import requests import pandas as pd import gradio as gr import datetime import nltk from datetime import datetime, timedelta from nltk.sentiment.vader import SentimentIntensityAnalyzer try: nltk.data.find('sentiment/vader_lexicon') except LookupError: nltk.download('vader_lexicon') from transformers import pipeline summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6") from concurrent.futures import ThreadPoolExecutor from dotenv import load_dotenv import os from concurrent.futures import ThreadPoolExecutor load_dotenv() api_key = os.getenv("API_KEY") if not api_key: raise ValueError("API_KEY not found. Make sure to set it in the .env file.") SOURCE_BIAS_MAP = { "fox news": "right", "breitbart": "right", "new york post": "right", "the wall street journal": "center-right", "reuters": "center", "associated press": "center", "bloomberg": "center", "npr": "center-left", "cnn": "left", "msnbc": "left", "the new york times": "left", "the washington post": "left", "the guardian": "left", "bbc news": "center", "sky news": "center-right", "the telegraph": "right", "the times": "center-right", "daily mail": "right", "the independent": "center-left", "the sun": "right", "financial times": "center", } BIAS_SCORE_MAP = { "left": -1, "center-left": -0.5, "center": 0, "center-right": 0.5, "right": 1, "unknown": 0 } def query(topic, sort_by="popularity", max_tokens=100): if not topic: print("Topic must be provided.") return None today = datetime.today() last_week = today - timedelta(days=7) from_date = last_week.strftime('%Y-%m-%d') to_date = today.strftime('%Y-%m-%d') base_url = "https://newsapi.org/v2/everything" url = ( f"{base_url}?q={topic}&from={from_date}&to={to_date}" f"&sortBy={sort_by}&pageSize=20&apiKey={api_key}" ) try: response = requests.get(url, timeout=10) if response.status_code != 200: print(f"API returned error: {response.status_code}") return None data = response.json() if data.get("totalResults", 0) == 0: print("No articles found for the given query and date range.") return None articles = data.get("articles", []) extracted = [ { "title": article.get("title", "N/A"), "description": article.get("description", "N/A"), "source_name": article.get("source", {}).get("name", "N/A"), "url": article.get("url", "N/A"), "publishedAt": article.get("publishedAt", "N/A"), } for article in articles ] return pd.DataFrame(extracted) except Exception as e: print(f"An error occurred: {e}") return None def process_data(df): if df is None or df.empty or not all(col in df.columns for col in ["title", "description"]): print("Invalid or empty DataFrame passed to process_data()") return pd.DataFrame() df_cleaned = df.dropna(subset=["title", "description"]) df_cleaned = df_cleaned[df_cleaned["title"].str.strip() != ""] df_cleaned = df_cleaned[df_cleaned["description"].str.strip() != ""] df_cleaned = df_cleaned.drop_duplicates(subset=["title", "url"]) df_cleaned["text"] = df_cleaned["title"] + df_cleaned["description"].str.lower() return df_cleaned def analyse_sentiment(df): analyser = SentimentIntensityAnalyzer() def get_scores(text): scores = analyser.polarity_scores(text) return scores['compound'], scores['neg'], scores['neu'], scores['pos'] with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(get_scores, df['text'])) df[['compound', 'neg', 'neu', 'pos']] = results def label_sentiment(score): if score >= 0.05: return "positive" elif score <= -0.05: return "negative" else: return "neutral" df['sentiment_label'] = df['compound'].apply(label_sentiment) return df def get_bias_label(source_name): source = source_name.strip().lower() return SOURCE_BIAS_MAP.get(source, "unknown") def add_bias_annotation(df): bias_series = pd.Series(SOURCE_BIAS_MAP) df['bias_label'] = df['source_name'].str.strip().str.lower().map(bias_series).fillna("unknown") return df def set_article_extremity(df, top_n=5): def get_bias_extremity(label): return BIAS_SCORE_MAP.get(label, 0) df['bias_score'] = df['bias_label'].apply(get_bias_extremity) df['extremity_score'] = df['compound'].abs() + df['bias_score'].abs() df['extremity_pct'] = (df['extremity_score'] / 2) * 100 df['extremity_pct'] = df['extremity_pct'].round(1) df = df.sort_values(by='extremity_score', ascending=False) df['extreme'] = False df.loc[df.index[:top_n], 'extreme'] = True return df def summarise_text(row, max_tokens=512): try: text = row['text'] if 'text' in row and pd.notna(row['text']) else '' source_name = row['source_name'] if 'source_name' in row and pd.notna(row['source_name']) else 'unknown' input_length = len(text.split()) max_length = min(input_length - 10, max_tokens) min_length = max(10, max_length - 10) summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False) summary_text = summary[0]['summary_text'] bias_label = get_bias_label(source_name) return pd.Series({'summary': summary_text, 'bias_score': bias_label, 'source': source_name}) except Exception as e: print(f"Error summarising row: {e}") return pd.Series({'summary': 'Summary unavailable', 'bias_score': 'unknown', 'source': 'unknown'}) def add_article_summaries(df, max_tokens=512): with ThreadPoolExecutor(max_workers=4) as executor: summaries = list(executor.map(lambda row: summarise_text(row, max_tokens), df.to_dict('records'))) summary_df = pd.DataFrame(summaries) df[['summary', 'bias_score', 'source']] = summary_df return df def main(): raw_df = query("Tesla") if raw_df is None or raw_df.empty: print("No data found!") return processed_df = process_data(raw_df) analyser = SentimentIntensityAnalyzer() sentiment_df = analyse_sentiment(processed_df, analyser) bias_df = add_bias_annotation(sentiment_df) extremity_df = set_article_extremity(bias_df) final_df = add_article_summaries(extremity_df) print(final_df.head()) if __name__ == "__main__": main()