Spaces:
Sleeping
Sleeping
File size: 5,456 Bytes
6a1e686 8524cf7 5d27647 2f3df87 8524cf7 da9a9de d45f589 2f3df87 5d27647 2f3df87 d45f589 2f3df87 5d27647 2f3df87 8524cf7 2f3df87 c4ad33b 8524cf7 c4ad33b 8524cf7 c4ad33b 8524cf7 6a1e686 8524cf7 d45f589 8524cf7 d45f589 8524cf7 d45f589 b0cd906 8524cf7 d45f589 8524cf7 d45f589 8524cf7 d45f589 8524cf7 da9a9de c4ad33b da9a9de c4ad33b da9a9de |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import numpy as np
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import MultiLabelBinarizer
import zipfile
import json
import pandas as pd
import torch
from datasets import Dataset
from torch.utils.data import DataLoader
import requests
from .config import TAG_NAMES, DEVICE, SPACE_URL, EVAL_LIMIT
from .globals import global_model, global_tokenizer
def load_data(test_data_path):
# zip file handler
zip_file = zipfile.ZipFile(test_data_path)
# list available files in the container
names = zip_file.namelist()
data = []
features = ["prob_desc_description","prob_desc_input_spec","prob_desc_output_spec"]
cols = features + ["tags"]
# extract a specific file from the zip container
for name in names[1:1+EVAL_LIMIT]:
f = zip_file.open(name)
# save the extraced file
content = f.read()
d = json.loads(content)
# json_fmt = json.dumps(d, indent=2)
# print(json_fmt)
row = []
for c in cols:
row.append(d[c])
data.append(row)
df = pd.DataFrame(data, columns=cols)
return df
def preprocessing(df):
mlb = MultiLabelBinarizer()
tags_to_encode = ['math', 'graphs', 'strings', 'number theory', 'trees', 'geometry', 'games', 'probabilities']
# Filter tags and one-hot encode
df['tags_filtered'] = [[tag for tag in tags if tag in tags_to_encode] for tags in df["tags"]]
df.loc[df['tags_filtered'].apply(len) == 0, 'tags_filtered'] = df.loc[df['tags_filtered'].apply(len) == 0, 'tags_filtered'].apply(lambda x: ['other'])
encoded_tags = mlb.fit_transform(df['tags_filtered'])
# Create a new DataFrame with one-hot encoded columns
encoded_df = pd.DataFrame(encoded_tags, columns=mlb.classes_)
# Concatenate the encoded tags with the original DataFrame
df = pd.concat([df, encoded_df], axis=1)
texts = df["prob_desc_description"].values.tolist()
labels = df[TAG_NAMES].values.tolist()
# data:
# texts = ["text1", "text2", ...] # list of texts
# labels = [[0,1,0,0,1,0,1,1,0], [0,1,1,0,0,0,0,0,0],, ...] # list of labels
df = pd.DataFrame({'text':texts, 'labels': labels})
return df
def evaluate_batch(file_path, hf_repo, backend="local", hf_token=None):
if backend == "local":
return _evaluate_local(file_path, hf_repo)
elif backend == "hf":
return _evaluate_hf_api(file_path, hf_token)
else:
raise ValueError(f"Unknown backend: {backend}")
def _evaluate_local(test_data_path, hf_repo):
global global_model, global_tokenizer
# Lazy-loading to avoid slow startup
if global_model is None:
from .model import QwenClassifier
from transformers import AutoTokenizer
global_model = QwenClassifier.from_pretrained(hf_repo).eval()
global_tokenizer = AutoTokenizer.from_pretrained(hf_repo)
df = load_data(test_data_path)
df = preprocessing(df)
hf_dataset = Dataset.from_pandas(df)
# Then apply tokenization
def tokenize_function(examples):
return global_tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512)
dataset = hf_dataset.map(tokenize_function, batched=True)
dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)
global_model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
for batch in dataloader:
batch = {k: v.to(DEVICE) for k, v in batch.items()}
labels = batch["labels"].type(torch.float32)
logits = global_model(batch["input_ids"], batch["attention_mask"])
preds = torch.sigmoid(logits).cpu().numpy() > 0.5
labels = labels.cpu().numpy()
all_preds.extend(preds)
all_labels.extend(labels)
val_acc = accuracy_score(all_labels, all_preds)
val_prec = precision_score(all_labels, all_preds, average='macro', zero_division=0)
val_rec = recall_score(all_labels, all_preds, average='macro')
val_f1 = f1_score(all_labels, all_preds, average='macro')
val_prec_per_class = precision_score(all_labels, all_preds, average=None, zero_division=0)
val_rec_per_class = recall_score(all_labels, all_preds, average=None)
val_f1_per_class = f1_score(all_labels, all_preds, average=None)
metrics = {
val_acc,
val_prec,
val_rec,
val_f1,
val_prec_per_class,
val_rec_per_class,
val_f1_per_class
}
report = classification_report(all_labels, all_preds, target_names=TAG_NAMES, zero_division=0)
return metrics, report
def _evaluate_hf_api(file_path, hf_token=None):
try:
response = requests.post(
f"{SPACE_URL}/evaluate",
json={"file_path": file_path}, # This matches the Pydantic model
headers={
"Authorization": f"Bearer {hf_token}",
"Content-Type": "application/json"
} if hf_token else {"Content-Type": "application/json"},
timeout=10
)
response.raise_for_status() # Raise HTTP errors
return response.json()
except requests.exceptions.RequestException as e:
raise ValueError(f"API Error: {str(e)}\nResponse: {e.response.text if hasattr(e, 'response') else ''}") |