ApsidalSolid4 commited on
Commit
ad52429
·
verified ·
1 Parent(s): 395a6e3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +153 -149
app.py CHANGED
@@ -1,34 +1,32 @@
1
  import torch
2
- import torch.nn.functional as F
3
  from transformers import AutoTokenizer, AutoModelForSequenceClassification
 
4
  import spacy
5
  from typing import List, Dict
6
  import logging
7
  import os
8
- from colorama import init, Fore, Back, Style
9
-
10
- # Initialize colorama for colored terminal output
11
- init()
12
 
13
  # Configure logging
14
  logging.basicConfig(level=logging.INFO)
15
  logger = logging.getLogger(__name__)
16
 
17
- # Constants - matching original implementations
18
  MAX_LENGTH = 512
19
  MODEL_NAME = "microsoft/deberta-v3-small"
20
  WINDOW_SIZE = 17
21
  WINDOW_OVERLAP = 2
22
  CONFIDENCE_THRESHOLD = 0.65
23
- BATCH_SIZE = 16 # Matching original batch size
24
 
25
- class TextProcessor:
26
  def __init__(self):
27
  try:
28
  self.nlp = spacy.load("en_core_web_sm")
29
  except OSError:
30
  logger.info("Downloading spacy model...")
31
- os.system("python -m spacy download en_core_web_sm")
32
  self.nlp = spacy.load("en_core_web_sm")
33
 
34
  if 'sentencizer' not in self.nlp.pipe_names:
@@ -42,6 +40,7 @@ class TextProcessor:
42
  return [str(sent).strip() for sent in doc.sents]
43
 
44
  def create_windows(self, sentences: List[str], window_size: int, overlap: int) -> List[str]:
 
45
  if len(sentences) < window_size:
46
  return [" ".join(sentences)]
47
 
@@ -53,17 +52,15 @@ class TextProcessor:
53
  return windows
54
 
55
  def create_centered_windows(self, sentences: List[str], window_size: int) -> tuple[List[str], List[List[int]]]:
56
- """Create windows centered around each sentence for detailed analysis."""
57
  windows = []
58
  window_sentence_indices = []
59
 
60
  for i in range(len(sentences)):
61
- # Calculate window boundaries centered on current sentence
62
  half_window = window_size // 2
63
  start_idx = max(0, i - half_window)
64
  end_idx = min(len(sentences), i + half_window + 1)
65
 
66
- # Adjust window if we're near the edges
67
  if start_idx == 0:
68
  end_idx = min(len(sentences), window_size)
69
  elif end_idx == len(sentences):
@@ -75,45 +72,58 @@ class TextProcessor:
75
 
76
  return windows, window_sentence_indices
77
 
78
- class AITextDetector:
79
  def __init__(self):
80
  self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
81
- self.processor = TextProcessor()
82
  self.tokenizer = None
83
  self.model = None
84
- self._initialize_model()
85
-
86
- def _initialize_model(self):
87
- """Initialize model and tokenizer."""
88
- self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
 
 
 
 
 
 
 
 
 
 
 
89
  self.model = AutoModelForSequenceClassification.from_pretrained(
90
- MODEL_NAME,
91
  num_labels=2
92
  ).to(self.device)
93
 
94
- try:
95
- model_path = "model_20250209_184929_acc1.0000.pt"
 
96
  checkpoint = torch.load(model_path, map_location=self.device)
97
  self.model.load_state_dict(checkpoint['model_state_dict'])
98
- logger.info(f"Loaded model from {model_path}")
99
- except Exception as e:
100
- logger.error(f"Failed to load model: {e}")
101
- raise
102
 
103
  def quick_scan(self, text: str) -> Dict:
104
- """
105
- Quick scan implementation matching the second original program's predict method.
106
- """
107
- if self.model is None or self.tokenizer is None:
108
- self._initialize_model()
 
 
109
 
110
- self.model.eval()
111
  sentences = self.processor.split_into_sentences(text)
112
  windows = self.processor.create_windows(sentences, WINDOW_SIZE, WINDOW_OVERLAP)
