File size: 13,903 Bytes
91525e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
import asyncio
import json
import re
from pathlib import Path

from .EdgeGPT import Chatbot, ConversationStyle
from .ImageGen import ImageGen
from log2d import Log

Log("BingChat")
log = Log.BingChat.debug  # shortcut to create a log entry

class Cookie:
    """
    Convenience class for Bing Cookie files, data, and configuration. This Class
    is updated dynamically by the Query class to allow cycling through >1
    cookie/credentials file e.g. when daily request limits (current 200 per
    account per day) are exceeded.
    """
    current_file_index = 0
    dir_path = Path.home().resolve() / "bing_cookies"
    current_file_path = dir_path  # Avoid Path errors when no cookie file used
    search_pattern = 'bing_cookies_*.json'
    ignore_files = set()
    request_count = {}
    supplied_files = set()
    rotate_cookies = True

    @classmethod
    def files(cls):
        """
        Return a sorted list of all cookie files matching .search_pattern in
        cls.dir_path, plus any supplied files, minus any ignored files.
        """
        all_files = set(Path(cls.dir_path).glob(cls.search_pattern))
        if hasattr(cls, "supplied_files"):
            supplied_files = {x for x in cls.supplied_files if x.is_file()}
            all_files.update(supplied_files)
        return sorted(list(all_files - cls.ignore_files))

    @classmethod
    def import_data(cls):
        """
        Read the active cookie file and populate the following attributes:

          .current_file_path
          .current_data
          .image_token
        """
        if not cls.files():
            log(f"No files in Cookie.dir_path")
            return
        try:
            cls.current_file_path = cls.files()[cls.current_file_index]
        except IndexError:
            log(f"Invalid file index [{cls.current_file_index}]")
            log("Files in Cookie.dir_path:")
            for file in cls.files():
                log(f"{file}")
            return
        log(f"Importing cookies from: {cls.current_file_path.name}")
        with open(cls.current_file_path, encoding="utf-8") as file:
            cls.current_data = json.load(file)
        cls.image_token = [x for x in cls.current_data if x.get("name").startswith("_U")]
        cls.image_token = cls.image_token[0].get("value")

    @classmethod
    def import_next(cls, discard=False):
        """
        Cycle through to the next cookies file then import it.

        discard (bool): True -Mark the previous file to be ignored for the remainder of the current session.  Otherwise cycle through all available
        cookie files (sharing the workload and 'resting' when not in use).
        """
        if not hasattr(cls, "current_file_path"):
            cls.import_data()
            return
        try:
            if discard:
                cls.ignore_files.add(cls.current_file_path)
            else:
                Cookie.current_file_index += 1
        except AttributeError:
            # Will fail on first instantiation because no current_file_path
            pass
        if Cookie.current_file_index >= len(cls.files()):
            Cookie.current_file_index = 0
        Cookie.import_data()

