import sys import os import pandas as pd import pdfplumber import gradio as gr from typing import List # ✅ Fix: Add src to Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) from txagent.txagent import TxAgent def sanitize_utf8(text: str) -> str: return text.encode("utf-8", "ignore").decode("utf-8") def clean_final_response(text: str) -> str: cleaned = text.replace("[TOOL_CALLS]", "").strip() sections = cleaned.split("[Final Analysis]") if len(sections) > 1: final = sections[1].strip() formatted_final = final.replace("\n", "
  • ") return ( "
    " "

    🧠 Final Analysis

    " f"" "
    " ) return f"

    {cleaned}

    " def extract_all_text_from_csv_or_excel(file_path: str, progress=None, index=0, total=1) -> str: try: if not os.path.exists(file_path): return f"File not found: {file_path}" if progress: progress((index + 1) / total, desc=f"Reading spreadsheet: {os.path.basename(file_path)}") if file_path.endswith(".csv"): df = pd.read_csv(file_path, encoding="utf-8", errors="replace", low_memory=False) elif file_path.endswith((".xls", ".xlsx")): try: df = pd.read_excel(file_path, engine="openpyxl") except: df = pd.read_excel(file_path, engine="xlrd") else: return f"Unsupported spreadsheet format: {file_path}" lines = [] for _, row in df.iterrows(): line = " | ".join(str(cell) for cell in row if pd.notna(cell)) if line: lines.append(line) return f"\U0001F4C4 {os.path.basename(file_path)}\n\n" + "\n".join(lines) except Exception as e: return f"[Error reading {os.path.basename(file_path)}]: {str(e)}" def extract_all_text_from_pdf(file_path: str, progress=None, index=0, total=1) -> str: try: if not os.path.exists(file_path): return f"PDF not found: {file_path}" extracted = [] with pdfplumber.open(file_path) as pdf: num_pages = len(pdf.pages) for i, page in enumerate(pdf.pages): try: text = page.extract_text() or "" extracted.append(text.strip()) if progress: progress((index + (i / num_pages)) / total, desc=f"Reading PDF: {os.path.basename(file_path)} ({i+1}/{num_pages})") except Exception as e: extracted.append(f"[Error reading page {i+1}]: {str(e)}") return f"\U0001F4C4 {os.path.basename(file_path)}\n\n" + "\n\n".join(extracted) except Exception as e: return f"[Error reading PDF {os.path.basename(file_path)}]: {str(e)}" def chunk_text(text: str, max_tokens: int = 8192) -> List[str]: chunks = [] words = text.split() chunk = [] token_count = 0 for word in words: token_count += len(word) // 4 + 1 if token_count > max_tokens: chunks.append(" ".join(chunk)) chunk = [word] token_count = len(word) // 4 + 1 else: chunk.append(word) if chunk: chunks.append(" ".join(chunk)) return chunks def create_ui(agent: TxAgent): with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("

    \U0001F4CB CPS: Clinical Patient Support System

    ") chatbot = gr.Chatbot(label="CPS Assistant", height=600, type="tuples") file_upload = gr.File( label="Upload Medical File", file_types=[".pdf", ".txt", ".docx", ".jpg", ".png", ".csv", ".xls", ".xlsx"], file_count="multiple" ) message_input = gr.Textbox(placeholder="Ask a biomedical question or just upload the files...", show_label=False) send_button = gr.Button("Send", variant="primary") conversation_state = gr.State([]) def handle_chat(message: str, history: list, conversation: list, uploaded_files: list, progress=gr.Progress()): context = ( "You are an expert clinical AI assistant reviewing medical form or interview data. " "Your job is to analyze this data and reason about any information or red flags that a human doctor might have overlooked. " "Provide a **detailed and structured response**, including examples, supporting evidence from the form, and clinical rationale for why these items matter. " "Ensure the output is informative and helpful for improving patient care. " "Do not hallucinate. Base the response only on the provided form content. " "End with a section labeled '[Final Analysis]' where you summarize key findings the doctor may have missed." ) try: history.append((message, "⏳ Processing your request...")) yield history extracted_text = "" if uploaded_files and isinstance(uploaded_files, list): total_files = len(uploaded_files) for index, file in enumerate(uploaded_files): if not hasattr(file, 'name'): continue path = file.name try: if path.endswith((".csv", ".xls", ".xlsx")): extracted_text += extract_all_text_from_csv_or_excel(path, progress, index, total_files) + "\n" elif path.endswith(".pdf"): extracted_text += extract_all_text_from_pdf(path, progress, index, total_files) + "\n" else: extracted_text += f"(Uploaded file: {os.path.basename(path)})\n" except Exception as file_error: extracted_text += f"[Error processing {os.path.basename(path)}]: {str(file_error)}\n" sanitized = sanitize_utf8(extracted_text.strip()) chunks = chunk_text(sanitized) full_response = "" for i, chunk in enumerate(chunks): chunked_prompt = ( f"{context}\n\n--- Uploaded File Content (Chunk {i+1}/{len(chunks)}) ---\n\n{chunk}\n\n" f"--- End of Chunk ---\n\nNow begin your analysis:" ) generator = agent.run_gradio_chat( message=chunked_prompt, history=[], temperature=0.3, max_new_tokens=1024, max_token=8192, call_agent=False, conversation=conversation, uploaded_files=uploaded_files, max_round=30 ) chunk_response = "" for update in generator: if isinstance(update, str): chunk_response += update elif isinstance(update, list): for msg in update: if hasattr(msg, 'content'): chunk_response += msg.content full_response += chunk_response + "\n\n" full_response = clean_final_response(full_response.strip()) history[-1] = (message, full_response) yield history except Exception as chat_error: print(f"Chat handling error: {chat_error}") error_msg = "An error occurred while processing your request. Please try again." if len(history) > 0 and history[-1][1].startswith("⏳"): history[-1] = (history[-1][0], error_msg) else: history.append((message, error_msg)) yield history inputs = [message_input, chatbot, conversation_state, file_upload] send_button.click(fn=handle_chat, inputs=inputs, outputs=chatbot) message_input.submit(fn=handle_chat, inputs=inputs, outputs=chatbot) gr.Examples([ ["Upload your medical form and ask what the doctor might've missed."], ["This patient was treated with antibiotics for UTI. What else should we check?"], ["Is there anything abnormal in the attached blood work report?"] ], inputs=message_input) return demo