113
 
114
  predictions = []
115
-
116
- # Process windows in batches to save memory
117
  for i in range(0, len(windows), BATCH_SIZE):
118
  batch_windows = windows[i:i + BATCH_SIZE]
119
 
@@ -138,21 +148,9 @@ class AITextDetector:
138
  }
139
  predictions.append(prediction)
140
 
141
- # Clear memory
142
- del inputs, outputs, probs
143
- if torch.cuda.is_available():
144
- torch.cuda.empty_cache()
145
-
146
- return self._aggregate_quick_predictions(predictions)
147
-
148
- def _aggregate_quick_predictions(self, predictions: List[Dict]) -> Dict:
149
- """
150
- Aggregate predictions matching the second original program.
151
- """
152
  if not predictions:
153
  return {
154
- 'human_prob': 0.0,
155
- 'ai_prob': 0.0,
156
  'prediction': 'unknown',
157
  'confidence': 0.0,
158
  'num_windows': 0
@@ -162,22 +160,25 @@ class AITextDetector:
162
  avg_ai_prob = sum(p['ai_prob'] for p in predictions) / len(predictions)
163
 
164
  return {
165
- 'human_prob': avg_human_prob,
166
- 'ai_prob': avg_ai_prob,
167
  'prediction': 'human' if avg_human_prob > avg_ai_prob else 'ai',
168
  'confidence': max(avg_human_prob, avg_ai_prob),
169
  'num_windows': len(predictions)
170
  }
171
 
172
  def detailed_scan(self, text: str) -> Dict:
173
- """
174
- Detailed scan implementation matching the first original program's
175
- predict_with_sentence_scores method.
176
- """
177
- if self.model is None or self.tokenizer is None:
178
- self._initialize_model()
 
 
 
 
 
 
179
 
180
- self.model.eval()
181
  sentences = self.processor.split_into_sentences(text)
182
  if not sentences:
183
  return {}
@@ -189,7 +190,7 @@ class AITextDetector:
189
  sentence_appearances = {i: 0 for i in range(len(sentences))}
190
  sentence_scores = {i: {'human_prob': 0.0, 'ai_prob': 0.0} for i in range(len(sentences))}
191
 
192
- # Process windows in batches to save memory
193
  for i in range(0, len(windows), BATCH_SIZE):
194
  batch_windows = windows[i:i + BATCH_SIZE]
195
  batch_indices = window_sentence_indices[i:i + BATCH_SIZE]
@@ -206,18 +207,12 @@ class AITextDetector:
206
  outputs = self.model(**inputs)
207
  probs = F.softmax(outputs.logits, dim=-1)
208
 
209
- # Attribute window predictions back to individual sentences
210
  for window_idx, indices in enumerate(batch_indices):
211
  for sent_idx in indices:
212
  sentence_appearances[sent_idx] += 1
213
  sentence_scores[sent_idx]['human_prob'] += probs[window_idx][1].item()
214
  sentence_scores[sent_idx]['ai_prob'] += probs[window_idx][0].item()
215
 
216
- # Clear memory
217
- del inputs, outputs, probs
218
- if torch.cuda.is_available():
219
- torch.cuda.empty_cache()
220
-
221
  # Average the scores and create final sentence-level predictions
222
  sentence_predictions = []
223
  for i in range(len(sentences)):
@@ -232,46 +227,40 @@ class AITextDetector:
232
  'confidence': max(human_prob, ai_prob)
233
  })
234
 
235
- # Generate highlighted text output
236
- highlighted_text = self._generate_highlighted_text(sentence_predictions)
237
-
238
  return {
239
  'sentence_predictions': sentence_predictions,
240
- 'highlighted_text': highlighted_text,
241
  'full_text': text,
242
- 'overall_prediction': self._aggregate_detailed_predictions(sentence_predictions)
243
  }
244
 
245
- def _generate_highlighted_text(self, sentence_predictions: List[Dict]) -> str:
246
- """Generate colored text output with highlighting based on predictions."""
247
- highlighted_parts = []
248
-
249
  for pred in sentence_predictions:
