aiws / search_engine.py
fikird
Enhance content processing with better extraction and summarization
f2c01c1
raw
history blame
13.5 kB
from typing import Dict, List, Any
import requests
from bs4 import BeautifulSoup
from transformers import pipeline
from langchain_community.embeddings import HuggingFaceEmbeddings
import time
import json
import os
from urllib.parse import urlparse, quote_plus
import logging
import random
logger = logging.getLogger(__name__)
class SearchResult:
def __init__(self, title: str, link: str, snippet: str):
self.title = title
self.link = link
self.snippet = snippet
class ModelManager:
"""Manages different AI models for specific tasks"""
def __init__(self):
self.device = "cpu"
self.models = {}
self.load_models()
def load_models(self):
# Use smaller models for CPU deployment
self.models['summarizer'] = pipeline(
"summarization",
model="facebook/bart-base",
device=self.device
)
self.models['embeddings'] = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": self.device}
)
class ContentProcessor:
"""Processes and analyzes different types of content"""
def __init__(self):
self.model_manager = ModelManager()
def clean_text(self, text: str) -> str:
"""Clean and normalize text content"""
# Remove extra whitespace
text = ' '.join(text.split())
# Remove common navigation elements
nav_elements = [
"skip to content",
"search",
"menu",
"navigation",
"subscribe",
"sign in",
"log in",
"submit",
"browse",
]
for element in nav_elements:
text = text.replace(element.lower(), "")
return text.strip()
def extract_main_content(self, soup: BeautifulSoup) -> str:
"""Extract main content from HTML, prioritizing article content"""
content = ""
# Try to find main content containers
priority_tags = [
('article', {}),
('div', {'class': ['article', 'post', 'content', 'main']}),
('div', {'id': ['article', 'post', 'content', 'main']}),
('main', {}),
]
for tag, attrs in priority_tags:
elements = soup.find_all(tag, attrs)
if elements:
content = " ".join(elem.get_text(strip=True) for elem in elements)
if content:
break
# If no main content found, try extracting paragraphs
if not content:
paragraphs = soup.find_all('p')
content = " ".join(p.get_text(strip=True) for p in paragraphs if len(p.get_text(strip=True)) > 100)
return self.clean_text(content)
def extract_key_points(self, text: str, max_points: int = 5) -> List[str]:
"""Extract key points from text using sentence transformers"""
try:
# Split into sentences
sentences = [s.strip() for s in text.split('.') if len(s.strip()) > 20]
if not sentences:
return []
# Get embeddings for sentences
embeddings = self.model_manager.models['embeddings'].embed_documents(sentences)
# Use simple clustering to find diverse sentences
selected_indices = [0] # Start with first sentence
for _ in range(min(max_points - 1, len(sentences) - 1)):
# Find sentence most different from selected ones
max_diff = -1
max_idx = -1
for i in range(len(sentences)):
if i not in selected_indices:
# Calculate average difference from selected sentences
diffs = [sum((embeddings[i][j] - embeddings[k][j])**2
for j in range(len(embeddings[i])))
for k in selected_indices]
avg_diff = sum(diffs) / len(diffs)
if avg_diff > max_diff:
max_diff = avg_diff
max_idx = i
if max_idx != -1:
selected_indices.append(max_idx)
return [sentences[i] for i in selected_indices]
except Exception as e:
logger.error(f"Error extracting key points: {str(e)}")
return []
def process_content(self, content: str, soup: BeautifulSoup = None) -> Dict:
"""Process content and generate insights"""
try:
# Extract main content if HTML is available
if soup:
content = self.extract_main_content(soup)
else:
content = self.clean_text(content)
# Generate summary
summary = self.model_manager.models['summarizer'](
content[:1024],
max_length=150,
min_length=50,
do_sample=False
)[0]['summary_text']
# Extract key points
key_points = self.extract_key_points(content)
return {
'summary': summary,
'content': content,
'key_points': key_points
}
except Exception as e:
return {
'summary': f"Error processing content: {str(e)}",
'content': content,
'key_points': []
}
class WebSearchEngine:
"""Main search engine class"""
def __init__(self):
self.processor = ContentProcessor()
self.session = requests.Session()
self.request_delay = 2.0
self.last_request_time = 0
self.max_retries = 3
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
def safe_get(self, url: str, max_retries: int = 3) -> requests.Response:
"""Make a GET request with retries and error handling"""
for i in range(max_retries):
try:
# Add delay between requests
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.request_delay:
time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5))
response = self.session.get(url, headers=self.headers, timeout=10)
self.last_request_time = time.time()
if response.status_code == 200:
return response
elif response.status_code == 429: # Rate limit
wait_time = (i + 1) * 5
time.sleep(wait_time)
continue
else:
response.raise_for_status()
except Exception as e:
if i == max_retries - 1:
raise
time.sleep((i + 1) * 2)
raise Exception(f"Failed to fetch URL after {max_retries} attempts")
def is_valid_url(self, url: str) -> bool:
"""Check if URL is valid for crawling"""
try:
parsed = urlparse(url)
return bool(parsed.netloc and parsed.scheme)
except:
return False
def get_metadata(self, soup: BeautifulSoup) -> Dict:
"""Extract metadata from page"""
title = soup.title.string if soup.title else "No title"
description = ""
if soup.find("meta", attrs={"name": "description"}):
description = soup.find("meta", attrs={"name": "description"}).get("content", "")
return {
'title': title,
'description': description
}
def process_url(self, url: str) -> Dict:
"""Process a single URL"""
if not self.is_valid_url(url):
return {'error': f"Invalid URL: {url}"}
try:
response = self.safe_get(url)
soup = BeautifulSoup(response.text, 'lxml')
# Get metadata
metadata = self.get_metadata(soup)
# Process content
processed = self.processor.process_content("", soup=soup)
return {
'url': url,
'title': metadata['title'],
'description': metadata['description'],
'summary': processed['summary'],
'key_points': processed['key_points'],
'content': processed['content']
}
except Exception as e:
return {'error': f"Error processing {url}: {str(e)}"}
def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict]:
"""Search DuckDuckGo and parse HTML results"""
search_results = []
try:
# Encode query for URL
encoded_query = quote_plus(query)
# DuckDuckGo HTML search URL
search_url = f'https://html.duckduckgo.com/html/?q={encoded_query}'
# Get search results page
response = self.safe_get(search_url)
soup = BeautifulSoup(response.text, 'lxml')
# Find all result elements
results = soup.find_all('div', {'class': 'result'})
for result in results[:max_results]:
try:
# Extract link
link_elem = result.find('a', {'class': 'result__a'})
if not link_elem:
continue
link = link_elem.get('href', '')
if not link or not self.is_valid_url(link):
continue
# Extract title
title = link_elem.get_text(strip=True)
# Extract snippet
snippet_elem = result.find('a', {'class': 'result__snippet'})
snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
search_results.append({
'link': link,
'title': title,
'snippet': snippet
})
# Add delay between processing results
time.sleep(random.uniform(0.2, 0.5))
except Exception as e:
logger.warning(f"Error processing search result: {str(e)}")
continue
return search_results
except Exception as e:
logger.error(f"Error during DuckDuckGo search: {str(e)}")
return []
def search(self, query: str, max_results: int = 5) -> Dict:
"""Perform search and process results"""
try:
# Search using DuckDuckGo HTML
search_results = self.search_duckduckgo(query, max_results)
if not search_results:
return {'error': 'No results found'}
results = []
all_key_points = []
for result in search_results:
if 'link' in result:
processed = self.process_url(result['link'])
if 'error' not in processed:
results.append(processed)
if 'key_points' in processed:
all_key_points.extend(processed['key_points'])
time.sleep(random.uniform(0.5, 1.0))
if not results:
return {'error': 'Failed to process any search results'}
# Combine insights from all results
combined_summary = " ".join([r['summary'] for r in results if 'summary' in r])
# Generate overall insights
insights = self.processor.model_manager.models['summarizer'](
combined_summary,
max_length=200,
min_length=100,
do_sample=False
)[0]['summary_text']
return {
'results': results,
'insights': insights,
'key_points': all_key_points[:10], # Top 10 key points
'follow_up_questions': [
f"What are the recent breakthroughs in {query}?",
f"How does {query} impact various industries?",
f"What are the future prospects of {query}?"
]
}
except Exception as e:
return {'error': f"Search failed: {str(e)}"}
# Main search function
def search(query: str, max_results: int = 5) -> Dict:
"""Main search function"""
engine = WebSearchEngine()
return engine.search(query, max_results)