Spaces:
Sleeping
Sleeping
import streamlit as st | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForCausalLM, | |
TextIteratorStreamer | |
) | |
from huggingface_hub import login | |
from threading import Thread | |
import PyPDF2 | |
import pandas as pd | |
import torch | |
import time | |
import os | |
# π Hugging Face Token via Environment Variable | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
if not HF_TOKEN: | |
raise ValueError("Missing Hugging Face Token. Please set the HF_TOKEN environment variable.") | |
# β Only PT-T5 Model | |
MODEL_NAME = "amiguel/Meta-Llama-3.1-8B-Instruct-lei-geral-trabalho" #"amiguel/lgt-Angola" #"amiguel/lgt-2025"## | |
# UI Setup | |
st.set_page_config(page_title="Assistente LGT | Angola", page_icon="π", layout="centered") | |
st.title("π Assistente LGT | Angola π") | |
USER_AVATAR = "https://raw.githubusercontent.com/achilela/vila_fofoka_analysis/9904d9a0d445ab0488cf7395cb863cce7621d897/USER_AVATAR.png" | |
BOT_AVATAR = "https://raw.githubusercontent.com/achilela/vila_fofoka_analysis/991f4c6e4e1dc7a8e24876ca5aae5228bcdb4dba/Ataliba_Avatar.jpg" | |
# Upload sidebar | |
with st.sidebar: | |
st.header("Upload Documentos π") | |
uploaded_file = st.file_uploader("Escolhe um ficheiro PDF ou XLSX", type=["pdf", "xlsx"], label_visibility="collapsed") | |
# Cache file processing | |
def process_file(uploaded_file): | |
if uploaded_file is None: | |
return "" | |
try: | |
if uploaded_file.type == "application/pdf": | |
reader = PyPDF2.PdfReader(uploaded_file) | |
return "\n".join(page.extract_text() or "" for page in reader.pages) | |
elif uploaded_file.type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": | |
df = pd.read_excel(uploaded_file) | |
return df.to_markdown() | |
except Exception as e: | |
st.error(f"π Erro ao processar o ficheiro: {str(e)}") | |
return "" | |
# Cache model loading | |
def load_model(): | |
try: | |
login(token=HF_TOKEN) | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, token=HF_TOKEN, use_fast=False) | |
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.float32).to("cuda" if torch.cuda.is_available() else "cpu") | |
return model, tokenizer | |
except Exception as e: | |
st.error(f"π€ Erro ao carregar o modelo: {str(e)}") | |
return None, None | |
# Streaming response generation | |
def generate_response(prompt, context, model, tokenizer): | |
full_prompt = f"Contexto:\n{context}\n\nPergunta: {prompt}\nResposta:" | |
inputs = tokenizer(full_prompt, return_tensors="pt", truncation=True).to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
generation_kwargs = { | |
"input_ids": inputs["input_ids"], | |
"attention_mask": inputs["attention_mask"], | |
"max_new_tokens": 512, | |
"temperature": 0.7, | |
"top_p": 0.9, | |
"repetition_penalty": 1.1, | |
"do_sample": True, | |
"use_cache": True, | |
"streamer": streamer | |
} | |
Thread(target=model.generate, kwargs=generation_kwargs).start() | |
return streamer | |
# Store chat history | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
# Show chat history | |
for message in st.session_state.messages: | |
avatar = USER_AVATAR if message["role"] == "user" else BOT_AVATAR | |
with st.chat_message(message["role"], avatar=avatar): | |
st.markdown(message["content"]) | |
# Chat input | |
if prompt := st.chat_input("Faca uma pergunta sobre a LGT..."): | |
with st.chat_message("user", avatar=USER_AVATAR): | |
st.markdown(prompt) | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
# Load model if not loaded | |
if "model" not in st.session_state: | |
with st.spinner("π A carregar o modelo PT-T5..."): | |
model, tokenizer = load_model() | |
if not model: | |
st.stop() | |
st.session_state.model = model | |
st.session_state.tokenizer = tokenizer | |
else: | |
model = st.session_state.model | |
tokenizer = st.session_state.tokenizer | |
context = process_file(uploaded_file) or "Sem contexto adicional disponΓvel." | |
# Generate assistant response | |
with st.chat_message("assistant", avatar=BOT_AVATAR): | |
response_box = st.empty() | |
full_response = "" | |
try: | |
start_time = time.time() | |
streamer = generate_response(prompt, context, model, tokenizer) | |
for chunk in streamer: | |
full_response += chunk.strip() + " " | |
response_box.markdown(full_response + "β", unsafe_allow_html=True) | |
end_time = time.time() | |
input_tokens = len(tokenizer(prompt)["input_ids"]) | |
output_tokens = len(tokenizer(full_response)["input_ids"]) | |
speed = output_tokens / (end_time - start_time) | |
cost_usd = ((input_tokens / 1e6) * 0.0001) + ((output_tokens / 1e6) * 0.0001) | |
cost_aoa = cost_usd * 1160 | |
st.caption( | |
f"π Tokens: {input_tokens} β {output_tokens} | π Velocidade: {speed:.1f}t/s | " | |
f"π° USD: ${cost_usd:.4f} | π¦π΄ AOA: {cost_aoa:.2f}" | |
) | |
response_box.markdown(full_response.strip()) | |
st.session_state.messages.append({"role": "assistant", "content": full_response.strip()}) | |
except Exception as e: | |
st.error(f"β‘ Erro ao gerar resposta: {str(e)}") |