Spaces:
Running
Running
import streamlit as st | |
from predict import run_prediction | |
from io import StringIO | |
import PyPDF4 | |
import docx2txt | |
import pdfplumber | |
import difflib | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sentence_transformers import SentenceTransformer, util | |
# ========== CONFIGURATION ========== | |
st.set_page_config( | |
layout="wide", | |
page_title="Contract Analysis Suite", | |
page_icon="π" | |
) | |
# Initialize session state variables if they don't exist | |
if 'comparison_results' not in st.session_state: | |
st.session_state.comparison_results = None | |
if 'analysis_results' not in st.session_state: | |
st.session_state.analysis_results = None | |
# ========== CACHED DATA LOADING ========== | |
def load_questions(): | |
try: | |
with open('data/questions.txt') as f: | |
return [q.strip() for q in f.readlines() if q.strip()] | |
except Exception as e: | |
st.error(f"Error loading questions: {str(e)}") | |
return [] | |
def load_questions_short(): | |
try: | |
with open('data/questions_short.txt') as f: | |
return [q.strip() for q in f.readlines() if q.strip()] | |
except Exception as e: | |
st.error(f"Error loading short questions: {str(e)}") | |
return [] | |
# ========== UTILITY FUNCTIONS ========== | |
def extract_text_from_pdf(uploaded_file): | |
try: | |
with pdfplumber.open(uploaded_file) as pdf: | |
full_text = "" | |
for page in pdf.pages: | |
try: | |
text = page.extract_text_formatted() | |
except AttributeError: | |
text = page.extract_text() | |
if text: | |
full_text += text + "\n\n" | |
else: | |
full_text += page.extract_text() + "\n\n" | |
return full_text if full_text.strip() else "" | |
except Exception as e: | |
st.error(f"PDF extraction error: {str(e)}") | |
return "" | |
def highlight_differences_words(text1, text2): | |
differ = difflib.Differ() | |
diff = list(differ.compare(text1.split(), text2.split())) | |
highlighted_text1 = "" | |
highlighted_text2 = "" | |
for i, word in enumerate(diff): | |
if word.startswith("- "): | |
removed_word = word[2:] | |
highlighted_text1 += f'<span style="background-color:#ffcccc; display: inline-block;">{removed_word}</span>' | |
if i + 1 < len(diff) and diff[i + 1].startswith("+ "): | |
added_word = diff[i + 1][2:] | |
highlighted_text2 += f'<span style="background-color:#ffffcc; display: inline-block;">{added_word}</span>' | |
diff[i + 1] = ' ' | |
else: | |
highlighted_text2 += " " | |
elif word.startswith("+ "): | |
added_word = word[2:] | |
highlighted_text2 += f'<span style="background-color:#ccffcc; display: inline-block;">{added_word}</span>' | |
if i - 1 >= 0 and diff[i - 1].startswith("- "): | |
highlighted_text1 += f'<span style="background-color:#ffffcc; display: inline-block;">{diff[i-1][2:]}</span>' | |
diff[i-1] = ' ' | |
else: | |
highlighted_text1 += " " | |
elif word.startswith(" "): | |
highlighted_text1 += word[2:] + " " | |
highlighted_text2 += word[2:] + " " | |
return highlighted_text1, highlighted_text2 | |
def calculate_similarity(text1, text2): | |
if not text1.strip() or not text2.strip(): | |
return 0.0 | |
try: | |
model = SentenceTransformer('all-MiniLM-L6-v2') | |
embeddings = model.encode([text1, text2], convert_to_tensor=True) | |
similarity = util.cos_sim(embeddings[0], embeddings[1]) | |
return float(similarity.item()) * 100 | |
except Exception as e: | |
st.error(f"Similarity calculation error: {e}") | |
return 0.0 | |
def load_contract(file): | |
if file is None: | |
return "" | |
ext = file.name.split('.')[-1].lower() | |
try: | |
if ext == 'txt': | |
content = StringIO(file.getvalue().decode("utf-8")).read() | |
elif ext == 'pdf': | |
content = extract_text_from_pdf(file) | |
if not content: | |
pdfReader = PyPDF4.PdfFileReader(file) | |
full_text = "" | |
for page in pdfReader.pages: | |
text = page.extractText() | |
if text: | |
full_text += text + "\n\n" | |
content = full_text | |
elif ext == 'docx': | |
content = docx2txt.process(file) | |
else: | |
st.warning('Unsupported file type') | |
return "" | |
return content.strip() if content else "" | |
except Exception as e: | |
st.error(f"Error loading {ext.upper()} file: {str(e)}") | |
return "" | |
# ========== MAIN APP ========== | |
def main(): | |
questions = load_questions() | |
questions_short = load_questions_short() | |
if not questions or not questions_short or len(questions) != len(questions_short): | |
st.error("Failed to load questions or questions mismatch. Please check data files.") | |
return | |
st.title("π Contract Analysis Suite") | |
st.markdown(""" | |
Compare documents and analyze legal clauses using AI-powered question answering. | |
""") | |
st.header("1. Upload Documents") | |
col1, col2 = st.columns(2) | |
with col1: | |
uploaded_file1 = st.file_uploader("Upload First Document", type=["txt", "pdf", "docx"], key="file1") | |
contract_text1 = load_contract(uploaded_file1) if uploaded_file1 else "" | |
doc1_display = st.empty() | |
with col2: | |
uploaded_file2 = st.file_uploader("Upload Second Document", type=["txt", "pdf", "docx"], key="file2") | |
contract_text2 = load_contract(uploaded_file2) if uploaded_file2 else "" | |
doc2_display = st.empty() | |
if uploaded_file1: | |
doc1_display.text_area("Document 1 Content", value=contract_text1, height=400, key="area1") | |
if uploaded_file2: | |
doc2_display.text_area("Document 2 Content", value=contract_text2, height=400, key="area2") | |
if not (uploaded_file1 and uploaded_file2): | |
st.warning("Please upload both documents to proceed") | |
return | |
st.header("2. Document Comparison") | |
with st.expander("Show Document Differences", expanded=True): | |
if st.button("Compare Documents"): | |
with st.spinner("Analyzing documents..."): | |
if not contract_text1.strip() or not contract_text2.strip(): | |
st.error("One or both documents appear to be empty or couldn't be read properly") | |
return | |
similarity_score = calculate_similarity(contract_text1, contract_text2) | |
highlighted_diff1, highlighted_diff2 = highlight_differences_words(contract_text1, contract_text2) | |
st.session_state.comparison_results = { | |
'similarity_score': similarity_score, | |
'highlighted_diff1': highlighted_diff1, | |
'highlighted_diff2': highlighted_diff2, | |
} | |
if st.session_state.comparison_results: | |
st.metric("Document Similarity Score", f"{st.session_state.comparison_results['similarity_score']:.2f}%") | |
if st.session_state.comparison_results['similarity_score'] < 50: | |
st.warning("Significant differences detected") | |
st.markdown("**Visual Difference Highlighting:**") | |
col1, col2 = st.columns(2) | |
with col1: | |
st.markdown("### Original Document") | |
st.markdown(f'<div style="border:1px solid #ccc; padding:10px; white-space: pre-wrap; font-family: monospace; font-size: 0.9em; max-height: 500px; overflow-y: auto;">{st.session_state.comparison_results["highlighted_diff1"]}</div>', unsafe_allow_html=True) | |
with col2: | |
st.markdown("### Modified Document") | |
st.markdown(f'<div style="border:1px solid #ccc; padding:10px; white-space: pre-wrap; font-family: monospace; font-size: 0.9em; max-height: 500px; overflow-y: auto;">{st.session_state.comparison_results["highlighted_diff2"]}</div>', unsafe_allow_html=True) | |
st.header("3. Clause Analysis") | |
try: | |
question_selected = st.selectbox('Select a legal question to analyze:', questions_short, index=0, key="question_select") | |
question_idx = questions_short.index(question_selected) | |
selected_question = questions[question_idx] | |
except Exception as e: | |
st.error(f"Error selecting question: {str(e)}") | |
return | |
if st.button("Analyze Both Documents"): | |
if not (contract_text1.strip() and contract_text2.strip()): | |
st.error("Please ensure both documents have readable content") | |
return | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("First Document Analysis") | |
with st.spinner('Processing first document...'): | |
try: | |
predictions1 = run_prediction([selected_question], contract_text1, 'marshmellow77/roberta-base-cuad', n_best_size=5) | |
answer1 = predictions1.get('0', 'No answer found') | |
st.session_state.analysis_results = st.session_state.analysis_results or {} | |
st.session_state.analysis_results['doc1'] = answer1 if answer1 else "No relevant clause found" | |
except Exception as e: | |
st.session_state.analysis_results = st.session_state.analysis_results or {} | |
st.session_state.analysis_results['doc1'] = f"Analysis failed: {str(e)}" | |
with col2: | |
st.subheader("Second Document Analysis") | |
with st.spinner('Processing second document...'): | |
try: | |
predictions2 = run_prediction([selected_question], contract_text2, 'marshmellow77/roberta-base-cuad', n_best_size=5) | |
answer2 = predictions2.get('0', 'No answer found') | |
st.session_state.analysis_results = st.session_state.analysis_results or {} | |
st.session_state.analysis_results['doc2'] = answer2 if answer2 else "No relevant clause found" | |
except Exception as e: | |
st.session_state.analysis_results = st.session_state.analysis_results or {} | |
st.session_state.analysis_results['doc2'] = f"Analysis failed: {str(e)}" | |
if st.session_state.analysis_results: | |
col1, col2 = st.columns(2) | |
with col1: | |
st.success(st.session_state.analysis_results.get('doc1', 'No analysis performed yet')) | |
with col2: | |
st.success(st.session_state.analysis_results.get('doc2', 'No analysis performed yet')) | |
if __name__ == "__main__": | |
main() |