AssignMaster / app.py
arpit13's picture
Update app.py
b2b1442 verified
import streamlit as st
import os
import time
import tempfile
import subprocess
import re
import PyPDF2
import docx
from fpdf import FPDF
from pathlib import Path
import threading
import queue
import logging
from datetime import datetime
import io
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("streamlit_app.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Set page configuration
st.set_page_config(
page_title="B.Tech Assignment Solution Generator",
page_icon="🎓",
layout="wide",
initial_sidebar_state="expanded",
menu_items={
'About': "# B.Tech Assignment Solution Generator\nThis app processes B.Tech assignments and generates comprehensive solutions."
}
)
# Apply dark theme
st.markdown("""
<style>
.main {
background-color: #0E1117;
color: white;
}
.stButton>button {
background-color: #4CAF50;
color: white;
border-radius: 8px;
padding: 10px 24px;
font-weight: bold;
border: none;
transition-duration: 0.4s;
}
.stButton>button:hover {
background-color: #45a049;
}
.stTextInput>div>div>input {
background-color: #1E2126;
color: white;
}
.stMarkdown {
color: white;
}
.css-1cpxqw2 {
background-color: #262730;
border-radius: 10px;
padding: 20px;
margin-bottom: 20px;
}
.css-1v3fvcr {
background-color: #0E1117;
}
.css-18e3th9 {
padding-top: 2rem;
}
.css-1kyxreq {
justify-content: center;
align-items: center;
}
.stProgress > div > div > div > div {
background-color: #4CAF50;
}
.log-container {
background-color: #1E2126;
color: #B0B0B0;
padding: 15px;
border-radius: 8px;
height: 300px;
overflow-y: auto;
font-family: monospace;
margin-bottom: 20px;
}
.success-message {
color: #4CAF50;
font-weight: bold;
}
.error-message {
color: #FF5252;
font-weight: bold;
}
.info-message {
color: #2196F3;
}
.warning-message {
color: #FFC107;
}
.pdf-container {
background-color: #262730;
padding: 20px;
border-radius: 10px;
margin-top: 20px;
}
.download-button {
background-color: #2196F3;
color: white;
padding: 10px 15px;
border-radius: 5px;
text-decoration: none;
display: inline-block;
margin-top: 10px;
}
</style>
""", unsafe_allow_html=True)
def extract_text_from_pdf(file):
"""Extract text from a PDF file."""
try:
pdf_reader = PyPDF2.PdfReader(file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text
except Exception as e:
logger.error(f"Error extracting text from PDF: {e}")
st.error(f"Error extracting text from PDF: {e}")
return None
def extract_text_from_docx(file):
"""Extract text from a DOCX file."""
try:
doc = docx.Document(file)
text = ""
for para in doc.paragraphs:
text += para.text + "\n"
return text
except Exception as e:
logger.error(f"Error extracting text from DOCX: {e}")
st.error(f"Error extracting text from DOCX: {e}")
return None
def create_pdf(content, filename="solution.pdf"):
"""Create a PDF file from the solution content."""
try:
pdf = FPDF()
pdf.add_page()
pdf.set_auto_page_break(auto=True, margin=15)
# Set font for title
pdf.set_font("Arial", "B", 16)
pdf.cell(0, 10, "B.Tech Assignment Solution", ln=True, align="C")
pdf.ln(5)
# Set font for content
pdf.set_font("Arial", "", 12)
# Process content by paragraphs
paragraphs = content.split('\n')
for para in paragraphs:
# Check if this is a heading (simple heuristic)
if para.strip() and len(para.strip()) < 100 and para.strip().isupper():
pdf.set_font("Arial", "B", 14)
pdf.cell(0, 10, para, ln=True)
pdf.set_font("Arial", "", 12)
else:
# Handle regular paragraph
pdf.multi_cell(0, 10, para)
pdf.ln(2)
# Save PDF to a bytes buffer
pdf_buffer = io.BytesIO()
pdf.output(pdf_buffer)
pdf_buffer.seek(0)
return pdf_buffer
except Exception as e:
logger.error(f"Error creating PDF: {e}")
st.error(f"Error creating PDF: {e}")
return None
def process_log_line(line):
"""Process a log line to determine its type and format it for display."""
line = line.strip()
if not line:
return None, None
# Determine message type
if "ERROR" in line or "error" in line.lower():
msg_type = "error"
elif "WARNING" in line or "warning" in line.lower():
msg_type = "warning"
elif "INFO" in line or "info" in line.lower():
msg_type = "info"
else:
msg_type = "info"
# Extract the important part of the message
# Remove timestamp and log level if present
match = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} - \w+ - \w+ - (.*)', line)
if match:
message = match.group(1)
else:
message = line
return msg_type, message
def monitor_log_file(log_file, message_queue, stop_event):
"""Monitor the log file for new content and put it in the queue."""
try:
with open(log_file, 'r') as f:
# Move to the end of the file
f.seek(0, 2)
while not stop_event.is_set():
line = f.readline()
if line:
msg_type, message = process_log_line(line)
if msg_type and message:
message_queue.put((msg_type, message))
else:
# No new line, wait a bit
time.sleep(0.1)
except Exception as e:
logger.error(f"Error monitoring log file: {e}")
message_queue.put(("error", f"Error monitoring log file: {e}"))
def run_assignment_processor(input_file_path, message_queue):
"""Run the assignment processor and capture its output."""
try:
# Get the directory of the current script
script_dir = os.path.dirname(os.path.abspath(__file__))
# Construct the path to streamlit_main.py
main_script = os.path.join(script_dir, "streamlit_main.py")
# Activate virtual environment if available
venv_python = os.path.join(script_dir, "venv", "bin", "python")
if os.path.exists(venv_python):
python_cmd = venv_python
else:
python_cmd = "python3"
# Run the assignment processor
cmd = [python_cmd, main_script, input_file_path]
# Add log message
message_queue.put(("info", "Starting assignment processing..."))
# Execute the command
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Capture and process output
for line in process.stdout:
msg_type, message = process_log_line(line)
if msg_type and message:
message_queue.put((msg_type, message))
# Wait for process to complete
return_code = process.wait()
if return_code != 0:
# Capture error output if process failed
error_output = process.stderr.read()
message_queue.put(("error", f"Process failed with return code {return_code}: {error_output}"))
return False
# Add completion message
message_queue.put(("success", "COMPLETED B.TECH ASSIGNMENT SOLUTION PIPELINE"))
return True
except Exception as e:
logger.error(f"Error running assignment processor: {e}")
message_queue.put(("error", f"Error running assignment processor: {e}"))
return False
def main():
# Header
st.title("B.Tech Assignment Solution Generator")
st.markdown("Upload your assignment file (PDF or DOCX) and get comprehensive solutions.")
# File uploader
uploaded_file = st.file_uploader("Choose an assignment file", type=["pdf", "docx"])
if uploaded_file is not None:
st.success(f"File uploaded: {uploaded_file.name}")
# Extract text from the uploaded file
if uploaded_file.name.endswith('.pdf'):
text = extract_text_from_pdf(uploaded_file)
elif uploaded_file.name.endswith('.docx'):
text = extract_text_from_docx(uploaded_file)
else:
st.error("Unsupported file format. Please upload a PDF or DOCX file.")
return
if text is None:
st.error("Failed to extract text from the file.")
return
# Preview the extracted text
with st.expander("Preview Extracted Text", expanded=False):
st.text_area("Extracted Text", text, height=200)
# Process button
if st.button("Generate Solution"):
# Create a temporary file to save the extracted text
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
temp_file.write(text)
temp_file_path = temp_file.name
try:
# Set up progress tracking
progress_bar = st.progress(0)
status_text = st.empty()
# Create a container for log messages
st.subheader("Processing Status")
log_container = st.empty()
log_messages = []
# Create a queue for messages
message_queue = queue.Queue()
# Create an event to signal thread to stop
stop_event = threading.Event()
# Start a thread to monitor the log file
log_file = "btech_solution_system.log"
log_thread = threading.Thread(
target=monitor_log_file,
args=(log_file, message_queue, stop_event)
)
log_thread.daemon = True
log_thread.start()
# Start processing in a separate thread
processing_thread = threading.Thread(
target=run_assignment_processor,
args=(temp_file_path, message_queue)
)
processing_thread.daemon = True
processing_thread.start()
# Update progress and display log messages
start_time = time.time()
solution_content = None
# Define key stages and their progress values
stages = {
"ANALYZING QUESTIONS": 10,
"RESEARCHING INFORMATION": 30,
"DEVELOPING INITIAL SOLUTIONS": 50,
"VERIFYING SOLUTIONS": 70,
"REVISING SOLUTIONS": 85,
"FORMATTING FINAL REPORT": 95,
"COMPLETED": 100
}
current_progress = 0
# Update status until processing is complete
while processing_thread.is_alive() or not message_queue.empty():
# Check for new messages
try:
while not message_queue.empty():
msg_type, message = message_queue.get(block=False)
# Update progress based on message content
for stage, progress_value in stages.items():
if stage in message:
current_progress = progress_value
break
# Format message based on type
formatted_msg = f"<div class='{msg_type}-message'>{message}</div>"
log_messages.append(formatted_msg)
# Keep only the last 20 messages to avoid cluttering
if len(log_messages) > 20:
log_messages = log_messages[-20:]
# Update log display
log_container.markdown(
f"<div class='log-container'>{''.join(log_messages)}</div>",
unsafe_allow_html=True
)
# Check if solution is ready
if "COMPLETED B.TECH ASSIGNMENT SOLUTION PIPELINE" in message:
# Try to read the solution file
try:
solution_file = "solution.txt"
if os.path.exists(solution_file):
with open(solution_file, 'r') as f:
solution_content = f.read()
except Exception as e:
logger.error(f"Error reading solution file: {e}")
log_messages.append(f"<div class='error-message'>Error reading solution file: {e}</div>")
except queue.Empty:
pass
# Update progress bar
progress_bar.progress(current_progress)
status_text.text(f"Processing... ({current_progress}%)")
# Check if processing is complete
if current_progress >= 100:
break
# Check timeout (30 minutes)
if time.time() - start_time > 1800:
status_text.error("Processing timed out after 30 minutes.")
break
time.sleep(0.5)
# Stop the log monitoring thread
stop_event.set()
# Check if solution was found
if solution_content:
# Update progress to 100%
progress_bar.progress(100)
status_text.success("Solution generated successfully!")
# Display the solution
st.subheader("Generated Solution")
st.markdown(solution_content)
# Create PDF
pdf_buffer = create_pdf(solution_content)
if pdf_buffer:
# Provide download button for PDF
st.download_button(
label="Download Solution as PDF",
data=pdf_buffer,
file_name="btech_assignment_solution.pdf",
mime="application/pdf",
key="solution-pdf",
help="Click to download the solution as a PDF file"
)
else:
status_text.error("Failed to generate solution.")
finally:
# Clean up temporary file
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
if __name__ == "__main__":
main()