class Query:
    """
    A convenience class that wraps around EdgeGPT.Chatbot to encapsulate input,
    config, and output all together.  Relies on Cookie class for authentication
    unless ignore_cookies=True
    """
    index = []
    image_dir_path = Path.cwd().resolve() / "bing_images"

    def __init__(
        self,
        prompt,
        style="precise",
        content_type="text",
        cookie_files=None,
        ignore_cookies=False,
        echo=True,
        echo_prompt=False,
        locale = "en-GB",
        simplify_response = True,
    ):
        """
        Arguments:

        prompt: Text to enter into Bing Chat
        style: creative, balanced, or precise
        content_type: "text" for Bing Chat; "image" for Dall-e
        ignore_cookies (bool): Ignore cookie data altogether
        echo: Print something to confirm request made
        echo_prompt: Print confirmation of the evaluated prompt
        simplify_response: True -> single simplified prompt/response exchange
        cookie_files: iterable of Paths or strings of cookie files (json)

        Files in Cookie.dir_path will also be used if they exist.  This defaults
        to the current working directory, so set Cookie.dir_path before
        creating a Query if your cookie files are elsewhere.
        """
        self.__class__.index += [self]
        self.prompt = prompt
        self.locale = locale
        self.simplify_response = simplify_response
        self.ignore_cookies = ignore_cookies
        if not ignore_cookies:
            if cookie_files:
                # Convert singular argument to an iterable:
                if isinstance(cookie_files, (str, Path)):
                    cookie_files = {cookie_files}
                # Check all elements exist and are Paths:
                cookie_files = {
                    Path(x).resolve()
                    for x in cookie_files
                    if isinstance(x, (str, Path)) and x
                }
                Cookie.supplied_files = cookie_files
            files = Cookie.files()  # includes .supplied_files
            if Cookie.rotate_cookies:
                Cookie.import_next()
            else:
                Cookie.import_data()
        if content_type == "text":
            self.style = style
            self.log_and_send_query(echo, echo_prompt)
        if content_type == "image":
            self.create_image()

    def log_and_send_query(self, echo, echo_prompt):
        self.response = asyncio.run(self.send_to_bing(echo, echo_prompt))
        if not hasattr(Cookie, "current_data"):
            name = "<no_cookies>"
        else:
            name = Cookie.current_file_path.name
        if not Cookie.request_count.get(name):
            Cookie.request_count[name] = 1
        else:
            Cookie.request_count[name] += 1

    def create_image(self):
        image_generator = ImageGen(Cookie.image_token)
        image_generator.save_images(
            image_generator.get_images(self.prompt),
            output_dir=self.__class__.image_dir_path,
        )

    async def send_to_bing(self, echo=True, echo_prompt=False):
        """Creat, submit, then close a Chatbot instance.  Return the response"""
        retries = len(Cookie.files()) or 1
        while retries:
            if not hasattr(Cookie, "current_data"):
                bot = await Chatbot.create()
            else:
                bot = await Chatbot.create(cookies=Cookie.current_data)
            if echo_prompt:
                log(f"{self.prompt=}")
            if echo:
                log("Waiting for response...")
            if self.style.lower() not in "creative balanced precise".split():
                self.style = "precise"
            try:
                response = await bot.ask(
                    prompt=self.prompt,
                    conversation_style=getattr(ConversationStyle, self.style),simplify_response=self.simplify_response,
                    locale=self.locale,
                )
                return response
            except Exception as ex:
                log(f"Exception: [{Cookie.current_file_path.name} may have exceeded the daily limit]\n{ex}")
                Cookie.import_next(discard=True)
                retries -= 1
            finally:
                await bot.close()

    @property
    def output(self):
        """The response from a completed Chatbot request"""
        if self.simplify_response:
            try:
                return self.response['text']
            except TypeError as te:
                raise TypeError(f"{te}\n(No response received - probably rate throttled...)")
        else:
            return [
                x.get('text') or x.get('hiddenText')
                for x in self.response['item']['messages']
                if x['author']=='bot'
            ]

    @property
    def sources(self):
        """The source names and details parsed from a completed Chatbot request"""
        if self.simplify_response:
            return self.response['sources_text']
        else:
            return [
                x.get('sourceAttributions') or []
                for x in self.response['item']['messages']
                if x['author']=='bot'
            ]

    @property
    def sources_dict(self):
        """The source names and details as a dictionary"""
        if self.simplify_response:
            text = self.response['sources_text']
            sources = enumerate(re.findall(r'\((http.*?)\)', text))
            return {index+1: value for index, value in sources}
        else:
            all_sources = []
            name = 'providerDisplayName'
            url = 'seeMoreUrl'
            for sources in self.sources:
                if not sources:
                    continue
                data = {}
                for index, source in enumerate(sources):
                    if name in source.keys() and url in source.keys():
                        data[index+1] = source[url]
                    else:
                        continue
                all_sources += [data]
            return all_sources

    @property
    def code_block_formats(self):
        """
        Extract a list of programming languages/formats used in code blocks
        """
        regex = r"``` *(\b\w+\b\+*) *"
        if self.simplify_response:
            return re.findall(regex, self.output)
        else:
            return re.findall(regex, "\n".join(self.output))

    @property
    def code_blocks(self):
        """
        Return a list of code blocks (```) or snippets (`) as strings.

        If the response contains a mix of snippets and code blocks, return the
        code blocks only.

        This method is not suitable if the main text response includes either of
        the delimiters but not as part of an actual snippet or code block.

        For example:
        'In Markdown, the back-tick (`) is used to denote a code snippet'

        """

        final_blocks = []
        if isinstance(self.output, str):  # I.e. simplify_response is True
            separator = '```' if '```' in self.output else '`'
            code_blocks = self.output.split(separator)[1:-1:2]
            if separator == '`':
                return code_blocks
        else:
            code_blocks = []
            for response in self.output:
                separator = '```' if '```' in response else '`'
                code_blocks.extend(response.split(separator)[1:-1:2])
            code_blocks = [x for x in code_blocks if x]
        # Remove language name if present:
        for block in code_blocks:
            lines = block.splitlines()
            code = lines[1:] if re.match(" *\w+ *", lines[0]) else lines
            final_blocks += ["\n".join(code).removeprefix(separator)]
        return [x for x in final_blocks if x]

    @property
    def code(self):
        """
        Extract and join any snippets of code or formatted data in the response
        """
        return "\n\n".join(self.code_blocks)


    @property
    def suggestions(self):
        """Follow-on questions suggested by the Chatbot"""
        if self.simplify_response:
            return self.response['suggestions']
        else:
            try:
                return [x['text'] for x in self.response['item']['messages'][1]['suggestedResponses']]
            except KeyError:
                return

    def __repr__(self):
        return f"<EdgeGPT.Query: {self.prompt}>"

    def __str__(self):
        if self.simplify_response:
            return self.output
        else:
            return "\n\n".join(self.output)