250
  sentence = pred['sentence']
251
  confidence = pred['confidence']
252
-
253
  if confidence >= CONFIDENCE_THRESHOLD:
254
  if pred['prediction'] == 'human':
255
- highlighted_parts.append(f"{Back.GREEN}{sentence}{Style.RESET_ALL}")
256
  else:
257
- highlighted_parts.append(f"{Back.RED}{sentence}{Style.RESET_ALL}")
258
  else:
259
- # Low confidence predictions get a lighter highlight
260
  if pred['prediction'] == 'human':
261
- highlighted_parts.append(f"{Back.LIGHTGREEN_EX}{sentence}{Style.RESET_ALL}")
262
  else:
263
- highlighted_parts.append(f"{Back.LIGHTRED_EX}{sentence}{Style.RESET_ALL}")
264
-
265
- return " ".join(highlighted_parts)
 
 
266
 
267
- def _aggregate_detailed_predictions(self, predictions: List[Dict]) -> Dict:
268
- """
269
- Aggregate predictions matching the first original program.
270
- """
271
  if not predictions:
272
  return {
273
- 'human_prob': 0.0,
274
- 'ai_prob': 0.0,
275
  'prediction': 'unknown',
276
  'confidence': 0.0,
277
  'num_sentences': 0
@@ -285,73 +274,88 @@ class AITextDetector:
285
  avg_ai_prob = total_ai_prob / num_sentences
286
 
287
  return {
288
- 'human_prob': avg_human_prob,
289
- 'ai_prob': avg_ai_prob,
290
  'prediction': 'human' if avg_human_prob > avg_ai_prob else 'ai',
291
  'confidence': max(avg_human_prob, avg_ai_prob),
292
  'num_sentences': num_sentences
293
  }
294
 
295
- def main():
296
- try:
297
- detector = AITextDetector()
 
 
298
 
299
- while True:
300
- print("\nAI Text Detector")
301
- print("===============")
302
- print("1. Quick Scan")
303
- print("2. Detailed Scan")
304
- print("3. Exit")
305
-
306
- choice = input("\nSelect an option (1-3): ").strip()
307
-
308
- if choice == "3":
309
- break
310
-
311
- if choice not in ["1", "2"]:
312
- print("Invalid choice. Please select 1, 2, or 3.")
313
- continue
314
-
315
- text = input("\nEnter text to analyze: ").strip()
316
-
317
- if choice == "1":
318
- # Quick scan
319
- result = detector.quick_scan(text)
320
- print("\nQuick Scan Results:")
321
- print("==================")
322
- print(f"Prediction: {result['prediction'].upper()}")
323
- print(f"Confidence: {result['confidence']*100:.1f}%")
324
- print(f"Human Probability: {result['human_prob']*100:.1f}%")
325
- print(f"AI Probability: {result['ai_prob']*100:.1f}%")
326
- print(f"Number of windows analyzed: {result['num_windows']}")
327
-
328
- else:
329
- # Detailed scan
330
- result = detector.detailed_scan(text)
331
- print("\nDetailed Analysis:")
332
- print("=================")
333
-
334
- # Print sentence-level predictions
335
- for pred in result['sentence_predictions']:
336
- confidence = pred['confidence'] * 100
337
- print(f"\nSentence: {pred['sentence']}")
338
- print(f"Prediction: {pred['prediction'].upper()}")
339
- print(f"Confidence: {confidence:.1f}%")
340
-
341
- # Print highlighted text
342
- print("\nHighlighted Text Analysis:")
343
- print("=========================")
344
- print(result['highlighted_text'])
345
-
346
- # Print final prediction
347
- final_pred = result['overall_prediction']
348
- print(f"\nFINAL PREDICTION: {final_pred['prediction'].upper()}")
349
- print(f"Overall confidence: {final_pred['confidence']*100:.1f}%")
350
- print(f"Number of sentences analyzed: {final_pred['num_sentences']}")
351
-
352
- except Exception as e:
353
- logger.error(f"An error occurred: {e}")
354
- raise
355
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
356
  if __name__ == "__main__":
357
- main()
 
1
  import torch
2
+ import numpy as np
3
  from transformers import AutoTokenizer, AutoModelForSequenceClassification
4
+ import torch.nn.functional as F
5
  import spacy
6
  from typing import List, Dict
7
  import logging
8
  import os
9
+ import gradio as gr
 
 
 
10
 
11
  # Configure logging
12
  logging.basicConfig(level=logging.INFO)
13
  logger = logging.getLogger(__name__)
14
 
15
+ # Constants
16
  MAX_LENGTH = 512
17
  MODEL_NAME = "microsoft/deberta-v3-small"
18
  WINDOW_SIZE = 17
19
  WINDOW_OVERLAP = 2
20
  CONFIDENCE_THRESHOLD = 0.65
21
+ BATCH_SIZE = 16
22
 
23
+ class TextWindowProcessor:
24
  def __init__(self):
25
  try:
26
  self.nlp = spacy.load("en_core_web_sm")
27
  except OSError:
28
  logger.info("Downloading spacy model...")
29
+ spacy.cli.download("en_core_web_sm")
30
  self.nlp = spacy.load("en_core_web_sm")
31
 
32
  if 'sentencizer' not in self.nlp.pipe_names:
 
40
  return [str(sent).strip() for sent in doc.sents]
41
 
42
  def create_windows(self, sentences: List[str], window_size: int, overlap: int) -> List[str]:
43
+ """Create overlapping windows for quick scan mode."""
44
  if len(sentences) < window_size:
45
  return [" ".join(sentences)]
46
 
 
52
  return windows
53
 
54
  def create_centered_windows(self, sentences: List[str], window_size: int) -> tuple[List[str], List[List[int]]]:
55
+ """Create centered windows for detailed analysis mode."""
56
  windows = []
57
  window_sentence_indices = []
58
 
59
  for i in range(len(sentences)):
 
60
  half_window = window_size // 2
61
  start_idx = max(0, i - half_window)
62
  end_idx = min(len(sentences), i + half_window + 1)
63
 
 
64
  if start_idx == 0:
65
  end_idx = min(len(sentences), window_size)
66
  elif end_idx == len(sentences):
 
72
 
73
  return windows, window_sentence_indices
74
 
75
+ class TextClassifier:
76
  def __init__(self):
77
  self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
78
+ self.model_name = MODEL_NAME
79
  self.tokenizer = None
80
  self.model = None
81
+ self.processor = TextWindowProcessor()
82
+ self.initialize_model()
83
+
84
+ def initialize_model(self):
85
+ """Initialize the model and tokenizer."""
86
+ logger.info("Initializing model and tokenizer...")
87
+
88
+ from transformers import DebertaV2TokenizerFast
89
+
90
+ self.tokenizer = DebertaV2TokenizerFast.from_pretrained(
91
+ self.model_name,
92
+ model_max_length=MAX_LENGTH,
93
+ use_fast=False,
94
+ from_slow=True
95
+ )
96
+
97
  self.model = AutoModelForSequenceClassification.from_pretrained(
98
+ self.model_name,
99
  num_labels=2
100
  ).to(self.device)
101
 
102
+ model_path = "model.pt"
103
+ if os.path.exists(model_path):
104
+ logger.info(f"Loading custom model from {model_path}")
105
  checkpoint = torch.load(model_path, map_location=self.device)
106
  self.model.load_state_dict(checkpoint['model_state_dict'])
107
+ else:
108
+ logger.warning("Custom model file not found. Using base model.")
109
+
110
+ self.model.eval()
111
 
112
  def quick_scan(self, text: str) -> Dict:
113
+ """Perform a quick scan using simple window analysis."""
114
+ if not text.strip():
115
+ return {
116
+ 'prediction': 'unknown',
117
+ 'confidence': 0.0,
118
+ 'num_windows': 0
119
+ }
120
 
 
121
  sentences = self.processor.split_into_sentences(text)
122
  windows = self.processor.create_windows(sentences, WINDOW_SIZE, WINDOW_OVERLAP)
123
 
124
  predictions = []
125
+
126
+ # Process windows in batches
127
  for i in range(0, len(windows), BATCH_SIZE):
128
  batch_windows = windows[i:i + BATCH_SIZE]
129
 
 
148
  }
