Spaces:
Running
Running
import streamlit as st | |
from PyPDF2 import PdfReader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
import os | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain_groq import ChatGroq | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain.prompts import PromptTemplate | |
from dotenv import load_dotenv | |
import re | |
load_dotenv() | |
os.getenv("GROQ_API_KEY") | |
css_style = """ | |
<style> | |
.step-number { font-size: 24px; font-weight: bold; } | |
.response-box { padding: 20px; background-color: #f8f9fa; border-radius: 10px; border-left: 5px solid #252850; margin: 20px 0; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } | |
.metadata-box { padding: 20px; background-color: #f0f2f6; border-radius: 10px; margin-bottom: 20px; } | |
.custom-input { font-size: 16px; padding: 10px; border-radius: 5px; border: 1px solid #ccc; } | |
.suggestion-container { border: 1px solid #e0e0e0; border-radius: 8px; padding: 15px; margin: 10px 0; background: #f8f9fa; } | |
.suggestion-btn { width: 100%; margin: 3px 0; padding: 8px; border-radius: 5px; border: 1px solid #252850; background: white; cursor: pointer; transition: all 0.2s; } | |
.suggestion-btn:hover { background: #252850; color: white; } | |
</style> | |
""" | |
def eliminar_proceso_pensamiento(texto): | |
texto_limpio = re.sub(r'<.*?>', '', texto, flags=re.DOTALL) | |
lineas = [line.strip() for line in texto_limpio.split('\n') if line.strip()] | |
return lineas[-1] if lineas else "Respuesta no disponible" | |
def get_pdf_text(pdf_docs): | |
text = "" | |
for pdf in pdf_docs: | |
pdf_reader = PdfReader(pdf) | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
return text | |
def get_text_chunks(text): | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=5000, chunk_overlap=500) | |
return text_splitter.split_text(text) | |
def get_vector_store(text_chunks): | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
return FAISS.from_texts(text_chunks, embedding=embeddings) | |
def get_conversational_chain(): | |
prompt_template = """ | |
Responde en español exclusivamente con la información solicitada usando el contexto, además sé lo más extenso y detallado posible | |
siempre que se pueda desarollar, como explicando el contenido de referencias nombradas. | |
Formato: Respuesta directa sin prefijos. Si no hay información, di "No disponible". | |
Contexto: | |
{context} | |
Pregunta: | |
{question} | |
Respuesta: | |
""" | |
model = ChatGroq( | |
temperature=0.2, | |
model_name="deepseek-r1-distill-llama-70b", | |
groq_api_key=os.getenv("GROQ_API_KEY") | |
) | |
return load_qa_chain(model, chain_type="stuff", prompt=PromptTemplate(template=prompt_template, input_variables=["context", "question"])) | |
def extract_metadata(vector_store): | |
metadata_questions = { | |
"title": "¿Cuál es el título principal del documento? Formato: Respuesta simple con algunas letras en mayúscula si hiciera falta", | |
"entity": "¿A qué organización pertenece este documento?. Formato: Respuesta directa con el nombre de la entidad.", | |
"date": "¿A qué fecha corresponde el documento? Si existen indicios indica la fecha, sino di 'No disponible'" | |
} | |
metadata = {} | |
chain = get_conversational_chain() | |
for key, question in metadata_questions.items(): | |
docs = vector_store.similarity_search(question, k=2) | |
response = chain({"input_documents": docs, "question": question}, return_only_outputs=True) | |
clean_response = eliminar_proceso_pensamiento(response['output_text']) | |
metadata[key] = clean_response if clean_response else "No disponible" | |
return metadata | |
def mostrar_respuesta(texto): | |
st.markdown(f'<div class="response-box">{texto}</div>', unsafe_allow_html=True) | |
def generar_sugerencias(): | |
if 'vector_store' not in st.session_state: | |
return | |
try: | |
docs = st.session_state.vector_store.similarity_search("", k=3) | |
context = "\n".join([doc.page_content for doc in docs]) | |
prompt_template = """ | |
Genera exactamente 3 preguntas en español basadas en el contexto. | |
Las preguntas deben ser en español, simples y sencillas de máximo 10 palabras. | |
Formato de respuesta: | |
1. [Pregunta completa en español] | |
2. [Pregunta completa en español] | |
3. [Pregunta completa en español] | |
Contexto: | |
{context} | |
""" | |
model = ChatGroq( | |
temperature=0.4, | |
model_name="deepseek-r1-distill-llama-70b", | |
groq_api_key=os.getenv("GROQ_API_KEY") | |
) | |
response = model.invoke(prompt_template.format(context=context)) | |
preguntas = [] | |
for line in response.content.split("\n"): | |
line = line.strip() | |
if line and line[0].isdigit(): | |
pregunta = line.split('. ', 1)[1] if '. ' in line else line[2:] | |
if pregunta: | |
preguntas.append(pregunta) | |
return preguntas[:3] | |
except Exception as e: | |
st.error(f"Error generando sugerencias: {str(e)}") | |
return | |
def procesar_consulta(user_question): | |
if 'vector_store' not in st.session_state: | |
st.error("Por favor carga un documento primero") | |
return | |
chain = get_conversational_chain() | |
docs = st.session_state.vector_store.similarity_search(user_question) | |
with st.spinner("Analizando documento..."): | |
response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) | |
respuesta_final = eliminar_proceso_pensamiento(response['output_text']) | |
mostrar_respuesta(respuesta_final) | |
def main(): | |
st.set_page_config(page_title="PDF Consultor 🔍", page_icon="🔍", layout="wide") | |
st.markdown(css_style, unsafe_allow_html=True) | |
st.markdown('<h1>PDF Consultor 🔍</h1>', unsafe_allow_html=True) | |
estados = { | |
'documento_cargado': False, | |
'sugerencias': [], | |
'pregunta_actual': "", | |
'respuestas': [] | |
} | |
for key, value in estados.items(): | |
if key not in st.session_state: | |
st.session_state[key] = value | |
with st.sidebar: | |
st.markdown('<p class="step-number">1 Subir archivos</p>', unsafe_allow_html=True) | |
pdf_docs = st.file_uploader("Subir PDF(s)", accept_multiple_files=True, type=["pdf"], label_visibility="collapsed") | |
if pdf_docs and not st.session_state.documento_cargado: | |
with st.spinner("Analizando documento..."): | |
try: | |
raw_text = get_pdf_text(pdf_docs) | |
text_chunks = get_text_chunks(raw_text) | |
vector_store = get_vector_store(text_chunks) | |
st.session_state.metadata = extract_metadata(vector_store) | |
st.session_state.vector_store = vector_store | |
st.session_state.documento_cargado = True | |
st.session_state.sugerencias = generar_sugerencias() | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error procesando documento: {str(e)}") | |
if 'metadata' in st.session_state: | |
st.markdown("---") | |
cols = st.columns(3) | |
campos_metadata = [ | |
("📄 Título", "title"), | |
("🏛️ Entidad", "entity"), | |
("📅 Fecha", "date") | |
] | |
for col, (icono, key) in zip(cols, campos_metadata): | |
with col: | |
st.markdown(f""" | |
<div class="metadata-box"> | |
<div style="font-size:16px; margin-bottom:10px;">{icono}</div> | |
{st.session_state.metadata[key]} | |
</div> | |
""", unsafe_allow_html=True) | |
if st.session_state.sugerencias: | |
st.markdown("---") | |
with st.container(): | |
st.markdown(""" | |
<div class="suggestion-container"> | |
<div style="font-size:14px; color:#666; margin-bottom:8px;">💡 ¿Necesitas ideas?</div> | |
""", unsafe_allow_html=True) | |
cols_sugerencias = st.columns(3) | |
for i, (col, pregunta) in enumerate(zip(cols_sugerencias, st.session_state.sugerencias)): | |
with col: | |
if st.button(pregunta, key=f"sug_{i}", help="Haz clic para usar esta pregunta", use_container_width=True): | |
st.session_state.pregunta_actual = pregunta | |
st.markdown("</div>", unsafe_allow_html=True) | |
if st.session_state.documento_cargado: | |
with st.form(key="consulta_form"): | |
col1, col2 = st.columns([5, 1]) | |
with col1: | |
pregunta_usuario = st.text_input("Escribe tu pregunta:", value=st.session_state.get('pregunta_actual', ''), placeholder="Ej: ¿De qué trata este documento?", label_visibility="collapsed") | |
with col2: | |
st.markdown("<br>", unsafe_allow_html=True) | |
enviar = st.form_submit_button("Enviar ▶") | |
if enviar or st.session_state.pregunta_actual: | |
pregunta_final = pregunta_usuario or st.session_state.pregunta_actual | |
procesar_consulta(pregunta_final) | |
if 'pregunta_actual' in st.session_state: | |
del st.session_state.pregunta_actual | |
elif not st.session_state.documento_cargado: | |
st.info("Por favor, sube un documento PDF para comenzar.") | |
if __name__ == "__main__": | |
main() |