gprocess / app.py
pentarosarium's picture
v.1.11
e20a82b
raw
history blame
11.4 kB
import gradio as gr
import spaces
import pandas as pd
import torch
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer
import plotly.graph_objects as go
import logging
import io
from rapidfuzz import fuzz
def fuzzy_deduplicate(df, column, threshold=55):
"""Deduplicate rows based on fuzzy matching of text content"""
seen_texts = []
indices_to_keep = []
for i, text in enumerate(df[column]):
if pd.isna(text):
indices_to_keep.append(i)
continue
text = str(text)
if not seen_texts or all(fuzz.ratio(text, seen) < threshold for seen in seen_texts):
seen_texts.append(text)
indices_to_keep.append(i)
return df.iloc[indices_to_keep]
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EventDetector:
def __init__(self):
self.model_name = "google/mt5-small"
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = None
self.finbert = None
self.roberta = None
self.finbert_tone = None
@spaces.GPU
def initialize_models(self):
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Initializing models on device: {device}")
self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name).to(device)
self.finbert = pipeline("sentiment-analysis", model="ProsusAI/finbert", device=device)
self.roberta = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment", device=device)
self.finbert_tone = pipeline("sentiment-analysis", model="yiyanghkust/finbert-tone", device=device)
return True
except Exception as e:
logger.error(f"Model initialization error: {str(e)}")
return False
@spaces.GPU
def detect_events(self, text, entity):
if not text or not entity:
return "Нет", "Invalid input"
try:
if self.model is None:
if not self.initialize_models():
return "Нет", "Model initialization failed"
device = "cuda" if torch.cuda.is_available() else "cpu"
# Truncate input text to avoid tensor size mismatch
text = text[:500] # Adjust this value if needed
prompt = f"""<s>Analyze the following news about {entity}:
Text: {text}
Task: Identify the main event type and provide a brief summary.</s>"""
inputs = self.tokenizer(prompt, return_tensors="pt", padding=True,
truncation=True, max_length=512).to(device)
outputs = self.model.generate(**inputs, max_length=300, num_return_sequences=1)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
event_type = "Нет"
if any(term in text.lower() for term in ['отчет', 'выручка', 'прибыль', 'ebitda']):
event_type = "Отчетность"
elif any(term in text.lower() for term in ['облигаци', 'купон', 'дефолт']):
event_type = "РЦБ"
elif any(term in text.lower() for term in ['суд', 'иск', 'арбитраж']):
event_type = "Суд"
return event_type, response
except Exception as e:
logger.error(f"Event detection error: {str(e)}")
return "Нет", f"Error: {str(e)}"
@spaces.GPU
def analyze_sentiment(self, text):
try:
if self.finbert is None:
if not self.initialize_models():
return "Neutral"
# Truncate text to avoid tensor size issues
truncated_text = text[:500]
results = []
try:
# Process text with all models in a batch
inputs = [truncated_text]
finbert_result = self.finbert(inputs, truncation=True, max_length=512)[0]
roberta_result = self.roberta(inputs, truncation=True, max_length=512)[0]
finbert_tone_result = self.finbert_tone(inputs, truncation=True, max_length=512)[0]
results = [
self._get_sentiment(finbert_result),
self._get_sentiment(roberta_result),
self._get_sentiment(finbert_tone_result)
]
except Exception as e:
logger.error(f"Model inference error: {e}")
return "Neutral"
sentiment_counts = pd.Series(results).value_counts()
return sentiment_counts.index[0] if sentiment_counts.iloc[0] >= 2 else "Neutral"
except Exception as e:
logger.error(f"Sentiment analysis error: {e}")
return "Neutral"
def create_visualizations(df):
if df is None or df.empty:
return None, None
try:
sentiments = df['Sentiment'].value_counts()
fig_sentiment = go.Figure(data=[go.Pie(
labels=sentiments.index,
values=sentiments.values,
marker_colors=['#FF6B6B', '#4ECDC4', '#95A5A6']
)])
fig_sentiment.update_layout(title="Распределение тональности")
events = df['Event_Type'].value_counts()
fig_events = go.Figure(data=[go.Bar(
x=events.index,
y=events.values,
marker_color='#2196F3'
)])
fig_events.update_layout(title="Распределение событий")
return fig_sentiment, fig_events
except Exception as e:
logger.error(f"Visualization error: {e}")
return None, None
@spaces.GPU
def process_file(file_obj):
try:
logger.info("Starting to read Excel file...")
df = pd.read_excel(file_obj, sheet_name='Публикации')
logger.info(f"Successfully read Excel file. Shape: {df.shape}")
# Perform deduplication
original_count = len(df)
df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55)
logger.info(f"Removed {original_count - len(df)} duplicate entries")
detector = EventDetector()
processed_rows = []
total = len(df)
# Initialize models once for all rows
if not detector.initialize_models():
raise Exception("Failed to initialize models")
for idx, row in df.iterrows():
try:
text = str(row.get('Выдержки из текста', ''))
if not text.strip():
continue
entity = str(row.get('Объект', ''))
if not entity.strip():
continue
event_type, event_summary = detector.detect_events(text, entity)
sentiment = detector.analyze_sentiment(text)
processed_rows.append({
'Объект': entity,
'Заголовок': str(row.get('Заголовок', '')),
'Sentiment': sentiment,
'Event_Type': event_type,
'Event_Summary': event_summary,
'Текст': text[:1000] # Truncate text for display
})
if idx % 5 == 0:
logger.info(f"Processed {idx + 1}/{total} rows")
except Exception as e:
logger.error(f"Error processing row {idx}: {str(e)}")
continue
result_df = pd.DataFrame(processed_rows)
logger.info(f"Processing complete. Final DataFrame shape: {result_df.shape}")
return result_df
except Exception as e:
logger.error(f"File processing error: {str(e)}")
raise
def create_interface():
with gr.Blocks(theme=gr.themes.Soft()) as app:
gr.Markdown("# AI-анализ мониторинга новостей v.1.11")
with gr.Row():
file_input = gr.File(
label="Загрузите Excel файл",
file_types=[".xlsx"],
type="binary"
)
with gr.Row():
analyze_btn = gr.Button(
"Начать анализ",
variant="primary"
)
with gr.Row():
progress = gr.Textbox(
label="Статус обработки",
interactive=False,
value="Ожидание файла..."
)
with gr.Row():
stats = gr.DataFrame(
label="Результаты анализа",
interactive=False,
wrap=True
)
with gr.Row():
with gr.Column():
sentiment_plot = gr.Plot(label="Распределение тональности")
with gr.Column():
events_plot = gr.Plot(label="Распределение событий")
def analyze(file_bytes):
if file_bytes is None:
gr.Warning("Пожалуйста, загрузите файл")
return None, None, None, "Ожидание файла..."
try:
# Create BytesIO object and debug print its content
file_obj = io.BytesIO(file_bytes)
logger.info("File loaded into BytesIO successfully")
# Process file with progress updates
progress_status = "Начинаем обработку файла..."
yield None, None, None, progress_status
df = process_file(file_obj)
if df.empty:
return None, None, None, "Нет данных для обработки"
progress_status = f"Создание визуализаций..."
yield None, None, None, progress_status
fig_sentiment, fig_events = create_visualizations(df)
return (
df,
fig_sentiment,
fig_events,
f"Обработка завершена успешно! Обработано {len(df)} строк"
)
except Exception as e:
error_msg = f"Ошибка анализа: {str(e)}"
logger.error(error_msg)
gr.Error(error_msg)
return None, None, None, error_msg
analyze_btn.click(
fn=analyze,
inputs=[file_input],
outputs=[stats, sentiment_plot, events_plot, progress]
)
return app
if __name__ == "__main__":
app = create_interface()
app.launch(share=True)