from fastapi import APIRouter from datetime import datetime from datasets import load_dataset from sklearn.metrics import accuracy_score from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.linear_model import LogisticRegression from sklearn.pipeline import Pipeline import numpy as np from .utils.evaluation import TextEvaluationRequest from .utils.emissions import tracker, clean_emissions_data, get_space_info router = APIRouter() DESCRIPTION = "Climate Disinformation Detection - TF-IDF + LogReg" ROUTE = "/text" def create_pipeline(): """Create an efficient text classification pipeline""" return Pipeline([ ('tfidf', TfidfVectorizer( max_features=10000, # Limit features for efficiency ngram_range=(1, 2), # Use unigrams and bigrams stop_words='english', min_df=2, # Remove very rare terms max_df=0.95 # Remove very common terms )), ('classifier', LogisticRegression( C=1.0, multi_class='multinomial', max_iter=200, n_jobs=-1 # Use all CPU cores )) ]) @router.post(ROUTE, tags=["Text Task"], description=DESCRIPTION) async def evaluate_text(request: TextEvaluationRequest): """ Evaluate text classification for climate disinformation detection. """ # Get space info username, space_url = get_space_info() # Define the label mapping LABEL_MAPPING = { "0_not_relevant": 0, "1_not_happening": 1, "2_not_human": 2, "3_not_bad": 3, "4_solutions_harmful_unnecessary": 4, "5_science_unreliable": 5, "6_proponents_biased": 6, "7_fossil_fuels_needed": 7 } # Start tracking emissions tracker.start() tracker.start_task("inference") try: # Load and prepare the dataset dataset = load_dataset(request.dataset_name) # Convert string labels to integers dataset = dataset.map(lambda x: {"label": LABEL_MAPPING[x["label"]]}) # Split dataset train_test = dataset["train"].train_test_split( test_size=request.test_size, seed=request.test_seed ) train_dataset = train_test["train"] test_dataset = train_test["test"] # Create and train pipeline pipeline = create_pipeline() # Train the model pipeline.fit( train_dataset["quote"], train_dataset["label"] ) # Make predictions predictions = pipeline.predict(test_dataset["quote"]) # Get true labels true_labels = test_dataset["label"] # Stop tracking emissions emissions_data = tracker.stop_task() # Calculate accuracy accuracy = accuracy_score(true_labels, predictions) # Prepare results dictionary results = { "username": username, "space_url": space_url, "submission_timestamp": datetime.now().isoformat(), "model_description": DESCRIPTION, "accuracy": float(accuracy), "energy_consumed_wh": emissions_data.energy_consumed * 1000, "emissions_gco2eq": emissions_data.emissions * 1000, "emissions_data": clean_emissions_data(emissions_data), "api_route": ROUTE, "dataset_config": { "dataset_name": request.dataset_name, "test_size": request.test_size, "test_seed": request.test_seed } } return results except Exception as e: # Stop tracking in case of error tracker.stop_task() raise e