Spaces:
Build error
Build error
from typing import Dict, List, Any | |
import requests | |
from bs4 import BeautifulSoup | |
from transformers import pipeline | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import time | |
import json | |
import os | |
from urllib.parse import urlparse, quote_plus | |
import logging | |
import random | |
logger = logging.getLogger(__name__) | |
class SearchResult: | |
def __init__(self, title: str, link: str, snippet: str): | |
self.title = title | |
self.link = link | |
self.snippet = snippet | |
class ModelManager: | |
"""Manages different AI models for specific tasks""" | |
def __init__(self): | |
self.device = "cpu" | |
self.models = {} | |
self.load_models() | |
def load_models(self): | |
# Use smaller models for CPU deployment | |
self.models['summarizer'] = pipeline( | |
"summarization", | |
model="facebook/bart-base", | |
device=self.device | |
) | |
self.models['embeddings'] = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-MiniLM-L6-v2", | |
model_kwargs={"device": self.device} | |
) | |
class ContentProcessor: | |
"""Processes and analyzes different types of content""" | |
def __init__(self): | |
self.model_manager = ModelManager() | |
def clean_text(self, text: str) -> str: | |
"""Clean and normalize text content""" | |
# Remove extra whitespace | |
text = ' '.join(text.split()) | |
# Remove common navigation elements | |
nav_elements = [ | |
"skip to content", | |
"search", | |
"menu", | |
"navigation", | |
"subscribe", | |
"sign in", | |
"log in", | |
"submit", | |
"browse", | |
] | |
for element in nav_elements: | |
text = text.replace(element.lower(), "") | |
return text.strip() | |
def extract_main_content(self, soup: BeautifulSoup) -> str: | |
"""Extract main content from HTML, prioritizing article content""" | |
content = "" | |
# Try to find main content containers | |
priority_tags = [ | |
('article', {}), | |
('div', {'class': ['article', 'post', 'content', 'main']}), | |
('div', {'id': ['article', 'post', 'content', 'main']}), | |
('main', {}), | |
] | |
for tag, attrs in priority_tags: | |
elements = soup.find_all(tag, attrs) | |
if elements: | |
content = " ".join(elem.get_text(strip=True) for elem in elements) | |
if content: | |
break | |
# If no main content found, try extracting paragraphs | |
if not content: | |
paragraphs = soup.find_all('p') | |
content = " ".join(p.get_text(strip=True) for p in paragraphs if len(p.get_text(strip=True)) > 100) | |
return self.clean_text(content) | |
def extract_key_points(self, text: str, max_points: int = 5) -> List[str]: | |
"""Extract key points from text using sentence transformers""" | |
try: | |
# Split into sentences | |
sentences = [s.strip() for s in text.split('.') if len(s.strip()) > 20] | |
if not sentences: | |
return [] | |
# Get embeddings for sentences | |
embeddings = self.model_manager.models['embeddings'].embed_documents(sentences) | |
# Use simple clustering to find diverse sentences | |
selected_indices = [0] # Start with first sentence | |
for _ in range(min(max_points - 1, len(sentences) - 1)): | |
# Find sentence most different from selected ones | |
max_diff = -1 | |
max_idx = -1 | |
for i in range(len(sentences)): | |
if i not in selected_indices: | |
# Calculate average difference from selected sentences | |
diffs = [sum((embeddings[i][j] - embeddings[k][j])**2 | |
for j in range(len(embeddings[i]))) | |
for k in selected_indices] | |
avg_diff = sum(diffs) / len(diffs) | |
if avg_diff > max_diff: | |
max_diff = avg_diff | |
max_idx = i | |
if max_idx != -1: | |
selected_indices.append(max_idx) | |
return [sentences[i] for i in selected_indices] | |
except Exception as e: | |
logger.error(f"Error extracting key points: {str(e)}") | |
return [] | |
def process_content(self, content: str, soup: BeautifulSoup = None) -> Dict: | |
"""Process content and generate insights""" | |
try: | |
# Extract main content if HTML is available | |
if soup: | |
content = self.extract_main_content(soup) | |
else: | |
content = self.clean_text(content) | |
# Generate summary | |
summary = self.model_manager.models['summarizer']( | |
content[:1024], | |
max_length=150, | |
min_length=50, | |
do_sample=False | |
)[0]['summary_text'] | |
# Extract key points | |
key_points = self.extract_key_points(content) | |
return { | |
'summary': summary, | |
'content': content, | |
'key_points': key_points | |
} | |
except Exception as e: | |
return { | |
'summary': f"Error processing content: {str(e)}", | |
'content': content, | |
'key_points': [] | |
} | |
class WebSearchEngine: | |
"""Main search engine class""" | |
def __init__(self): | |
self.processor = ContentProcessor() | |
self.session = requests.Session() | |
self.request_delay = 2.0 | |
self.last_request_time = 0 | |
self.max_retries = 3 | |
self.headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
def safe_get(self, url: str, max_retries: int = 3) -> requests.Response: | |
"""Make a GET request with retries and error handling""" | |
for i in range(max_retries): | |
try: | |
# Add delay between requests | |
current_time = time.time() | |
time_since_last = current_time - self.last_request_time | |
if time_since_last < self.request_delay: | |
time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5)) | |
response = self.session.get(url, headers=self.headers, timeout=10) | |
self.last_request_time = time.time() | |
if response.status_code == 200: | |
return response | |
elif response.status_code == 429: # Rate limit | |
wait_time = (i + 1) * 5 | |
time.sleep(wait_time) | |
continue | |
else: | |
response.raise_for_status() | |
except Exception as e: | |
if i == max_retries - 1: | |
raise | |
time.sleep((i + 1) * 2) | |
raise Exception(f"Failed to fetch URL after {max_retries} attempts") | |
def is_valid_url(self, url: str) -> bool: | |
"""Check if URL is valid for crawling""" | |
try: | |
parsed = urlparse(url) | |
return bool(parsed.netloc and parsed.scheme) | |
except: | |
return False | |
def get_metadata(self, soup: BeautifulSoup) -> Dict: | |
"""Extract metadata from page""" | |
title = soup.title.string if soup.title else "No title" | |
description = "" | |
if soup.find("meta", attrs={"name": "description"}): | |
description = soup.find("meta", attrs={"name": "description"}).get("content", "") | |
return { | |
'title': title, | |
'description': description | |
} | |
def process_url(self, url: str) -> Dict: | |
"""Process a single URL""" | |
if not self.is_valid_url(url): | |
return {'error': f"Invalid URL: {url}"} | |
try: | |
response = self.safe_get(url) | |
soup = BeautifulSoup(response.text, 'lxml') | |
# Get metadata | |
metadata = self.get_metadata(soup) | |
# Process content | |
processed = self.processor.process_content("", soup=soup) | |
return { | |
'url': url, | |
'title': metadata['title'], | |
'description': metadata['description'], | |
'summary': processed['summary'], | |
'key_points': processed['key_points'], | |
'content': processed['content'] | |
} | |
except Exception as e: | |
return {'error': f"Error processing {url}: {str(e)}"} | |
def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict]: | |
"""Search DuckDuckGo and parse HTML results""" | |
search_results = [] | |
try: | |
# Encode query for URL | |
encoded_query = quote_plus(query) | |
# DuckDuckGo HTML search URL | |
search_url = f'https://html.duckduckgo.com/html/?q={encoded_query}' | |
# Get search results page | |
response = self.safe_get(search_url) | |
soup = BeautifulSoup(response.text, 'lxml') | |
# Find all result elements | |
results = soup.find_all('div', {'class': 'result'}) | |
for result in results[:max_results]: | |
try: | |
# Extract link | |
link_elem = result.find('a', {'class': 'result__a'}) | |
if not link_elem: | |
continue | |
link = link_elem.get('href', '') | |
if not link or not self.is_valid_url(link): | |
continue | |
# Extract title | |
title = link_elem.get_text(strip=True) | |
# Extract snippet | |
snippet_elem = result.find('a', {'class': 'result__snippet'}) | |
snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" | |
search_results.append({ | |
'link': link, | |
'title': title, | |
'snippet': snippet | |
}) | |
# Add delay between processing results | |
time.sleep(random.uniform(0.2, 0.5)) | |
except Exception as e: | |
logger.warning(f"Error processing search result: {str(e)}") | |
continue | |
return search_results | |
except Exception as e: | |
logger.error(f"Error during DuckDuckGo search: {str(e)}") | |
return [] | |
def search(self, query: str, max_results: int = 5) -> Dict: | |
"""Perform search and process results""" | |
try: | |
# Search using DuckDuckGo HTML | |
search_results = self.search_duckduckgo(query, max_results) | |
if not search_results: | |
return {'error': 'No results found'} | |
results = [] | |
all_key_points = [] | |
for result in search_results: | |
if 'link' in result: | |
processed = self.process_url(result['link']) | |
if 'error' not in processed: | |
results.append(processed) | |
if 'key_points' in processed: | |
all_key_points.extend(processed['key_points']) | |
time.sleep(random.uniform(0.5, 1.0)) | |
if not results: | |
return {'error': 'Failed to process any search results'} | |
# Combine insights from all results | |
combined_summary = " ".join([r['summary'] for r in results if 'summary' in r]) | |
# Generate overall insights | |
insights = self.processor.model_manager.models['summarizer']( | |
combined_summary, | |
max_length=200, | |
min_length=100, | |
do_sample=False | |
)[0]['summary_text'] | |
return { | |
'results': results, | |
'insights': insights, | |
'key_points': all_key_points[:10], # Top 10 key points | |
'follow_up_questions': [ | |
f"What are the recent breakthroughs in {query}?", | |
f"How does {query} impact various industries?", | |
f"What are the future prospects of {query}?" | |
] | |
} | |
except Exception as e: | |
return {'error': f"Search failed: {str(e)}"} | |
# Main search function | |
def search(query: str, max_results: int = 5) -> Dict: | |
"""Main search function""" | |
engine = WebSearchEngine() | |
return engine.search(query, max_results) | |