import streamlit as st import pandas as pd import os import tempfile from typing import List, Optional, Dict, Any, Union import json from datetime import datetime from llama_cpp import Llama from langchain.output_parsers import PydanticOutputParser from langchain.prompts import ChatPromptTemplate from langchain.schema import HumanMessage, SystemMessage from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema.runnable import RunnablePassthrough from langchain.prompts.prompt import PromptTemplate from langchain.chains import ConversationalRetrievalChain from langchain.chains import LLMChain from langchain.memory import ConversationBufferMemory from langchain.vectorstores import Chroma from pydantic import BaseModel, Field from Ingestion.ingest import process_document, get_processor_for_file import warnings warnings.filterwarnings("ignore", category=RuntimeWarning) # Set page configuration st.set_page_config( page_title="DocMind AI: AI-Powered Document Analysis", page_icon="🧠", layout="wide", initial_sidebar_state="expanded", ) # Custom CSS for better dark/light mode compatibility st.markdown(""" """, unsafe_allow_html=True) # Define the output structures using Pydantic class DocumentAnalysis(BaseModel): summary: str = Field(description="A concise summary of the document") key_insights: List[str] = Field(description="A list of key insights from the document") action_items: Optional[List[str]] = Field(None, description="A list of action items derived from the document") open_questions: Optional[List[str]] = Field(None, description="A list of open questions or areas needing clarification") # Function to clean up LLM responses for better parsing def clean_llm_response(response): """Clean up the LLM response to extract JSON content from potential markdown code blocks.""" # Extract content from the response if isinstance(response, dict) and 'choices' in response: content = response['choices'][0]['message']['content'] else: content = str(response) # Remove markdown code block formatting if present if '```' in content: # Handle ```json format parts = content.split('```') if len(parts) >= 3: # Has opening and closing backticks # Take the content between first pair of backticks content = parts[1] # Remove json language specifier if present if content.startswith('json') or content.startswith('JSON'): content = content[4:].lstrip() elif '`json' in content: # Handle `json format parts = content.split('`json') if len(parts) >= 2: content = parts[1] if '`' in content: content = content.split('`')[0] # Strip any leading/trailing whitespace content = content.strip() return content # Initialize LLM and Model Cache @st.cache_resource(experimental_allow_widgets=True) def load_model(): with st.spinner("Loading model..."): try: llm = Llama.from_pretrained( repo_id="stduhpf/google-gemma-3-1b-it-qat-q4_0-gguf-small", filename="gemma-3-1b-it-q4_0_s.gguf", ) return llm except Exception as e: st.error(f"Error loading model: {str(e)}") return None # Initialize embeddings - but only when needed to avoid torch inspection issues @st.cache_resource(experimental_allow_widgets=True) def load_embeddings(): from langchain_community.embeddings import HuggingFaceEmbeddings with st.spinner("Loading embeddings..."): embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={'device': 'cpu'} ) return embeddings # Sidebar Configuration with improved styling st.sidebar.markdown("

🧠 DocMind AI

