ApsidalSolid4 commited on
Commit
7b9d8b2
·
verified ·
1 Parent(s): 1351283

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +206 -121
app.py CHANGED
@@ -1,31 +1,34 @@
1
  import torch
2
- import numpy as np
3
- from transformers import AutoTokenizer, AutoModelForSequenceClassification
4
  import torch.nn.functional as F
 
5
  import spacy
6
  from typing import List, Dict
7
  import logging
8
  import os
9
- import gradio as gr
 
 
 
10
 
11
  # Configure logging
12
  logging.basicConfig(level=logging.INFO)
13
  logger = logging.getLogger(__name__)
14
 
15
- # Constants
16
  MAX_LENGTH = 512
17
  MODEL_NAME = "microsoft/deberta-v3-small"
18
  WINDOW_SIZE = 17
19
  WINDOW_OVERLAP = 2
20
  CONFIDENCE_THRESHOLD = 0.65
 
21
 
22
- class TextWindowProcessor:
23
  def __init__(self):
24
  try:
25
  self.nlp = spacy.load("en_core_web_sm")
26
  except OSError:
27
  logger.info("Downloading spacy model...")
28
- spacy.cli.download("en_core_web_sm")
29
  self.nlp = spacy.load("en_core_web_sm")
30
 
31
  if 'sentencizer' not in self.nlp.pipe_names:
@@ -38,16 +41,29 @@ class TextWindowProcessor:
38
  doc = self.nlp(text)
39
  return [str(sent).strip() for sent in doc.sents]
40
 
 
 
 
 
 
 
 
 
 
 
 
41
  def create_centered_windows(self, sentences: List[str], window_size: int) -> tuple[List[str], List[List[int]]]:
42
  """Create windows centered around each sentence for detailed analysis."""
43
  windows = []
44
  window_sentence_indices = []
45
 
46
  for i in range(len(sentences)):
 
47
  half_window = window_size // 2
48
  start_idx = max(0, i - half_window)
49
  end_idx = min(len(sentences), i + half_window + 1)
50
 
 
51
  if start_idx == 0:
52
  end_idx = min(len(sentences), window_size)
53
  elif end_idx == len(sentences):
@@ -59,60 +75,109 @@ class TextWindowProcessor:
59
 
60
  return windows, window_sentence_indices
61
 
62
- class TextClassifier:
63
  def __init__(self):
64
  self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
65
- self.model_name = MODEL_NAME
66
  self.tokenizer = None
67
  self.model = None
68
- self.processor = TextWindowProcessor()
69
- self.initialize_model()
70
-
71
- def initialize_model(self):
72
- """Initialize the model and tokenizer."""
73
- logger.info("Initializing model and tokenizer...")
74
-
75
- from transformers import DebertaV2TokenizerFast
76
-
77
- # Try to load tokenizer directly from the Hub
78
- self.tokenizer = DebertaV2TokenizerFast.from_pretrained(
79
- self.model_name,
80
- model_max_length=MAX_LENGTH,
81
- use_fast=False,
82
- from_slow=True
83
- )
84
-
85
- # Initialize the model as before
86
  self.model = AutoModelForSequenceClassification.from_pretrained(
87
- self.model_name,
88
  num_labels=2
89
  ).to(self.device)
90
 
91
- # Your existing model loading code
92
- model_path = "model_20250209_184929_acc1.0000.pt"
93
- if os.path.exists(model_path):
94
- logger.info(f"Loading custom model from {model_path}")
95
  checkpoint = torch.load(model_path, map_location=self.device)
96
  self.model.load_state_dict(checkpoint['model_state_dict'])
97
- else:
98
- logger.warning("Custom model file not found. Using base model.")
99
-
 
 
 
 
 
 
 
 
 
100
  self.model.eval()
 
 
 
 
 
 
 
 
101
 
102
- def predict_with_sentence_scores(self, text: str) -> Dict:
103
- """Predict with sentence-level granularity using overlapping windows."""
104
- if not text.strip():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  return {
106
- 'sentence_predictions': [],
107
- 'highlighted_text': '',
108
- 'full_text': '',
109
- 'overall_prediction': {
110
- 'prediction': 'unknown',
111
- 'confidence': 0.0,
112
- 'num_sentences': 0
113
- }
114
  }
115
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
  sentences = self.processor.split_into_sentences(text)
117
  if not sentences:
118
  return {}
@@ -125,10 +190,9 @@ class TextClassifier:
125
  sentence_scores = {i: {'human_prob': 0.0, 'ai_prob': 0.0} for i in range(len(sentences))}
126
 
127
  # Process windows in batches to save memory
