|
import torch |
|
import numpy as np |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
import torch.nn.functional as F |
|
import spacy |
|
from typing import List, Dict, Tuple |
|
import logging |
|
import os |
|
import gradio as gr |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from concurrent.futures import ThreadPoolExecutor |
|
from functools import partial |
|
import time |
|
from datetime import datetime |
|
import openpyxl |
|
from openpyxl import Workbook |
|
from openpyxl.utils import get_column_letter |
|
from io import BytesIO |
|
import base64 |
|
import hashlib |
|
import requests |
|
import tempfile |
|
from pathlib import Path |
|
import mimetypes |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
MAX_LENGTH = 512 |
|
MODEL_NAME = "microsoft/deberta-v3-small" |
|
WINDOW_SIZE = 6 |
|
WINDOW_OVERLAP = 2 |
|
CONFIDENCE_THRESHOLD = 0.65 |
|
BATCH_SIZE = 8 |
|
MAX_WORKERS = 4 |
|
|
|
|
|
|
|
if not torch.cuda.is_available(): |
|
|
|
torch.set_num_threads(MAX_WORKERS) |
|
try: |
|
|
|
torch.set_num_interop_threads(MAX_WORKERS) |
|
except RuntimeError as e: |
|
logger.warning(f"Could not set interop threads: {str(e)}") |
|
|
|
|
|
ADMIN_PASSWORD_HASH = os.environ.get('ADMIN_PASSWORD_HASH') |
|
|
|
if not ADMIN_PASSWORD_HASH: |
|
ADMIN_PASSWORD_HASH = "5e22d1ed71b273b1b2b5331f2d3e0f6cf34595236f201c6924d6bc81de27cdcb" |
|
|
|
|
|
EXCEL_LOG_PATH = "/tmp/prediction_logs.xlsx" |
|
|
|
|
|
OCR_API_KEY = "9e11346f1288957" |
|
OCR_API_ENDPOINT = "https://api.ocr.space/parse/image" |
|
OCR_MAX_PDF_PAGES = 3 |
|
OCR_MAX_FILE_SIZE_MB = 1 |
|
|
|
|
|
ocr_logger = logging.getLogger("ocr_module") |
|
ocr_logger.setLevel(logging.INFO) |
|
|
|
class OCRProcessor: |
|
""" |
|
Handles OCR processing of image and document files using OCR.space API |
|
""" |
|
def __init__(self, api_key: str = OCR_API_KEY): |
|
self.api_key = api_key |
|
self.endpoint = OCR_API_ENDPOINT |
|
|
|
def process_file(self, file_path: str) -> Dict: |
|
""" |
|
Process a file using OCR.space API |
|
""" |
|
start_time = time.time() |
|
ocr_logger.info(f"Starting OCR processing for file: {os.path.basename(file_path)}") |
|
|
|
|
|
file_size_mb = os.path.getsize(file_path) / (1024 * 1024) |
|
if file_size_mb > OCR_MAX_FILE_SIZE_MB: |
|
ocr_logger.warning(f"File size ({file_size_mb:.2f} MB) exceeds limit of {OCR_MAX_FILE_SIZE_MB} MB") |
|
return { |
|
"success": False, |
|
"error": f"File size ({file_size_mb:.2f} MB) exceeds limit of {OCR_MAX_FILE_SIZE_MB} MB", |
|
"text": "" |
|
} |
|
|
|
|
|
file_type = self._get_file_type(file_path) |
|
ocr_logger.info(f"Detected file type: {file_type}") |
|
|
|
|
|
payload = { |
|
'isOverlayRequired': 'false', |
|
'language': 'eng', |
|
'OCREngine': '2', |
|
'scale': 'true', |
|
'detectOrientation': 'true', |
|
} |
|
|
|
|
|
if file_type == 'application/pdf': |
|
ocr_logger.info("PDF document detected, enforcing page limit") |
|
payload['filetype'] = 'PDF' |
|
|
|
|
|
with open(file_path, 'rb') as f: |
|
file_data = f.read() |
|
|
|
files = { |
|
'file': (os.path.basename(file_path), file_data, file_type) |
|
} |
|
|
|
headers = { |
|
'apikey': self.api_key, |
|
} |
|
|
|
|
|
try: |
|
ocr_logger.info(f"Sending request to OCR.space API for file: {os.path.basename(file_path)}") |
|
response = requests.post( |
|
self.endpoint, |
|
files=files, |
|
data=payload, |
|
headers=headers, |
|
timeout=60 |
|
) |
|
|
|
ocr_logger.info(f"OCR API status code: {response.status_code}") |
|
|
|
|
|
response_preview = response.text[:200] if hasattr(response, 'text') else "No text content" |
|
ocr_logger.info(f"OCR API response preview: {response_preview}...") |
|
|
|
try: |
|
response.raise_for_status() |
|
except Exception as e: |
|
ocr_logger.error(f"HTTP Error: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": f"OCR API HTTP Error: {str(e)}", |
|
"text": "" |
|
} |
|
|
|
try: |
|
result = response.json() |
|
ocr_logger.info(f"OCR API exit code: {result.get('OCRExitCode')}") |
|
|
|
|
|
if result.get('OCRExitCode') in [1, 2]: |
|
extracted_text = self._extract_text_from_result(result) |
|
processing_time = time.time() - start_time |
|
ocr_logger.info(f"OCR processing completed in {processing_time:.2f} seconds") |
|
ocr_logger.info(f"Extracted text word count: {len(extracted_text.split())}") |
|
|
|
return { |
|
"success": True, |
|
"text": extracted_text, |
|
"word_count": len(extracted_text.split()), |
|
"processing_time_ms": int(processing_time * 1000) |
|
} |
|
else: |
|
error_msg = result.get('ErrorMessage', 'OCR processing failed') |
|
ocr_logger.error(f"OCR API error: {error_msg}") |
|
return { |
|
"success": False, |
|
"error": error_msg, |
|
"text": "" |
|
} |
|
except ValueError as e: |
|
ocr_logger.error(f"Invalid JSON response: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": f"Invalid response from OCR API: {str(e)}", |
|
"text": "" |
|
} |
|
|
|
except requests.exceptions.RequestException as e: |
|
ocr_logger.error(f"OCR API request failed: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": f"OCR API request failed: {str(e)}", |
|
"text": "" |
|
} |
|
finally: |
|
|
|
pass |
|
|
|
def _extract_text_from_result(self, result: Dict) -> str: |
|
""" |
|
Extract all text from the OCR API result |
|
""" |
|
extracted_text = "" |
|
|
|
if 'ParsedResults' in result and result['ParsedResults']: |
|
for parsed_result in result['ParsedResults']: |
|
if parsed_result.get('ParsedText'): |
|
extracted_text += parsed_result['ParsedText'] |
|
|
|
return extracted_text |
|
|
|
def _get_file_type(self, file_path: str) -> str: |
|
""" |
|
Determine MIME type of a file |
|
""" |
|
mime_type, _ = mimetypes.guess_type(file_path) |
|
if mime_type is None: |
|
|
|
return 'application/octet-stream' |
|
return mime_type |
|
|
|
def is_admin_password(input_text: str) -> bool: |
|
""" |
|
Check if the input text matches the admin password using secure hash comparison. |
|
""" |
|
|
|
input_hash = hashlib.sha256(input_text.strip().encode()).hexdigest() |
|
|
|
|
|
return input_hash == ADMIN_PASSWORD_HASH |
|
|
|
class TextWindowProcessor: |
|
def __init__(self): |
|
try: |
|
self.nlp = spacy.load("en_core_web_sm") |
|
except OSError: |
|
logger.info("Downloading spacy model...") |
|
spacy.cli.download("en_core_web_sm") |
|
self.nlp = spacy.load("en_core_web_sm") |
|
|
|
if 'sentencizer' not in self.nlp.pipe_names: |
|
self.nlp.add_pipe('sentencizer') |
|
|
|
disabled_pipes = [pipe for pipe in self.nlp.pipe_names if pipe != 'sentencizer'] |
|
self.nlp.disable_pipes(*disabled_pipes) |
|
|
|
|
|
self.executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) |
|
|
|
def split_into_sentences(self, text: str) -> List[str]: |
|
doc = self.nlp(text) |
|
return [str(sent).strip() for sent in doc.sents] |
|
|
|
def create_windows(self, sentences: List[str], window_size: int, overlap: int) -> List[str]: |
|
if len(sentences) < window_size: |
|
return [" ".join(sentences)] |
|
|
|
windows = [] |
|
stride = window_size - overlap |
|
for i in range(0, len(sentences) - window_size + 1, stride): |
|
window = sentences[i:i + window_size] |
|
windows.append(" ".join(window)) |
|
return windows |
|
|
|
def create_centered_windows(self, sentences: List[str], window_size: int) -> Tuple[List[str], List[List[int]]]: |
|
"""Create windows with better boundary handling""" |
|
windows = [] |
|
window_sentence_indices = [] |
|
|
|
for i in range(len(sentences)): |
|
|
|
half_window = window_size // 2 |
|
start_idx = max(0, i - half_window) |
|
end_idx = min(len(sentences), i + half_window + 1) |
|
|
|
|
|
window = sentences[start_idx:end_idx] |
|
windows.append(" ".join(window)) |
|
window_sentence_indices.append(list(range(start_idx, end_idx))) |
|
|
|
return windows, window_sentence_indices |
|
|
|
class TextClassifier: |
|
def __init__(self): |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
self.model_name = MODEL_NAME |
|
self.tokenizer = None |
|
self.model = None |
|
self.processor = TextWindowProcessor() |
|
self.initialize_model() |
|
|
|
def initialize_model(self): |
|
"""Initialize the model and tokenizer.""" |
|
logger.info("Initializing model and tokenizer...") |
|
|
|
from transformers import DebertaV2TokenizerFast |
|
|
|
self.tokenizer = DebertaV2TokenizerFast.from_pretrained( |
|
self.model_name, |
|
model_max_length=MAX_LENGTH, |
|
use_fast=True |
|
) |
|
|
|
self.model = AutoModelForSequenceClassification.from_pretrained( |
|
self.model_name, |
|
num_labels=2 |
|
).to(self.device) |
|
|
|
model_path = "model_20250209_184929_acc1.0000.pt" |
|
if os.path.exists(model_path): |
|
logger.info(f"Loading custom model from {model_path}") |
|
checkpoint = torch.load(model_path, map_location=self.device) |
|
self.model.load_state_dict(checkpoint['model_state_dict']) |
|
else: |
|
logger.warning("Custom model file not found. Using base model.") |
|
|
|
self.model.eval() |
|
|
|
def quick_scan(self, text: str) -> Dict: |
|
"""Perform a quick scan using simple window analysis.""" |
|
if not text.strip(): |
|
return { |
|
'prediction': 'unknown', |
|
'confidence': 0.0, |
|
'num_windows': 0 |
|
} |
|
|
|
sentences = self.processor.split_into_sentences(text) |
|
windows = self.processor.create_windows(sentences, WINDOW_SIZE, WINDOW_OVERLAP) |
|
|
|
predictions = [] |
|
|
|
|
|
for i in range(0, len(windows), BATCH_SIZE): |
|
batch_windows = windows[i:i + BATCH_SIZE] |
|
|
|
inputs = self.tokenizer( |
|
batch_windows, |
|
truncation=True, |
|
padding=True, |
|
max_length=MAX_LENGTH, |
|
return_tensors="pt" |
|
).to(self.device) |
|
|
|
with torch.no_grad(): |
|
outputs = self.model(**inputs) |
|
probs = F.softmax(outputs.logits, dim=-1) |
|
|
|
for idx, window in enumerate(batch_windows): |
|
prediction = { |
|
'window': window, |
|
'human_prob': probs[idx][1].item(), |
|
'ai_prob': probs[idx][0].item(), |
|
'prediction': 'human' if probs[idx][1] > probs[idx][0] else 'ai' |
|
} |
|
predictions.append(prediction) |
|
|
|
|
|
del inputs, outputs, probs |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
if not predictions: |
|
return { |
|
'prediction': 'unknown', |
|
'confidence': 0.0, |
|
'num_windows': 0 |
|
} |
|
|
|
avg_human_prob = sum(p['human_prob'] for p in predictions) / len(predictions) |
|
avg_ai_prob = sum(p['ai_prob'] for p in predictions) / len(predictions) |
|
|
|
return { |
|
'prediction': 'human' if avg_human_prob > avg_ai_prob else 'ai', |
|
'confidence': max(avg_human_prob, avg_ai_prob), |
|
'num_windows': len(predictions) |
|
} |
|
|
|
def detailed_scan(self, text: str) -> Dict: |
|
"""Perform a detailed scan with improved sentence-level analysis.""" |
|
|
|
text = text.rstrip() |
|
|
|
if not text.strip(): |
|
return { |
|
'sentence_predictions': [], |
|
'highlighted_text': '', |
|
'full_text': '', |
|
'overall_prediction': { |
|
'prediction': 'unknown', |
|
'confidence': 0.0, |
|
'num_sentences': 0 |
|
} |
|
} |
|
|
|
sentences = self.processor.split_into_sentences(text) |
|
if not sentences: |
|
return {} |
|
|
|
|
|
windows, window_sentence_indices = self.processor.create_centered_windows(sentences, WINDOW_SIZE) |
|
|
|
|
|
sentence_appearances = {i: 0 for i in range(len(sentences))} |
|
sentence_scores = {i: {'human_prob': 0.0, 'ai_prob': 0.0} for i in range(len(sentences))} |
|
|
|
|
|
for i in range(0, len(windows), BATCH_SIZE): |
|
batch_windows = windows[i:i + BATCH_SIZE] |
|
batch_indices = window_sentence_indices[i:i + BATCH_SIZE] |
|
|
|
inputs = self.tokenizer( |
|
batch_windows, |
|
truncation=True, |
|
padding=True, |
|
max_length=MAX_LENGTH, |
|
return_tensors="pt" |
|
).to(self.device) |
|
|
|
with torch.no_grad(): |
|
outputs = self.model(**inputs) |
|
probs = F.softmax(outputs.logits, dim=-1) |
|
|
|
|
|
for window_idx, indices in enumerate(batch_indices): |
|
center_idx = len(indices) // 2 |
|
center_weight = 0.7 |
|
edge_weight = 0.3 / (len(indices) - 1) if len(indices) > 1 else 0 |
|
|
|
for pos, sent_idx in enumerate(indices): |
|
|
|
weight = center_weight if pos == center_idx else edge_weight |
|
sentence_appearances[sent_idx] += weight |
|
sentence_scores[sent_idx]['human_prob'] += weight * probs[window_idx][1].item() |
|
sentence_scores[sent_idx]['ai_prob'] += weight * probs[window_idx][0].item() |
|
|
|
|
|
del inputs, outputs, probs |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
|
|
sentence_predictions = [] |
|
for i in range(len(sentences)): |
|
if sentence_appearances[i] > 0: |
|
human_prob = sentence_scores[i]['human_prob'] / sentence_appearances[i] |
|
ai_prob = sentence_scores[i]['ai_prob'] / sentence_appearances[i] |
|
|
|
|
|
if i > 0 and i < len(sentences) - 1: |
|
prev_human = sentence_scores[i-1]['human_prob'] / max(sentence_appearances[i-1], 1e-10) |
|
prev_ai = sentence_scores[i-1]['ai_prob'] / max(sentence_appearances[i-1], 1e-10) |
|
next_human = sentence_scores[i+1]['human_prob'] / max(sentence_appearances[i+1], 1e-10) |
|
next_ai = sentence_scores[i+1]['ai_prob'] / max(sentence_appearances[i+1], 1e-10) |
|
|
|
|
|
current_pred = 'human' if human_prob > ai_prob else 'ai' |
|
prev_pred = 'human' if prev_human > prev_ai else 'ai' |
|
next_pred = 'human' if next_human > next_ai else 'ai' |
|
|
|
if current_pred != prev_pred or current_pred != next_pred: |
|
|
|
smooth_factor = 0.1 |
|
human_prob = (human_prob * (1 - smooth_factor) + |
|
(prev_human + next_human) * smooth_factor / 2) |
|
ai_prob = (ai_prob * (1 - smooth_factor) + |
|
(prev_ai + next_ai) * smooth_factor / 2) |
|
|
|
sentence_predictions.append({ |
|
'sentence': sentences[i], |
|
'human_prob': human_prob, |
|
'ai_prob': ai_prob, |
|
'prediction': 'human' if human_prob > ai_prob else 'ai', |
|
'confidence': max(human_prob, ai_prob) |
|
}) |
|
|
|
return { |
|
'sentence_predictions': sentence_predictions, |
|
'highlighted_text': self.format_predictions_html(sentence_predictions), |
|
'full_text': text, |
|
'overall_prediction': self.aggregate_predictions(sentence_predictions) |
|
} |
|
|
|
def format_predictions_html(self, sentence_predictions: List[Dict]) -> str: |
|
"""Format predictions as HTML with color-coding.""" |
|
html_parts = [] |
|
|
|
for pred in sentence_predictions: |
|
sentence = pred['sentence'] |
|
confidence = pred['confidence'] |
|
|
|
if confidence >= CONFIDENCE_THRESHOLD: |
|
if pred['prediction'] == 'human': |
|
color = "#90EE90" |
|
else: |
|
color = "#FFB6C6" |
|
else: |
|
if pred['prediction'] == 'human': |
|
color = "#E8F5E9" |
|
else: |
|
color = "#FFEBEE" |
|
|
|
html_parts.append(f'<span style="background-color: {color};">{sentence}</span>') |
|
|
|
return " ".join(html_parts) |
|
|
|
def aggregate_predictions(self, predictions: List[Dict]) -> Dict: |
|
"""Aggregate predictions from multiple sentences into a single prediction.""" |
|
if not predictions: |
|
return { |
|
'prediction': 'unknown', |
|
'confidence': 0.0, |
|
'num_sentences': 0 |
|
} |
|
|
|
total_human_prob = sum(p['human_prob'] for p in predictions) |
|
total_ai_prob = sum(p['ai_prob'] for p in predictions) |
|
num_sentences = len(predictions) |
|
|
|
avg_human_prob = total_human_prob / num_sentences |
|
avg_ai_prob = total_ai_prob / num_sentences |
|
|
|
return { |
|
'prediction': 'human' if avg_human_prob > avg_ai_prob else 'ai', |
|
'confidence': max(avg_human_prob, avg_ai_prob), |
|
'num_sentences': num_sentences |
|
} |
|
|
|
|
|
def handle_file_upload_and_analyze(file_obj, mode: str) -> tuple: |
|
""" |
|
Handle file upload, OCR processing, and text analysis |
|
""" |
|
|
|
global classifier |
|
classifier_to_use = classifier |
|
|
|
if file_obj is None: |
|
return ( |
|
"No file uploaded", |
|
"Please upload a file to analyze", |
|
"No file uploaded for analysis" |
|
) |
|
|
|
|
|
logger.info(f"Received file upload of type: {type(file_obj)}") |
|
|
|
try: |
|
|
|
if isinstance(file_obj, bytes): |
|
content_start = file_obj[:20] |
|
|
|
|
|
file_ext = ".bin" |
|
|
|
|
|
if content_start.startswith(b'%PDF'): |
|
file_ext = ".pdf" |
|
|
|
elif content_start.startswith(b'\xff\xd8'): |
|
file_ext = ".jpg" |
|
elif content_start.startswith(b'\x89PNG'): |
|
file_ext = ".png" |
|
elif content_start.startswith(b'GIF'): |
|
file_ext = ".gif" |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as temp_file: |
|
temp_file_path = temp_file.name |
|
|
|
temp_file.write(file_obj) |
|
logger.info(f"Saved uploaded file to {temp_file_path}") |
|
else: |
|
|
|
logger.error(f"Unexpected file object type: {type(file_obj)}") |
|
return ( |
|
"File upload error", |
|
"Unexpected file format", |
|
"Unable to process this file format" |
|
) |
|
|
|
|
|
ocr_processor = OCRProcessor() |
|
logger.info(f"Starting OCR processing for file: {temp_file_path}") |
|
ocr_result = ocr_processor.process_file(temp_file_path) |
|
|
|
if not ocr_result["success"]: |
|
logger.error(f"OCR processing failed: {ocr_result['error']}") |
|
return ( |
|
"OCR Processing Error", |
|
ocr_result["error"], |
|
"Failed to extract text from the uploaded file" |
|
) |
|
|
|
|
|
extracted_text = ocr_result["text"] |
|
logger.info(f"OCR processing complete. Extracted {len(extracted_text.split())} words") |
|
|
|
|
|
if not extracted_text.strip(): |
|
logger.warning("No text extracted from file") |
|
return ( |
|
"No text extracted", |
|
"The OCR process did not extract any text from the uploaded file.", |
|
"No text was found in the uploaded file" |
|
) |
|
|
|
|
|
logger.info("Proceeding with text analysis") |
|
return analyze_text(extracted_text, mode, classifier_to_use) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in file upload processing: {str(e)}") |
|
return ( |
|
"Error Processing File", |
|
f"An error occurred while processing the file: {str(e)}", |
|
"File processing error. Please try again or try a different file." |
|
) |
|
finally: |
|
|
|
if 'temp_file_path' in locals() and os.path.exists(temp_file_path): |
|
try: |
|
os.remove(temp_file_path) |
|
logger.info(f"Removed temporary file: {temp_file_path}") |
|
except Exception as e: |
|
logger.warning(f"Could not remove temporary file: {str(e)}") |
|
|
|
def initialize_excel_log(): |
|
"""Initialize the Excel log file if it doesn't exist.""" |
|
if not os.path.exists(EXCEL_LOG_PATH): |
|
wb = Workbook() |
|
ws = wb.active |
|
ws.title = "Prediction Logs" |
|
|
|
|
|
headers = ["timestamp", "word_count", "prediction", "confidence", |
|
"execution_time_ms", "analysis_mode", "full_text"] |
|
|
|
for col_num, header in enumerate(headers, 1): |
|
ws.cell(row=1, column=col_num, value=header) |
|
|
|
|
|
ws.column_dimensions[get_column_letter(1)].width = 20 |
|
ws.column_dimensions[get_column_letter(2)].width = 10 |
|
ws.column_dimensions[get_column_letter(3)].width = 10 |
|
ws.column_dimensions[get_column_letter(4)].width = 10 |
|
ws.column_dimensions[get_column_letter(5)].width = 15 |
|
ws.column_dimensions[get_column_letter(6)].width = 15 |
|
ws.column_dimensions[get_column_letter(7)].width = 100 |
|
|
|
|
|
wb.save(EXCEL_LOG_PATH) |
|
logger.info(f"Initialized Excel log file at {EXCEL_LOG_PATH}") |
|
|
|
|
|
def log_prediction_data(input_text, word_count, prediction, confidence, execution_time, mode): |
|
"""Log prediction data to an Excel file in the /tmp directory.""" |
|
|
|
if not os.path.exists(EXCEL_LOG_PATH): |
|
initialize_excel_log() |
|
|
|
try: |
|
|
|
wb = openpyxl.load_workbook(EXCEL_LOG_PATH) |
|
ws = wb.active |
|
|
|
|
|
next_row = ws.max_row + 1 |
|
|
|
|
|
cleaned_text = input_text.replace("\n", " ") |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
row_data = [ |
|
timestamp, |
|
word_count, |
|
prediction, |
|
f"{confidence:.2f}", |
|
f"{execution_time:.2f}", |
|
mode, |
|
cleaned_text |
|
] |
|
|
|
|
|
for col_num, value in enumerate(row_data, 1): |
|
ws.cell(row=next_row, column=col_num, value=value) |
|
|
|
|
|
wb.save(EXCEL_LOG_PATH) |
|
logger.info(f"Successfully logged prediction data to {EXCEL_LOG_PATH}") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Error logging prediction data to Excel: {str(e)}") |
|
return False |
|
|
|
|
|
def get_logs_as_base64(): |
|
"""Read the Excel logs file and return as base64 for downloading.""" |
|
if not os.path.exists(EXCEL_LOG_PATH): |
|
return None |
|
|
|
try: |
|
|
|
with open(EXCEL_LOG_PATH, "rb") as f: |
|
file_data = f.read() |
|
|
|
|
|
base64_data = base64.b64encode(file_data).decode('utf-8') |
|
return base64_data |
|
|
|
except Exception as e: |
|
logger.error(f"Error reading Excel logs: {str(e)}") |
|
return None |
|
|
|
|
|
def analyze_text(text: str, mode: str, classifier: TextClassifier) -> tuple: |
|
"""Analyze text using specified mode and return formatted results.""" |
|
|
|
if is_admin_password(text.strip()): |
|
|
|
base64_data = get_logs_as_base64() |
|
logs_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
logs_filename = f"prediction_logs_{logs_timestamp}.xlsx" |
|
|
|
if base64_data: |
|
|
|
html_content = f""" |
|
<div style="background-color: #e6f7ff; padding: 15px; border-radius: 5px;"> |
|
<h3>Admin Access Granted - Prediction Logs</h3> |
|
<p>Logs retrieved at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p> |
|
<p>Excel file contains all prediction data with full text of all submissions.</p> |
|
<a href="data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,{base64_data}" |
|
download="{logs_filename}" |
|
style="display: inline-block; margin-top: 10px; padding: 10px 15px; |
|
background-color: #4CAF50; color: white; text-decoration: none; |
|
border-radius: 4px;"> |
|
Download Excel Logs |
|
</a> |
|
</div> |
|
""" |
|
else: |
|
html_content = """ |
|
<div style="background-color: #ffe6e6; padding: 15px; border-radius: 5px;"> |
|
<h3>Admin Access Granted - No Logs Found</h3> |
|
<p>No prediction logs were found or there was an error reading the logs file.</p> |
|
</div> |
|
""" |
|
|
|
|
|
return ( |
|
html_content, |
|
f"Admin access granted. Logs retrieved at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", |
|
f"ADMIN MODE\nLogs available for download\nFile: {EXCEL_LOG_PATH}" |
|
) |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
word_count = len(text.split()) |
|
|
|
|
|
original_mode = mode |
|
if word_count < 200 and mode == "detailed": |
|
mode = "quick" |
|
|
|
if mode == "quick": |
|
result = classifier.quick_scan(text) |
|
|
|
quick_analysis = f""" |
|
PREDICTION: {result['prediction'].upper()} |
|
Confidence: {result['confidence']*100:.1f}% |
|
Windows analyzed: {result['num_windows']} |
|
""" |
|
|
|
|
|
if original_mode == "detailed": |
|
quick_analysis += f"\n\nNote: Switched to quick mode because text contains only {word_count} words. Minimum 200 words required for detailed analysis." |
|
|
|
|
|
execution_time = (time.time() - start_time) * 1000 |
|
|
|
|
|
log_prediction_data( |
|
input_text=text, |
|
word_count=word_count, |
|
prediction=result['prediction'], |
|
confidence=result['confidence'], |
|
execution_time=execution_time, |
|
mode=original_mode |
|
) |
|
|
|
return ( |
|
text, |
|
"Quick scan mode - no sentence-level analysis available", |
|
quick_analysis |
|
) |
|
else: |
|
analysis = classifier.detailed_scan(text) |
|
|
|
detailed_analysis = [] |
|
for pred in analysis['sentence_predictions']: |
|
confidence = pred['confidence'] * 100 |
|
detailed_analysis.append(f"Sentence: {pred['sentence']}") |
|
detailed_analysis.append(f"Prediction: {pred['prediction'].upper()}") |
|
detailed_analysis.append(f"Confidence: {confidence:.1f}%") |
|
detailed_analysis.append("-" * 50) |
|
|
|
final_pred = analysis['overall_prediction'] |
|
overall_result = f""" |
|
FINAL PREDICTION: {final_pred['prediction'].upper()} |
|
Overall confidence: {final_pred['confidence']*100:.1f}% |
|
Number of sentences analyzed: {final_pred['num_sentences']} |
|
""" |
|
|
|
|
|
execution_time = (time.time() - start_time) * 1000 |
|
|
|
|
|
log_prediction_data( |
|
input_text=text, |
|
word_count=word_count, |
|
prediction=final_pred['prediction'], |
|
confidence=final_pred['confidence'], |
|
execution_time=execution_time, |
|
mode=original_mode |
|
) |
|
|
|
return ( |
|
analysis['highlighted_text'], |
|
"\n".join(detailed_analysis), |
|
overall_result |
|
) |
|
|
|
|
|
classifier = TextClassifier() |
|
|
|
|
|
def create_interface(): |
|
|
|
css = """ |
|
#analyze-btn { |
|
background-color: #FF8C00 !important; |
|
border-color: #FF8C00 !important; |
|
color: white !important; |
|
} |
|
|
|
/* Style the file upload to be more compact */ |
|
.file-upload { |
|
width: 150px !important; |
|
margin-left: 15px !important; |
|
} |
|
|
|
/* Hide file preview elements */ |
|
.file-upload .file-preview, |
|
.file-upload p:not(.file-upload p:first-child), |
|
.file-upload svg, |
|
.file-upload [data-testid="chunkFileDropArea"], |
|
.file-upload .file-drop { |
|
display: none !important; |
|
} |
|
|
|
/* Style the upload button */ |
|
.file-upload button { |
|
height: 40px !important; |
|
width: 100% !important; |
|
background-color: #f0f0f0 !important; |
|
border: 1px solid #d9d9d9 !important; |
|
border-radius: 4px !important; |
|
color: #333 !important; |
|
font-size: 14px !important; |
|
display: flex !important; |
|
align-items: center !important; |
|
justify-content: center !important; |
|
margin: 0 !important; |
|
padding: 0 !important; |
|
} |
|
|
|
/* Hide the "or" text */ |
|
.file-upload .or { |
|
display: none !important; |
|
} |
|
|
|
/* Make the container compact */ |
|
.file-upload [data-testid="block"] { |
|
margin: 0 !important; |
|
padding: 0 !important; |
|
} |
|
""" |
|
|
|
with gr.Blocks(css=css, title="AI Text Detector") as demo: |
|
gr.Markdown("# AI Text Detector") |
|
gr.Markdown("Analyze text to detect if it was written by a human or AI. Choose between quick scan and detailed sentence-level analysis. 200+ words suggested for accurate predictions.") |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
|
|
text_input = gr.Textbox( |
|
lines=8, |
|
placeholder="Enter text to analyze...", |
|
label="Input Text" |
|
) |
|
|
|
|
|
gr.Markdown("Analysis Mode") |
|
gr.Markdown("Quick mode for faster analysis. Detailed mode for sentence-level analysis.") |
|
|
|
|
|
with gr.Row(): |
|
mode_selection = gr.Radio( |
|
choices=["quick", "detailed"], |
|
value="quick", |
|
label="", |
|
show_label=False |
|
) |
|
|
|
|
|
file_upload = gr.File( |
|
file_types=["image", "pdf", "doc", "docx"], |
|
elem_classes=["file-upload"] |
|
) |
|
|
|
|
|
analyze_btn = gr.Button("Analyze Text", elem_id="analyze-btn") |
|
|
|
|
|
with gr.Column(scale=1): |
|
output_html = gr.HTML(label="Highlighted Analysis") |
|
output_sentences = gr.Textbox(label="Sentence-by-Sentence Analysis", lines=10) |
|
output_result = gr.Textbox(label="Overall Result", lines=4) |
|
|
|
|
|
analyze_btn.click( |
|
fn=lambda text, mode: analyze_text(text, mode, classifier), |
|
inputs=[text_input, mode_selection], |
|
outputs=[output_html, output_sentences, output_result] |
|
) |
|
|
|
|
|
file_upload.change( |
|
fn=handle_file_upload_and_analyze, |
|
inputs=[file_upload, mode_selection], |
|
outputs=[output_html, output_sentences, output_result] |
|
) |
|
|
|
return demo |
|
|
|
|
|
def setup_app(): |
|
demo = create_interface() |
|
|
|
|
|
app = demo.app |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["GET", "POST", "OPTIONS"], |
|
allow_headers=["*"], |
|
) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = setup_app() |
|
|
|
|
|
demo.queue() |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=True |
|
) |