File size: 4,693 Bytes
1b67eb7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
from __future__ import annotations
from bs4 import BeautifulSoup
from aiohttp import ClientSession, ClientTimeout
from duckduckgo_search import DDGS
import asyncio
class SearchResults():
def __init__(self, results: list):
self.results = results
def __iter__(self):
yield from self.results
def __str__(self):
search = ""
for idx, result in enumerate(self.results):
if search:
search += "\n\n\n"
search += f"Title: {result.title}\n\n"
if result.text:
search += result.text
else:
search += result.snippet
search += f"\n\nSource: [[{idx}]]({result.url})"
return search
class SearchResultEntry():
def __init__(self, title: str, url: str, snippet: str, text: str = None):
self.title = title
self.url = url
self.snippet = snippet
self.text = text
def set_text(self, text: str):
self.text = text
def scrape_text(html: str, max_words: int = None) -> str:
soup = BeautifulSoup(html, "html.parser")
for exclude in soup(["script", "style"]):
exclude.extract()
for selector in [
"main",
".main-content-wrapper",
".main-content",
".emt-container-inner",
".content-wrapper",
"#content",
"#mainContent",
]:
select = soup.select_one(selector)
if select:
soup = select
break
# Zdnet
for remove in [".c-globalDisclosure"]:
select = soup.select_one(remove)
if select:
select.extract()
clean_text = ""
for paragraph in soup.select("p"):
text = paragraph.get_text()
for line in text.splitlines():
words = []
for word in line.replace("\t", " ").split(" "):
if word:
words.append(word)
count = len(words)
if not count:
continue
if max_words:
max_words -= count
if max_words <= 0:
break
if clean_text:
clean_text += "\n"
clean_text += " ".join(words)
return clean_text
async def fetch_and_scrape(session: ClientSession, url: str, max_words: int = None) -> str:
try:
async with session.get(url) as response:
if response.status == 200:
html = await response.text()
return scrape_text(html, max_words)
except:
return
async def search(query: str, n_results: int = 5, max_words: int = 2500, add_text: bool = True) -> SearchResults:
with DDGS() as ddgs:
results = []
for result in ddgs.text(
query,
region="wt-wt",
safesearch="moderate",
timelimit="y",
):
results.append(SearchResultEntry(
result["title"],
result["href"],
result["body"]
))
if len(results) >= n_results:
break
if add_text:
requests = []
async with ClientSession(timeout=ClientTimeout(5)) as session:
for entry in results:
requests.append(fetch_and_scrape(session, entry.url, int(max_words / (n_results - 1))))
texts = await asyncio.gather(*requests)
formatted_results = []
left_words = max_words
for i, entry in enumerate(results):
if add_text:
entry.text = texts[i]
if left_words:
left_words -= entry.title.count(" ") + 5
if entry.text:
left_words -= entry.text.count(" ")
else:
left_words -= entry.snippet.count(" ")
if 0 > left_words:
break
formatted_results.append(entry)
return SearchResults(formatted_results)
def get_search_message(prompt) -> str:
try:
search_results = asyncio.run(search(prompt))
message = f"""
{search_results}
Instruction: Using the provided web search results, to write a comprehensive reply to the user request.
Make sure to add the sources of cites using [[Number]](Url) notation after the reference. Example: [[0]](http://google.com)
If the provided search results refer to multiple subjects with the same name, write separate answers for each subject.
User request:
{prompt}
"""
return message
except Exception as e:
print("Couldn't search DuckDuckGo:", e)
return prompt
|