Zen0's picture
Update tasks/text.py
daf1822 verified
raw
history blame
3.77 kB
from fastapi import APIRouter
from datetime import datetime
from datasets import load_dataset
from sklearn.metrics import accuracy_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
import numpy as np
from .utils.evaluation import TextEvaluationRequest
from .utils.emissions import tracker, clean_emissions_data, get_space_info
router = APIRouter()
DESCRIPTION = "Climate Disinformation Detection - TF-IDF + LogReg"
ROUTE = "/text"
def create_pipeline():
"""Create an efficient text classification pipeline"""
return Pipeline([
('tfidf', TfidfVectorizer(
max_features=10000, # Limit features for efficiency
ngram_range=(1, 2), # Use unigrams and bigrams
stop_words='english',
min_df=2, # Remove very rare terms
max_df=0.95 # Remove very common terms
)),
('classifier', LogisticRegression(
C=1.0,
multi_class='multinomial',
max_iter=200,
n_jobs=-1 # Use all CPU cores
))
])
@router.post(ROUTE, tags=["Text Task"], description=DESCRIPTION)
async def evaluate_text(request: TextEvaluationRequest):
"""
Evaluate text classification for climate disinformation detection.
"""
# Get space info
username, space_url = get_space_info()
# Define the label mapping
LABEL_MAPPING = {
"0_not_relevant": 0,
"1_not_happening": 1,
"2_not_human": 2,
"3_not_bad": 3,
"4_solutions_harmful_unnecessary": 4,
"5_science_unreliable": 5,
"6_proponents_biased": 6,
"7_fossil_fuels_needed": 7
}
# Start tracking emissions
tracker.start()
tracker.start_task("inference")
try:
# Load and prepare the dataset
dataset = load_dataset(request.dataset_name)
# Convert string labels to integers
dataset = dataset.map(lambda x: {"label": LABEL_MAPPING[x["label"]]})
# Split dataset
train_test = dataset["train"].train_test_split(
test_size=request.test_size,
seed=request.test_seed
)
train_dataset = train_test["train"]
test_dataset = train_test["test"]
# Create and train pipeline
pipeline = create_pipeline()
# Train the model
pipeline.fit(
train_dataset["quote"],
train_dataset["label"]
)
# Make predictions
predictions = pipeline.predict(test_dataset["quote"])
# Get true labels
true_labels = test_dataset["label"]
# Stop tracking emissions
emissions_data = tracker.stop_task()
# Calculate accuracy
accuracy = accuracy_score(true_labels, predictions)
# Prepare results dictionary
results = {
"username": username,
"space_url": space_url,
"submission_timestamp": datetime.now().isoformat(),
"model_description": DESCRIPTION,
"accuracy": float(accuracy),
"energy_consumed_wh": emissions_data.energy_consumed * 1000,
"emissions_gco2eq": emissions_data.emissions * 1000,
"emissions_data": clean_emissions_data(emissions_data),
"api_route": ROUTE,
"dataset_config": {
"dataset_name": request.dataset_name,
"test_size": request.test_size,
"test_seed": request.test_seed
}
}
return results
except Exception as e:
# Stop tracking in case of error
tracker.stop_task()
raise e