Spaces:
Runtime error
Runtime error
File size: 13,903 Bytes
91525e6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
import asyncio
import json
import re
from pathlib import Path
from .EdgeGPT import Chatbot, ConversationStyle
from .ImageGen import ImageGen
from log2d import Log
Log("BingChat")
log = Log.BingChat.debug # shortcut to create a log entry
class Cookie:
"""
Convenience class for Bing Cookie files, data, and configuration. This Class
is updated dynamically by the Query class to allow cycling through >1
cookie/credentials file e.g. when daily request limits (current 200 per
account per day) are exceeded.
"""
current_file_index = 0
dir_path = Path.home().resolve() / "bing_cookies"
current_file_path = dir_path # Avoid Path errors when no cookie file used
search_pattern = 'bing_cookies_*.json'
ignore_files = set()
request_count = {}
supplied_files = set()
rotate_cookies = True
@classmethod
def files(cls):
"""
Return a sorted list of all cookie files matching .search_pattern in
cls.dir_path, plus any supplied files, minus any ignored files.
"""
all_files = set(Path(cls.dir_path).glob(cls.search_pattern))
if hasattr(cls, "supplied_files"):
supplied_files = {x for x in cls.supplied_files if x.is_file()}
all_files.update(supplied_files)
return sorted(list(all_files - cls.ignore_files))
@classmethod
def import_data(cls):
"""
Read the active cookie file and populate the following attributes:
.current_file_path
.current_data
.image_token
"""
if not cls.files():
log(f"No files in Cookie.dir_path")
return
try:
cls.current_file_path = cls.files()[cls.current_file_index]
except IndexError:
log(f"Invalid file index [{cls.current_file_index}]")
log("Files in Cookie.dir_path:")
for file in cls.files():
log(f"{file}")
return
log(f"Importing cookies from: {cls.current_file_path.name}")
with open(cls.current_file_path, encoding="utf-8") as file:
cls.current_data = json.load(file)
cls.image_token = [x for x in cls.current_data if x.get("name").startswith("_U")]
cls.image_token = cls.image_token[0].get("value")
@classmethod
def import_next(cls, discard=False):
"""
Cycle through to the next cookies file then import it.
discard (bool): True -Mark the previous file to be ignored for the remainder of the current session. Otherwise cycle through all available
cookie files (sharing the workload and 'resting' when not in use).
"""
if not hasattr(cls, "current_file_path"):
cls.import_data()
return
try:
if discard:
cls.ignore_files.add(cls.current_file_path)
else:
Cookie.current_file_index += 1
except AttributeError:
# Will fail on first instantiation because no current_file_path
pass
if Cookie.current_file_index >= len(cls.files()):
Cookie.current_file_index = 0
Cookie.import_data()
class Query:
"""
A convenience class that wraps around EdgeGPT.Chatbot to encapsulate input,
config, and output all together. Relies on Cookie class for authentication
unless ignore_cookies=True
"""
index = []
image_dir_path = Path.cwd().resolve() / "bing_images"
def __init__(
self,
prompt,
style="precise",
content_type="text",
cookie_files=None,
ignore_cookies=False,
echo=True,
echo_prompt=False,
locale = "en-GB",
simplify_response = True,
):
"""
Arguments:
prompt: Text to enter into Bing Chat
style: creative, balanced, or precise
content_type: "text" for Bing Chat; "image" for Dall-e
ignore_cookies (bool): Ignore cookie data altogether
echo: Print something to confirm request made
echo_prompt: Print confirmation of the evaluated prompt
simplify_response: True -> single simplified prompt/response exchange
cookie_files: iterable of Paths or strings of cookie files (json)
Files in Cookie.dir_path will also be used if they exist. This defaults
to the current working directory, so set Cookie.dir_path before
creating a Query if your cookie files are elsewhere.
"""
self.__class__.index += [self]
self.prompt = prompt
self.locale = locale
self.simplify_response = simplify_response
self.ignore_cookies = ignore_cookies
if not ignore_cookies:
if cookie_files:
# Convert singular argument to an iterable:
if isinstance(cookie_files, (str, Path)):
cookie_files = {cookie_files}
# Check all elements exist and are Paths:
cookie_files = {
Path(x).resolve()
for x in cookie_files
if isinstance(x, (str, Path)) and x
}
Cookie.supplied_files = cookie_files
files = Cookie.files() # includes .supplied_files
if Cookie.rotate_cookies:
Cookie.import_next()
else:
Cookie.import_data()
if content_type == "text":
self.style = style
self.log_and_send_query(echo, echo_prompt)
if content_type == "image":
self.create_image()
def log_and_send_query(self, echo, echo_prompt):
self.response = asyncio.run(self.send_to_bing(echo, echo_prompt))
if not hasattr(Cookie, "current_data"):
name = "<no_cookies>"
else:
name = Cookie.current_file_path.name
if not Cookie.request_count.get(name):
Cookie.request_count[name] = 1
else:
Cookie.request_count[name] += 1
def create_image(self):
image_generator = ImageGen(Cookie.image_token)
image_generator.save_images(
image_generator.get_images(self.prompt),
output_dir=self.__class__.image_dir_path,
)
async def send_to_bing(self, echo=True, echo_prompt=False):
"""Creat, submit, then close a Chatbot instance. Return the response"""
retries = len(Cookie.files()) or 1
while retries:
if not hasattr(Cookie, "current_data"):
bot = await Chatbot.create()
else:
bot = await Chatbot.create(cookies=Cookie.current_data)
if echo_prompt:
log(f"{self.prompt=}")
if echo:
log("Waiting for response...")
if self.style.lower() not in "creative balanced precise".split():
self.style = "precise"
try:
response = await bot.ask(
prompt=self.prompt,
conversation_style=getattr(ConversationStyle, self.style),simplify_response=self.simplify_response,
locale=self.locale,
)
return response
except Exception as ex:
log(f"Exception: [{Cookie.current_file_path.name} may have exceeded the daily limit]\n{ex}")
Cookie.import_next(discard=True)
retries -= 1
finally:
await bot.close()
@property
def output(self):
"""The response from a completed Chatbot request"""
if self.simplify_response:
try:
return self.response['text']
except TypeError as te:
raise TypeError(f"{te}\n(No response received - probably rate throttled...)")
else:
return [
x.get('text') or x.get('hiddenText')
for x in self.response['item']['messages']
if x['author']=='bot'
]
@property
def sources(self):
"""The source names and details parsed from a completed Chatbot request"""
if self.simplify_response:
return self.response['sources_text']
else:
return [
x.get('sourceAttributions') or []
for x in self.response['item']['messages']
if x['author']=='bot'
]
@property
def sources_dict(self):
"""The source names and details as a dictionary"""
if self.simplify_response:
text = self.response['sources_text']
sources = enumerate(re.findall(r'\((http.*?)\)', text))
return {index+1: value for index, value in sources}
else:
all_sources = []
name = 'providerDisplayName'
url = 'seeMoreUrl'
for sources in self.sources:
if not sources:
continue
data = {}
for index, source in enumerate(sources):
if name in source.keys() and url in source.keys():
data[index+1] = source[url]
else:
continue
all_sources += [data]
return all_sources
@property
def code_block_formats(self):
"""
Extract a list of programming languages/formats used in code blocks
"""
regex = r"``` *(\b\w+\b\+*) *"
if self.simplify_response:
return re.findall(regex, self.output)
else:
return re.findall(regex, "\n".join(self.output))
@property
def code_blocks(self):
"""
Return a list of code blocks (```) or snippets (`) as strings.
If the response contains a mix of snippets and code blocks, return the
code blocks only.
This method is not suitable if the main text response includes either of
the delimiters but not as part of an actual snippet or code block.
For example:
'In Markdown, the back-tick (`) is used to denote a code snippet'
"""
final_blocks = []
if isinstance(self.output, str): # I.e. simplify_response is True
separator = '```' if '```' in self.output else '`'
code_blocks = self.output.split(separator)[1:-1:2]
if separator == '`':
return code_blocks
else:
code_blocks = []
for response in self.output:
separator = '```' if '```' in response else '`'
code_blocks.extend(response.split(separator)[1:-1:2])
code_blocks = [x for x in code_blocks if x]
# Remove language name if present:
for block in code_blocks:
lines = block.splitlines()
code = lines[1:] if re.match(" *\w+ *", lines[0]) else lines
final_blocks += ["\n".join(code).removeprefix(separator)]
return [x for x in final_blocks if x]
@property
def code(self):
"""
Extract and join any snippets of code or formatted data in the response
"""
return "\n\n".join(self.code_blocks)
@property
def suggestions(self):
"""Follow-on questions suggested by the Chatbot"""
if self.simplify_response:
return self.response['suggestions']
else:
try:
return [x['text'] for x in self.response['item']['messages'][1]['suggestedResponses']]
except KeyError:
return
def __repr__(self):
return f"<EdgeGPT.Query: {self.prompt}>"
def __str__(self):
if self.simplify_response:
return self.output
else:
return "\n\n".join(self.output)
class ImageQuery(Query):
def __init__(self, prompt, **kwargs):
kwargs.update({"content_type": "image"})
super().__init__(prompt, **kwargs)
def __repr__(self):
return f"<EdgeGPT.ImageQuery: {self.prompt}>"
def test_cookie_rotation():
for i in range(1, 50):
q = Query(f"What is {i} in Roman numerals? Give the answer in JSON", style="precise")
log(f"{i}: {Cookie.current_file_path.name}")
log(q.code)
log(f"Cookie count: {Cookie.request_count.get(Cookie.current_file_path.name)}")
def test_features():
try:
q = Query(f"What is {i} in Roman numerals? Give the answer in JSON", style="precise")
log(f"{i}: {Cookie.current_file_path.name}")
print(f"{Cookie.current_file_index=}")
print(f"{Cookie.current_file_path=}")
print(f"{Cookie.current_data=}")
print(f"{Cookie.dir_path=}")
print(f"{Cookie.search_pattern=}")
print(f"{Cookie.files()=}")
print(f"{Cookie.image_token=}")
print(f"{Cookie.import_next(discard=True)=}")
print(f"{Cookie.rotate_cookies=}")
print(f"{Cookie.files()=}")
print(f"{Cookie.ignore_files=}")
print(f"{Cookie.supplied_files=}")
print(f"{Cookie.request_count=}") # Keeps a tally of requests made in using each cookie file during this session
print(f"{q=}")
print(f"{q.prompt=}")
print(f"{q.ignore_cookies=}")
print(f"{q.style=}")
print(f"{q.simplify_response=}")
print(f"{q.locale=}")
print(f"{q.output=}")
print(q)
print(f"{q.sources=}")
print(f"{q.sources_dict=}")
print(f"{q.suggestions=}")
print(f"{q.code=}") # All code as a single string
print(f"{q.code_blocks=}") # Individual code blocks
print(f"{q.code_block_formats=}") # The language/format of each code block (if given)
print(f"{Query.index=}") # Keeps an index of Query objects created
print(f"{Query.image_dir_path=}")
except Exception as E:
raise Exception(E)
finally:
return q
|