from typing import Dict, List, Any import requests from bs4 import BeautifulSoup from transformers import pipeline from langchain_community.embeddings import HuggingFaceEmbeddings import time import json import os from urllib.parse import urlparse, quote_plus import logging import random logger = logging.getLogger(__name__) class SearchResult: def __init__(self, title: str, link: str, snippet: str): self.title = title self.link = link self.snippet = snippet class ModelManager: """Manages different AI models for specific tasks""" def __init__(self): self.device = "cpu" self.models = {} self.load_models() def load_models(self): # Use smaller models for CPU deployment self.models['summarizer'] = pipeline( "summarization", model="facebook/bart-base", device=self.device ) self.models['embeddings'] = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={"device": self.device} ) class ContentProcessor: """Processes and analyzes different types of content""" def __init__(self): self.model_manager = ModelManager() def clean_text(self, text: str) -> str: """Clean and normalize text content""" # Remove extra whitespace text = ' '.join(text.split()) # Remove common navigation elements nav_elements = [ "skip to content", "search", "menu", "navigation", "subscribe", "sign in", "log in", "submit", "browse", ] for element in nav_elements: text = text.replace(element.lower(), "") return text.strip() def extract_main_content(self, soup: BeautifulSoup) -> str: """Extract main content from HTML, prioritizing article content""" content = "" # Try to find main content containers priority_tags = [ ('article', {}), ('div', {'class': ['article', 'post', 'content', 'main']}), ('div', {'id': ['article', 'post', 'content', 'main']}), ('main', {}), ] for tag, attrs in priority_tags: elements = soup.find_all(tag, attrs) if elements: content = " ".join(elem.get_text(strip=True) for elem in elements) if content: break # If no main content found, try extracting paragraphs if not content: paragraphs = soup.find_all('p') content = " ".join(p.get_text(strip=True) for p in paragraphs if len(p.get_text(strip=True)) > 100) return self.clean_text(content) def extract_key_points(self, text: str, max_points: int = 5) -> List[str]: """Extract key points from text using sentence transformers""" try: # Split into sentences sentences = [s.strip() for s in text.split('.') if len(s.strip()) > 20] if not sentences: return [] # Get embeddings for sentences embeddings = self.model_manager.models['embeddings'].embed_documents(sentences) # Use simple clustering to find diverse sentences selected_indices = [0] # Start with first sentence for _ in range(min(max_points - 1, len(sentences) - 1)): # Find sentence most different from selected ones max_diff = -1 max_idx = -1 for i in range(len(sentences)): if i not in selected_indices: # Calculate average difference from selected sentences diffs = [sum((embeddings[i][j] - embeddings[k][j])**2 for j in range(len(embeddings[i]))) for k in selected_indices] avg_diff = sum(diffs) / len(diffs) if avg_diff > max_diff: max_diff = avg_diff max_idx = i if max_idx != -1: selected_indices.append(max_idx) return [sentences[i] for i in selected_indices] except Exception as e: logger.error(f"Error extracting key points: {str(e)}") return [] def process_content(self, content: str, soup: BeautifulSoup = None) -> Dict: """Process content and generate insights""" try: # Extract main content if HTML is available if soup: content = self.extract_main_content(soup) else: content = self.clean_text(content) # Generate summary summary = self.model_manager.models['summarizer']( content[:1024], max_length=150, min_length=50, do_sample=False )[0]['summary_text'] # Extract key points key_points = self.extract_key_points(content) return { 'summary': summary, 'content': content, 'key_points': key_points } except Exception as e: return { 'summary': f"Error processing content: {str(e)}", 'content': content, 'key_points': [] } class WebSearchEngine: """Main search engine class""" def __init__(self): self.processor = ContentProcessor() self.session = requests.Session() self.request_delay = 2.0 self.last_request_time = 0 self.max_retries = 3 self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } def safe_get(self, url: str, max_retries: int = 3) -> requests.Response: """Make a GET request with retries and error handling""" for i in range(max_retries): try: # Add delay between requests current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.request_delay: time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5)) response = self.session.get(url, headers=self.headers, timeout=10) self.last_request_time = time.time() if response.status_code == 200: return response elif response.status_code == 429: # Rate limit wait_time = (i + 1) * 5 time.sleep(wait_time) continue else: response.raise_for_status() except Exception as e: if i == max_retries - 1: raise time.sleep((i + 1) * 2) raise Exception(f"Failed to fetch URL after {max_retries} attempts") def is_valid_url(self, url: str) -> bool: """Check if URL is valid for crawling""" try: parsed = urlparse(url) return bool(parsed.netloc and parsed.scheme) except: return False def get_metadata(self, soup: BeautifulSoup) -> Dict: """Extract metadata from page""" title = soup.title.string if soup.title else "No title" description = "" if soup.find("meta", attrs={"name": "description"}): description = soup.find("meta", attrs={"name": "description"}).get("content", "") return { 'title': title, 'description': description } def process_url(self, url: str) -> Dict: """Process a single URL""" if not self.is_valid_url(url): return {'error': f"Invalid URL: {url}"} try: response = self.safe_get(url) soup = BeautifulSoup(response.text, 'lxml') # Get metadata metadata = self.get_metadata(soup) # Process content processed = self.processor.process_content("", soup=soup) return { 'url': url, 'title': metadata['title'], 'description': metadata['description'], 'summary': processed['summary'], 'key_points': processed['key_points'], 'content': processed['content'] } except Exception as e: return {'error': f"Error processing {url}: {str(e)}"} def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict]: """Search DuckDuckGo and parse HTML results""" search_results = [] try: # Encode query for URL encoded_query = quote_plus(query) # DuckDuckGo HTML search URL search_url = f'https://html.duckduckgo.com/html/?q={encoded_query}' # Get search results page response = self.safe_get(search_url) soup = BeautifulSoup(response.text, 'lxml') # Find all result elements results = soup.find_all('div', {'class': 'result'}) for result in results[:max_results]: try: # Extract link link_elem = result.find('a', {'class': 'result__a'}) if not link_elem: continue link = link_elem.get('href', '') if not link or not self.is_valid_url(link): continue # Extract title title = link_elem.get_text(strip=True) # Extract snippet snippet_elem = result.find('a', {'class': 'result__snippet'}) snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" search_results.append({ 'link': link, 'title': title, 'snippet': snippet }) # Add delay between processing results time.sleep(random.uniform(0.2, 0.5)) except Exception as e: logger.warning(f"Error processing search result: {str(e)}") continue return search_results except Exception as e: logger.error(f"Error during DuckDuckGo search: {str(e)}") return [] def search(self, query: str, max_results: int = 5) -> Dict: """Perform search and process results""" try: # Search using DuckDuckGo HTML search_results = self.search_duckduckgo(query, max_results) if not search_results: return {'error': 'No results found'} results = [] all_key_points = [] for result in search_results: if 'link' in result: processed = self.process_url(result['link']) if 'error' not in processed: results.append(processed) if 'key_points' in processed: all_key_points.extend(processed['key_points']) time.sleep(random.uniform(0.5, 1.0)) if not results: return {'error': 'Failed to process any search results'} # Combine insights from all results combined_summary = " ".join([r['summary'] for r in results if 'summary' in r]) # Generate overall insights insights = self.processor.model_manager.models['summarizer']( combined_summary, max_length=200, min_length=100, do_sample=False )[0]['summary_text'] return { 'results': results, 'insights': insights, 'key_points': all_key_points[:10], # Top 10 key points 'follow_up_questions': [ f"What are the recent breakthroughs in {query}?", f"How does {query} impact various industries?", f"What are the future prospects of {query}?" ] } except Exception as e: return {'error': f"Search failed: {str(e)}"} # Main search function def search(query: str, max_results: int = 5) -> Dict: """Main search function""" engine = WebSearchEngine() return engine.search(query, max_results)