class ImageQuery(Query):
    def __init__(self, prompt, **kwargs):
        kwargs.update({"content_type": "image"})
        super().__init__(prompt, **kwargs)

    def __repr__(self):
        return f"<EdgeGPT.ImageQuery: {self.prompt}>"

def test_cookie_rotation():
    for i in range(1, 50):
        q = Query(f"What is {i} in Roman numerals?  Give the answer in JSON", style="precise")
        log(f"{i}: {Cookie.current_file_path.name}")
        log(q.code)
        log(f"Cookie count: {Cookie.request_count.get(Cookie.current_file_path.name)}")

def test_features():
    try:
        q = Query(f"What is {i} in Roman numerals?  Give the answer in JSON", style="precise")
        log(f"{i}: {Cookie.current_file_path.name}")
        print(f"{Cookie.current_file_index=}")
        print(f"{Cookie.current_file_path=}")
        print(f"{Cookie.current_data=}")
        print(f"{Cookie.dir_path=}")
        print(f"{Cookie.search_pattern=}")
        print(f"{Cookie.files()=}")
        print(f"{Cookie.image_token=}")
        print(f"{Cookie.import_next(discard=True)=}")
        print(f"{Cookie.rotate_cookies=}")
        print(f"{Cookie.files()=}")
        print(f"{Cookie.ignore_files=}")
        print(f"{Cookie.supplied_files=}")
        print(f"{Cookie.request_count=}")  # Keeps a tally of requests made in using each cookie file during this session
        print(f"{q=}")
        print(f"{q.prompt=}")
        print(f"{q.ignore_cookies=}")
        print(f"{q.style=}")
        print(f"{q.simplify_response=}")
        print(f"{q.locale=}")
        print(f"{q.output=}")
        print(q)
        print(f"{q.sources=}")
        print(f"{q.sources_dict=}")
        print(f"{q.suggestions=}")
        print(f"{q.code=}")  # All code as a single string
        print(f"{q.code_blocks=}")  # Individual code blocks
        print(f"{q.code_block_formats=}")  # The language/format of each code block (if given)
        print(f"{Query.index=}")  # Keeps an index of Query objects created
        print(f"{Query.image_dir_path=}")
    except Exception as E:
        raise Exception(E)    
    finally:
        return q