File size: 5,264 Bytes
246d201 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import time
from typing import Any, Union
import requests
from requests.exceptions import ConnectionError, HTTPError, Timeout
class InvariantClient:
timeout: int = 120
def __init__(self, server_url: str, session_id: str | None = None):
self.server = server_url
self.session_id, err = self._create_session(session_id)
if err:
raise RuntimeError(f'Failed to create session: {err}')
self.Policy = self._Policy(self)
self.Monitor = self._Monitor(self)
def _create_session(
self, session_id: str | None = None
) -> tuple[str | None, Exception | None]:
elapsed = 0
while elapsed < self.timeout:
try:
if session_id:
response = requests.get(
f'{self.server}/session/new?session_id={session_id}', timeout=60
)
else:
response = requests.get(f'{self.server}/session/new', timeout=60)
response.raise_for_status()
return response.json().get('id'), None
except (ConnectionError, Timeout):
elapsed += 1
time.sleep(1)
except HTTPError as http_err:
return None, http_err
except Exception as err:
return None, err
return None, ConnectionError('Connection timed out')
def close_session(self) -> Union[None, Exception]:
try:
response = requests.delete(
f'{self.server}/session/?session_id={self.session_id}', timeout=60
)
response.raise_for_status()
except (ConnectionError, Timeout, HTTPError) as err:
return err
return None
class _Policy:
def __init__(self, invariant):
self.server = invariant.server
self.session_id = invariant.session_id
def _create_policy(self, rule: str) -> tuple[str | None, Exception | None]:
try:
response = requests.post(
f'{self.server}/policy/new?session_id={self.session_id}',
json={'rule': rule},
timeout=60,
)
response.raise_for_status()
return response.json().get('policy_id'), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err
def get_template(self) -> tuple[str | None, Exception | None]:
try:
response = requests.get(
f'{self.server}/policy/template',
timeout=60,
)
response.raise_for_status()
return response.json(), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err
def from_string(self, rule: str):
policy_id, err = self._create_policy(rule)
if err:
raise err
self.policy_id = policy_id
return self
def analyze(self, trace: list[dict]) -> Union[Any, Exception]:
try:
response = requests.post(
f'{self.server}/policy/{self.policy_id}/analyze?session_id={self.session_id}',
json={'trace': trace},
timeout=60,
)
response.raise_for_status()
return response.json(), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err
class _Monitor:
def __init__(self, invariant):
self.server = invariant.server
self.session_id = invariant.session_id
self.policy = ''
def _create_monitor(self, rule: str) -> tuple[str | None, Exception | None]:
try:
response = requests.post(
f'{self.server}/monitor/new?session_id={self.session_id}',
json={'rule': rule},
timeout=60,
)
response.raise_for_status()
return response.json().get('monitor_id'), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err
def from_string(self, rule: str):
monitor_id, err = self._create_monitor(rule)
if err:
raise err
self.monitor_id = monitor_id
self.policy = rule
return self
def check(
self, past_events: list[dict], pending_events: list[dict]
) -> Union[Any, Exception]:
try:
response = requests.post(
f'{self.server}/monitor/{self.monitor_id}/check?session_id={self.session_id}',
json={'past_events': past_events, 'pending_events': pending_events},
timeout=60,
)
response.raise_for_status()
return response.json(), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err
|