Spaces:
Sleeping
Sleeping
from fastapi import APIRouter | |
from datetime import datetime | |
from datasets import load_dataset | |
from sklearn.metrics import accuracy_score | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.pipeline import Pipeline | |
import numpy as np | |
from .utils.evaluation import TextEvaluationRequest | |
from .utils.emissions import tracker, clean_emissions_data, get_space_info | |
router = APIRouter() | |
DESCRIPTION = "Climate Disinformation Detection - TF-IDF + LogReg" | |
ROUTE = "/text" | |
def create_pipeline(): | |
"""Create an efficient text classification pipeline""" | |
return Pipeline([ | |
('tfidf', TfidfVectorizer( | |
max_features=10000, # Limit features for efficiency | |
ngram_range=(1, 2), # Use unigrams and bigrams | |
stop_words='english', | |
min_df=2, # Remove very rare terms | |
max_df=0.95 # Remove very common terms | |
)), | |
('classifier', LogisticRegression( | |
C=1.0, | |
multi_class='multinomial', | |
max_iter=200, | |
n_jobs=-1 # Use all CPU cores | |
)) | |
]) | |
async def evaluate_text(request: TextEvaluationRequest): | |
""" | |
Evaluate text classification for climate disinformation detection. | |
""" | |
# Get space info | |
username, space_url = get_space_info() | |
# Define the label mapping | |
LABEL_MAPPING = { | |
"0_not_relevant": 0, | |
"1_not_happening": 1, | |
"2_not_human": 2, | |
"3_not_bad": 3, | |
"4_solutions_harmful_unnecessary": 4, | |
"5_science_unreliable": 5, | |
"6_proponents_biased": 6, | |
"7_fossil_fuels_needed": 7 | |
} | |
# Start tracking emissions | |
tracker.start() | |
tracker.start_task("inference") | |
try: | |
# Load and prepare the dataset | |
dataset = load_dataset(request.dataset_name) | |
# Convert string labels to integers | |
dataset = dataset.map(lambda x: {"label": LABEL_MAPPING[x["label"]]}) | |
# Split dataset | |
train_test = dataset["train"].train_test_split( | |
test_size=request.test_size, | |
seed=request.test_seed | |
) | |
train_dataset = train_test["train"] | |
test_dataset = train_test["test"] | |
# Create and train pipeline | |
pipeline = create_pipeline() | |
# Train the model | |
pipeline.fit( | |
train_dataset["quote"], | |
train_dataset["label"] | |
) | |
# Make predictions | |
predictions = pipeline.predict(test_dataset["quote"]) | |
# Get true labels | |
true_labels = test_dataset["label"] | |
# Stop tracking emissions | |
emissions_data = tracker.stop_task() | |
# Calculate accuracy | |
accuracy = accuracy_score(true_labels, predictions) | |
# Prepare results dictionary | |
results = { | |
"username": username, | |
"space_url": space_url, | |
"submission_timestamp": datetime.now().isoformat(), | |
"model_description": DESCRIPTION, | |
"accuracy": float(accuracy), | |
"energy_consumed_wh": emissions_data.energy_consumed * 1000, | |
"emissions_gco2eq": emissions_data.emissions * 1000, | |
"emissions_data": clean_emissions_data(emissions_data), | |
"api_route": ROUTE, | |
"dataset_config": { | |
"dataset_name": request.dataset_name, | |
"test_size": request.test_size, | |
"test_seed": request.test_seed | |
} | |
} | |
return results | |
except Exception as e: | |
# Stop tracking in case of error | |
tracker.stop_task() | |
raise e |