aiws / search_engine.py
fikird
Improve search reliability with multiple regions and better error handling
5e3672b
raw
history blame
9.91 kB
from typing import Dict, List, Any
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
from transformers import pipeline
from langchain_community.embeddings import HuggingFaceEmbeddings
import time
import json
import os
from urllib.parse import urlparse
import logging
import random
logger = logging.getLogger(__name__)
class SearchResult:
def __init__(self, title: str, link: str, snippet: str):
self.title = title
self.link = link
self.snippet = snippet
class ModelManager:
"""Manages different AI models for specific tasks"""
def __init__(self):
self.device = "cpu"
self.models = {}
self.load_models()
def load_models(self):
# Use smaller models for CPU deployment
self.models['summarizer'] = pipeline(
"summarization",
model="facebook/bart-base",
device=self.device
)
self.models['embeddings'] = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": self.device}
)
class ContentProcessor:
"""Processes and analyzes different types of content"""
def __init__(self):
self.model_manager = ModelManager()
def process_content(self, content: str) -> Dict:
"""Process content and generate insights"""
try:
# Generate summary
summary = self.model_manager.models['summarizer'](
content[:1024],
max_length=100,
min_length=30,
do_sample=False
)[0]['summary_text']
return {
'summary': summary,
'content': content
}
except Exception as e:
return {
'summary': f"Error processing content: {str(e)}",
'content': content
}
class WebSearchEngine:
"""Main search engine class"""
def __init__(self):
self.processor = ContentProcessor()
self.session = requests.Session()
self.request_delay = 2.0
self.last_request_time = 0
self.max_retries = 3
self.ddgs = None
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.initialize_search()
def initialize_search(self):
"""Initialize DuckDuckGo search with retries"""
for _ in range(self.max_retries):
try:
self.ddgs = DDGS()
return
except Exception as e:
logger.error(f"Error initializing DDGS: {str(e)}")
time.sleep(random.uniform(1, 3))
raise Exception("Failed to initialize DuckDuckGo search after multiple attempts")
def safe_get(self, url: str, max_retries: int = 3) -> requests.Response:
"""Make a GET request with retries and error handling"""
for i in range(max_retries):
try:
# Add delay between requests
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.request_delay:
time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5))
response = self.session.get(url, headers=self.headers, timeout=10)
self.last_request_time = time.time()
if response.status_code == 200:
return response
elif response.status_code == 429: # Rate limit
wait_time = (i + 1) * 5
time.sleep(wait_time)
continue
else:
response.raise_for_status()
except Exception as e:
if i == max_retries - 1:
raise
time.sleep((i + 1) * 2)
raise Exception(f"Failed to fetch URL after {max_retries} attempts")
def is_valid_url(self, url: str) -> bool:
"""Check if URL is valid for crawling"""
try:
parsed = urlparse(url)
return bool(parsed.netloc and parsed.scheme)
except:
return False
def get_metadata(self, soup: BeautifulSoup) -> Dict:
"""Extract metadata from page"""
title = soup.title.string if soup.title else "No title"
description = ""
if soup.find("meta", attrs={"name": "description"}):
description = soup.find("meta", attrs={"name": "description"}).get("content", "")
return {
'title': title,
'description': description
}
def process_url(self, url: str) -> Dict:
"""Process a single URL"""
if not self.is_valid_url(url):
return {'error': f"Invalid URL: {url}"}
try:
response = self.safe_get(url)
soup = BeautifulSoup(response.text, 'lxml')
# Extract text content
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
content = ' '.join(chunk for chunk in chunks if chunk)
# Get metadata
metadata = self.get_metadata(soup)
# Process content
processed = self.processor.process_content(content)
return {
'url': url,
'title': metadata['title'],
'description': metadata['description'],
'summary': processed['summary'],
'content': processed['content']
}
except Exception as e:
return {'error': f"Error processing {url}: {str(e)}"}
def search(self, query: str, max_results: int = 5) -> Dict:
"""Perform search and process results"""
try:
# Initialize search if needed
if self.ddgs is None:
self.initialize_search()
# Add delay before search
time.sleep(random.uniform(1, 2))
# Search using DuckDuckGo with retries
search_results = []
retry_count = 0
while retry_count < self.max_retries and len(search_results) < max_results:
try:
# Try different regions if search fails
regions = ['wt-wt', 'us-en', 'uk-en']
for region in regions:
if len(search_results) >= max_results:
break
results_gen = self.ddgs.text(
query,
region=region,
max_results=max_results - len(search_results)
)
for result in results_gen:
if len(search_results) >= max_results:
break
if result and isinstance(result, dict) and 'link' in result:
search_results.append(result)
time.sleep(random.uniform(0.2, 0.5))
if search_results:
break
if search_results:
break
except Exception as e:
retry_count += 1
if retry_count >= self.max_retries:
logger.error(f"Search failed after {self.max_retries} attempts: {str(e)}")
if not search_results:
return {'error': f"Search failed after {self.max_retries} attempts: {str(e)}"}
break
logger.warning(f"Search attempt {retry_count} failed: {str(e)}")
time.sleep(random.uniform(2, 5))
self.initialize_search()
if not search_results:
return {'error': 'No results found'}
results = []
for result in search_results:
if 'link' in result:
processed = self.process_url(result['link'])
if 'error' not in processed:
results.append(processed)
time.sleep(random.uniform(0.5, 1.0))
if not results:
return {'error': 'Failed to process any search results'}
# Generate insights from results
all_content = " ".join([r['summary'] for r in results if 'summary' in r])
return {
'results': results,
'insights': all_content[:1000] if all_content else "No insights available.",
'follow_up_questions': [
f"What are the key differences between {query} and related topics?",
f"Can you explain {query} in simple terms?",
f"What are the latest developments in {query}?"
]
}
except Exception as e:
return {'error': f"Search failed: {str(e)}"}
# Main search function
def search(query: str, max_results: int = 5) -> Dict:
"""Main search function"""
engine = WebSearchEngine()
return engine.search(query, max_results)