149
  predictions.append(prediction)
150
 
151
+ # Calculate aggregate prediction
 
 
 
 
 
 
 
 
 
 
152
  if not predictions:
153
  return {
 
 
154
  'prediction': 'unknown',
155
  'confidence': 0.0,
156
  'num_windows': 0
 
160
  avg_ai_prob = sum(p['ai_prob'] for p in predictions) / len(predictions)
161
 
162
  return {
 
 
163
  'prediction': 'human' if avg_human_prob > avg_ai_prob else 'ai',
164
  'confidence': max(avg_human_prob, avg_ai_prob),
165
  'num_windows': len(predictions)
166
  }
167
 
168
  def detailed_scan(self, text: str) -> Dict:
169
+ """Perform a detailed scan with sentence-level analysis."""
170
+ if not text.strip():
171
+ return {
172
+ 'sentence_predictions': [],
173
+ 'highlighted_text': '',
174
+ 'full_text': '',
175
+ 'overall_prediction': {
176
+ 'prediction': 'unknown',
177
+ 'confidence': 0.0,
178
+ 'num_sentences': 0
179
+ }
180
+ }
181
 
 
182
  sentences = self.processor.split_into_sentences(text)
183
  if not sentences:
184
  return {}
 
190
  sentence_appearances = {i: 0 for i in range(len(sentences))}
191
  sentence_scores = {i: {'human_prob': 0.0, 'ai_prob': 0.0} for i in range(len(sentences))}
192
 
193
+ # Process windows in batches
194
  for i in range(0, len(windows), BATCH_SIZE):
195
  batch_windows = windows[i:i + BATCH_SIZE]
196
  batch_indices = window_sentence_indices[i:i + BATCH_SIZE]
 
207
  outputs = self.model(**inputs)
208
  probs = F.softmax(outputs.logits, dim=-1)
209
 
 
210
  for window_idx, indices in enumerate(batch_indices):
211
  for sent_idx in indices:
212
  sentence_appearances[sent_idx] += 1
213
  sentence_scores[sent_idx]['human_prob'] += probs[window_idx][1].item()
214
  sentence_scores[sent_idx]['ai_prob'] += probs[window_idx][0].item()
215
 
 
 
 
 
 
216
  # Average the scores and create final sentence-level predictions
217
  sentence_predictions = []
218
  for i in range(len(sentences)):
 
227
  'confidence': max(human_prob, ai_prob)
228
  })
