|
import gradio as gr |
|
from transformers import pipeline |
|
from translatepy import Translator |
|
import logging |
|
import random |
|
import time |
|
import nltk |
|
from nltk.tokenize import sent_tokenize, word_tokenize |
|
import os |
|
from typing import Dict, Optional |
|
from functools import lru_cache |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(), |
|
logging.FileHandler('app.log') |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class Config: |
|
NLTK_DATA = os.getenv('NLTK_DATA', '/home/user/nltk_data') |
|
CACHE_DIR = os.getenv('CACHE_DIR', '/home/user/model_cache') |
|
MAX_TEXT_LENGTH = 10000 |
|
CHUNK_SIZE = 500 |
|
|
|
|
|
def setup_nltk(): |
|
try: |
|
os.makedirs(Config.NLTK_DATA, exist_ok=True) |
|
nltk.data.path.append(Config.NLTK_DATA) |
|
required_packages = ['punkt'] |
|
for package in required_packages: |
|
try: |
|
nltk.data.find(f'tokenizers/{package}') |
|
except LookupError: |
|
nltk.download(package, download_dir=Config.NLTK_DATA, quiet=True) |
|
except Exception as e: |
|
logger.error(f"NLTK setup failed: {str(e)}") |
|
raise |
|
|
|
class TextHumanizer: |
|
def __init__(self, cache_dir: str = Config.CACHE_DIR): |
|
"""Initialize with better error handling and resource management""" |
|
try: |
|
os.makedirs(cache_dir, exist_ok=True) |
|
|
|
|
|
self.detector = self._init_pipeline( |
|
"text-classification", |
|
"roberta-base-openai-detector", |
|
cache_dir |
|
) |
|
|
|
self.humanizer = self._init_pipeline( |
|
"text2text-generation", |
|
"facebook/bart-large-cnn", |
|
cache_dir |
|
) |
|
|
|
self.translator = Translator() |
|
|
|
|
|
self.tone_prompts = { |
|
"Casual": [ |
|
"Rewrite this casually as if you're texting a friend: {text}", |
|
"Make this sound like natural conversation: {text}", |
|
"Convert this to everyday spoken English: {text}" |
|
], |
|
"Business": [ |
|
"Rephrase this in professional corporate language: {text}", |
|
"Transform this into formal business communication: {text}", |
|
"Rewrite for a professional email: {text}" |
|
], |
|
"Academic": [ |
|
"Rephrase this in scholarly academic language: {text}", |
|
"Convert to academic paper style: {text}", |
|
"Rewrite for a research publication: {text}" |
|
], |
|
"Creative": [ |
|
"Transform this into vivid, imaginative writing: {text}", |
|
"Rewrite with creative metaphors and sensory details: {text}", |
|
"Convert to engaging storytelling style: {text}" |
|
] |
|
} |
|
|
|
self.human_patterns = self._load_patterns() |
|
|
|
except Exception as e: |
|
logger.error(f"Initialization failed: {str(e)}") |
|
raise |
|
|
|
@staticmethod |
|
def _init_pipeline(task: str, model: str, cache_dir: str, max_retries: int = 3): |
|
"""Initialize pipeline with retry logic""" |
|
for attempt in range(max_retries): |
|
try: |
|
return pipeline(task, model=model, cache_dir=cache_dir, device=-1) |
|
except Exception as e: |
|
if attempt == max_retries - 1: |
|
raise |
|
logger.warning(f"Pipeline initialization attempt {attempt + 1} failed: {str(e)}") |
|
time.sleep(2 ** attempt) |
|
|
|
@staticmethod |
|
def _load_patterns(): |
|
"""Load human-like patterns with enhanced variety""" |
|
return { |
|
'fillers': ["well", "you know", "actually", "I mean", "basically", |
|
"to be honest", "kind of", "sort of", "like"], |
|
'contractions': { |
|
"cannot": "can't", |
|
"could not": "couldn't", |
|
"would not": "wouldn't", |
|
"is not": "isn't", |
|
"do not": "don't", |
|
"will not": "won't", |
|
"should not": "shouldn't", |
|
"have not": "haven't" |
|
}, |
|
'sentence_variants': [ |
|
lambda s: s.lower(), |
|
lambda s: s.capitalize(), |
|
lambda s: s[:-1] + ", which is interesting." if s.endswith('.') else s, |
|
lambda s: s[:-1] + ", you know?" if s.endswith('.') else s, |
|
lambda s: s[:-1] + "..." if s.endswith('.') else s |
|
] |
|
} |
|
|
|
@lru_cache(maxsize=1000) |
|
def _add_human_touches(self, text: str) -> str: |
|
"""Apply multiple layers of human-like modifications with caching""" |
|
try: |
|
sentences = sent_tokenize(text) |
|
|
|
|
|
modified_sentences = [] |
|
for sent in sentences: |
|
if random.random() < 0.4: |
|
filler = random.choice(self.human_patterns['fillers']) |
|
sent = f"{filler}, {sent.lower()}" |
|
|
|
|
|
if len(sent.split()) > 12 and random.random() < 0.3: |
|
words = word_tokenize(sent) |
|
split_point = len(words)//2 + random.randint(-2, 2) |
|
modified_sentences.extend([ |
|
' '.join(words[:split_point]) + ',', |
|
' '.join(words[split_point:]) |
|
]) |
|
else: |
|
modified_sentences.append(sent) |
|
|
|
|
|
text = ' '.join(modified_sentences) |
|
for formal, casual in self.human_patterns['contractions'].items(): |
|
text = text.replace(f" {formal} ", f" {casual} ") |
|
|
|
|
|
final_sentences = [] |
|
for sent in sent_tokenize(text): |
|
if random.random() < 0.7: |
|
sent = random.choice(self.human_patterns['sentence_variants'])(sent) |
|
final_sentences.append(sent) |
|
|
|
return ' '.join(final_sentences) |
|
|
|
except Exception as e: |
|
logger.error(f"Humanization error: {str(e)}") |
|
return text |
|
|
|
def detect_ai_text(self, text: str) -> float: |
|
"""Enhanced AI detection with better chunk handling""" |
|
try: |
|
if not text.strip(): |
|
return 0.0 |
|
|
|
chunks = [text[i:i+Config.CHUNK_SIZE] for i in range(0, len(text), Config.CHUNK_SIZE)] |
|
scores = [] |
|
|
|
for chunk in chunks: |
|
if len(chunk.strip()) < 50: |
|
continue |
|
result = self.detector(chunk)[0] |
|
if result['label'] == 'ARTIFICIAL': |
|
scores.append(result['score']) |
|
|
|
return sum(scores)/len(scores) if scores else 0.0 |
|
|
|
except Exception as e: |
|
logger.error(f"Detection error: {str(e)}") |
|
return 0.0 |
|
|
|
def humanize_text(self, text: str, tone: str, translate_to: Optional[str] = None) -> str: |
|
"""Improved humanization pipeline with better error handling and quality control""" |
|
try: |
|
if not text or len(text) > Config.MAX_TEXT_LENGTH: |
|
raise ValueError(f"Text must be between 1 and {Config.MAX_TEXT_LENGTH} characters") |
|
|
|
|
|
metrics = {'start_time': time.time()} |
|
|
|
original_score = self.detect_ai_text(text) |
|
logger.info(f"Initial AI score: {original_score:.2f}") |
|
|
|
|
|
prompt = random.choice(self.tone_prompts[tone]).format(text=text) |
|
generated = self.humanizer( |
|
prompt, |
|
max_length=min(len(text)*2, 1024), |
|
temperature=0.9, |
|
top_p=0.95, |
|
num_beams=4, |
|
repetition_penalty=1.2, |
|
no_repeat_ngram_size=3 |
|
)[0]['generated_text'] |
|
|
|
|
|
humanized = self._add_human_touches(generated) |
|
final_score = self.detect_ai_text(humanized) |
|
|
|
|
|
if final_score > original_score * 0.8: |
|
logger.info("Applying additional humanization pass") |
|
humanized = self._add_human_touches(humanized) |
|
|
|
|
|
if translate_to and translate_to != "None": |
|
try: |
|
lang_code = translate_to.split()[0] |
|
humanized = self.translator.translate(humanized, lang_code).result |
|
except Exception as e: |
|
logger.error(f"Translation failed: {str(e)}") |
|
raise ValueError(f"Translation failed: {str(e)}") |
|
|
|
metrics['processing_time'] = time.time() - metrics['start_time'] |
|
logger.info(f"Processing completed in {metrics['processing_time']:.2f} seconds") |
|
|
|
return humanized |
|
|
|
except Exception as e: |
|
logger.error(f"Humanization failed: {str(e)}") |
|
raise |
|
|
|
def create_interface(): |
|
"""Create Gradio interface with improved error handling and user experience""" |
|
try: |
|
humanizer = TextHumanizer() |
|
setup_nltk() |
|
|
|
def process_text(text: str, tone: str, translate_to: str) -> Dict: |
|
try: |
|
if not text.strip(): |
|
return { |
|
"data": ["Please enter some text to process"], |
|
"success": False, |
|
"error": "Empty input" |
|
} |
|
|
|
start_time = time.time() |
|
result = humanizer.humanize_text(text, tone, translate_to) |
|
processing_time = time.time() - start_time |
|
|
|
return { |
|
"data": [result], |
|
"success": True, |
|
"metrics": { |
|
"processing_time": round(processing_time, 2), |
|
"characters_processed": len(text), |
|
"words_processed": len(text.split()) |
|
} |
|
} |
|
except Exception as e: |
|
logger.error(f"Text processing failed: {str(e)}") |
|
return { |
|
"data": [], |
|
"success": False, |
|
"error": str(e) |
|
} |
|
|
|
iface = gr.Interface( |
|
fn=process_text, |
|
inputs=[ |
|
gr.Textbox( |
|
label="Input Text", |
|
lines=5, |
|
placeholder="Enter text to humanize..." |
|
), |
|
gr.Dropdown( |
|
choices=list(humanizer.tone_prompts.keys()), |
|
label="Writing Style", |
|
value="Casual" |
|
), |
|
gr.Dropdown( |
|
choices=["None"] + [f"{c} ({n})" for c, n in [ |
|
("da", "Danish"), ("no", "Norwegian"), |
|
("sv", "Swedish"), ("es", "Spanish"), |
|
("fr", "French"), ("de", "German") |
|
]], |
|
label="Translate to", |
|
value="None" |
|
) |
|
], |
|
outputs=gr.JSON(), |
|
title="Advanced AI Text Humanizer", |
|
description="Transform AI-generated text into more natural, human-like writing", |
|
examples=[ |
|
["Large language models demonstrate remarkable capabilities in natural language understanding tasks.", "Casual", "None"], |
|
["The implementation requires careful consideration of multiple interdependent factors.", "Business", "es (Spanish)"] |
|
], |
|
flagging_mode=None |
|
) |
|
|
|
iface.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=True |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Interface creation failed: {str(e)}") |
|
raise |
|
|
|
if __name__ == "__main__": |
|
create_interface() |