media-trust-analyser / media_trust.py
ModelForge's picture
article count changed
a7a3f24 verified
import requests
import pandas as pd
import gradio as gr
import datetime
import nltk
from datetime import datetime, timedelta
from nltk.sentiment.vader import SentimentIntensityAnalyzer
try:
nltk.data.find('sentiment/vader_lexicon')
except LookupError:
nltk.download('vader_lexicon')
from transformers import pipeline
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
import os
from concurrent.futures import ThreadPoolExecutor
load_dotenv()
api_key = os.getenv("API_KEY")
if not api_key:
raise ValueError("API_KEY not found. Make sure to set it in the .env file.")
SOURCE_BIAS_MAP = {
"fox news": "right",
"breitbart": "right",
"new york post": "right",
"the wall street journal": "center-right",
"reuters": "center",
"associated press": "center",
"bloomberg": "center",
"npr": "center-left",
"cnn": "left",
"msnbc": "left",
"the new york times": "left",
"the washington post": "left",
"the guardian": "left",
"bbc news": "center",
"sky news": "center-right",
"the telegraph": "right",
"the times": "center-right",
"daily mail": "right",
"the independent": "center-left",
"the sun": "right",
"financial times": "center",
}
BIAS_SCORE_MAP = {
"left": -1,
"center-left": -0.5,
"center": 0,
"center-right": 0.5,
"right": 1,
"unknown": 0
}
def query(topic, sort_by="popularity", max_tokens=100):
if not topic:
print("Topic must be provided.")
return None
today = datetime.today()
last_week = today - timedelta(days=7)
from_date = last_week.strftime('%Y-%m-%d')
to_date = today.strftime('%Y-%m-%d')
base_url = "https://newsapi.org/v2/everything"
url = (
f"{base_url}?q={topic}&from={from_date}&to={to_date}"
f"&sortBy={sort_by}&pageSize=20&apiKey={api_key}"
)
try:
response = requests.get(url, timeout=10)
if response.status_code != 200:
print(f"API returned error: {response.status_code}")
return None
data = response.json()
if data.get("totalResults", 0) == 0:
print("No articles found for the given query and date range.")
return None
articles = data.get("articles", [])
extracted = [
{
"title": article.get("title", "N/A"),
"description": article.get("description", "N/A"),
"source_name": article.get("source", {}).get("name", "N/A"),
"url": article.get("url", "N/A"),
"publishedAt": article.get("publishedAt", "N/A"),
}
for article in articles
]
return pd.DataFrame(extracted)
except Exception as e:
print(f"An error occurred: {e}")
return None
def process_data(df):
if df is None or df.empty or not all(col in df.columns for col in ["title", "description"]):
print("Invalid or empty DataFrame passed to process_data()")
return pd.DataFrame()
df_cleaned = df.dropna(subset=["title", "description"])
df_cleaned = df_cleaned[df_cleaned["title"].str.strip() != ""]
df_cleaned = df_cleaned[df_cleaned["description"].str.strip() != ""]
df_cleaned = df_cleaned.drop_duplicates(subset=["title", "url"])
df_cleaned["text"] = df_cleaned["title"] + df_cleaned["description"].str.lower()
return df_cleaned
def analyse_sentiment(df):
analyser = SentimentIntensityAnalyzer()
def get_scores(text):
scores = analyser.polarity_scores(text)
return scores['compound'], scores['neg'], scores['neu'], scores['pos']
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(get_scores, df['text']))
df[['compound', 'neg', 'neu', 'pos']] = results
def label_sentiment(score):
if score >= 0.05:
return "positive"
elif score <= -0.05:
return "negative"
else:
return "neutral"
df['sentiment_label'] = df['compound'].apply(label_sentiment)
return df
def get_bias_label(source_name):
source = source_name.strip().lower()
return SOURCE_BIAS_MAP.get(source, "unknown")
def add_bias_annotation(df):
bias_series = pd.Series(SOURCE_BIAS_MAP)
df['bias_label'] = df['source_name'].str.strip().str.lower().map(bias_series).fillna("unknown")
return df
def set_article_extremity(df, top_n=5):
def get_bias_extremity(label):
return BIAS_SCORE_MAP.get(label, 0)
df['bias_score'] = df['bias_label'].apply(get_bias_extremity)
df['extremity_score'] = df['compound'].abs() + df['bias_score'].abs()
df['extremity_pct'] = (df['extremity_score'] / 2) * 100
df['extremity_pct'] = df['extremity_pct'].round(1)
df = df.sort_values(by='extremity_score', ascending=False)
df['extreme'] = False
df.loc[df.index[:top_n], 'extreme'] = True
return df
def summarise_text(row, max_tokens=512):
try:
text = row['text'] if 'text' in row and pd.notna(row['text']) else ''
source_name = row['source_name'] if 'source_name' in row and pd.notna(row['source_name']) else 'unknown'
input_length = len(text.split())
max_length = min(input_length - 10, max_tokens)
min_length = max(10, max_length - 10)
summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False)
summary_text = summary[0]['summary_text']
bias_label = get_bias_label(source_name)
return pd.Series({'summary': summary_text, 'bias_score': bias_label, 'source': source_name})
except Exception as e:
print(f"Error summarising row: {e}")
return pd.Series({'summary': 'Summary unavailable', 'bias_score': 'unknown', 'source': 'unknown'})
def add_article_summaries(df, max_tokens=512):
with ThreadPoolExecutor(max_workers=4) as executor:
summaries = list(executor.map(lambda row: summarise_text(row, max_tokens), df.to_dict('records')))
summary_df = pd.DataFrame(summaries)
df[['summary', 'bias_score', 'source']] = summary_df
return df
def main():
raw_df = query("Tesla")
if raw_df is None or raw_df.empty:
print("No data found!")
return
processed_df = process_data(raw_df)
analyser = SentimentIntensityAnalyzer()
sentiment_df = analyse_sentiment(processed_df, analyser)
bias_df = add_bias_annotation(sentiment_df)
extremity_df = set_article_extremity(bias_df)
final_df = add_article_summaries(extremity_df)
print(final_df.head())
if __name__ == "__main__":
main()