", unsafe_allow_html=True) st.sidebar.markdown("
AI-Powered Document Analysis
", unsafe_allow_html=True) st.sidebar.markdown("---") # Load LLM with st.sidebar: llm = load_model() if llm is not None: st.markdown("
✅ Model loaded successfully!
", unsafe_allow_html=True) else: st.markdown("
❌ Error loading model. Check logs for details.
", unsafe_allow_html=True) st.stop() # Mode Selection with st.sidebar: st.markdown("### Analysis Configuration") analysis_mode = st.radio( "Analysis Mode", ["Analyze each document separately", "Combine analysis for all documents"] ) # Prompt Selection prompt_options = { "Comprehensive Document Analysis": "Analyze the provided document comprehensively. Generate a summary, extract key insights, identify action items, and list open questions.", "Extract Key Insights and Action Items": "Extract key insights and action items from the provided document.", "Summarize and Identify Open Questions": "Summarize the provided document and identify any open questions that need clarification.", "Custom Prompt": "Enter a custom prompt below:" } with st.sidebar: st.markdown("### Prompt Settings") selected_prompt_option = st.selectbox("Select Prompt", list(prompt_options.keys())) custom_prompt = "" if selected_prompt_option == "Custom Prompt": custom_prompt = st.text_area("Enter Custom Prompt", height=100) # Tone Selection tone_options = [ "Professional", "Academic", "Informal", "Creative", "Neutral", "Direct", "Empathetic", "Humorous", "Authoritative", "Inquisitive" ] with st.sidebar: selected_tone = st.selectbox("Select Tone", tone_options) # Instructions Selection instruction_options = { "General Assistant": "Act as a helpful assistant.", "Researcher": "Act as a researcher providing in-depth analysis.", "Software Engineer": "Act as a software engineer focusing on code and technical details.", "Product Manager": "Act as a product manager considering strategy and user experience.", "Data Scientist": "Act as a data scientist emphasizing data analysis.", "Business Analyst": "Act as a business analyst considering strategic aspects.", "Technical Writer": "Act as a technical writer creating clear documentation.", "Marketing Specialist": "Act as a marketing specialist focusing on branding.", "HR Manager": "Act as an HR manager considering people aspects.", "Legal Advisor": "Act as a legal advisor providing legal perspective.", "Custom Instructions": "Enter custom instructions below:" } with st.sidebar: st.markdown("### Assistant Behavior") selected_instruction = st.selectbox("Select Instructions", list(instruction_options.keys())) custom_instruction = "" if selected_instruction == "Custom Instructions": custom_instruction = st.text_area("Enter Custom Instructions", height=100) # Length/Detail Selection length_options = ["Concise", "Detailed", "Comprehensive", "Bullet Points"] with st.sidebar: st.markdown("### Response Format") selected_length = st.selectbox("Select Length/Detail", length_options) # Main Area st.markdown("

📄 DocMind AI: Document Analysis

", unsafe_allow_html=True) st.markdown("

Upload documents and analyze them using the Gemma model

