Spaces:
Build error
Build error
from typing import Dict, List, Any | |
import requests | |
from bs4 import BeautifulSoup | |
from transformers import pipeline | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import time | |
import json | |
import os | |
from urllib.parse import urlparse, quote_plus | |
import logging | |
import random | |
logger = logging.getLogger(__name__) | |
class SearchResult: | |
def __init__(self, title: str, link: str, snippet: str): | |
self.title = title | |
self.link = link | |
self.snippet = snippet | |
class ModelManager: | |
"""Manages different AI models for specific tasks""" | |
def __init__(self): | |
self.device = "cpu" | |
self.models = {} | |
self.load_models() | |
def load_models(self): | |
# Use smaller models for CPU deployment | |
self.models['summarizer'] = pipeline( | |
"summarization", | |
model="facebook/bart-base", | |
device=self.device | |
) | |
self.models['embeddings'] = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-MiniLM-L6-v2", | |
model_kwargs={"device": self.device} | |
) | |
class ContentProcessor: | |
"""Processes and analyzes different types of content""" | |
def __init__(self): | |
self.model_manager = ModelManager() | |
def clean_text(self, text: str) -> str: | |
"""Clean and normalize text content""" | |
# Remove extra whitespace | |
text = ' '.join(text.split()) | |
# Remove common navigation elements | |
nav_elements = [ | |
"skip to content", | |
"skip to navigation", | |
"search", | |
"menu", | |
"subscribe", | |
"sign in", | |
"log in", | |
"submit", | |
"browse", | |
"explore", | |
] | |
for element in nav_elements: | |
text = text.replace(element.lower(), "") | |
return text.strip() | |
def extract_main_content(self, soup: BeautifulSoup) -> str: | |
"""Extract main content from HTML soup""" | |
# Remove navigation, headers, footers, and sidebars | |
for elem in soup.find_all(['nav', 'header', 'footer', 'aside']): | |
elem.decompose() | |
# Remove script and style elements | |
for elem in soup.find_all(['script', 'style']): | |
elem.decompose() | |
# Try to find main content area | |
main_content = None | |
content_tags = ['article', 'main', '[role="main"]', '#content', '.content', '.post-content'] | |
for tag in content_tags: | |
main_content = soup.select_one(tag) | |
if main_content: | |
break | |
# If no main content found, use body | |
if not main_content: | |
main_content = soup.find('body') | |
if main_content: | |
text = main_content.get_text(separator=' ', strip=True) | |
else: | |
text = soup.get_text(separator=' ', strip=True) | |
return self.clean_text(text) | |
def extract_key_points(self, text: str, max_points: int = 5) -> List[str]: | |
"""Extract key points from text using AI""" | |
try: | |
# Split text into chunks for processing | |
chunks = [text[i:i + 1024] for i in range(0, len(text), 1024)] | |
all_points = [] | |
for chunk in chunks[:3]: # Process first 3 chunks to keep it manageable | |
summary = self.model_manager.models['summarizer']( | |
chunk, | |
max_length=100, | |
min_length=30, | |
do_sample=False | |
)[0]['summary_text'] | |
# Split summary into sentences | |
points = [s.strip() for s in summary.split('.') if s.strip()] | |
all_points.extend(points) | |
# Return top points | |
return all_points[:max_points] | |
except Exception as e: | |
logger.error(f"Error extracting key points: {str(e)}") | |
return [] | |
def process_content(self, content: str, soup: BeautifulSoup = None) -> Dict: | |
"""Process content and generate insights""" | |
try: | |
# Extract main content if soup is provided | |
if soup: | |
content = self.extract_main_content(soup) | |
else: | |
content = self.clean_text(content) | |
# Extract key points | |
key_points = self.extract_key_points(content) | |
# Generate overall summary | |
summary = self.model_manager.models['summarizer']( | |
content[:1024], | |
max_length=150, | |
min_length=50, | |
do_sample=False | |
)[0]['summary_text'] | |
return { | |
'summary': summary, | |
'key_points': key_points, | |
'content': content | |
} | |
except Exception as e: | |
logger.error(f"Error processing content: {str(e)}") | |
return { | |
'summary': f"Error processing content: {str(e)}", | |
'key_points': [], | |
'content': content | |
} | |
class WebSearchEngine: | |
"""Main search engine class""" | |
def __init__(self): | |
self.processor = ContentProcessor() | |
self.session = requests.Session() | |
self.request_delay = 2.0 | |
self.last_request_time = 0 | |
self.max_retries = 3 | |
self.headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
def safe_get(self, url: str, max_retries: int = 3) -> requests.Response: | |
"""Make a GET request with retries and error handling""" | |
for i in range(max_retries): | |
try: | |
# Add delay between requests | |
current_time = time.time() | |
time_since_last = current_time - self.last_request_time | |
if time_since_last < self.request_delay: | |
time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5)) | |
response = self.session.get(url, headers=self.headers, timeout=10) | |
self.last_request_time = time.time() | |
if response.status_code == 200: | |
return response | |
elif response.status_code == 429: # Rate limit | |
wait_time = (i + 1) * 5 | |
time.sleep(wait_time) | |
continue | |
else: | |
response.raise_for_status() | |
except Exception as e: | |
if i == max_retries - 1: | |
raise | |
time.sleep((i + 1) * 2) | |
raise Exception(f"Failed to fetch URL after {max_retries} attempts") | |
def is_valid_url(self, url: str) -> bool: | |
"""Check if URL is valid for crawling""" | |
try: | |
parsed = urlparse(url) | |
return bool(parsed.netloc and parsed.scheme) | |
except: | |
return False | |
def get_metadata(self, soup: BeautifulSoup) -> Dict: | |
"""Extract metadata from page""" | |
title = soup.title.string if soup.title else "No title" | |
description = "" | |
if soup.find("meta", attrs={"name": "description"}): | |
description = soup.find("meta", attrs={"name": "description"}).get("content", "") | |
return { | |
'title': title, | |
'description': description | |
} | |
def process_url(self, url: str) -> Dict: | |
"""Process a single URL""" | |
if not self.is_valid_url(url): | |
return {'error': f"Invalid URL: {url}"} | |
try: | |
response = self.safe_get(url) | |
soup = BeautifulSoup(response.text, 'lxml') | |
# Process content with BeautifulSoup object | |
processed = self.processor.process_content("", soup) | |
# Get metadata | |
metadata = self.get_metadata(soup) | |
return { | |
'url': url, | |
'title': metadata['title'], | |
'description': metadata['description'], | |
'summary': processed['summary'], | |
'key_points': processed['key_points'], | |
'content': processed['content'] | |
} | |
except Exception as e: | |
return {'error': f"Error processing {url}: {str(e)}"} | |
def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict]: | |
"""Search DuckDuckGo and parse HTML results""" | |
search_results = [] | |
try: | |
# Encode query for URL | |
encoded_query = quote_plus(query) | |
# DuckDuckGo HTML search URL | |
search_url = f'https://html.duckduckgo.com/html/?q={encoded_query}' | |
# Get search results page | |
response = self.safe_get(search_url) | |
soup = BeautifulSoup(response.text, 'lxml') | |
# Find all result elements | |
results = soup.find_all('div', {'class': 'result'}) | |
for result in results[:max_results]: | |
try: | |
# Extract link | |
link_elem = result.find('a', {'class': 'result__a'}) | |
if not link_elem: | |
continue | |
link = link_elem.get('href', '') | |
if not link or not self.is_valid_url(link): | |
continue | |
# Extract title | |
title = link_elem.get_text(strip=True) | |
# Extract snippet | |
snippet_elem = result.find('a', {'class': 'result__snippet'}) | |
snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" | |
search_results.append({ | |
'link': link, | |
'title': title, | |
'snippet': snippet | |
}) | |
# Add delay between processing results | |
time.sleep(random.uniform(0.2, 0.5)) | |
except Exception as e: | |
logger.warning(f"Error processing search result: {str(e)}") | |
continue | |
return search_results | |
except Exception as e: | |
logger.error(f"Error during DuckDuckGo search: {str(e)}") | |
return [] | |
def search(self, query: str, max_results: int = 5) -> Dict: | |
"""Perform search and process results""" | |
try: | |
# Search using DuckDuckGo HTML | |
search_results = self.search_duckduckgo(query, max_results) | |
if not search_results: | |
return {'error': 'No results found'} | |
results = [] | |
all_key_points = [] | |
for result in search_results: | |
if 'link' in result: | |
processed = self.process_url(result['link']) | |
if 'error' not in processed: | |
results.append(processed) | |
if 'key_points' in processed: | |
all_key_points.extend(processed['key_points']) | |
time.sleep(random.uniform(0.5, 1.0)) | |
if not results: | |
return {'error': 'Failed to process any search results'} | |
# Combine all summaries and key points | |
all_summaries = [r['summary'] for r in results if 'summary' in r] | |
combined_summary = " ".join(all_summaries) | |
# Format insights | |
insights = { | |
'main_summary': combined_summary[:500], | |
'key_findings': list(set(all_key_points))[:7], # Remove duplicates and limit to top 7 | |
'sources': [{'title': r['title'], 'url': r['url']} for r in results] | |
} | |
return { | |
'results': results, | |
'insights': insights, | |
'follow_up_questions': [ | |
f"What are the practical applications of {query}?", | |
f"How has {query} evolved over the past year?", | |
f"What challenges remain in {query}?" | |
] | |
} | |
except Exception as e: | |
return {'error': f"Search failed: {str(e)}"} | |
# Main search function | |
def search(query: str, max_results: int = 5) -> Dict: | |
"""Main search function""" | |
engine = WebSearchEngine() | |
return engine.search(query, max_results) | |