""" Advanced RAG-based search engine with multi-source intelligence. """ from typing import List, Dict, Any, Optional import asyncio from langchain.chains import RetrievalQAWithSourcesChain from langchain.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.docstore.document import Document from duckduckgo_search import DDGS from googlesearch import search as gsearch import requests from bs4 import BeautifulSoup from tenacity import retry, stop_after_attempt, wait_exponential import json import time from datetime import datetime, timedelta import hashlib from urllib.parse import urlparse import re class SearchEngine: def __init__(self): self.embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-mpnet-base-v2" ) self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50 ) self.cache = {} self.cache_ttl = timedelta(hours=24) self.search_delay = 2 # seconds between searches self.last_search_time = datetime.min def _get_cache_key(self, query: str, **kwargs) -> str: """Generate cache key from query and kwargs.""" cache_data = { "query": query, **kwargs } return hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest() def _get_cached_result(self, cache_key: str) -> Optional[Dict[str, Any]]: """Get result from cache if valid.""" if cache_key in self.cache: result, timestamp = self.cache[cache_key] if datetime.now() - timestamp < self.cache_ttl: return result del self.cache[cache_key] return None def _set_cached_result(self, cache_key: str, result: Dict[str, Any]): """Store result in cache.""" self.cache[cache_key] = (result, datetime.now()) @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def search_web(self, query: str, max_results: int = 10) -> List[Dict[str, str]]: """Perform web search using multiple search engines.""" results = [] # Respect rate limiting time_since_last = datetime.now() - self.last_search_time if time_since_last.total_seconds() < self.search_delay: await asyncio.sleep(self.search_delay - time_since_last.total_seconds()) # DuckDuckGo Search try: with DDGS() as ddgs: ddg_results = [r for r in ddgs.text(query, max_results=max_results)] results.extend(ddg_results) except Exception as e: print(f"DuckDuckGo search error: {e}") # Google Search try: google_results = gsearch(query, num_results=max_results) results.extend([{"link": url, "title": url} for url in google_results]) except Exception as e: print(f"Google search error: {e}") self.last_search_time = datetime.now() return results[:max_results] def _clean_html(self, html: str) -> str: """Clean HTML content.""" # Remove script and style elements html = re.sub(r']*>.*?', '', html, flags=re.DOTALL) html = re.sub(r']*>.*?', '', html, flags=re.DOTALL) # Remove comments html = re.sub(r'', '', html, flags=re.DOTALL) # Remove remaining tags html = re.sub(r'<[^>]+>', ' ', html) # Clean whitespace html = re.sub(r'\s+', ' ', html).strip() return html @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def fetch_content(self, url: str) -> Optional[str]: """Fetch and extract content from a webpage.""" try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() # Extract main content soup = BeautifulSoup(response.text, "html.parser") # Remove unwanted elements for element in soup(["script", "style", "nav", "footer", "header", "aside"]): element.decompose() # Try to find main content main_content = None # Look for article tag if soup.find("article"): main_content = soup.find("article") # Look for main tag elif soup.find("main"): main_content = soup.find("main") # Look for div with common content class names elif soup.find("div", class_=re.compile(r"content|article|post|entry")): main_content = soup.find("div", class_=re.compile(r"content|article|post|entry")) # Use body if no main content found if not main_content: main_content = soup.body # Extract text if main_content: text = self._clean_html(str(main_content)) else: text = self._clean_html(response.text) return text except Exception as e: print(f"Error fetching {url}: {e}") return None def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: """Extract metadata from webpage.""" metadata = { "url": url, "domain": urlparse(url).netloc, "title": None, "description": None, "published_date": None, "author": None, "keywords": None } # Extract title if soup.title: metadata["title"] = soup.title.string # Extract meta tags for meta in soup.find_all("meta"): name = meta.get("name", "").lower() property = meta.get("property", "").lower() content = meta.get("content") if name == "description" or property == "og:description": metadata["description"] = content elif name == "author": metadata["author"] = content elif name == "keywords": metadata["keywords"] = content elif name in ["published_time", "article:published_time"]: metadata["published_date"] = content return metadata async def process_search_results(self, query: str) -> Dict[str, Any]: """Process search results and create a RAG-based answer.""" cache_key = self._get_cache_key(query) cached_result = self._get_cached_result(cache_key) if cached_result: return cached_result # Perform web search search_results = await self.search_web(query) # Fetch content from search results documents = [] metadata_list = [] for result in search_results: url = result.get("link") if not url: continue content = await self.fetch_content(url) if content: # Split content into chunks chunks = self.text_splitter.split_text(content) # Store metadata metadata = { "source": url, "title": result.get("title", url), **result } metadata_list.append(metadata) # Create documents for chunk in chunks: doc = Document( page_content=chunk, metadata=metadata ) documents.append(doc) if not documents: return { "answer": "I couldn't find any relevant information.", "sources": [], "metadata": [] } # Create vector store vectorstore = FAISS.from_documents(documents, self.embeddings) # Create retrieval chain chain = RetrievalQAWithSourcesChain.from_chain_type( llm=None, # We'll implement custom answer synthesis retriever=vectorstore.as_retriever() ) # Get relevant documents relevant_docs = chain.retriever.get_relevant_documents(query) # Extract unique sources and content sources = [] content = [] used_metadata = [] for doc in relevant_docs[:5]: # Limit to top 5 most relevant docs source = doc.metadata["source"] if source not in sources: sources.append(source) content.append(doc.page_content) # Find corresponding metadata for meta in metadata_list: if meta["source"] == source: used_metadata.append(meta) break result = { "answer": "\n\n".join(content), "sources": sources, "metadata": used_metadata } # Cache the result self._set_cached_result(cache_key, result) return result async def search(self, query: str) -> Dict[str, Any]: """Main search interface.""" try: return await self.process_search_results(query) except Exception as e: return { "answer": f"An error occurred: {str(e)}", "sources": [], "metadata": [] }