import streamlit as st from PyPDF2 import PdfReader from langchain.text_splitter import RecursiveCharacterTextSplitter import os from langchain_community.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS from langchain_groq import ChatGroq from langchain.chains.question_answering import load_qa_chain from langchain.prompts import PromptTemplate from dotenv import load_dotenv import re load_dotenv() os.getenv("GROQ_API_KEY") css_style = """ """ def eliminar_proceso_pensamiento(texto): texto_limpio = re.sub(r'<.*?>', '', texto, flags=re.DOTALL) lineas = [line.strip() for line in texto_limpio.split('\n') if line.strip()] return lineas[-1] if lineas else "Respuesta no disponible" def get_pdf_text(pdf_docs): text = "" for pdf in pdf_docs: pdf_reader = PdfReader(pdf) for page in pdf_reader.pages: text += page.extract_text() return text def get_text_chunks(text): text_splitter = RecursiveCharacterTextSplitter(chunk_size=5000, chunk_overlap=500) return text_splitter.split_text(text) def get_vector_store(text_chunks): embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") return FAISS.from_texts(text_chunks, embedding=embeddings) def get_conversational_chain(): prompt_template = """ Responde en español exclusivamente con la información solicitada usando el contexto, además sé lo más extenso y detallado posible siempre que se pueda desarollar, como explicando el contenido de referencias nombradas. Formato: Respuesta directa sin prefijos. Si no hay información, di "No disponible". Contexto: {context} Pregunta: {question} Respuesta: """ model = ChatGroq( temperature=0.2, model_name="deepseek-r1-distill-llama-70b", groq_api_key=os.getenv("GROQ_API_KEY") ) return load_qa_chain(model, chain_type="stuff", prompt=PromptTemplate(template=prompt_template, input_variables=["context", "question"])) def extract_metadata(vector_store): metadata_questions = { "title": "¿Cuál es el título principal del documento? Formato: Respuesta simple con algunas letras en mayúscula si hiciera falta", "entity": "¿A qué organización pertenece este documento?. Formato: Respuesta directa con el nombre de la entidad.", "date": "¿A qué fecha corresponde el documento? Si existen indicios indica la fecha, sino di 'No disponible'" } metadata = {} chain = get_conversational_chain() for key, question in metadata_questions.items(): docs = vector_store.similarity_search(question, k=2) response = chain({"input_documents": docs, "question": question}, return_only_outputs=True) clean_response = eliminar_proceso_pensamiento(response['output_text']) metadata[key] = clean_response if clean_response else "No disponible" return metadata def mostrar_respuesta(texto): st.markdown(f'
{texto}
', unsafe_allow_html=True) def generar_sugerencias(): if 'vector_store' not in st.session_state: return try: docs = st.session_state.vector_store.similarity_search("", k=3) context = "\n".join([doc.page_content for doc in docs]) prompt_template = """ Genera exactamente 3 preguntas en español basadas en el contexto. Las preguntas deben ser en español, simples y sencillas de máximo 10 palabras. Formato de respuesta: 1. [Pregunta completa en español] 2. [Pregunta completa en español] 3. [Pregunta completa en español] Contexto: {context} """ model = ChatGroq( temperature=0.4, model_name="deepseek-r1-distill-llama-70b", groq_api_key=os.getenv("GROQ_API_KEY") ) response = model.invoke(prompt_template.format(context=context)) preguntas = [] for line in response.content.split("\n"): line = line.strip() if line and line[0].isdigit(): pregunta = line.split('. ', 1)[1] if '. ' in line else line[2:] if pregunta: preguntas.append(pregunta) return preguntas[:3] except Exception as e: st.error(f"Error generando sugerencias: {str(e)}") return def procesar_consulta(user_question): if 'vector_store' not in st.session_state: st.error("Por favor carga un documento primero") return chain = get_conversational_chain() docs = st.session_state.vector_store.similarity_search(user_question) with st.spinner("Analizando documento..."): response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) respuesta_final = eliminar_proceso_pensamiento(response['output_text']) mostrar_respuesta(respuesta_final) def main(): st.set_page_config(page_title="PDF Consultor 🔍", page_icon="🔍", layout="wide") st.markdown(css_style, unsafe_allow_html=True) st.markdown('

PDF Consultor 🔍

', unsafe_allow_html=True) estados = { 'documento_cargado': False, 'sugerencias': [], 'pregunta_actual': "", 'respuestas': [] } for key, value in estados.items(): if key not in st.session_state: st.session_state[key] = value with st.sidebar: st.markdown('

1 Subir archivos

', unsafe_allow_html=True) pdf_docs = st.file_uploader("Subir PDF(s)", accept_multiple_files=True, type=["pdf"], label_visibility="collapsed") if pdf_docs and not st.session_state.documento_cargado: with st.spinner("Analizando documento..."): try: raw_text = get_pdf_text(pdf_docs) text_chunks = get_text_chunks(raw_text) vector_store = get_vector_store(text_chunks) st.session_state.metadata = extract_metadata(vector_store) st.session_state.vector_store = vector_store st.session_state.documento_cargado = True st.session_state.sugerencias = generar_sugerencias() st.rerun() except Exception as e: st.error(f"Error procesando documento: {str(e)}") if 'metadata' in st.session_state: st.markdown("---") cols = st.columns(3) campos_metadata = [ ("📄 Título", "title"), ("🏛️ Entidad", "entity"), ("📅 Fecha", "date") ] for col, (icono, key) in zip(cols, campos_metadata): with col: st.markdown(f"""
{icono}
{st.session_state.metadata[key]}
""", unsafe_allow_html=True) if st.session_state.sugerencias: st.markdown("---") with st.container(): st.markdown("""
💡 ¿Necesitas ideas?
""", unsafe_allow_html=True) cols_sugerencias = st.columns(3) for i, (col, pregunta) in enumerate(zip(cols_sugerencias, st.session_state.sugerencias)): with col: if st.button(pregunta, key=f"sug_{i}", help="Haz clic para usar esta pregunta", use_container_width=True): st.session_state.pregunta_actual = pregunta st.markdown("
", unsafe_allow_html=True) if st.session_state.documento_cargado: with st.form(key="consulta_form"): col1, col2 = st.columns([5, 1]) with col1: pregunta_usuario = st.text_input("Escribe tu pregunta:", value=st.session_state.get('pregunta_actual', ''), placeholder="Ej: ¿De qué trata este documento?", label_visibility="collapsed") with col2: st.markdown("
", unsafe_allow_html=True) enviar = st.form_submit_button("Enviar ▶") if enviar or st.session_state.pregunta_actual: pregunta_final = pregunta_usuario or st.session_state.pregunta_actual procesar_consulta(pregunta_final) if 'pregunta_actual' in st.session_state: del st.session_state.pregunta_actual elif not st.session_state.documento_cargado: st.info("Por favor, sube un documento PDF para comenzar.") if __name__ == "__main__": main()