229
 
 
 
 
230
  return {
231
  'sentence_predictions': sentence_predictions,
232
+ 'highlighted_text': self.format_predictions_html(sentence_predictions),
233
  'full_text': text,
234
+ 'overall_prediction': self.aggregate_predictions(sentence_predictions)
235
  }
236
 
237
+ def format_predictions_html(self, sentence_predictions: List[Dict]) -> str:
238
+ """Format predictions as HTML with color-coding."""
239
+ html_parts = []
240
+
241
  for pred in sentence_predictions:
242
  sentence = pred['sentence']
243
  confidence = pred['confidence']
244
+
245
  if confidence >= CONFIDENCE_THRESHOLD:
246
  if pred['prediction'] == 'human':
247
+ color = "#90EE90" # Light green
248
  else:
249
+ color = "#FFB6C6" # Light red
250
  else:
 
251
  if pred['prediction'] == 'human':
252
+ color = "#E8F5E9" # Very light green
253
  else:
254
+ color = "#FFEBEE" # Very light red
255
+
256
+ html_parts.append(f'<span style="background-color: {color};">{sentence}</span>')
257
+
258
+ return " ".join(html_parts)
259
 
260
+ def aggregate_predictions(self, predictions: List[Dict]) -> Dict:
261
+ """Aggregate predictions from multiple sentences into a single prediction."""
 
 
262
  if not predictions:
