File size: 5,264 Bytes
246d201
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import time
from typing import Any, Union

import requests
from requests.exceptions import ConnectionError, HTTPError, Timeout


class InvariantClient:
    timeout: int = 120

    def __init__(self, server_url: str, session_id: str | None = None):
        self.server = server_url
        self.session_id, err = self._create_session(session_id)
        if err:
            raise RuntimeError(f'Failed to create session: {err}')
        self.Policy = self._Policy(self)
        self.Monitor = self._Monitor(self)

    def _create_session(

        self, session_id: str | None = None

    ) -> tuple[str | None, Exception | None]:
        elapsed = 0
        while elapsed < self.timeout:
            try:
                if session_id:
                    response = requests.get(
                        f'{self.server}/session/new?session_id={session_id}', timeout=60
                    )
                else:
                    response = requests.get(f'{self.server}/session/new', timeout=60)
                response.raise_for_status()
                return response.json().get('id'), None
            except (ConnectionError, Timeout):
                elapsed += 1
                time.sleep(1)
            except HTTPError as http_err:
                return None, http_err
            except Exception as err:
                return None, err
        return None, ConnectionError('Connection timed out')

    def close_session(self) -> Union[None, Exception]:
        try:
            response = requests.delete(
                f'{self.server}/session/?session_id={self.session_id}', timeout=60
            )
            response.raise_for_status()
        except (ConnectionError, Timeout, HTTPError) as err:
            return err
        return None

    class _Policy:
        def __init__(self, invariant):
            self.server = invariant.server
            self.session_id = invariant.session_id

        def _create_policy(self, rule: str) -> tuple[str | None, Exception | None]:
            try:
                response = requests.post(
                    f'{self.server}/policy/new?session_id={self.session_id}',
                    json={'rule': rule},
                    timeout=60,
                )
                response.raise_for_status()
                return response.json().get('policy_id'), None
            except (ConnectionError, Timeout, HTTPError) as err:
                return None, err

        def get_template(self) -> tuple[str | None, Exception | None]:
            try:
                response = requests.get(
                    f'{self.server}/policy/template',
                    timeout=60,
                )
                response.raise_for_status()
                return response.json(), None
            except (ConnectionError, Timeout, HTTPError) as err:
                return None, err

        def from_string(self, rule: str):
            policy_id, err = self._create_policy(rule)
            if err:
                raise err
            self.policy_id = policy_id
            return self

        def analyze(self, trace: list[dict]) -> Union[Any, Exception]:
            try:
                response = requests.post(
                    f'{self.server}/policy/{self.policy_id}/analyze?session_id={self.session_id}',
                    json={'trace': trace},
                    timeout=60,
                )
                response.raise_for_status()
                return response.json(), None
            except (ConnectionError, Timeout, HTTPError) as err:
                return None, err

    class _Monitor:
        def __init__(self, invariant):
            self.server = invariant.server
            self.session_id = invariant.session_id
            self.policy = ''

        def _create_monitor(self, rule: str) -> tuple[str | None, Exception | None]:
            try:
                response = requests.post(
                    f'{self.server}/monitor/new?session_id={self.session_id}',
                    json={'rule': rule},
                    timeout=60,
                )
                response.raise_for_status()
                return response.json().get('monitor_id'), None
            except (ConnectionError, Timeout, HTTPError) as err:
                return None, err

        def from_string(self, rule: str):
            monitor_id, err = self._create_monitor(rule)
            if err:
                raise err
            self.monitor_id = monitor_id
            self.policy = rule
            return self

        def check(

            self, past_events: list[dict], pending_events: list[dict]

        ) -> Union[Any, Exception]:
            try:
                response = requests.post(
                    f'{self.server}/monitor/{self.monitor_id}/check?session_id={self.session_id}',
                    json={'past_events': past_events, 'pending_events': pending_events},
                    timeout=60,
                )
                response.raise_for_status()
                return response.json(), None
            except (ConnectionError, Timeout, HTTPError) as err:
                return None, err