File size: 7,799 Bytes
44198e0
 
 
d7b6953
44198e0
d7b6953
44198e0
 
 
 
6c83b94
 
 
 
44198e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d7b6953
44198e0
 
 
d7b6953
 
44198e0
 
 
 
 
 
 
 
6c83b94
44198e0
6c83b94
 
 
 
 
 
 
 
 
 
 
 
 
 
d7b6953
44198e0
 
 
 
d7b6953
44198e0
 
 
 
 
d7b6953
44198e0
 
 
 
d7b6953
 
44198e0
 
 
 
d7b6953
 
 
44198e0
6c83b94
44198e0
d7b6953
 
6c83b94
 
44198e0
 
 
 
d7b6953
 
 
44198e0
 
d7b6953
 
 
 
 
 
 
44198e0
d7b6953
 
44198e0
d7b6953
 
44198e0
 
 
 
d7b6953
 
 
44198e0
 
 
d7b6953
44198e0
 
 
 
6c83b94
 
 
 
 
 
 
 
edb4444
6c83b94
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44198e0
d7b6953
44198e0
d7b6953
44198e0
d7b6953
 
6c83b94
 
d7b6953
 
 
44198e0
 
d7b6953
 
 
 
 
 
 
44198e0
 
 
d7b6953
44198e0
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from typing import Dict, List, Any
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
from transformers import pipeline
from langchain_community.embeddings import HuggingFaceEmbeddings
import time
import json
import os
from urllib.parse import urlparse
import logging
import random

logger = logging.getLogger(__name__)

class ModelManager:
    """Manages different AI models for specific tasks"""
    
    def __init__(self):
        self.device = "cpu"
        self.models = {}
        self.load_models()
        
    def load_models(self):
        # Use smaller models for CPU deployment
        self.models['summarizer'] = pipeline(
            "summarization",
            model="facebook/bart-base",
            device=self.device
        )
        
        self.models['embeddings'] = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2",
            model_kwargs={"device": self.device}
        )

class ContentProcessor:
    """Processes and analyzes different types of content"""
    
    def __init__(self):
        self.model_manager = ModelManager()
    
    def process_content(self, content: str) -> Dict:
        """Process content and generate insights"""
        try:
            # Generate summary
            summary = self.model_manager.models['summarizer'](
                content[:1024],
                max_length=100,
                min_length=30,
                do_sample=False
            )[0]['summary_text']
            
            return {
                'summary': summary,
                'content': content
            }
        except Exception as e:
            return {
                'summary': f"Error processing content: {str(e)}",
                'content': content
            }

class WebSearchEngine:
    """Main search engine class"""
    
    def __init__(self):
        self.processor = ContentProcessor()
        self.session = requests.Session()
        self.request_delay = 2.0  # Increased delay between requests
        self.last_request_time = 0
        self.max_retries = 3
        self.ddgs = None
        self.initialize_search()
    
    def initialize_search(self):
        """Initialize DuckDuckGo search with retries"""
        for _ in range(self.max_retries):
            try:
                self.ddgs = DDGS()
                return
            except Exception as e:
                logger.error(f"Error initializing DDGS: {str(e)}")
                time.sleep(random.uniform(1, 3))
        raise Exception("Failed to initialize DuckDuckGo search after multiple attempts")
        
    def is_valid_url(self, url: str) -> bool:
        """Check if URL is valid for crawling"""
        try:
            parsed = urlparse(url)
            return bool(parsed.netloc and parsed.scheme)
        except:
            return False
    
    def get_metadata(self, soup: BeautifulSoup) -> Dict:
        """Extract metadata from page"""
        title = soup.title.string if soup.title else "No title"
        description = ""
        if soup.find("meta", attrs={"name": "description"}):
            description = soup.find("meta", attrs={"name": "description"}).get("content", "")
        return {
            'title': title,
            'description': description
        }
    
    def process_url(self, url: str) -> Dict:
        """Process a single URL"""
        if not self.is_valid_url(url):
            return {'error': f"Invalid URL: {url}"}
            
        try:
            # Rate limiting with random delay
            current_time = time.time()
            time_since_last = current_time - self.last_request_time
            if time_since_last < self.request_delay:
                delay = self.request_delay - time_since_last + random.uniform(0.5, 1.5)
                time.sleep(delay)
            
            response = self.session.get(url, timeout=10)
            self.last_request_time = time.time()
            
            if response.status_code != 200:
                return {'error': f"Failed to fetch URL: {url}, status code: {response.status_code}"}
                
            soup = BeautifulSoup(response.text, 'lxml')
            
            # Extract text content
            for script in soup(["script", "style"]):
                script.decompose()
            text = soup.get_text()
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            content = ' '.join(chunk for chunk in chunks if chunk)
            
            # Get metadata
            metadata = self.get_metadata(soup)
            
            # Process content
            processed = self.processor.process_content(content)
            
            return {
                'url': url,
                'title': metadata['title'],
                'description': metadata['description'],
                'summary': processed['summary'],
                'content': processed['content']
            }
            
        except Exception as e:
            return {'error': f"Error processing {url}: {str(e)}"}
    
    def search(self, query: str, max_results: int = 5) -> Dict:
        """Perform search and process results"""
        try:
            # Initialize search if needed
            if self.ddgs is None:
                self.initialize_search()
            
            # Add delay before search
            time.sleep(random.uniform(1, 2))
            
            # Search using DuckDuckGo with retries
            search_results = []
            retry_count = 0
            
            while retry_count < self.max_retries:
                try:
                    for result in self.ddgs.text(query, max_results=max_results):
                        search_results.append(result)
                        # Add small delay between results
                        time.sleep(random.uniform(0.2, 0.5))
                    break
                except Exception as e:
                    retry_count += 1
                    if retry_count >= self.max_retries:
                        return {'error': f"Search failed after {self.max_retries} attempts: {str(e)}"}
                    logger.warning(f"Search attempt {retry_count} failed: {str(e)}")
                    time.sleep(random.uniform(2, 5))
                    self.initialize_search()
            
            if not search_results:
                return {'error': 'No results found'}
            
            results = []
            for result in search_results:
                if 'link' in result:
                    processed = self.process_url(result['link'])
                    if 'error' not in processed:
                        results.append(processed)
                        # Add delay between processing URLs
                        time.sleep(random.uniform(0.5, 1.0))
                        
            # Generate insights from results
            all_content = " ".join([r['summary'] for r in results if 'summary' in r])
            
            return {
                'results': results,
                'insights': all_content[:1000] if all_content else "No insights available.",
                'follow_up_questions': [
                    f"What are the key differences between {query} and related topics?",
                    f"Can you explain {query} in simple terms?",
                    f"What are the latest developments in {query}?"
                ]
            }
            
        except Exception as e:
            return {'error': f"Search failed: {str(e)}"}

# Main search function
def search(query: str, max_results: int = 5) -> Dict:
    """Main search function"""
    engine = WebSearchEngine()
    return engine.search(query, max_results)