"""Utility functions for news extraction, sentiment analysis, and text-to-speech.""" import requests from bs4 import BeautifulSoup from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification from gtts import gTTS import os from typing import List, Dict, Any import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from config import * import re from datetime import datetime, timedelta import time import json from googletrans import Translator, LANGUAGES import statistics def analyze_company_data(company_name: str) -> Dict[str, Any]: """Analyze company news and generate insights.""" try: # Initialize components news_extractor = NewsExtractor() sentiment_analyzer = SentimentAnalyzer() text_summarizer = TextSummarizer() comparative_analyzer = ComparativeAnalyzer() # Get news articles articles = news_extractor.search_news(company_name) if not articles: return { "articles": [], "comparative_sentiment_score": {}, "final_sentiment_analysis": "No articles found for analysis.", "audio_path": None } # Process each article processed_articles = [] sentiment_scores = {} for article in articles: # Generate summary summary = text_summarizer.summarize(article['content']) article['summary'] = summary # Analyze overall sentiment sentiment = sentiment_analyzer.analyze(article['content']) article['sentiment'] = sentiment # Analyze fine-grained sentiment try: fine_grained_results = sentiment_analyzer._get_fine_grained_sentiment(article['content']) article['fine_grained_sentiment'] = fine_grained_results # Add sentiment indices sentiment_indices = sentiment_analyzer._calculate_sentiment_indices(fine_grained_results) article['sentiment_indices'] = sentiment_indices # Add entities and sentiment targets entities = sentiment_analyzer._extract_entities(article['content']) article['entities'] = entities sentiment_targets = sentiment_analyzer._extract_sentiment_targets(article['content'], entities) article['sentiment_targets'] = sentiment_targets except Exception as e: print(f"Error in fine-grained sentiment analysis: {str(e)}") # Track sentiment by source source = article['source'] if source not in sentiment_scores: sentiment_scores[source] = [] sentiment_scores[source].append(sentiment) processed_articles.append(article) # Calculate overall sentiment overall_sentiment = sentiment_analyzer.get_overall_sentiment(processed_articles) # Ensure consistent array lengths in sentiment_scores max_length = max(len(scores) for scores in sentiment_scores.values()) for source in sentiment_scores: # Pad shorter arrays with 'neutral' to match the longest array sentiment_scores[source].extend(['neutral'] * (max_length - len(sentiment_scores[source]))) # Get comparative analysis comparative_analysis = comparative_analyzer.analyze_coverage(processed_articles, company_name) # Combine all results result = { "articles": processed_articles, "comparative_sentiment_score": { "sentiment_distribution": comparative_analysis.get("sentiment_distribution", {}), "sentiment_indices": comparative_analysis.get("sentiment_indices", {}), "source_distribution": comparative_analysis.get("source_distribution", {}), "common_topics": comparative_analysis.get("common_topics", []), "coverage_differences": comparative_analysis.get("coverage_differences", []), "total_articles": len(processed_articles) }, "final_sentiment_analysis": overall_sentiment, "ensemble_info": sentiment_analyzer._get_ensemble_sentiment("\n".join([a['content'] for a in processed_articles])), "audio_path": None } return result except Exception as e: print(f"Error analyzing company data: {str(e)}") return { "articles": [], "comparative_sentiment_score": {}, "final_sentiment_analysis": f"Error during analysis: {str(e)}", "audio_path": None } # Initialize translator with retry mechanism def get_translator(): max_retries = 3 for attempt in range(max_retries): try: translator = Translator() # Test the translator translator.translate('test', dest='en') return translator except Exception as e: if attempt == max_retries - 1: print(f"Failed to initialize translator after {max_retries} attempts: {str(e)}") return None time.sleep(1) # Wait before retrying return None class NewsExtractor: def __init__(self): self.headers = HEADERS self.start_time = None self.timeout = 30 # 30 seconds timeout def search_news(self, company_name: str) -> List[Dict[str, str]]: """Extract news articles about the company ensuring minimum count.""" self.start_time = time.time() all_articles = [] retries = 2 # Number of retries if we don't get enough articles min_articles = MIN_ARTICLES # Start with default minimum while retries > 0 and len(all_articles) < min_articles: # Check for timeout if time.time() - self.start_time > self.timeout: print(f"\nTimeout reached after {self.timeout} seconds. Proceeding with available articles.") break for source, url_template in NEWS_SOURCES.items(): try: url = url_template.format(company_name.replace(" ", "+")) print(f"\nSearching {source} for news about {company_name}...") # Try different page numbers for more articles for page in range(2): # Try first two pages # Check for timeout again if time.time() - self.start_time > self.timeout: break page_url = url if page > 0: if source == "google": page_url += f"&start={page * 10}" elif source == "bing": page_url += f"&first={page * 10 + 1}" elif source == "yahoo": page_url += f"&b={page * 10 + 1}" elif source == "reuters": page_url += f"&page={page + 1}" elif source == "marketwatch": page_url += f"&page={page + 1}" elif source == "investing": page_url += f"&page={page + 1}" elif source == "techcrunch": page_url += f"/page/{page + 1}" elif source == "zdnet": page_url += f"&page={page + 1}" response = requests.get(page_url, headers=self.headers, timeout=15) if response.status_code != 200: print(f"Error: {source} page {page+1} returned status code {response.status_code}") continue soup = BeautifulSoup(response.content, 'html.parser') source_articles = [] if source == "google": source_articles = self._parse_google_news(soup) elif source == "bing": source_articles = self._parse_bing_news(soup) elif source == "yahoo": source_articles = self._parse_yahoo_news(soup) elif source == "reuters": source_articles = self._parse_reuters_news(soup) elif source == "marketwatch": source_articles = self._parse_marketwatch_news(soup) elif source == "investing": source_articles = self._parse_investing_news(soup) elif source == "techcrunch": source_articles = self._parse_techcrunch_news(soup) elif source == "zdnet": source_articles = self._parse_zdnet_news(soup) # Limit articles per source if source_articles: source_articles = source_articles[:MAX_ARTICLES_PER_SOURCE] all_articles.extend(source_articles) print(f"Found {len(source_articles)} articles from {source} page {page+1}") # If we have enough articles, break the page loop if len(all_articles) >= min_articles: break except Exception as e: print(f"Error fetching from {source}: {str(e)}") continue # If we have enough articles, break the source loop if len(all_articles) >= min_articles: break retries -= 1 if len(all_articles) < min_articles and retries > 0: print(f"\nFound only {len(all_articles)} articles, retrying...") # Lower the minimum requirement if we're close if len(all_articles) >= 15: # If we have at least 15 articles min_articles = len(all_articles) print(f"Adjusting minimum requirement to {min_articles} articles") # Remove duplicates unique_articles = self._remove_duplicates(all_articles) print(f"\nFound {len(unique_articles)} unique articles") if len(unique_articles) < MIN_ARTICLES: print(f"Warning: Could only find {len(unique_articles)} unique articles, fewer than minimum {MIN_ARTICLES}") print("Proceeding with available articles...") # Balance articles across sources balanced_articles = self._balance_sources(unique_articles) return balanced_articles[:max(len(unique_articles), MAX_ARTICLES)] def _balance_sources(self, articles: List[Dict[str, str]]) -> List[Dict[str, str]]: """Balance articles across sources while maintaining minimum count.""" source_articles = {} # Group articles by source for article in articles: source = article['source'] if source not in source_articles: source_articles[source] = [] source_articles[source].append(article) # Calculate target articles per source total_sources = len(source_articles) target_per_source = max(MIN_ARTICLES // total_sources, MAX_ARTICLES_PER_SOURCE) # Get articles from each source balanced = [] for source, articles_list in source_articles.items(): balanced.extend(articles_list[:target_per_source]) # If we still need more articles to meet minimum, add more from sources # that have additional articles if len(balanced) < MIN_ARTICLES: remaining = [] for articles_list in source_articles.values(): remaining.extend(articles_list[target_per_source:]) # Sort remaining by source to maintain balance remaining.sort(key=lambda x: len([a for a in balanced if a['source'] == x['source']])) while len(balanced) < MIN_ARTICLES and remaining: balanced.append(remaining.pop(0)) return balanced def _parse_google_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: """Parse Google News search results.""" articles = [] for div in soup.find_all(['div', 'article'], class_=['g', 'xuvV6b', 'WlydOe']): try: title_elem = div.find(['h3', 'h4']) snippet_elem = div.find('div', class_=['VwiC3b', 'yy6M1d']) link_elem = div.find('a') source_elem = div.find(['div', 'span'], class_='UPmit') if title_elem and snippet_elem and link_elem: source = source_elem.get_text(strip=True) if source_elem else 'Google News' articles.append({ 'title': title_elem.get_text(strip=True), 'content': snippet_elem.get_text(strip=True), 'url': link_elem['href'], 'source': source }) except Exception as e: print(f"Error parsing Google article: {str(e)}") continue return articles def _parse_bing_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: """Parse Bing News search results.""" articles = [] for article in soup.find_all(['div', 'article'], class_=['news-card', 'newsitem', 'item-info']): try: title_elem = article.find(['a', 'h3'], class_=['title', 'news-card-title']) snippet_elem = article.find(['div', 'p'], class_=['snippet', 'description']) source_elem = article.find(['div', 'span'], class_=['source', 'provider']) if title_elem and snippet_elem: source = source_elem.get_text(strip=True) if source_elem else 'Bing News' url = title_elem['href'] if 'href' in title_elem.attrs else '' articles.append({ 'title': title_elem.get_text(strip=True), 'content': snippet_elem.get_text(strip=True), 'url': url, 'source': source }) except Exception as e: print(f"Error parsing Bing article: {str(e)}") return articles def _parse_yahoo_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: """Parse Yahoo News search results.""" articles = [] for article in soup.find_all('div', class_='NewsArticle'): try: title_elem = article.find(['h4', 'h3', 'a']) snippet_elem = article.find('p') source_elem = article.find(['span', 'div'], class_=['provider', 'source']) if title_elem and snippet_elem: source = source_elem.get_text(strip=True) if source_elem else 'Yahoo News' url = title_elem.find('a')['href'] if title_elem.find('a') else '' articles.append({ 'title': title_elem.get_text(strip=True), 'content': snippet_elem.get_text(strip=True), 'url': url, 'source': source }) except Exception as e: print(f"Error parsing Yahoo article: {str(e)}") return articles def _parse_reuters_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: """Parse Reuters search results.""" articles = [] for article in soup.find_all(['div', 'article'], class_=['search-result-content', 'story']): try: title_elem = article.find(['h3', 'a'], class_='story-title') snippet_elem = article.find(['p', 'div'], class_=['story-description', 'description']) if title_elem: url = title_elem.find('a')['href'] if title_elem.find('a') else '' if url and not url.startswith('http'): url = 'https://www.reuters.com' + url articles.append({ 'title': title_elem.get_text(strip=True), 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', 'url': url, 'source': 'Reuters' }) except Exception as e: print(f"Error parsing Reuters article: {str(e)}") return articles def _parse_marketwatch_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: """Parse MarketWatch search results.""" articles = [] for article in soup.find_all(['div', 'article'], class_=['element--article', 'article__content']): try: title_elem = article.find(['h3', 'h2'], class_=['article__headline', 'title']) snippet_elem = article.find('p', class_=['article__summary', 'description']) if title_elem: url = title_elem.find('a')['href'] if title_elem.find('a') else '' articles.append({ 'title': title_elem.get_text(strip=True), 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', 'url': url, 'source': 'MarketWatch' }) except Exception as e: print(f"Error parsing MarketWatch article: {str(e)}") return articles def _parse_investing_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: """Parse Investing.com search results.""" articles = [] for article in soup.find_all(['div', 'article'], class_=['articleItem', 'news-item']): try: title_elem = article.find(['a', 'h3'], class_=['title', 'articleTitle']) snippet_elem = article.find(['p', 'div'], class_=['description', 'articleContent']) if title_elem: url = title_elem['href'] if 'href' in title_elem.attrs else title_elem.find('a')['href'] if url and not url.startswith('http'): url = 'https://www.investing.com' + url articles.append({ 'title': title_elem.get_text(strip=True), 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', 'url': url, 'source': 'Investing.com' }) except Exception as e: print(f"Error parsing Investing.com article: {str(e)}") return articles def _parse_techcrunch_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: """Parse TechCrunch search results.""" articles = [] for article in soup.find_all(['div', 'article'], class_=['post-block', 'article-block']): try: title_elem = article.find(['h2', 'h3', 'a'], class_=['post-block__title', 'article-title']) snippet_elem = article.find(['div', 'p'], class_=['post-block__content', 'article-content']) if title_elem: url = title_elem.find('a')['href'] if title_elem.find('a') else '' articles.append({ 'title': title_elem.get_text(strip=True), 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', 'url': url, 'source': 'TechCrunch' }) except Exception as e: print(f"Error parsing TechCrunch article: {str(e)}") return articles def _parse_zdnet_news(self, soup: BeautifulSoup) -> List[Dict[str, str]]: """Parse ZDNet search results.""" articles = [] for article in soup.find_all(['div', 'article'], class_=['item', 'article']): try: title_elem = article.find(['h3', 'a'], class_=['title', 'headline']) snippet_elem = article.find(['p', 'div'], class_=['summary', 'content']) if title_elem: url = title_elem.find('a')['href'] if title_elem.find('a') else '' if url and not url.startswith('http'): url = 'https://www.zdnet.com' + url articles.append({ 'title': title_elem.get_text(strip=True), 'content': snippet_elem.get_text(strip=True) if snippet_elem else '', 'url': url, 'source': 'ZDNet' }) except Exception as e: print(f"Error parsing ZDNet article: {str(e)}") return articles def _remove_duplicates(self, articles: List[Dict[str, str]]) -> List[Dict[str, str]]: """Remove duplicate articles based on title similarity.""" unique_articles = [] seen_titles = set() for article in articles: title = article['title'].lower() if not any(title in seen_title or seen_title in title for seen_title in seen_titles): unique_articles.append(article) seen_titles.add(title) return unique_articles class SentimentAnalyzer: def __init__(self): try: # Primary financial sentiment model self.sentiment_pipeline = pipeline("sentiment-analysis", model=SENTIMENT_MODEL) # Initialize fine-grained sentiment models self.fine_grained_models = {} try: # Initialize the default fine-grained model for backward compatibility self.fine_grained_sentiment = pipeline("sentiment-analysis", model=SENTIMENT_FINE_GRAINED_MODEL) # Initialize additional fine-grained models for model_name, model_path in FINE_GRAINED_MODELS.items(): try: print(f"Loading fine-grained model: {model_name}") self.fine_grained_models[model_name] = pipeline("sentiment-analysis", model=model_path) except Exception as e: print(f"Error loading fine-grained model {model_name}: {str(e)}") except Exception as e: print(f"Error initializing fine-grained models: {str(e)}") self.fine_grained_sentiment = None # Initialize additional sentiment analyzers if available self.has_textblob = False self.has_vader = False try: from textblob import TextBlob self.TextBlob = TextBlob self.has_textblob = True except: print("TextBlob not available. Install with: pip install textblob") try: from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer self.vader = SentimentIntensityAnalyzer() self.has_vader = True except: print("VADER not available. Install with: pip install vaderSentiment") self.summarizer = pipeline("summarization", model=SUMMARIZATION_MODEL) self.vectorizer = TfidfVectorizer(stop_words='english', max_features=10) # Initialize NER pipeline if spaCy is available try: import spacy self.nlp = spacy.load("en_core_web_sm") self.has_ner = True except: self.has_ner = False print("spaCy not available for NER. Install with: pip install spacy && python -m spacy download en_core_web_sm") except Exception as e: print(f"Error initializing sentiment models: {str(e)}") # Fallback to default models if specific models fail self.sentiment_pipeline = pipeline("sentiment-analysis") self.fine_grained_sentiment = None self.fine_grained_models = {} self.summarizer = pipeline("summarization") self.vectorizer = TfidfVectorizer(stop_words='english', max_features=10) self.has_ner = False self.has_textblob = False self.has_vader = False def analyze(self, text: str) -> str: """Analyze sentiment of text and return sentiment label.""" try: # Get ensemble sentiment analysis sentiment_analysis = self._get_ensemble_sentiment(text) return sentiment_analysis['ensemble_sentiment'] except Exception as e: print(f"Error in sentiment analysis: {str(e)}") return 'neutral' # Default to neutral on error def get_overall_sentiment(self, articles: List[Dict[str, Any]]) -> str: """Get overall sentiment from a list of articles.""" try: # Combine all article texts combined_text = ' '.join([ f"{article.get('title', '')} {article.get('content', '')}" for article in articles ]) # Get ensemble sentiment analysis sentiment_analysis = self._get_ensemble_sentiment(combined_text) return sentiment_analysis['ensemble_sentiment'] except Exception as e: print(f"Error getting overall sentiment: {str(e)}") return 'neutral' # Default to neutral on error def analyze_article(self, article: Dict[str, str]) -> Dict[str, Any]: """Analyze sentiment and generate summary for an article.""" try: # Get the full text by combining title and content full_text = f"{article['title']} {article['content']}" # Generate summary summary = self.summarize_text(full_text) # Get ensemble sentiment analysis sentiment_analysis = self._get_ensemble_sentiment(full_text) sentiment_label = sentiment_analysis['ensemble_sentiment'] sentiment_score = sentiment_analysis['ensemble_score'] # Add fine-grained sentiment analysis fine_grained_sentiment = self._get_fine_grained_sentiment(full_text) # Extract key topics topics = self.extract_topics(full_text) # Extract named entities entities = self._extract_entities(full_text) # Extract sentiment targets (entities associated with sentiment) sentiment_targets = self._extract_sentiment_targets(full_text, entities) # Add analysis to article analyzed_article = article.copy() analyzed_article.update({ 'summary': summary, 'sentiment': sentiment_label, 'sentiment_score': sentiment_score, 'sentiment_details': sentiment_analysis, 'fine_grained_sentiment': fine_grained_sentiment, 'topics': topics, 'entities': entities, 'sentiment_targets': sentiment_targets, 'sentiment_indices': fine_grained_sentiment.get('indices', {}), 'analysis_timestamp': datetime.now().isoformat() }) return analyzed_article except Exception as e: print(f"Error analyzing article: {str(e)}") # Return original article with default values if analysis fails article.update({ 'summary': article.get('content', '')[:200] + '...', 'sentiment': 'neutral', 'sentiment_score': 0.0, 'sentiment_details': {}, 'fine_grained_sentiment': {}, 'topics': [], 'entities': {}, 'sentiment_targets': [], 'sentiment_indices': { 'positivity_index': 0.5, 'negativity_index': 0.5, 'emotional_intensity': 0.0, 'controversy_score': 0.0, 'confidence_score': 0.0, 'esg_relevance': 0.0 }, 'analysis_timestamp': datetime.now().isoformat() }) return article def _get_ensemble_sentiment(self, text: str) -> Dict[str, Any]: """Get ensemble sentiment by combining multiple sentiment models.""" results = {} # Initialize with default values ensemble_result = { 'ensemble_sentiment': 'neutral', 'ensemble_score': 0.5, 'models': {} } try: # 1. Primary transformer model (finbert) try: primary_result = self.sentiment_pipeline(text[:512])[0] # Limit text length primary_label = primary_result['label'].lower() primary_score = primary_result['score'] # Map to standard format if primary_label == 'positive': primary_normalized = primary_score elif primary_label == 'negative': primary_normalized = 1 - primary_score else: # neutral primary_normalized = 0.5 ensemble_result['models']['transformer'] = { 'sentiment': primary_label, 'score': round(primary_score, 3), 'normalized_score': round(primary_normalized, 3) } except: ensemble_result['models']['transformer'] = { 'sentiment': 'error', 'score': 0, 'normalized_score': 0.5 } # 2. TextBlob sentiment if self.has_textblob: try: blob = self.TextBlob(text) polarity = blob.sentiment.polarity # Convert to standard format if polarity > 0.1: textblob_sentiment = 'positive' textblob_score = polarity elif polarity < -0.1: textblob_sentiment = 'negative' textblob_score = abs(polarity) else: textblob_sentiment = 'neutral' textblob_score = 0.5 # Normalize to 0-1 scale textblob_normalized = (polarity + 1) / 2 ensemble_result['models']['textblob'] = { 'sentiment': textblob_sentiment, 'score': round(textblob_score, 3), 'normalized_score': round(textblob_normalized, 3) } except: ensemble_result['models']['textblob'] = { 'sentiment': 'error', 'score': 0, 'normalized_score': 0.5 } # 3. VADER sentiment if self.has_vader: try: vader_scores = self.vader.polarity_scores(text) compound = vader_scores['compound'] # Convert to standard format if compound > 0.05: vader_sentiment = 'positive' vader_score = compound elif compound < -0.05: vader_sentiment = 'negative' vader_score = abs(compound) else: vader_sentiment = 'neutral' vader_score = 0.5 # Normalize to 0-1 scale vader_normalized = (compound + 1) / 2 ensemble_result['models']['vader'] = { 'sentiment': vader_sentiment, 'score': round(vader_score, 3), 'normalized_score': round(vader_normalized, 3) } except: ensemble_result['models']['vader'] = { 'sentiment': 'error', 'score': 0, 'normalized_score': 0.5 } # Calculate ensemble result # Get all normalized scores normalized_scores = [] for model_name, model_result in ensemble_result['models'].items(): if model_result['sentiment'] != 'error': normalized_scores.append(model_result['normalized_score']) # Calculate average if we have scores if normalized_scores: avg_score = sum(normalized_scores) / len(normalized_scores) # Convert to sentiment label if avg_score > 0.6: ensemble_sentiment = 'positive' elif avg_score < 0.4: ensemble_sentiment = 'negative' else: ensemble_sentiment = 'neutral' ensemble_result['ensemble_sentiment'] = ensemble_sentiment ensemble_result['ensemble_score'] = round(avg_score, 3) # Add confidence level if len(normalized_scores) > 1: # Calculate standard deviation to measure agreement std_dev = statistics.stdev(normalized_scores) if len(normalized_scores) > 1 else 0 agreement = 1 - (std_dev * 2) # Lower std_dev means higher agreement agreement = max(0, min(1, agreement)) # Clamp to 0-1 ensemble_result['model_agreement'] = round(agreement, 3) return ensemble_result except Exception as e: print(f"Error in ensemble sentiment analysis: {str(e)}") return { 'ensemble_sentiment': 'neutral', 'ensemble_score': 0.5, 'models': {} } def _get_fine_grained_sentiment(self, text: str) -> Dict[str, Any]: """Get fine-grained sentiment analysis with more detailed categories.""" # Initialize result structure result = { "primary": {"category": "unknown", "confidence": 0.0}, "models": {} } # Check if we have any fine-grained models if not self.fine_grained_sentiment and not self.fine_grained_models: return result try: # Split text into manageable chunks if too long chunks = self._split_text(text) # Process with default fine-grained model for backward compatibility if self.fine_grained_sentiment: primary_results = [] for chunk in chunks: if not chunk.strip(): continue chunk_result = self.fine_grained_sentiment(chunk)[0] primary_results.append(chunk_result) if primary_results: # Aggregate results from all chunks categories = {} for res in primary_results: label = res['label'].lower() score = res['score'] if label in categories: categories[label] += score else: categories[label] = score # Normalize scores total = sum(categories.values()) if total > 0: categories = {k: round(v/total, 3) for k, v in categories.items()} # Get dominant category dominant_category = max(categories.items(), key=lambda x: x[1]) result["primary"] = { "category": dominant_category[0], "confidence": dominant_category[1], "distribution": categories } # Process with additional fine-grained models for model_name, model in self.fine_grained_models.items(): model_results = [] for chunk in chunks: if not chunk.strip(): continue try: chunk_result = model(chunk)[0] model_results.append(chunk_result) except Exception as e: print(f"Error analyzing chunk with model {model_name}: {str(e)}") if model_results: # Aggregate results from all chunks categories = {} for res in model_results: # Ensure the label is lowercase for consistency label = res['label'].lower() if isinstance(res.get('label'), str) else "unknown" score = res['score'] if label in categories: categories[label] += score else: categories[label] = score # Normalize scores total = sum(categories.values()) if total > 0: categories = {k: round(v/total, 3) for k, v in categories.items()} # Get dominant category dominant_category = max(categories.items(), key=lambda x: x[1]) # Store results for this model result["models"][model_name] = { "category": dominant_category[0], "confidence": dominant_category[1], "distribution": categories } # Calculate sentiment indices based on the fine-grained results result["indices"] = self._calculate_sentiment_indices(result) return result except Exception as e: print(f"Error in fine-grained sentiment analysis: {str(e)}") return result def _calculate_sentiment_indices(self, fine_grained_results: Dict[str, Any]) -> Dict[str, float]: """Calculate various sentiment indices based on fine-grained sentiment analysis.""" indices = { "positivity_index": 0.5, # Default neutral value "negativity_index": 0.5, "emotional_intensity": 0.0, "controversy_score": 0.0, "confidence_score": 0.0, "esg_relevance": 0.0 } try: # Extract distributions from all models distributions = {} confidence_scores = {} # Add primary model if available if "category" in fine_grained_results.get("primary", {}): if "distribution" in fine_grained_results["primary"]: distributions["primary"] = fine_grained_results["primary"]["distribution"] confidence_scores["primary"] = fine_grained_results["primary"].get("confidence", 0.0) # Add other models for model_name, model_result in fine_grained_results.get("models", {}).items(): if "distribution" in model_result: distributions[model_name] = model_result["distribution"] confidence_scores[model_name] = model_result.get("confidence", 0.0) # Calculate positivity index positive_scores = [] for model_name, dist in distributions.items(): if model_name == "financial" or model_name == "primary" or model_name == "news_tone" or model_name == "aspect": pos_score = dist.get("positive", 0.0) positive_scores.append(pos_score) elif model_name == "emotion": # For emotion model, consider joy as positive pos_score = dist.get("joy", 0.0) + dist.get("surprise", 0.0) * 0.5 positive_scores.append(pos_score) if positive_scores: indices["positivity_index"] = round(sum(positive_scores) / len(positive_scores), 3) # Calculate negativity index negative_scores = [] for model_name, dist in distributions.items(): if model_name == "financial" or model_name == "primary" or model_name == "news_tone" or model_name == "aspect": neg_score = dist.get("negative", 0.0) negative_scores.append(neg_score) elif model_name == "emotion": # For emotion model, consider sadness, anger, fear, disgust as negative neg_score = dist.get("sadness", 0.0) + dist.get("anger", 0.0) + \ dist.get("fear", 0.0) + dist.get("disgust", 0.0) negative_scores.append(neg_score / 4) # Average of 4 negative emotions if negative_scores: indices["negativity_index"] = round(sum(negative_scores) / len(negative_scores), 3) # Calculate emotional intensity emotion_dist = distributions.get("emotion", {}) if emotion_dist: # Sum all emotional intensities except neutral emotional_sum = sum(v for k, v in emotion_dist.items() if k != "neutral") indices["emotional_intensity"] = round(emotional_sum, 3) # Calculate controversy score (high when both positive and negative are high) indices["controversy_score"] = round(indices["positivity_index"] * indices["negativity_index"] * 4, 3) # Calculate confidence score (average of all model confidences) if confidence_scores: indices["confidence_score"] = round(sum(confidence_scores.values()) / len(confidence_scores), 3) # Calculate ESG relevance if available esg_dist = distributions.get("esg", {}) if esg_dist: # Sum of all ESG categories esg_sum = sum(v for k, v in esg_dist.items() if k in ["environmental", "social", "governance"]) indices["esg_relevance"] = round(esg_sum, 3) return indices except Exception as e: print(f"Error calculating sentiment indices: {str(e)}") return indices def summarize_text(self, text: str) -> str: """Generate a concise summary of the text.""" try: # Clean and prepare text text = text.replace('\n', ' ').strip() # For very short texts, return as is if len(text.split()) < 30: return text # Split text into chunks if it's too long chunks = self._split_text(text) summaries = [] for chunk in chunks: # Calculate appropriate max_length based on input length input_words = len(chunk.split()) max_length = min(130, max(30, input_words // 2)) min_length = min(30, max(10, input_words // 4)) # Generate summary for each chunk summary = self.summarizer(chunk, max_length=max_length, min_length=min_length, do_sample=False)[0]['summary_text'] summaries.append(summary) # Combine summaries if there were multiple chunks final_summary = ' '.join(summaries) return final_summary except Exception as e: print(f"Error generating summary: {str(e)}") return text[:200] + '...' # Return truncated text as fallback def extract_topics(self, text: str) -> List[str]: """Extract key topics from the text using TF-IDF.""" try: # Prepare text text = text.lower() # Fit and transform the text tfidf_matrix = self.vectorizer.fit_transform([text]) # Get feature names and scores feature_names = self.vectorizer.get_feature_names_out() scores = tfidf_matrix.toarray()[0] # Get top topics top_indices = scores.argsort()[-5:][::-1] # Get top 5 topics topics = [feature_names[i] for i in top_indices] return topics except Exception as e: print(f"Error extracting topics: {str(e)}") return [] def _split_text(self, text: str, max_length: int = 1024) -> List[str]: """Split text into chunks that fit within model's maximum token limit.""" words = text.split() chunks = [] current_chunk = [] current_length = 0 for word in words: word_length = len(word) + 1 # +1 for space if current_length + word_length > max_length: chunks.append(' '.join(current_chunk)) current_chunk = [word] current_length = word_length else: current_chunk.append(word) current_length += word_length if current_chunk: chunks.append(' '.join(current_chunk)) return chunks def _extract_entities(self, text: str) -> Dict[str, List[str]]: """Extract named entities from text.""" entities = { 'PERSON': [], 'ORG': [], 'GPE': [], # Countries, cities, states 'MONEY': [], 'PERCENT': [], 'DATE': [] } if not self.has_ner: return entities try: # Process text with spaCy doc = self.nlp(text[:10000]) # Limit text length for performance # Extract entities for ent in doc.ents: if ent.label_ in entities: # Clean entity text and deduplicate clean_text = ent.text.strip() if clean_text and clean_text not in entities[ent.label_]: entities[ent.label_].append(clean_text) return entities except Exception as e: print(f"Error extracting entities: {str(e)}") return entities def _extract_sentiment_targets(self, text: str, entities: Dict[str, List[str]]) -> List[Dict[str, Any]]: """Extract entities that are targets of sentiment expressions.""" if not self.has_ner: return [] try: # Get all entities as a flat list all_entities = [] for entity_type, entity_list in entities.items(): for entity in entity_list: all_entities.append({ 'text': entity, 'type': entity_type }) # Find sentiment targets targets = [] # Split text into sentences doc = self.nlp(text[:10000]) # Limit text length for sentence in doc.sents: # Skip short sentences if len(sentence.text.split()) < 3: continue # Check for sentiment in this sentence try: sentiment = self.sentiment_pipeline(sentence.text)[0] # Only process if sentiment is strong if sentiment['score'] > 0.7: # Find entities in this sentence for entity in all_entities: if entity['text'] in sentence.text: targets.append({ 'entity': entity['text'], 'type': entity['type'], 'sentiment': sentiment['label'].lower(), 'confidence': round(sentiment['score'], 3), 'context': sentence.text }) except: continue # Return unique targets unique_targets = [] seen = set() for target in targets: key = f"{target['entity']}_{target['sentiment']}" if key not in seen: seen.add(key) unique_targets.append(target) return unique_targets except Exception as e: print(f"Error extracting sentiment targets: {str(e)}") return [] class TextSummarizer: def __init__(self): try: # Initialize the summarization pipeline self.summarizer = pipeline("summarization", model=SUMMARIZATION_MODEL) except Exception as e: print(f"Error initializing TextSummarizer: {str(e)}") # Fallback to default model if specific model fails self.summarizer = pipeline("summarization") def summarize(self, text: str) -> str: """Generate a concise summary of the text.""" try: # Clean and prepare text text = text.replace('\n', ' ').strip() # For very short texts, return as is if len(text.split()) < 30: return text # Split text into chunks if it's too long chunks = self._split_text(text) summaries = [] for chunk in chunks: # Calculate appropriate max_length based on input length input_words = len(chunk.split()) max_length = min(130, max(30, input_words // 2)) min_length = min(30, max(10, input_words // 4)) # Generate summary for each chunk summary = self.summarizer(chunk, max_length=max_length, min_length=min_length, do_sample=False)[0]['summary_text'] summaries.append(summary) # Combine summaries if there were multiple chunks final_summary = ' '.join(summaries) return final_summary except Exception as e: print(f"Error generating summary: {str(e)}") return text[:200] + '...' # Return truncated text as fallback def _split_text(self, text: str, max_length: int = 1024) -> List[str]: """Split text into chunks that fit within model's maximum token limit.""" words = text.split() chunks = [] current_chunk = [] current_length = 0 for word in words: word_length = len(word) + 1 # +1 for space if current_length + word_length > max_length: chunks.append(' '.join(current_chunk)) current_chunk = [word] current_length = word_length else: current_chunk.append(word) current_length += word_length if current_chunk: chunks.append(' '.join(current_chunk)) return chunks class TextToSpeechConverter: def __init__(self): self.output_dir = AUDIO_OUTPUT_DIR self.translator = get_translator() os.makedirs(self.output_dir, exist_ok=True) def generate_audio(self, text: str, filename: str) -> str: """Convert text to Hindi speech and save as audio file.""" try: print(f"Translating text to Hindi: {text[:100]}...") # First translate the text to Hindi # Use chunking for long text to avoid translation limits chunks = [] for i in range(0, len(text), 1000): chunk = text[i:i+1000] try: translated_chunk = self.translator.translate(chunk, dest='hi').text chunks.append(translated_chunk) print(f"Translated chunk {i//1000 + 1}") except Exception as e: print(f"Error translating chunk {i//1000 + 1}: {str(e)}") # If translation fails, use original text chunks.append(chunk) hindi_text = ' '.join(chunks) print(f"Translation complete. Hindi text length: {len(hindi_text)}") # Generate Hindi speech print("Generating Hindi speech...") tts = gTTS(text=hindi_text, lang='hi', slow=False) output_path = os.path.join(self.output_dir, f"{filename}.mp3") tts.save(output_path) print(f"Audio saved to {output_path}") return output_path except Exception as e: print(f"Error in TTS conversion: {str(e)}") # Fallback to original text if translation fails print("Using fallback English TTS") tts = gTTS(text=text, lang='en') output_path = os.path.join(self.output_dir, f"{filename}.mp3") tts.save(output_path) return output_path class ComparativeAnalyzer: def __init__(self): pass def analyze_coverage(self, articles: List[Dict[str, Any]], company_name: str = None) -> Dict[str, Any]: """Perform comparative analysis across articles.""" if not articles: return { "topics": [], "sentiment_distribution": {}, "coverage_differences": ["No articles found for analysis."], "final_sentiment": "No articles found for analysis.", "total_articles": 0, "sentiment_indices": {} } # Debug: Print articles for analysis print(f"Analyzing {len(articles)} articles for company: {company_name}") # Add company name to each article if provided if company_name: for article in articles: article['company'] = company_name # Calculate sentiment distribution print("Calculating sentiment distribution...") sentiment_dist = self._get_sentiment_distribution(articles) print("Sentiment distribution result:") print(sentiment_dist) # Analyze common topics topics = self._analyze_topics(articles) # Analyze coverage differences differences = self._analyze_coverage_differences(articles) # Get final sentiment analysis final_sentiment = self._get_final_sentiment(sentiment_dist, articles) result = { "topics": topics, "sentiment_distribution": sentiment_dist, "coverage_differences": differences, "final_sentiment": final_sentiment, "total_articles": len(articles), "sentiment_indices": sentiment_dist.get("sentiment_indices", {}) } # Debug: Print final result print("Final comparative analysis result:") print(result) return result def _get_sentiment_distribution(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]: """Calculate distribution of sentiments across articles.""" # Basic sentiment distribution basic_distribution = {'positive': 0, 'negative': 0, 'neutral': 0} # Fine-grained sentiment distribution fine_grained_distribution = {} # Sentiment scores sentiment_scores = [] # Sentiment indices aggregation sentiment_indices = { "positivity_index": [], "negativity_index": [], "emotional_intensity": [], "controversy_score": [], "confidence_score": [], "esg_relevance": [] } # Debug: Print articles for sentiment distribution print(f"Processing {len(articles)} articles for sentiment distribution") # Process each article for i, article in enumerate(articles): try: # Debug: Print article sentiment data print(f"Article {i+1} sentiment data:") print(f" Basic sentiment: {article.get('sentiment', 'N/A')}") print(f" Fine-grained: {article.get('fine_grained_sentiment', {})}") print(f" Sentiment indices: {article.get('sentiment_indices', {})}") # Basic sentiment sentiment = article.get('sentiment', 'neutral') if isinstance(sentiment, str): sentiment = sentiment.lower() # Ensure we have a valid sentiment category if sentiment not in basic_distribution: sentiment = 'neutral' basic_distribution[sentiment] = basic_distribution.get(sentiment, 0) + 1 else: # Handle non-string sentiment values basic_distribution['neutral'] = basic_distribution.get('neutral', 0) + 1 # Sentiment score score = article.get('sentiment_score', 0.0) if isinstance(score, (int, float)): sentiment_scores.append(score) # Fine-grained sentiment fine_grained = article.get('fine_grained_sentiment', {}) if isinstance(fine_grained, dict) and 'category' in fine_grained: category = fine_grained['category'] if isinstance(category, str): category = category.lower() fine_grained_distribution[category] = fine_grained_distribution.get(category, 0) + 1 # Collect sentiment indices indices = article.get('sentiment_indices', {}) if isinstance(indices, dict): for index_name, index_values in sentiment_indices.items(): if index_name in indices and isinstance(indices[index_name], (int, float)): index_values.append(indices[index_name]) except Exception as e: print(f"Error processing article {i+1} for sentiment distribution: {str(e)}") # Continue with next article continue # Debug: Print collected data print("Collected sentiment data:") print(f" Basic distribution: {basic_distribution}") print(f" Fine-grained distribution: {fine_grained_distribution}") print(f" Sentiment scores: {sentiment_scores}") print(f" Sentiment indices collected: {sentiment_indices}") # Calculate average sentiment score with fallback avg_sentiment_score = 0.5 # Default neutral value if sentiment_scores: avg_sentiment_score = sum(sentiment_scores) / len(sentiment_scores) # Calculate sentiment volatility (standard deviation) with fallback sentiment_volatility = 0 if len(sentiment_scores) > 1: try: sentiment_volatility = statistics.stdev(sentiment_scores) except Exception as e: print(f"Error calculating sentiment volatility: {str(e)}") # Calculate average sentiment indices with fallbacks avg_indices = {} for index_name, values in sentiment_indices.items(): if values: avg_indices[index_name] = round(sum(values) / len(values), 3) else: # Provide default values for empty indices if index_name in ["positivity_index", "confidence_score"]: avg_indices[index_name] = 0.5 # Neutral default else: avg_indices[index_name] = 0.0 # Zero default for other indices # Ensure all expected indices exist for index_name in ["positivity_index", "negativity_index", "emotional_intensity", "controversy_score", "confidence_score", "esg_relevance"]: if index_name not in avg_indices: avg_indices[index_name] = 0.5 if index_name in ["positivity_index", "confidence_score"] else 0.0 # Ensure we have at least one item in each distribution if not any(basic_distribution.values()): basic_distribution['neutral'] = 1 # Ensure fine_grained_distribution has at least one entry if empty if not fine_grained_distribution: fine_grained_distribution['neutral'] = 1 result = { "basic": basic_distribution, "fine_grained": fine_grained_distribution, "avg_score": round(avg_sentiment_score, 3), "volatility": round(sentiment_volatility, 3), "sentiment_indices": avg_indices } # Debug: Print final sentiment distribution result print("Final sentiment distribution result:") print(result) return result def _analyze_topics(self, articles: List[Dict[str, Any]]) -> List[str]: """Analyze common topics across articles using TF-IDF.""" try: # Combine title and content for better topic extraction texts = [f"{article.get('title', '')} {article.get('content', '')}" for article in articles] # Create and fit TF-IDF vectorizer = TfidfVectorizer( max_features=10, stop_words='english', ngram_range=(1, 2), token_pattern=r'(?u)\b[A-Za-z][A-Za-z+\'-]*[A-Za-z]+\b' # Improved pattern ) # Clean and normalize texts cleaned_texts = [] for text in texts: # Remove numbers and special characters cleaned = re.sub(r'\d+', '', text) cleaned = re.sub(r'[^\w\s]', ' ', cleaned) cleaned_texts.append(cleaned.lower()) tfidf_matrix = vectorizer.fit_transform(cleaned_texts) feature_names = vectorizer.get_feature_names_out() # Get average TF-IDF scores for each term avg_scores = tfidf_matrix.mean(axis=0).A1 # Sort terms by score and return top meaningful terms sorted_indices = avg_scores.argsort()[-5:][::-1] meaningful_topics = [] for idx in sorted_indices: topic = feature_names[idx] # Filter out single characters and common words if len(topic) > 1 and topic not in {'000', 'com', 'said', 'says', 'year', 'new', 'one'}: meaningful_topics.append(topic) if len(meaningful_topics) >= 5: break return meaningful_topics except Exception as e: print(f"Error analyzing topics: {str(e)}") return [] def _analyze_coverage_differences(self, articles: List[Dict[str, Any]]) -> List[str]: """Analyze how coverage differs across articles.""" if not articles: return ["No articles available for comparison"] differences = [] # Compare sentiment differences sentiments = [article.get('sentiment', 'neutral').lower() for article in articles] unique_sentiments = set(sentiments) if len(unique_sentiments) > 1: pos_count = sentiments.count('positive') neg_count = sentiments.count('negative') neu_count = sentiments.count('neutral') if pos_count > 0 and neg_count > 0: differences.append(f"Coverage sentiment varies significantly: {pos_count} positive, {neg_count} negative, and {neu_count} neutral articles.") # Compare fine-grained sentiment differences fine_grained_categories = [] for article in articles: fine_grained = article.get('fine_grained_sentiment', {}) if isinstance(fine_grained, dict) and 'category' in fine_grained: category = fine_grained['category'] if isinstance(category, str): fine_grained_categories.append(category.lower()) unique_categories = set(fine_grained_categories) if len(unique_categories) > 2: # More than 2 different categories category_counts = {} for category in fine_grained_categories: category_counts[category] = category_counts.get(category, 0) + 1 top_categories = sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:3] categories_str = ", ".join([f"{cat} ({count})" for cat, count in top_categories]) differences.append(f"Articles show diverse sentiment categories: {categories_str}") # Compare sentiment indices indices_differences = [] positivity_values = [] negativity_values = [] controversy_values = [] for article in articles: indices = article.get('sentiment_indices', {}) if indices: if 'positivity_index' in indices: positivity_values.append(indices['positivity_index']) if 'negativity_index' in indices: negativity_values.append(indices['negativity_index']) if 'controversy_score' in indices: controversy_values.append(indices['controversy_score']) # Check for high variance in positivity if positivity_values and len(positivity_values) > 1: if max(positivity_values) - min(positivity_values) > 0.4: indices_differences.append("Articles show significant variation in positivity levels") # Check for high variance in negativity if negativity_values and len(negativity_values) > 1: if max(negativity_values) - min(negativity_values) > 0.4: indices_differences.append("Articles show significant variation in negativity levels") # Check for high controversy scores if controversy_values: high_controversy = [v for v in controversy_values if v > 0.5] if high_controversy: indices_differences.append(f"{len(high_controversy)} articles show high controversy scores") if indices_differences: differences.append("Sentiment index analysis: " + "; ".join(indices_differences)) # Compare source differences sources = [article.get('source', '').lower() for article in articles] source_counts = {} for source in sources: if source: source_counts[source] = source_counts.get(source, 0) + 1 if len(source_counts) > 1: top_sources = sorted(source_counts.items(), key=lambda x: x[1], reverse=True)[:3] sources_str = ", ".join([f"{source} ({count})" for source, count in top_sources]) differences.append(f"Coverage spans multiple sources: {sources_str}") # If no significant differences found if not differences: differences.append("Coverage is relatively consistent across articles") return differences def _get_final_sentiment(self, distribution: Dict[str, Any], articles: List[Dict[str, Any]]) -> str: """Generate final sentiment analysis based on distribution and article content.""" try: # Get basic sentiment counts basic_dist = distribution.get('basic', {}) positive_count = basic_dist.get('positive', 0) negative_count = basic_dist.get('negative', 0) neutral_count = basic_dist.get('neutral', 0) total_articles = positive_count + negative_count + neutral_count if total_articles == 0: return "No sentiment data available" # Calculate percentages positive_pct = (positive_count / total_articles) * 100 negative_pct = (negative_count / total_articles) * 100 neutral_pct = (neutral_count / total_articles) * 100 # Get average sentiment score avg_score = distribution.get('avg_score', 0.5) # Get volatility volatility = distribution.get('volatility', 0) # Get sentiment indices indices = distribution.get('sentiment_indices', {}) positivity_index = indices.get('positivity_index', 0.5) negativity_index = indices.get('negativity_index', 0.5) emotional_intensity = indices.get('emotional_intensity', 0) controversy_score = indices.get('controversy_score', 0) esg_relevance = indices.get('esg_relevance', 0) # Generate analysis text analysis = [] # Overall sentiment if positive_pct > 60: analysis.append(f"Overall sentiment is predominantly positive ({positive_pct:.1f}%).") elif negative_pct > 60: analysis.append(f"Overall sentiment is predominantly negative ({negative_pct:.1f}%).") elif neutral_pct > 60: analysis.append(f"Overall sentiment is predominantly neutral ({neutral_pct:.1f}%).") elif positive_pct > negative_pct and positive_pct > neutral_pct: analysis.append(f"Overall sentiment leans positive ({positive_pct:.1f}%), with some mixed coverage.") elif negative_pct > positive_pct and negative_pct > neutral_pct: analysis.append(f"Overall sentiment leans negative ({negative_pct:.1f}%), with some mixed coverage.") else: analysis.append(f"Sentiment is mixed across sources (Positive: {positive_pct:.1f}%, Negative: {negative_pct:.1f}%, Neutral: {neutral_pct:.1f}%).") # Sentiment indices insights if positivity_index > 0.7: analysis.append(f"High positivity index ({positivity_index:.2f}) indicates strong positive sentiment.") elif positivity_index < 0.3 and negativity_index > 0.7: analysis.append(f"High negativity index ({negativity_index:.2f}) with low positivity suggests strongly negative coverage.") if emotional_intensity > 0.6: analysis.append(f"Coverage shows high emotional intensity ({emotional_intensity:.2f}).") if controversy_score > 0.5: analysis.append(f"Coverage shows significant controversy ({controversy_score:.2f}), with polarized opinions.") if esg_relevance > 0.4: analysis.append(f"Coverage includes significant ESG-related content ({esg_relevance:.2f}).") # Volatility if volatility > 0.2: analysis.append(f"Sentiment varies considerably across articles (volatility: {volatility:.2f}).") else: analysis.append(f"Sentiment is relatively consistent across articles (volatility: {volatility:.2f}).") return " ".join(analysis) except Exception as e: print(f"Error generating final sentiment: {str(e)}") return "Unable to generate final sentiment analysis due to an error."