128
- batch_size = 16
129
- for i in range(0, len(windows), batch_size):
130
- batch_windows = windows[i:i + batch_size]
131
- batch_indices = window_sentence_indices[i:i + batch_size]
132
 
133
  inputs = self.tokenizer(
134
  batch_windows,
@@ -142,12 +206,18 @@ class TextClassifier:
142
  outputs = self.model(**inputs)
143
  probs = F.softmax(outputs.logits, dim=-1)
144
 
 
145
  for window_idx, indices in enumerate(batch_indices):
146
  for sent_idx in indices:
147
  sentence_appearances[sent_idx] += 1
148
  sentence_scores[sent_idx]['human_prob'] += probs[window_idx][1].item()
149
  sentence_scores[sent_idx]['ai_prob'] += probs[window_idx][0].item()
150
 
 
 
 
 
 
151
  # Average the scores and create final sentence-level predictions
152
  sentence_predictions = []
153
  for i in range(len(sentences)):
@@ -162,41 +232,46 @@ class TextClassifier:
162
  'confidence': max(human_prob, ai_prob)
163
  })
164
 
165
- # Generate analysis outputs
 
 
166
  return {
167
  'sentence_predictions': sentence_predictions,
168
- 'highlighted_text': self.format_predictions_html(sentence_predictions),
169
  'full_text': text,
170
- 'overall_prediction': self.aggregate_predictions(sentence_predictions)
171
  }
172
 
173
- def format_predictions_html(self, sentence_predictions: List[Dict]) -> str:
174
- """Format predictions as HTML with color-coding."""
175
- html_parts = []
176
-
177
  for pred in sentence_predictions:
178
  sentence = pred['sentence']
179
  confidence = pred['confidence']
180
-
181
  if confidence >= CONFIDENCE_THRESHOLD:
182
  if pred['prediction'] == 'human':
183
- color = "#90EE90" # Light green
184
  else:
185
- color = "#FFB6C6" # Light red
186
  else:
 
187
  if pred['prediction'] == 'human':
188
- color = "#E8F5E9" # Very light green
189
  else:
190
- color = "#FFEBEE" # Very light red
191
-
192
- html_parts.append(f'<span style="background-color: {color};">{sentence}</span>')
193
-
194
- return " ".join(html_parts)
195
 
196
- def aggregate_predictions(self, predictions: List[Dict]) -> Dict:
197
- """Aggregate predictions from multiple sentences into a single prediction."""
 
 
198
  if not predictions:
199
  return {
 
 
200
  'prediction': 'unknown',
201
  'confidence': 0.0,
202
  'num_sentences': 0
@@ -210,63 +285,73 @@ class TextClassifier:
210
  avg_ai_prob = total_ai_prob / num_sentences
211
 
212
  return {
 
 
213
  'prediction': 'human' if avg_human_prob > avg_ai_prob else 'ai',
214
  'confidence': max(avg_human_prob, avg_ai_prob),
215
  'num_sentences': num_sentences
216
  }
217
 
218
- def analyze_text(text: str, classifier: TextClassifier) -> tuple:
219
- """Analyze text and return formatted results for Gradio interface."""
220
- # Get predictions
221
- analysis = classifier.predict_with_sentence_scores(text)
222
-
223
- # Format sentence-by-sentence analysis
224
- detailed_analysis = []
225
- for pred in analysis['sentence_predictions']:
226
- confidence = pred['confidence'] * 100
227
- detailed_analysis.append(f"Sentence: {pred['sentence']}")
228
- detailed_analysis.append(f"Prediction: {pred['prediction'].upper()}")
229
- detailed_analysis.append(f"Confidence: {confidence:.1f}%")
230
- detailed_analysis.append("-" * 50)
231
-
232
- # Format overall prediction
233
- final_pred = analysis['overall_prediction']
234
- overall_result = f"""
235
- FINAL PREDICTION: {final_pred['prediction'].upper()}
236
- Overall confidence: {final_pred['confidence']*100:.1f}%
237
- Number of sentences analyzed: {final_pred['num_sentences']}
238
- """
239
-
240
- return (
241
- analysis['highlighted_text'],
242
- "\n".join(detailed_analysis),
243
- overall_result
244
- )
245
-
246
- # Initialize the classifier globally
247
- classifier = TextClassifier()
248
-
249
- # Create Gradio interface
250
- demo = gr.Interface(
251
- fn=lambda text: analyze_text(text, classifier),
252
- inputs=gr.Textbox(
253
- lines=8,
254
- placeholder="Enter text to analyze...",
255
- label="Input Text"
256
- ),
257
- outputs=[
258
- gr.HTML(label="Highlighted Analysis"),
259
- gr.Textbox(label="Sentence-by-Sentence Analysis", lines=10),
260
- gr.Textbox(label="Overall Result", lines=4)
261
- ],
262
- title="AI Text Detector",
263
- description="Analyze text to detect if it was written by a human or AI. Text is analyzed sentence by sentence, with color coding indicating the prediction confidence.",
264
- examples=[
265
- ["This is a sample text written by a human. It contains multiple sentences with different ideas. The analysis will show how each sentence is classified. This demonstrates the AI detection capabilities."],
266
- ],
267
- allow_flagging="never"
268
- )
269
-
270
- # Launch the interface
 
 
 
 
 
 
 
 
271
  if __name__ == "__main__":
272
- demo.launch(share=True)
 
1
  import torch
 
 
2
  import torch.nn.functional as F
3
+ from transformers import AutoTokenizer, AutoModelForSequenceClassification
4
  import spacy
5
  from typing import List, Dict
6
  import logging
7
  import os
8
+ from colorama import init, Fore, Back, Style
9
+
10
+ # Initialize colorama for colored terminal output
11
+ init()
12
 
13
  # Configure logging
14
  logging.basicConfig(level=logging.INFO)
15
  logger = logging.getLogger(__name__)
16
 
17
+ # Constants - matching original implementations
18
  MAX_LENGTH = 512
19
  MODEL_NAME = "microsoft/deberta-v3-small"
20
  WINDOW_SIZE = 17
21
  WINDOW_OVERLAP = 2
22
  CONFIDENCE_THRESHOLD = 0.65
23
+ BATCH_SIZE = 16 # Matching original batch size
24
 
25
+ class TextProcessor:
26
  def __init__(self):
27
  try:
28
  self.nlp = spacy.load("en_core_web_sm")
29
  except OSError:
30
  logger.info("Downloading spacy model...")
31
+ os.system("python -m spacy download en_core_web_sm")
32
  self.nlp = spacy.load("en_core_web_sm")
33
 
34
  if 'sentencizer' not in self.nlp.pipe_names:
 
41
  doc = self.nlp(text)
42
  return [str(sent).strip() for sent in doc.sents]
43
 
44
+ def create_windows(self, sentences: List[str], window_size: int, overlap: int) -> List[str]:
45
+ if len(sentences) < window_size:
46
+ return [" ".join(sentences)]
47
+
48
+ windows = []
49
+ stride = window_size - overlap
50
+ for i in range(0, len(sentences) - window_size + 1, stride):
51
+ window = sentences[i:i + window_size]
52
+ windows.append(" ".join(window))
53
+ return windows
54
+
55
  def create_centered_windows(self, sentences: List[str], window_size: int) -> tuple[List[str], List[List[int]]]:
56
  """Create windows centered around each sentence for detailed analysis."""
57
  windows = []
58
  window_sentence_indices = []
59
 
60
  for i in range(len(sentences)):
61
+ # Calculate window boundaries centered on current sentence
62
  half_window = window_size // 2
63
  start_idx = max(0, i - half_window)
64
  end_idx = min(len(sentences), i + half_window + 1)
65
 
66
+ # Adjust window if we're near the edges
67
  if start_idx == 0:
68
  end_idx = min(len(sentences), window_size)
69
  elif end_idx == len(sentences):
 
75
 
76
  return windows, window_sentence_indices
77
 
78
+ class AITextDetector:
79
  def __init__(self):
80
  self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
81
+ self.processor = TextProcessor()
82
  self.tokenizer = None
83
  self.model = None
84
+ self._initialize_model()
85
+
86
+ def _initialize_model(self):
87
+ """Initialize model and tokenizer."""
88
+ self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
 
 
 
 
 
 
 
 
 
 
 
 
 
89
  self.model = AutoModelForSequenceClassification.from_pretrained(
90
+ MODEL_NAME,
91
  num_labels=2
92
  ).to(self.device)
93
 
94
+ try:
95
+ model_path = "model_20250209_184929_acc1.0000.pt"
 
 
96
  checkpoint = torch.load(model_path, map_location=self.device)
97
  self.model.load_state_dict(checkpoint['model_state_dict'])
98
+ logger.info(f"Loaded model from {model_path}")
99
+ except Exception as e:
100
+ logger.error(f"Failed to load model: {e}")
101
+ raise
102
+
103
+ def quick_scan(self, text: str) -> Dict:
104
+ """
105
+ Quick scan implementation matching the second original program's predict method.
106
+ """
107
+ if self.model is None or self.tokenizer is None:
108
+ self._initialize_model()
109
+
110
  self.model.eval()
111
+ sentences = self.processor.split_into_sentences(text)
112
+ windows = self.processor.create_windows(sentences, WINDOW_SIZE, WINDOW_OVERLAP)
113
+
114
+ predictions = []
115
+
116
+ # Process windows in batches to save memory
117
+ for i in range(0, len(windows), BATCH_SIZE):
118
+ batch_windows = windows[i:i + BATCH_SIZE]
119
 
120
+ inputs = self.tokenizer(
121
+ batch_windows,
122
+ truncation=True,
123
+ padding=True,
124
+ max_length=MAX_LENGTH,
125
+ return_tensors="pt"
126
+ ).to(self.device)
127
+
128
+ with torch.no_grad():
129
+ outputs = self.model(**inputs)
130
+ probs = F.softmax(outputs.logits, dim=-1)
131
+
132
+ for idx, window in enumerate(batch_windows):
133
+ prediction = {
134
+ 'window': window,
135
+ 'human_prob': probs[idx][1].item(),
136
+ 'ai_prob': probs[idx][0].item(),
137
+ 'prediction': 'human' if probs[idx][1] > probs[idx][0] else 'ai'
138
+ }
139
+ predictions.append(prediction)
140
+
141
+ # Clear memory
142
+ del inputs, outputs, probs
143
+ if torch.cuda.is_available():
144
+ torch.cuda.empty_cache()
145
+
146
+ return self._aggregate_quick_predictions(predictions)
147
+
148
+ def _aggregate_quick_predictions(self, predictions: List[Dict]) -> Dict:
149
+ """
150
+ Aggregate predictions matching the second original program.
151
+ """
152
+ if not predictions:
153
  return {
154
+ 'human_prob': 0.0,
155
+ 'ai_prob': 0.0,
156
+ 'prediction': 'unknown',
157
+ 'confidence': 0.0,
158
+ 'num_windows': 0
 
 
 
159
  }
160
 
161
+ avg_human_prob = sum(p['human_prob'] for p in predictions) / len(predictions)
162
+ avg_ai_prob = sum(p['ai_prob'] for p in predictions) / len(predictions)
163
+
164
+ return {
165
+ 'human_prob': avg_human_prob,
166
+ 'ai_prob': avg_ai_prob,
167
+ 'prediction': 'human' if avg_human_prob > avg_ai_prob else 'ai',
168
+ 'confidence': max(avg_human_prob, avg_ai_prob),
169
+ 'num_windows': len(predictions)
170
+ }
171
+
172
+ def detailed_scan(self, text: str) -> Dict:
173
+ """
174
+ Detailed scan implementation matching the first original program's
175
+ predict_with_sentence_scores method.
176
+ """
177
+ if self.model is None or self.tokenizer is None:
178
+ self._initialize_model()
179
+
180
+ self.model.eval()
181
  sentences = self.processor.split_into_sentences(text)
182
  if not sentences:
183
  return {}
 
190
  sentence_scores = {i: {'human_prob': 0.0, 'ai_prob': 0.0} for i in range(len(sentences))}
191
 
192
  # Process windows in batches to save memory
193
+ for i in range(0, len(windows), BATCH_SIZE):
194
+ batch_windows = windows[i:i + BATCH_SIZE]
195
+ batch_indices = window_sentence_indices[i:i + BATCH_SIZE]
 
196
 
197
  inputs = self.tokenizer(
198
  batch_windows,
 
206
  outputs = self.model(**inputs)
207
  probs = F.softmax(outputs.logits, dim=-1)
208
 
209
+ # Attribute window predictions back to individual sentences
210
  for window_idx, indices in enumerate(batch_indices):
211
  for sent_idx in indices:
212
  sentence_appearances[sent_idx] += 1
213
  sentence_scores[sent_idx]['human_prob'] += probs[window_idx][1].item()
214
  sentence_scores[sent_idx]['ai_prob'] += probs[window_idx][0].item()
215
 
216
+ # Clear memory
217
+ del inputs, outputs, probs
218
+ if torch.cuda.is_available():
219
+ torch.cuda.empty_cache()
220
+
221
  # Average the scores and create final sentence-level predictions
222
  sentence_predictions = []
223
  for i in range(len(sentences)):
 
232
  'confidence': max(human_prob, ai_prob)
233
  })