263
  return {
 
 
264
  'prediction': 'unknown',
265
  'confidence': 0.0,
266
  'num_sentences': 0
 
274
  avg_ai_prob = total_ai_prob / num_sentences
275
 
276
  return {
 
 
277
  'prediction': 'human' if avg_human_prob > avg_ai_prob else 'ai',
278
  'confidence': max(avg_human_prob, avg_ai_prob),
279
  'num_sentences': num_sentences
280
  }
281
 
282
+ def analyze_text(text: str, mode: str, classifier: TextClassifier) -> tuple:
283
+ """Analyze text using specified mode and return formatted results."""
284
+ if mode == "quick":
285
+ # Quick scan
286
+ result = classifier.quick_scan(text)
287
 
288
+ quick_analysis = f"""
289
+ PREDICTION: {result['prediction'].upper()}
290
+ Confidence: {result['confidence']*100:.1f}%
291
+ Windows analyzed: {result['num_windows']}
292
+ """
293
+
294
+ return (
295
+ text, # No highlighting in quick mode
296
+ "Quick scan mode - no sentence-level analysis available",
297
+ quick_analysis
298
+ )
299
+ else:
300
+ # Detailed scan
301
+ analysis = classifier.detailed_scan(text)
302
+
303
+ # Format sentence-by-sentence analysis
304
+ detailed_analysis = []
305
+ for pred in analysis['sentence_predictions']:
306
+ confidence = pred['confidence'] * 100
307
+ detailed_analysis.append(f"Sentence: {pred['sentence']}")
308
+ detailed_analysis.append(f"Prediction: {pred['prediction'].upper()}")
309
+ detailed_analysis.append(f"Confidence: {confidence:.1f}%")
310
+ detailed_analysis.append("-" * 50)
311
+
312
+ # Format overall prediction
313
+ final_pred = analysis['overall_prediction']
314
+ overall_result = f"""
315
+ FINAL PREDICTION: {final_pred['prediction'].upper()}
316
+ Overall confidence: {final_pred['confidence']*100:.1f}%
317
+ Number of sentences analyzed: {final_pred['num_sentences']}
318
+ """
319
+
320
+ return (
321
+ analysis['highlighted_text'],
322
+ "\n".join(detailed_analysis),
323
+ overall_result
324
+ )
325
+
326
+ # Initialize the classifier globally
327
+ classifier = TextClassifier()
328
+
329
+ # Create Gradio interface
330
+ demo = gr.Interface(
331
+ fn=lambda text, mode: analyze_text(text, mode, classifier),
332
+ inputs=[
333
+ gr.Textbox(
334
+ lines=8,
335
+ placeholder="Enter text to analyze...",
336
+ label="Input Text"
337
+ ),
338
+ gr.Radio(
339
+ choices=["quick", "detailed"],
340
+ value="quick",
341
+ label="Analysis Mode",
342
+ info="Quick mode for faster analysis, Detailed mode for sentence-level analysis"
343
+ )
344
+ ],
345
+ outputs=[
346
+ gr.HTML(label="Highlighted Analysis"),
347
+ gr.Textbox(label="Sentence-by-Sentence Analysis", lines=10),
348
+ gr.Textbox(label="Overall Result", lines=4)
349
+ ],
350
+ title="AI Text Detector",
351
+ description="Analyze text to detect if it was written by a human or AI. Choose between quick scan and detailed sentence-level analysis.",
352
+ examples=[
353
+ ["This is a sample text written by a human. It contains multiple sentences with different ideas. The analysis will show how each sentence is classified.", "quick"],
354
+ ["This is a sample text written by a human. It contains multiple sentences with different ideas. The analysis will show how each sentence is classified.", "detailed"],
355
+ ],
356
+ allow_flagging="never"
357
+ )
358
+
359
+ # Launch the interface
360
  if __name__ == "__main__":
361
+ demo.launch(share=True)