", unsafe_allow_html=True) # File Upload with improved UI uploaded_files = st.file_uploader( "Upload Documents", accept_multiple_files=True, type=["pdf", "docx", "txt", "xlsx", "md", "json", "xml", "rtf", "csv", "msg", "pptx", "odt", "epub", "py", "js", "java", "ts", "tsx", "c", "cpp", "h", "html", "css", "sql", "rb", "go", "rs", "php"] ) # Display uploaded files with better visual indication if uploaded_files: st.markdown("
", unsafe_allow_html=True) st.markdown("### Uploaded Documents") st.markdown("", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) # Function to process the documents and run analysis def run_analysis(): if not uploaded_files: st.error("Please upload at least one document.") return # Save uploaded files to temporary directory temp_dir = tempfile.mkdtemp() file_paths = [] for uploaded_file in uploaded_files: file_path = os.path.join(temp_dir, uploaded_file.name) with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) file_paths.append(file_path) # Process documents with st.spinner("Processing documents..."): all_texts = [] processed_docs = [] progress_bar = st.progress(0) for i, file_path in enumerate(file_paths): processor = get_processor_for_file(file_path) if processor: try: doc_data = process_document(file_path) if doc_data is not None and len(doc_data.strip()) > 0: # Ensure we have content all_texts.append(doc_data) processed_docs.append({"name": os.path.basename(file_path), "data": doc_data}) except Exception as e: st.error(f"Error processing {os.path.basename(file_path)}: {str(e)}") progress_bar.progress((i + 1) / len(file_paths)) if not all_texts: st.error("No documents could be processed. Please check the file formats and try again.") return # Build the prompt if selected_prompt_option == "Custom Prompt": prompt_text = custom_prompt else: prompt_text = prompt_options[selected_prompt_option] if selected_instruction == "Custom Instructions": instruction_text = custom_instruction else: instruction_text = instruction_options[selected_instruction] # Add tone guidance tone_guidance = f"Use a {selected_tone.lower()} tone in your response." # Add length guidance length_guidance = "" if selected_length == "Concise": length_guidance = "Keep your response brief and to the point." elif selected_length == "Detailed": length_guidance = "Provide a detailed response with thorough explanations." elif selected_length == "Comprehensive": length_guidance = "Provide a comprehensive in-depth analysis covering all aspects." elif selected_length == "Bullet Points": length_guidance = "Format your response primarily using bullet points for clarity." # Set up the output parser output_parser = PydanticOutputParser(pydantic_object=DocumentAnalysis) format_instructions = output_parser.get_format_instructions() if analysis_mode == "Analyze each document separately": results = [] for doc in processed_docs: with st.spinner(f"Analyzing {doc['name']}..."): # Create system message with combined instructions system_message = f"{instruction_text} {tone_guidance} {length_guidance} Format your response according to these instructions: {format_instructions}" prompt = f""" {prompt_text} Document: {doc['name']} Content: {doc['data']} """ # Get response from LLM try: response = llm.create_chat_completion( messages = [ { "role": "system", "content": system_message }, { "role": "user", "content": prompt } ] ) # Try to parse the response into the pydantic model try: # Clean the response before parsing cleaned_response = clean_llm_response(response) parsed_response = output_parser.parse(cleaned_response) results.append({ "document_name": doc['name'], "analysis": parsed_response.dict() }) except Exception as e: # If parsing fails, include the raw response if isinstance(response, dict) and 'choices' in response: raw_response = response['choices'][0]['message']['content'] else: raw_response = str(response) results.append({ "document_name": doc['name'], "analysis": raw_response, "parsing_error": str(e) }) except Exception as e: st.error(f"Error analyzing {doc['name']}: {str(e)}") # Display results with card-based UI for result in results: st.markdown(f"
", unsafe_allow_html=True) st.markdown(f"

Analysis for: {result['document_name']}