234
 
235
+ # Generate highlighted text output
236
+ highlighted_text = self._generate_highlighted_text(sentence_predictions)
237
+
238
  return {
239
  'sentence_predictions': sentence_predictions,
240
+ 'highlighted_text': highlighted_text,
241
  'full_text': text,
242
+ 'overall_prediction': self._aggregate_detailed_predictions(sentence_predictions)
243
  }
244
 
245
+ def _generate_highlighted_text(self, sentence_predictions: List[Dict]) -> str:
246
+ """Generate colored text output with highlighting based on predictions."""
247
+ highlighted_parts = []
248
+
249
  for pred in sentence_predictions:
250
  sentence = pred['sentence']
251
  confidence = pred['confidence']
252
+
253
  if confidence >= CONFIDENCE_THRESHOLD:
254
  if pred['prediction'] == 'human':
255
+ highlighted_parts.append(f"{Back.GREEN}{sentence}{Style.RESET_ALL}")
256
  else:
257
+ highlighted_parts.append(f"{Back.RED}{sentence}{Style.RESET_ALL}")
258
  else:
259
+ # Low confidence predictions get a lighter highlight
260
  if pred['prediction'] == 'human':
261
+ highlighted_parts.append(f"{Back.LIGHTGREEN_EX}{sentence}{Style.RESET_ALL}")
262
  else:
263
+ highlighted_parts.append(f"{Back.LIGHTRED_EX}{sentence}{Style.RESET_ALL}")
264
+
265
+ return " ".join(highlighted_parts)
 
 
266
 
267
+ def _aggregate_detailed_predictions(self, predictions: List[Dict]) -> Dict:
268
+ """
269
+ Aggregate predictions matching the first original program.
270
+ """
271
  if not predictions:
272
  return {
273
+ 'human_prob': 0.0,
274
+ 'ai_prob': 0.0,
275
  'prediction': 'unknown',
276
  'confidence': 0.0,
277
  'num_sentences': 0
 
285
  avg_ai_prob = total_ai_prob / num_sentences
286
 
287
  return {
288
+ 'human_prob': avg_human_prob,
289
+ 'ai_prob': avg_ai_prob,
290
  'prediction': 'human' if avg_human_prob > avg_ai_prob else 'ai',
291
  'confidence': max(avg_human_prob, avg_ai_prob),
292
  'num_sentences': num_sentences
293
  }
294
 
295
+ def main():
296
+ try:
297
+ detector = AITextDetector()
298
+
299
+ while True:
300
+ print("\nAI Text Detector")
301
+ print("===============")
302
+ print("1. Quick Scan")
303
+ print("2. Detailed Scan")
304
+ print("3. Exit")
305
+
306
+ choice = input("\nSelect an option (1-3): ").strip()
307
+
308
+ if choice == "3":
309
+ break
310
+
311
+ if choice not in ["1", "2"]:
312
+ print("Invalid choice. Please select 1, 2, or 3.")
313
+ continue
314
+
315
+ text = input("\nEnter text to analyze: ").strip()
316
+
317
+ if choice == "1":
318
+ # Quick scan
319
+ result = detector.quick_scan(text)
320
+ print("\nQuick Scan Results:")
321
+ print("==================")
322
+ print(f"Prediction: {result['prediction'].upper()}")
323
+ print(f"Confidence: {result['confidence']*100:.1f}%")
324
+ print(f"Human Probability: {result['human_prob']*100:.1f}%")
325
+ print(f"AI Probability: {result['ai_prob']*100:.1f}%")
326
+ print(f"Number of windows analyzed: {result['num_windows']}")
327
+
328
+ else:
329
+ # Detailed scan
330
+ result = detector.detailed_scan(text)
331
+ print("\nDetailed Analysis:")
332
+ print("=================")
333
+
334
+ # Print sentence-level predictions
335
+ for pred in result['sentence_predictions']:
336
+ confidence = pred['confidence'] * 100
337
+ print(f"\nSentence: {pred['sentence']}")
338
+ print(f"Prediction: {pred['prediction'].upper()}")
339
+ print(f"Confidence: {confidence:.1f}%")
340
+
341
+ # Print highlighted text
342
+ print("\nHighlighted Text Analysis:")
343
+ print("=========================")
344
+ print(result['highlighted_text'])
345
+
346
+ # Print final prediction
347
+ final_pred = result['overall_prediction']
348
+ print(f"\nFINAL PREDICTION: {final_pred['prediction'].upper()}")
349
+ print(f"Overall confidence: {final_pred['confidence']*100:.1f}%")
350
+ print(f"Number of sentences analyzed: {final_pred['num_sentences']}")
351
+
352
+ except Exception as e:
353
+ logger.error(f"An error occurred: {e}")
354
+ raise
355
+
356
  if __name__ == "__main__":
357
+ main()