Spaces:
Running
on
Zero
Running
on
Zero
import numpy as np | |
import pandas as pd | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.utils.data import DataLoader, TensorDataset | |
from sklearn.model_selection import StratifiedKFold | |
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score, precision_recall_fscore_support, confusion_matrix | |
from sklearn.preprocessing import StandardScaler, LabelEncoder | |
from sklearn.impute import SimpleImputer | |
import matplotlib.pyplot as plt | |
import json | |
import joblib | |
import os | |
import seaborn as sns | |
from scipy import stats | |
import time | |
import argparse | |
def setup_gpu(): | |
if torch.cuda.is_available(): | |
return True | |
else: | |
print("No GPUs found. Using CPU.") | |
return False | |
GPU_AVAILABLE = setup_gpu() | |
DEVICE = torch.device('cuda' if GPU_AVAILABLE else 'cpu') | |
def load_data_from_json(directory_path): | |
if os.path.isfile(directory_path): | |
directory = os.path.dirname(directory_path) | |
else: | |
directory = directory_path | |
print(f"Loading JSON files from directory: {directory}") | |
json_files = [os.path.join(directory, f) for f in os.listdir(directory) | |
if f.endswith('.json') and os.path.isfile(os.path.join(directory, f))] | |
if not json_files: | |
raise ValueError(f"No JSON files found in directory {directory}") | |
print(f"Found {len(json_files)} JSON files") | |
all_data = [] | |
for file_path in json_files: | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
data_dict = json.load(f) | |
if 'data' in data_dict: | |
all_data.extend(data_dict['data']) | |
else: | |
print(f"Warning: 'data' key not found in {os.path.basename(file_path)}") | |
except Exception as e: | |
print(f"Error loading {os.path.basename(file_path)}: {str(e)}") | |
if not all_data: | |
raise ValueError("Failed to load data from JSON files") | |
df = pd.DataFrame(all_data) | |
label_mapping = { | |
'ai': 'AI', | |
'human': 'Human', | |
'ai+rew': 'AI', | |
} | |
if 'source' in df.columns: | |
df['label'] = df['source'].map(lambda x: label_mapping.get(x, x)) | |
else: | |
print("Warning: 'source' column not found, using default label") | |
df['label'] = 'Unknown' | |
valid_labels = ['AI', 'Human'] | |
df = df[df['label'].isin(valid_labels)] | |
print(f"Filtered to {len(df)} examples with labels: {valid_labels}") | |
print(f"Label distribution: {df['label'].value_counts().to_dict()}") | |
return df | |
class Medium_Binary_Network(nn.Module): | |
def __init__(self, input_size, hidden_sizes=[256, 128, 64, 32], dropout=0.3): | |
super(Medium_Binary_Network, self).__init__() | |
layers = [] | |
prev_size = input_size | |
for hidden_size in hidden_sizes: | |
layers.append(nn.Linear(prev_size, hidden_size)) | |
layers.append(nn.ReLU()) | |
layers.append(nn.Dropout(dropout)) | |
prev_size = hidden_size | |
layers.append(nn.Linear(prev_size, 2)) | |
self.model = nn.Sequential(*layers) | |
def forward(self, x): | |
return self.model(x) | |
def cross_validate_simple_classifier(directory_path="experiments/results/two_scores_with_long_text_analyze_2048T", | |
feature_config=None, | |
n_splits=5, | |
random_state=42, | |
epochs=100, | |
hidden_sizes=[256, 128, 64, 32], | |
dropout=0.3, | |
early_stopping_patience=10): | |
print("\n" + "="*50) | |
print("MEDIUM BINARY CLASSIFIER CROSS-VALIDATION") | |
print("="*50) | |
if feature_config is None: | |
feature_config = { | |
'basic_scores': True, | |
'basic_text_stats': ['total_tokens', 'total_words', 'unique_words', 'stop_words', 'avg_word_length'], | |
'morphological': ['pos_distribution', 'unique_lemmas', 'lemma_word_ratio'], | |
'syntactic': ['dependencies', 'noun_chunks'], | |
'entities': ['total_entities', 'entity_types'], | |
'diversity': ['ttr', 'mtld'], | |
'structure': ['sentence_count', 'avg_sentence_length', 'question_sentences', 'exclamation_sentences'], | |
'readability': ['words_per_sentence', 'syllables_per_word', 'flesh_kincaid_score', 'long_words_percent'], | |
'semantic': True | |
} | |
df = load_data_from_json(directory_path) | |
features_df = select_features(df, feature_config) | |
print(f"Selected {len(features_df.columns)} features") | |
imputer = SimpleImputer(strategy='mean') | |
X = imputer.fit_transform(features_df) | |
label_encoder = LabelEncoder() | |
y = label_encoder.fit_transform(df['label'].values) | |
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) | |
fold_metrics = [] | |
fold_models = [] | |
all_train_losses = [] | |
all_val_losses = [] | |
all_train_accs = [] | |
all_val_accs = [] | |
all_y_true = [] | |
all_y_pred = [] | |
best_fold_score = -1 | |
best_fold_index = -1 | |
for fold, (train_idx, test_idx) in enumerate(skf.split(X, y)): | |
print(f"\n{'='*20} Fold {fold+1}/{n_splits} {'='*20}") | |
X_train, X_test = X[train_idx], X[test_idx] | |
y_train, y_test = y[train_idx], y[test_idx] | |
scaler = StandardScaler() | |
X_train_scaled = scaler.fit_transform(X_train) | |
X_test_scaled = scaler.transform(X_test) | |
X_train_tensor = torch.FloatTensor(X_train_scaled).to(DEVICE) | |
y_train_tensor = torch.LongTensor(y_train).to(DEVICE) | |
X_test_tensor = torch.FloatTensor(X_test_scaled).to(DEVICE) | |
y_test_tensor = torch.LongTensor(y_test).to(DEVICE) | |
train_dataset = TensorDataset(X_train_tensor, y_train_tensor) | |
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) | |
model = Medium_Binary_Network(X_train_scaled.shape[1], hidden_sizes=hidden_sizes, dropout=dropout).to(DEVICE) | |
print(f"Model created with {len(hidden_sizes)} hidden layers: {hidden_sizes}") | |
criterion = nn.CrossEntropyLoss() | |
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) | |
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) | |
best_val_loss = float('inf') | |
patience_counter = 0 | |
best_model_state = None | |
train_losses = [] | |
val_losses = [] | |
train_accs = [] | |
val_accs = [] | |
for epoch in range(epochs): | |
model.train() | |
running_loss = 0.0 | |
running_corrects = 0 | |
for inputs, labels in train_loader: | |
optimizer.zero_grad() | |
outputs = model(inputs) | |
loss = criterion(outputs, labels) | |
loss.backward() | |
optimizer.step() | |
_, preds = torch.max(outputs, 1) | |
running_loss += loss.item() * inputs.size(0) | |
running_corrects += torch.sum(preds == labels).item() | |
epoch_loss = running_loss / len(train_loader.dataset) | |
epoch_acc = running_corrects / len(train_loader.dataset) | |
train_losses.append(epoch_loss) | |
train_accs.append(epoch_acc) | |
model.eval() | |
with torch.no_grad(): | |
val_outputs = model(X_test_tensor) | |
val_loss = criterion(val_outputs, y_test_tensor) | |
val_losses.append(val_loss.item()) | |
_, val_preds = torch.max(val_outputs, 1) | |
val_acc = torch.sum(val_preds == y_test_tensor).item() / len(y_test_tensor) | |
val_accs.append(val_acc) | |
if val_loss < best_val_loss: | |
best_val_loss = val_loss | |
patience_counter = 0 | |
best_model_state = model.state_dict().copy() | |
else: | |
patience_counter += 1 | |
if patience_counter >= early_stopping_patience: | |
print(f"Early stopping at epoch {epoch+1}") | |
break | |
scheduler.step(val_loss) | |
if (epoch + 1) % 10 == 0 or epoch == 0: | |
print(f"Epoch {epoch+1}/{epochs}, Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}") | |
if best_model_state: | |
model.load_state_dict(best_model_state) | |
print("Loaded best model weights") | |
model.eval() | |
with torch.no_grad(): | |
test_outputs = model(X_test_tensor) | |
_, predicted = torch.max(test_outputs, 1) | |
test_acc = torch.sum(predicted == y_test_tensor).item() / len(y_test_tensor) | |
y_test_np = y_test | |
predicted_np = predicted.cpu().numpy() | |
all_y_true.extend(y_test_np) | |
all_y_pred.extend(predicted_np) | |
precision, recall, f1, _ = precision_recall_fscore_support(y_test_np, predicted_np, average='weighted') | |
fold_metric = { | |
'fold': fold + 1, | |
'accuracy': float(test_acc), | |
'precision': float(precision), | |
'recall': float(recall), | |
'f1': float(f1), | |
'val_loss': float(best_val_loss) | |
} | |
fold_metrics.append(fold_metric) | |
fold_models.append({ | |
'model': model, | |
'scaler': scaler, | |
'label_encoder': label_encoder, | |
'imputer': imputer, | |
'score': test_acc | |
}) | |
if test_acc > best_fold_score: | |
best_fold_score = test_acc | |
best_fold_index = fold | |
all_train_losses.extend(train_losses) | |
all_val_losses.extend(val_losses) | |
all_train_accs.extend(train_accs) | |
all_val_accs.extend(val_accs) | |
print(f"Fold {fold+1} Results:") | |
print(f" Accuracy: {test_acc:.4f}") | |
print(f" Precision: {precision:.4f}") | |
print(f" Recall: {recall:.4f}") | |
print(f" F1 Score: {f1:.4f}") | |
overall_accuracy = accuracy_score(all_y_true, all_y_pred) | |
overall_precision, overall_recall, overall_f1, _ = precision_recall_fscore_support( | |
all_y_true, all_y_pred, average='weighted' | |
) | |
fold_accuracies = [metrics['accuracy'] for metrics in fold_metrics] | |
mean_accuracy = np.mean(fold_accuracies) | |
std_accuracy = np.std(fold_accuracies) | |
ci_lower = mean_accuracy - 1.96 * std_accuracy / np.sqrt(n_splits) | |
ci_upper = mean_accuracy + 1.96 * std_accuracy / np.sqrt(n_splits) | |
plot_learning_curve(all_train_losses, all_val_losses) | |
plot_accuracy_curve(all_train_accs, all_val_accs) | |
class_names = label_encoder.classes_ | |
cm = confusion_matrix(all_y_true, all_y_pred) | |
plt.figure(figsize=(10, 8)) | |
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', | |
xticklabels=class_names, | |
yticklabels=class_names) | |
plt.title('Binary Classification Confusion Matrix (Cross-Validation)') | |
plt.ylabel('True Label') | |
plt.xlabel('Predicted Label') | |
os.makedirs('plots/binary', exist_ok=True) | |
plt.savefig('plots/binary/confusion_matrix_medium.png') | |
plt.close() | |
print("\n" + "="*50) | |
print("CROSS-VALIDATION SUMMARY") | |
print("="*50) | |
print(f"Mean Accuracy: {mean_accuracy:.4f} ± {std_accuracy:.4f}") | |
print(f"95% Confidence Interval: [{ci_lower:.4f}, {ci_upper:.4f}]") | |
print(f"Overall Accuracy: {overall_accuracy:.4f}") | |
print(f"Overall Precision: {overall_precision:.4f}") | |
print(f"Overall Recall: {overall_recall:.4f}") | |
print(f"Overall F1: {overall_f1:.4f}") | |
print(f"\nBest Fold: {best_fold_index + 1} (Accuracy: {fold_metrics[best_fold_index]['accuracy']:.4f})") | |
best_model_data = fold_models[best_fold_index] | |
results = { | |
'fold_metrics': fold_metrics, | |
'overall': { | |
'accuracy': float(overall_accuracy), | |
'precision': float(overall_precision), | |
'recall': float(overall_recall), | |
'f1': float(overall_f1) | |
}, | |
'cross_validation': { | |
'mean_accuracy': float(mean_accuracy), | |
'std_accuracy': float(std_accuracy), | |
'confidence_interval_95': [float(ci_lower), float(ci_upper)] | |
}, | |
'best_fold': { | |
'fold': best_fold_index + 1, | |
'accuracy': float(fold_metrics[best_fold_index]['accuracy']) | |
}, | |
'model_config': { | |
'hidden_sizes': hidden_sizes, | |
'dropout': dropout | |
} | |
} | |
output_dir = 'models/medium_binary_classifier' | |
save_paths = save_binary_model(best_model_data, results, output_dir=output_dir) | |
return best_model_data, results, save_paths | |
def plot_learning_curve(train_losses, val_losses): | |
plt.figure(figsize=(10, 6)) | |
epochs = range(1, len(train_losses) + 1) | |
plt.plot(epochs, train_losses, 'b-', label='Training Loss') | |
plt.plot(epochs, val_losses, 'r-', label='Validation Loss') | |
plt.title('Learning Curve') | |
plt.xlabel('Epochs') | |
plt.ylabel('Loss') | |
plt.legend() | |
plt.grid(True) | |
os.makedirs('plots/binary', exist_ok=True) | |
plt.savefig('plots/binary/learning_curve.png') | |
plt.close() | |
print("Learning curve saved to plots/binary/learning_curve.png") | |
def plot_accuracy_curve(train_accuracies, val_accuracies): | |
plt.figure(figsize=(10, 6)) | |
epochs = range(1, len(train_accuracies) + 1) | |
plt.plot(epochs, train_accuracies, 'g-', label='Training Accuracy') | |
plt.plot(epochs, val_accuracies, 'm-', label='Validation Accuracy') | |
plt.title('Accuracy Curve') | |
plt.xlabel('Epochs') | |
plt.ylabel('Accuracy') | |
plt.legend() | |
plt.grid(True) | |
plt.ylim(0, 1.0) | |
os.makedirs('plots/binary', exist_ok=True) | |
plt.savefig('plots/binary/accuracy_curve.png') | |
plt.close() | |
print("Accuracy curve saved to plots/binary/accuracy_curve.png") | |
def select_features(df, feature_config): | |
features_df = pd.DataFrame() | |
if feature_config.get('basic_scores', True): | |
if 'score_chat' in df.columns: | |
features_df['score_chat'] = df['score_chat'] | |
if 'score_coder' in df.columns: | |
features_df['score_coder'] = df['score_coder'] | |
if 'text_analysis' in df.columns: | |
if feature_config.get('basic_text_stats'): | |
for feature in feature_config['basic_text_stats']: | |
features_df[f'basic_{feature}'] = df['text_analysis'].apply( | |
lambda x: x.get('basic_stats', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
) | |
if feature_config.get('morphological'): | |
for feature in feature_config['morphological']: | |
if feature == 'pos_distribution': | |
pos_types = ['NOUN', 'VERB', 'ADJ', 'ADV', 'PROPN', 'DET', 'ADP', 'PRON', 'CCONJ', 'SCONJ'] | |
for pos in pos_types: | |
features_df[f'pos_{pos}'] = df['text_analysis'].apply( | |
lambda x: x.get('morphological_analysis', {}).get('pos_distribution', {}).get(pos, 0) | |
if isinstance(x, dict) else 0 | |
) | |
else: | |
features_df[f'morph_{feature}'] = df['text_analysis'].apply( | |
lambda x: x.get('morphological_analysis', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
) | |
if feature_config.get('syntactic'): | |
for feature in feature_config['syntactic']: | |
if feature == 'dependencies': | |
dep_types = ['nsubj', 'obj', 'amod', 'nmod', 'ROOT', 'punct', 'case'] | |
for dep in dep_types: | |
features_df[f'dep_{dep}'] = df['text_analysis'].apply( | |
lambda x: x.get('syntactic_analysis', {}).get('dependencies', {}).get(dep, 0) | |
if isinstance(x, dict) else 0 | |
) | |
else: | |
features_df[f'synt_{feature}'] = df['text_analysis'].apply( | |
lambda x: x.get('syntactic_analysis', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
) | |
if feature_config.get('entities'): | |
for feature in feature_config['entities']: | |
if feature == 'entity_types': | |
entity_types = ['PER', 'LOC', 'ORG'] | |
for ent in entity_types: | |
features_df[f'ent_{ent}'] = df['text_analysis'].apply( | |
lambda x: x.get('named_entities', {}).get('entity_types', {}).get(ent, 0) | |
if isinstance(x, dict) else 0 | |
) | |
else: | |
features_df[f'ent_{feature}'] = df['text_analysis'].apply( | |
lambda x: x.get('named_entities', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
) | |
if feature_config.get('diversity'): | |
for feature in feature_config['diversity']: | |
features_df[f'div_{feature}'] = df['text_analysis'].apply( | |
lambda x: x.get('lexical_diversity', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
) | |
if feature_config.get('structure'): | |
for feature in feature_config['structure']: | |
features_df[f'struct_{feature}'] = df['text_analysis'].apply( | |
lambda x: x.get('text_structure', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
) | |
if feature_config.get('readability'): | |
for feature in feature_config['readability']: | |
features_df[f'read_{feature}'] = df['text_analysis'].apply( | |
lambda x: x.get('readability', {}).get(feature, 0) if isinstance(x, dict) else 0 | |
) | |
if feature_config.get('semantic'): | |
features_df['semantic_coherence'] = df['text_analysis'].apply( | |
lambda x: x.get('semantic_coherence', {}).get('avg_coherence_score', 0) if isinstance(x, dict) else 0 | |
) | |
print(f"Generated {len(features_df.columns)} features") | |
return features_df | |
def augment_text_features(features_df, num_augmentations=5, noise_factor=0.05): | |
augmented_dfs = [features_df] | |
for i in range(num_augmentations): | |
numeric_cols = features_df.select_dtypes(include=[np.number]).columns | |
augmented_df = features_df.copy() | |
for col in numeric_cols: | |
augmented_df[col] = augmented_df[col].astype(float) | |
noise = augmented_df[numeric_cols] * np.random.normal(0, noise_factor, size=augmented_df[numeric_cols].shape) | |
augmented_df[numeric_cols] += noise | |
augmented_dfs.append(augmented_df) | |
return pd.concat(augmented_dfs, ignore_index=True) | |
def cross_validate_binary_classifier(directory_path="experiments/results/two_scores_with_long_text_analyze_2048T", | |
model_config=None, | |
feature_config=None, | |
n_splits=5, | |
random_state=42, | |
epochs=100, | |
early_stopping_patience=10, | |
use_augmentation=True, | |
num_augmentations=2, | |
noise_factor=0.05): | |
if model_config is None: | |
model_config = { | |
'hidden_layers': [256, 128, 64], | |
'dropout_rate': 0.3 | |
} | |
if feature_config is None: | |
feature_config = { | |
'basic_scores': True, | |
'basic_text_stats': ['total_tokens', 'total_words', 'unique_words', 'stop_words', 'avg_word_length'], | |
'morphological': ['pos_distribution', 'unique_lemmas', 'lemma_word_ratio'], | |
'syntactic': ['dependencies', 'noun_chunks'], | |
'entities': ['total_entities', 'entity_types'], | |
'diversity': ['ttr', 'mtld'], | |
'structure': ['sentence_count', 'avg_sentence_length', 'question_sentences', 'exclamation_sentences'], | |
'readability': ['words_per_sentence', 'syllables_per_word', 'flesh_kincaid_score', 'long_words_percent'], | |
'semantic': True | |
} | |
print("\n" + "="*50) | |
print("BINARY CLASSIFIER CROSS-VALIDATION") | |
print("="*50) | |
df = load_data_from_json(directory_path) | |
features_df = select_features(df, feature_config) | |
print(f"Selected features: {features_df.columns.tolist()}") | |
imputer = SimpleImputer(strategy='mean') | |
if use_augmentation: | |
print(f"Augmenting data with {num_augmentations} copies (noise factor: {noise_factor})...") | |
original_size = len(features_df) | |
features_df_augmented = augment_text_features(features_df, | |
num_augmentations=num_augmentations, | |
noise_factor=noise_factor) | |
y_augmented = np.tile(df['label'].values, num_augmentations + 1) | |
print(f"Data size increased from {original_size} to {len(features_df_augmented)}") | |
X = imputer.fit_transform(features_df_augmented) | |
y = y_augmented | |
else: | |
X = imputer.fit_transform(features_df) | |
y = df['label'].values | |
label_encoder = LabelEncoder() | |
y_encoded = label_encoder.fit_transform(y) | |
print(f"Data size: {X.shape}") | |
print(f"Labels distribution: {pd.Series(y).value_counts().to_dict()}") | |
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) | |
fold_metrics = [] | |
fold_models = [] | |
all_y_true = [] | |
all_y_pred = [] | |
all_y_scores = [] | |
best_fold_score = -1 | |
best_fold_index = -1 | |
print(f"\nPerforming {n_splits}-fold cross-validation...") | |
num_avg_epochs = 5 | |
saved_weights = [] | |
for fold, (train_idx, test_idx) in enumerate(skf.split(X, y_encoded)): | |
print(f"\n{'='*20} Fold {fold+1}/{n_splits} {'='*20}") | |
X_train, X_test = X[train_idx], X[test_idx] | |
y_train, y_test = y_encoded[train_idx], y_encoded[test_idx] | |
scaler = StandardScaler() | |
X_train_scaled = scaler.fit_transform(X_train) | |
X_test_scaled = scaler.transform(X_test) | |
X_train_tensor = torch.FloatTensor(X_train_scaled).to(DEVICE) | |
y_train_tensor = torch.LongTensor(y_train).to(DEVICE) | |
X_test_tensor = torch.FloatTensor(X_test_scaled).to(DEVICE) | |
y_test_tensor = torch.LongTensor(y_test).to(DEVICE) | |
train_dataset = TensorDataset(X_train_tensor, y_train_tensor) | |
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) | |
num_classes = len(label_encoder.classes_) | |
model = build_neural_network(X_train_scaled.shape[1], num_classes, | |
hidden_layers=model_config['hidden_layers']) | |
criterion = nn.CrossEntropyLoss() | |
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) | |
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95) | |
best_val_loss = float('inf') | |
patience_counter = 0 | |
best_model_state = None | |
train_losses = [] | |
val_losses = [] | |
saved_weights = [] | |
for epoch in range(epochs): | |
model.train() | |
running_loss = 0.0 | |
for inputs, labels in train_loader: | |
optimizer.zero_grad() | |
outputs = model(inputs) | |
loss = criterion(outputs, labels) | |
loss.backward() | |
optimizer.step() | |
running_loss += loss.item() * inputs.size(0) | |
epoch_loss = running_loss / len(train_loader.dataset) | |
train_losses.append(epoch_loss) | |
model.eval() | |
with torch.no_grad(): | |
val_outputs = model(X_test_tensor) | |
val_loss = criterion(val_outputs, y_test_tensor) | |
val_losses.append(val_loss.item()) | |
if val_loss < best_val_loss: | |
best_val_loss = val_loss | |
patience_counter = 0 | |
best_model_state = model.state_dict().copy() | |
else: | |
patience_counter += 1 | |
if patience_counter >= early_stopping_patience: | |
print(f"Early stopping at epoch {epoch+1}") | |
break | |
if epoch >= epochs - num_avg_epochs: | |
saved_weights.append(model.state_dict().copy()) | |
scheduler.step() | |
if (epoch + 1) % 10 == 0 or epoch == 0: | |
print(f"Epoch {epoch+1}/{epochs}, Train Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}") | |
if len(saved_weights) > 0: | |
print(f"Averaging weights from last {len(saved_weights)} epochs...") | |
avg_state_dict = saved_weights[0].copy() | |
for key in avg_state_dict.keys(): | |
if epoch >= epochs - num_avg_epochs: | |
for i in range(1, len(saved_weights)): | |
avg_state_dict[key] += saved_weights[i][key] | |
avg_state_dict[key] /= len(saved_weights) | |
model.load_state_dict(avg_state_dict) | |
print("Model loaded with averaged weights") | |
elif best_model_state: | |
model.load_state_dict(best_model_state) | |
print("Model loaded with best validation weights") | |
model.eval() | |
with torch.no_grad(): | |
test_outputs = model(X_test_tensor) | |
_, predicted = torch.max(test_outputs.data, 1) | |
predicted_np = predicted.cpu().numpy() | |
probabilities = torch.softmax(test_outputs, dim=1) | |
pos_scores = probabilities[:, 1].cpu().numpy() | |
all_y_true.extend(y_test) | |
all_y_pred.extend(predicted_np) | |
all_y_scores.extend(pos_scores) | |
fold_acc = accuracy_score(y_test, predicted_np) | |
precision, recall, f1, _ = precision_recall_fscore_support(y_test, predicted_np, average='weighted') | |
try: | |
fold_auc = roc_auc_score(y_test, pos_scores) | |
except: | |
fold_auc = 0.0 | |
print("Warning: Could not compute AUC") | |
fold_metrics.append({ | |
'fold': fold + 1, | |
'accuracy': float(fold_acc), | |
'precision': float(precision), | |
'recall': float(recall), | |
'f1': float(f1), | |
'auc': float(fold_auc), | |
'best_val_loss': float(best_val_loss) | |
}) | |
fold_models.append({ | |
'model': model, | |
'scaler': scaler, | |
'label_encoder': label_encoder, | |
'imputer': imputer, | |
'score': fold_acc | |
}) | |
if fold_acc > best_fold_score: | |
best_fold_score = fold_acc | |
best_fold_index = fold | |
print(f"Fold {fold+1} Results:") | |
print(f" Accuracy: {fold_acc:.4f}") | |
print(f" Precision: {precision:.4f}") | |
print(f" Recall: {recall:.4f}") | |
print(f" F1 Score: {f1:.4f}") | |
if fold_auc > 0: | |
print(f" AUC: {fold_auc:.4f}") | |
overall_accuracy = accuracy_score(all_y_true, all_y_pred) | |
overall_precision, overall_recall, overall_f1, _ = precision_recall_fscore_support( | |
all_y_true, all_y_pred, average='weighted' | |
) | |
try: | |
overall_auc = roc_auc_score(all_y_true, all_y_scores) | |
except: | |
overall_auc = 0.0 | |
print("Warning: Could not compute overall AUC") | |
fold_accuracies = [metrics['accuracy'] for metrics in fold_metrics] | |
mean_accuracy = np.mean(fold_accuracies) | |
std_accuracy = np.std(fold_accuracies) | |
ci_lower = mean_accuracy - 1.96 * std_accuracy / np.sqrt(n_splits) | |
ci_upper = mean_accuracy + 1.96 * std_accuracy / np.sqrt(n_splits) | |
class_counts = np.bincount(y_encoded) | |
baseline_accuracy = np.max(class_counts) / len(y_encoded) | |
most_frequent_class = np.argmax(class_counts) | |
t_stat, p_value = stats.ttest_1samp(fold_accuracies, baseline_accuracy) | |
best_model_data = fold_models[best_fold_index] | |
class_names = label_encoder.classes_ | |
cm = confusion_matrix(all_y_true, all_y_pred) | |
plt.figure(figsize=(10, 8)) | |
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', | |
xticklabels=class_names, | |
yticklabels=class_names) | |
plt.title('Binary Classification Confusion Matrix (Cross-Validation)') | |
plt.ylabel('True Label') | |
plt.xlabel('Predicted Label') | |
os.makedirs('plots/binary', exist_ok=True) | |
plt.savefig('plots/binary/confusion_matrix_cv.png') | |
plt.close() | |
if overall_auc > 0: | |
from sklearn.metrics import roc_curve | |
fpr, tpr, _ = roc_curve(all_y_true, all_y_scores) | |
plt.figure(figsize=(10, 8)) | |
plt.plot(fpr, tpr, lw=2, label=f'ROC curve (AUC = {overall_auc:.4f})') | |
plt.plot([0, 1], [0, 1], 'k--', lw=2) | |
plt.xlim([0.0, 1.0]) | |
plt.ylim([0.0, 1.05]) | |
plt.xlabel('False Positive Rate') | |
plt.ylabel('True Positive Rate') | |
plt.title('Receiver Operating Characteristic (ROC)') | |
plt.legend(loc="lower right") | |
plt.savefig('plots/binary/roc_curve.png') | |
plt.close() | |
results = { | |
'fold_metrics': fold_metrics, | |
'overall': { | |
'accuracy': float(overall_accuracy), | |
'precision': float(overall_precision), | |
'recall': float(overall_recall), | |
'f1': float(overall_f1), | |
'auc': float(overall_auc) if overall_auc > 0 else None | |
}, | |
'cross_validation': { | |
'mean_accuracy': float(mean_accuracy), | |
'std_accuracy': float(std_accuracy), | |
'confidence_interval_95': [float(ci_lower), float(ci_upper)], | |
'baseline_accuracy': float(baseline_accuracy), | |
'most_frequent_class': str(label_encoder.inverse_transform([most_frequent_class])[0]), | |
't_statistic': float(t_stat), | |
'p_value': float(p_value), | |
'statistically_significant': "yes" if p_value < 0.05 else "no" | |
}, | |
'best_fold': { | |
'fold': best_fold_index + 1, | |
'accuracy': float(fold_metrics[best_fold_index]['accuracy']) | |
} | |
} | |
print("\n" + "="*50) | |
print("CROSS-VALIDATION SUMMARY") | |
print("="*50) | |
print(f"Mean Accuracy: {mean_accuracy:.4f} ± {std_accuracy:.4f}") | |
print(f"95% Confidence Interval: [{ci_lower:.4f}, {ci_upper:.4f}]") | |
print(f"Overall Accuracy: {overall_accuracy:.4f}") | |
print(f"Baseline Accuracy: {baseline_accuracy:.4f} (most frequent class: {label_encoder.inverse_transform([most_frequent_class])[0]})") | |
print(f"T-statistic: {t_stat:.4f}, p-value: {p_value:.6f}") | |
if p_value < 0.05: | |
print("The model is significantly better than the baseline (p < 0.05)") | |
else: | |
print("The model is NOT significantly better than the baseline (p >= 0.05)") | |
print(f"\nBest Fold: {best_fold_index + 1} (Accuracy: {fold_metrics[best_fold_index]['accuracy']:.4f})") | |
return best_model_data, results | |
def save_binary_model(model_data, results, output_dir='models/binary_classifier'): | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
model_path = os.path.join(output_dir, 'nn_model.pt') | |
torch.save(model_data['model'].state_dict(), model_path) | |
scaler_path = os.path.join(output_dir, 'scaler.joblib') | |
joblib.dump(model_data['scaler'], scaler_path) | |
encoder_path = os.path.join(output_dir, 'label_encoder.joblib') | |
joblib.dump(model_data['label_encoder'], encoder_path) | |
imputer_path = os.path.join(output_dir, 'imputer.joblib') | |
joblib.dump(model_data['imputer'], imputer_path) | |
results_path = os.path.join(output_dir, 'cv_results.json') | |
with open(results_path, 'w') as f: | |
json.dump(results, f, indent=4) | |
print(f"Binary model saved to {model_path}") | |
print(f"CV results saved to {results_path}") | |
return { | |
'model_path': model_path, | |
'scaler_path': scaler_path, | |
'encoder_path': encoder_path, | |
'imputer_path': imputer_path, | |
'results_path': results_path | |
} | |
def parse_args(): | |
parser = argparse.ArgumentParser(description='Binary Neural Network Classifier (Human vs AI) with Cross-Validation') | |
parser.add_argument('--random_seed', type=int, default=42, | |
help='Random seed for reproducibility') | |
parser.add_argument('--folds', type=int, default=5, | |
help='Number of cross-validation folds') | |
parser.add_argument('--epochs', type=int, default=100, | |
help='Maximum number of training epochs per fold') | |
parser.add_argument('--patience', type=int, default=10, | |
help='Early stopping patience (epochs)') | |
return parser.parse_args() | |
def main(): | |
print("\n" + "="*50) | |
print("MEDIUM BINARY CLASSIFIER") | |
print("="*50 + "\n") | |
args = parse_args() | |
seed = args.random_seed | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
if GPU_AVAILABLE: | |
torch.cuda.manual_seed_all(seed) | |
plt.switch_backend('agg') | |
feature_config = { | |
'basic_scores': True, | |
'basic_text_stats': ['total_tokens', 'total_words', 'unique_words', 'stop_words', 'avg_word_length'], | |
'morphological': ['pos_distribution', 'unique_lemmas', 'lemma_word_ratio'], | |
'syntactic': ['dependencies', 'noun_chunks'], | |
'entities': ['total_entities', 'entity_types'], | |
'diversity': ['ttr', 'mtld'], | |
'structure': ['sentence_count', 'avg_sentence_length', 'question_sentences', 'exclamation_sentences'], | |
'readability': ['words_per_sentence', 'syllables_per_word', 'flesh_kincaid_score', 'long_words_percent'], | |
'semantic': True | |
} | |
model_data, results, save_paths = cross_validate_simple_classifier( | |
directory_path="experiments/results/two_scores_with_long_text_analyze_2048T", | |
feature_config=feature_config, | |
n_splits=5, | |
random_state=seed, | |
epochs=150, | |
hidden_sizes=[256, 192, 128, 64], | |
dropout=0.3, | |
early_stopping_patience=15 | |
) | |
print("\nTraining completed.") | |
print(f"Medium binary classifier saved to {save_paths['model_path']}") | |
if __name__ == "__main__": | |
main() | |