", unsafe_allow_html=True) if isinstance(result['analysis'], dict) and 'parsing_error' not in result: # Structured output st.markdown("
", unsafe_allow_html=True) st.markdown("### Summary") st.write(result['analysis']['summary']) st.markdown("
", unsafe_allow_html=True) st.markdown("### Key Insights") for insight in result['analysis']['key_insights']: st.markdown(f"- {insight}") if result['analysis'].get('action_items'): st.markdown("
", unsafe_allow_html=True) st.markdown("### Action Items") for item in result['analysis']['action_items']: st.markdown(f"- {item}") st.markdown("
", unsafe_allow_html=True) if result['analysis'].get('open_questions'): st.markdown("### Open Questions") for question in result['analysis']['open_questions']: st.markdown(f"- {question}") else: # Raw output st.markdown(result['analysis']) if 'parsing_error' in result: st.info(f"Note: The response could not be parsed into the expected format. Error: {result['parsing_error']}") st.markdown("
", unsafe_allow_html=True) else: with st.spinner("Analyzing all documents together..."): # Combine all documents combined_content = "\n\n".join([f"Document: {doc['name']}\n\nContent: {doc['data']}" for doc in processed_docs]) # Create system message with combined instructions system_message = f"{instruction_text} {tone_guidance} {length_guidance} Format your response according to these instructions: {format_instructions}" # Create the prompt template for HuggingFace models prompt = f""" {prompt_text} {combined_content} """ # Get response from LLM try: response = llm.create_chat_completion( messages = [ { "role": "system", "content": system_message }, { "role": "user", "content": prompt } ] ) # Try to parse the response into the pydantic model try: # Clean the response before parsing cleaned_response = clean_llm_response(response) parsed_response = output_parser.parse(cleaned_response) st.markdown("
", unsafe_allow_html=True) st.markdown("

Combined Analysis for All Documents

", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) st.markdown("### Summary") st.write(parsed_response.summary) st.markdown("
", unsafe_allow_html=True) st.markdown("### Key Insights") for insight in parsed_response.key_insights: st.markdown(f"- {insight}") if parsed_response.action_items: st.markdown("
", unsafe_allow_html=True) st.markdown("### Action Items") for item in parsed_response.action_items: st.markdown(f"- {item}") st.markdown("
", unsafe_allow_html=True) if parsed_response.open_questions: st.markdown("### Open Questions") for question in parsed_response.open_questions: st.markdown(f"- {question}") st.markdown("
", unsafe_allow_html=True) except Exception as e: # If parsing fails, return the raw response st.markdown("
", unsafe_allow_html=True) st.markdown("

Combined Analysis for All Documents

", unsafe_allow_html=True) # Get raw content from response if isinstance(response, dict) and 'choices' in response: raw_response = response['choices'][0]['message']['content'] else: raw_response = str(response) st.markdown(raw_response) st.info(f"Note: The response could not be parsed into the expected format. Error: {str(e)}") st.markdown("
", unsafe_allow_html=True) except Exception as e: st.error(f"Error analyzing documents: {str(e)}") # Create text chunks for embeddings with st.spinner("Setting up document chat..."): try: text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) all_chunks = [] for doc in processed_docs: if doc['data'] and len(doc['data'].strip()) > 0: # Verify data exists and is not empty chunks = text_splitter.split_text(doc['data']) all_chunks.extend(chunks) # Only create embeddings if we have chunks if all_chunks and len(all_chunks) > 0: # Load embeddings embeddings = load_embeddings() # Using 'None' as namespace to avoid unique ID issues with Chroma vectorstore = Chroma.from_texts( texts=all_chunks, embedding=embeddings, collection_name="docmind_collection", collection_metadata={"timestamp": datetime.now().isoformat()} ) retriever = vectorstore.as_retriever() # Set up conversation memory memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True ) # Create conversational chain qa_chain = ConversationalRetrievalChain.from_llm( llm=llm, retriever=retriever, memory=memory ) st.session_state['qa_chain'] = qa_chain st.session_state['chat_history'] = [] st.success("Document chat is ready! Ask questions about your documents below.") else: st.warning("No text chunks were created from the documents. Chat functionality is unavailable.") except Exception as e: st.error(f"Error setting up document chat: {str(e)}") # For debugging purposes st.exception(e) # Initialize chat history if 'chat_history' not in st.session_state: st.session_state['chat_history'] = [] # Chat Interface with improved styling st.markdown("---") st.markdown("

💬 Chat with your Documents

", unsafe_allow_html=True) st.markdown("

Ask follow-up questions about the analyzed documents.

", unsafe_allow_html=True) # Process the analysis if button is clicked col1, col2, col3 = st.columns([1, 2, 1]) with col2: if st.button("Extract and Analyze", use_container_width=True): run_analysis() # Chat input and display if 'qa_chain' in st.session_state: st.markdown("
", unsafe_allow_html=True) user_question = st.text_input("Ask a question about your documents:") if user_question: with st.spinner("Generating response..."): try: response = st.session_state['qa_chain'].invoke({"question": user_question}) st.session_state['chat_history'].append({"question": user_question, "answer": response['answer']}) except Exception as e: st.error(f"Error generating response: {str(e)}") # Display chat history with improved styling for exchange in st.session_state['chat_history']: st.markdown("
", unsafe_allow_html=True) st.markdown(f"
You: {exchange['question']}
", unsafe_allow_html=True) st.markdown(f"
DocMind AI: {exchange['answer']}
", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) # Footer st.markdown("---") st.markdown( """

Built with ❤️ using Streamlit, LangChain, and Gemma model

DocMind AI - AI-Powered Document Analysis

""", unsafe_allow_html=True )