GradebookReport / app.py
simonraj's picture
Upload 4 files
81a5ff5 verified
raw
history blame
11.7 kB
import pandas as pd
import os
import gradio as gr
import plotly.express as px
from typing import Tuple, List, Union
import traceback
# NTU Singapore colors
NTU_BLUE = "#003D7C"
NTU_RED = "#C11E38"
NTU_GOLD = "#E7B820"
def process_data(file: gr.File, progress=gr.Progress()) -> Tuple[str, str, pd.DataFrame]:
try:
# Check if file is uploaded
if file is None:
raise ValueError("No file uploaded. Please upload an Excel file.")
# Check file extension
if not file.name.lower().endswith(('.xls', '.xlsx')):
raise ValueError("Invalid file format. Please upload an Excel file (.xls or .xlsx).")
# Load the raw Excel file
try:
raw_data = pd.read_excel(file.name)
except Exception as e:
raise ValueError(f"Error reading Excel file: {str(e)}")
# Check if required columns are present
required_columns = ['user_id', 'lastname', 'course_id']
missing_columns = [col for col in required_columns if col not in raw_data.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")
# Extract filename without extension
base_filename = os.path.splitext(os.path.basename(file.name))[0]
# Define output paths
final_file_path = f'mailmerge {base_filename}.xlsx'
base_path = 'mailmerge'
# Step 1: Extract User Information
user_info = raw_data[['user_id', 'lastname']].drop_duplicates().copy()
user_info['Username'] = user_info['user_id']
user_info['Name'] = user_info['lastname']
user_info['Email'] = user_info['user_id'] + '@ntu.edu.sg'
progress(0.2, desc="Extracting user information")
# Step 2: Calculate Course Count
course_counts = raw_data.groupby('user_id')['course_id'].nunique().reset_index()
course_counts.columns = ['Username', 'Courses']
user_info = user_info.merge(course_counts, on='Username', how='left')
progress(0.4, desc="Calculating course counts")
# Step 3: Calculate Grand Total
event_counts = raw_data.groupby('user_id').size().reset_index(name='Grand Total')
event_counts.columns = ['Username', 'Grand Total']
user_info = user_info.merge(event_counts, on='Username', how='left')
progress(0.6, desc="Calculating grand totals")
# Step 4: Generate Filenames and Paths
user_info['File'] = 'User_' + user_info['Username'] + '_data.csv'
user_info['Path'] = user_info['File'].apply(lambda x: os.path.join(base_path, x))
# Remove extra columns and summary rows
user_info = user_info[['Username', 'Name', 'Courses', 'Grand Total', 'Email', 'File', 'Path']]
user_info = user_info[user_info['Username'].notna()]
user_info.drop_duplicates(subset=['Username'], inplace=True)
user_info.sort_values(by='Username', inplace=True)
progress(0.8, desc="Generating individual CSV files")
# Generate individual CSV files for each user
required_columns = ['course_id', 'course_pk1', 'data', 'event_type', 'internal_handle', 'lastname', 'session_id', 'timestamp', 'user_id', 'system_role']
missing_columns = [col for col in required_columns if col not in raw_data.columns]
if missing_columns:
raise ValueError(f"Missing columns for individual CSV files: {', '.join(missing_columns)}")
if not os.path.exists(base_path):
try:
os.makedirs(base_path)
except PermissionError:
raise PermissionError(f"Unable to create directory {base_path}. Please check your permissions.")
for user_id in user_info['Username'].unique():
user_data = raw_data[raw_data['user_id'] == user_id][required_columns]
user_file_path = os.path.join(base_path, f'User_{user_id}_data.csv')
try:
user_data.to_csv(user_file_path, index=False)
except PermissionError:
raise PermissionError(f"Unable to save file {user_file_path}. Please check your permissions.")
progress(0.9, desc="Saving final Excel file")
# Save the final dataframe to the output Excel file
try:
with pd.ExcelWriter(final_file_path, engine='xlsxwriter') as writer:
user_info.to_excel(writer, index=False, sheet_name='Sheet1')
workbook = writer.book
worksheet = writer.sheets['Sheet1']
# Find the last row number dynamically
last_row = len(user_info) + 1 # Account for header row in Excel
# Write the total values in columns B, C, and D of the first empty row after the user data
worksheet.write(f'B{last_row + 1}', 'Total')
worksheet.write(f'C{last_row + 1}', user_info['Courses'].sum())
worksheet.write(f'D{last_row + 1}', user_info['Grand Total'].sum())
progress(1.0, desc="Processing complete")
return f"Processing complete. Output saved to {final_file_path}", f"Individual CSV files saved in {base_path} directory", user_info
except PermissionError:
raise PermissionError(f"Unable to save file {final_file_path}. Please check if the file is open or if you have the necessary permissions.")
except Exception as e:
raise Exception(f"An error occurred while saving the final Excel file: {str(e)}")
except Exception as e:
error_msg = f"Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
return error_msg, "Processing failed", pd.DataFrame()
def create_summary_stats(df: pd.DataFrame) -> dict:
try:
return {
"Total Users": len(df),
"Total Courses": df['Courses'].sum(),
"Total Activity": df['Grand Total'].sum(),
"Avg Courses per User": df['Courses'].mean(),
"Avg Activity per User": df['Grand Total'].mean()
}
except Exception as e:
return {"Error": f"Failed to create summary stats: {str(e)}"}
def create_bar_chart(df: pd.DataFrame, x: str, y: str, title: str) -> Union[px.bar, None]:
try:
if df.empty:
return None
fig = px.bar(df, x=x, y=y, title=title)
fig.update_layout(
plot_bgcolor='white',
paper_bgcolor='white',
font_color=NTU_BLUE
)
fig.update_traces(marker_color=NTU_BLUE)
return fig
except Exception as e:
print(f"Error creating bar chart: {str(e)}")
return None
def create_scatter_plot(df: pd.DataFrame) -> Union[px.scatter, None]:
try:
if df.empty:
return None
fig = px.scatter(df, x='Courses', y='Grand Total', title='Courses vs. Activity Level',
hover_data=['Username', 'Name'])
fig.update_layout(
plot_bgcolor='white',
paper_bgcolor='white',
font_color=NTU_BLUE
)
fig.update_traces(marker_color=NTU_RED)
return fig
except Exception as e:
print(f"Error creating scatter plot: {str(e)}")
return None
def update_insights(df: pd.DataFrame) -> List[Union[gr.components.Component, None]]:
try:
if df.empty:
return [gr.Markdown("No data available. Please upload and process a file first.")] + [None] * 4
stats = create_summary_stats(df)
stats_md = gr.Markdown("\n".join([f"**{k}**: {v:.2f}" for k, v in stats.items()]))
users_activity_chart = create_bar_chart(df, 'Username', 'Grand Total', 'User Activity Levels')
users_courses_chart = create_bar_chart(df, 'Username', 'Courses', 'Courses per User')
scatter_plot = create_scatter_plot(df)
user_table = gr.DataFrame(value=df)
return [stats_md, users_activity_chart, users_courses_chart, scatter_plot, user_table]
except Exception as e:
error_msg = f"Error updating insights: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
return [gr.Markdown(error_msg)] + [None] * 4
def process_and_update(file):
try:
result_msg, csv_loc, df = process_data(file)
insights = update_insights(df)
return [result_msg, csv_loc] + insights
except Exception as e:
error_msg = f"Error in process_and_update: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
return [error_msg, "Processing failed"] + [gr.Markdown(error_msg)] + [None] * 4 # 4 is the number of plot components
# ... (previous code remains the same)
# Create a custom theme
custom_theme = gr.themes.Base().set(
body_background_fill="#E6F3FF",
body_text_color="#003D7C",
button_primary_background_fill="#C11E38",
button_primary_background_fill_hover="#A5192F",
button_primary_text_color="white",
block_title_text_color="#003D7C",
block_label_background_fill="#E6F3FF",
input_background_fill="white",
input_border_color="#003D7C",
input_border_color_focus="#C11E38",
)
# Load custom CSS
custom_css = """
.gr-button-secondary {
background-color: #F0F0F0;
color: #003D7C;
border: 1px solid #003D7C;
border-radius: 12px;
padding: 8px 16px;
font-size: 16px;
font-weight: bold;
cursor: pointer;
transition: background-color 0.3s, color 0.3s, border-color 0.3s;
}
.gr-button-secondary:hover {
background-color: #003D7C;
color: white;
border-color: #003D7C;
}
.gr-button-secondary:active {
transform: translateY(1px);
}
.app-title {
color: #003D7C;
font-size: 24px;
font-weight: bold;
text-align: center;
margin-bottom: 20px;
}
"""
def clear_outputs():
return [""] * 2 + [None] * 5 # 2 text outputs and 5 graph/table outputs
with gr.Blocks(theme=custom_theme, css=custom_css) as iface:
gr.Markdown("# Gradebook Data Processor", elem_classes=["app-title"])
with gr.Tabs():
with gr.TabItem("File Upload and Processing"):
file_input = gr.File(label="Upload Excel File")
with gr.Row():
process_btn = gr.Button("Process Data", variant="primary")
clear_btn = gr.Button("Clear", variant="secondary")
output_msg = gr.Textbox(label="Processing Result")
csv_location = gr.Textbox(label="CSV Files Location")
with gr.TabItem("Data Insights Dashboard"):
with gr.Row():
summary_stats = gr.Markdown("Upload and process a file to see summary statistics.")
with gr.Row():
users_activity_chart = gr.Plot()
users_courses_chart = gr.Plot()
with gr.Row():
scatter_plot = gr.Plot()
with gr.Row():
user_table = gr.DataFrame()
process_btn.click(
process_and_update,
inputs=[file_input],
outputs=[output_msg, csv_location, summary_stats, users_activity_chart, users_courses_chart, scatter_plot, user_table]
)
clear_btn.click(
clear_outputs,
inputs=[],
outputs=[output_msg, csv_location, summary_stats, users_activity_chart, users_courses_chart, scatter_plot, user_table]
)
if __name__ == "__main__":
iface.launch()