BananaSauce's picture
updated for task
611c02e
import pandas as pd
import streamlit as st
import csv
import io
import openpyxl # Add this import for Excel handling
from datetime import datetime
import re
def preprocess_csv(input_bytes):
# Keep this for backward compatibility with CSV files
text = input_bytes.decode() # Decode bytes to text
output = io.StringIO()
writer = csv.writer(output)
for row in csv.reader(io.StringIO(text)): # Read text as csv
if len(row) > 5:
row = row[0:5] + [','.join(row[5:])] # Combine extra fields into one
writer.writerow(row)
output.seek(0) # go to the start of the StringIO object
return output
def load_data(file):
column_names = [
'Functional area',
'Scenario name',
'Start datetime',
'End datetime',
'Status',
'Error message'
]
data = pd.read_csv(file, header=None, names=column_names)
return data
@st.cache_data
def preprocess_xlsx(uploaded_file):
"""Process Excel file with step-level data and convert to scenario-level summary"""
# Define data types for columns
dtype_dict = {
'Feature Name': 'string',
'Scenario Name': 'string',
'Total Time Taken (ms)': 'float64',
'Failed Scenario': 'string'
}
try:
# Read both the first sheet for error messages and "Time Taken" sheet
excel_file = pd.ExcelFile(uploaded_file, engine='openpyxl')
# Read detailed step data from first sheet (contains error messages)
error_df = pd.read_excel(excel_file, sheet_name=0)
# Read time taken data from the "Time Taken" sheet
df = pd.read_excel(
excel_file,
sheet_name='Time Taken',
dtype=dtype_dict
)
# Convert Failed Scenario column to boolean after reading
# Handle different possible values (TRUE/FALSE, True/False, etc.)
df['Failed Scenario'] = df['Failed Scenario'].astype(str).str.upper()
# Replace 'NAN' string with empty string to avoid conversion issues
df['Failed Scenario'] = df['Failed Scenario'].replace('NAN', '')
df['Status'] = df['Failed Scenario'].map(
lambda x: 'FAILED' if x in ['TRUE', 'YES', 'Y', '1'] else 'PASSED'
)
# Count failed and passed scenarios
failed_count = (df['Status'] == 'FAILED').sum()
passed_count = (df['Status'] == 'PASSED').sum()
# Extract error messages from the first sheet
# Find rows with FAILED result and group by Scenario Name to get the error message
if 'Result' in error_df.columns:
failed_steps = error_df[error_df['Result'] == 'FAILED'].copy()
# If there are failed steps, get the error messages
if not failed_steps.empty:
# Group by Scenario Name and get the first error message and step for each scenario
error_messages = failed_steps.groupby('Scenario Name').agg({
'Error Message': 'first',
'Step': 'first' # Capture the step where it failed
}).reset_index()
else:
# Create empty DataFrame with required columns
error_messages = pd.DataFrame(columns=['Scenario Name', 'Error Message', 'Step'])
else:
# If Result column doesn't exist, create empty DataFrame
error_messages = pd.DataFrame(columns=['Scenario Name', 'Error Message', 'Step'])
# Extract date from filename (e.g., RI2211_batch_20250225_27031.xlsx)
filename = uploaded_file.name
date_match = re.search(r'_(\d{8})_', filename)
if date_match:
date_str = date_match.group(1)
file_date = datetime.strptime(date_str, '%Y%m%d').date()
else:
st.warning(f"Could not extract date from filename: {filename}. Using current date.")
file_date = datetime.now().date()
# Extract environment from filename
if any(pattern in filename for pattern in ['_batch_', '_fin_', '_priority_', '_Puppeteer_']):
# Get everything before _batch, _fin, or _priority
if '_batch_' in filename:
environment = filename.split('_batch_')[0]
elif '_fin_' in filename:
environment = filename.split('_fin_')[0]
elif '_priority_' in filename:
environment = filename.split('_priority_')[0]
elif '_Puppeteer_' in filename:
environment = filename.split('_Puppeteer_')[0]
else:
environment = filename.split('.')[0]
# Create result dataframe
result_df = pd.DataFrame({
'Functional area': df['Feature Name'],
'Scenario Name': df['Scenario Name'],
'Status': df['Status'],
'Time spent': df['Total Time Taken (ms)'] / 1000 # Convert ms to seconds
})
# Fill any NaN values in Functional area
result_df['Functional area'] = result_df['Functional area'].fillna('Unknown')
# Ensure Time spent is a numeric value and handle NaN
result_df['Time spent'] = pd.to_numeric(result_df['Time spent'], errors='coerce')
result_df['Time spent'] = result_df['Time spent'].fillna(0)
# Merge error messages with result dataframe
if not error_messages.empty:
result_df = result_df.merge(error_messages[['Scenario Name', 'Error Message', 'Step']],
on='Scenario Name', how='left')
# Add environment column
result_df['Environment'] = environment
# Calculate formatted time spent
result_df['Time spent(m:s)'] = pd.to_datetime(result_df['Time spent'], unit='s').dt.strftime('%M:%S')
result_df['Start datetime'] = pd.to_datetime(file_date)
result_df['End datetime'] = result_df['Start datetime'] + pd.to_timedelta(result_df['Time spent'], unit='s')
# Add failed step information if available
if 'Step' in result_df.columns:
result_df['Failed Step'] = result_df['Step']
result_df.drop('Step', axis=1, inplace=True)
# Extract start time from the first sheet
before_steps = error_df[error_df['Step'].str.contains('before', case=False, na=False)].copy()
if not before_steps.empty:
# Get the first 'before' step for each scenario
before_steps.loc[:, 'Time Stamp'] = pd.to_datetime(before_steps['Time Stamp'], format='%H:%M:%S', errors='coerce')
start_times = before_steps.groupby('Scenario Name').agg({'Time Stamp': 'first'}).reset_index()
# Store the timestamps in a variable for efficient reuse
result_df = result_df.merge(start_times, on='Scenario Name', how='left')
result_df.rename(columns={'Time Stamp': 'Scenario Start Time'}, inplace=True)
# Convert Scenario Start Time to datetime if it's not already
result_df['Scenario Start Time'] = pd.to_datetime(result_df['Scenario Start Time'], errors='coerce')
# Combine the date from the filename with the time stamp
result_df['Start datetime'] = pd.to_datetime(
result_df['Scenario Start Time'].dt.strftime('%H:%M:%S') + ' ' + file_date.strftime('%Y-%m-%d'),
errors='coerce'
)
return result_df
except Exception as e:
st.error(f"Error processing Excel file: {str(e)}")
# Log more detailed error information
import traceback
st.error(f"Detailed error: {traceback.format_exc()}")
# Return empty DataFrame with expected columns to avoid further errors
return pd.DataFrame(columns=[
'Functional area', 'Scenario Name', 'Status', 'Time spent',
'Time spent(m:s)', 'Environment', 'Start datetime', 'End datetime'
])
def fill_missing_data(data, column_index, value):
data.iloc[:, column_index] = data.iloc[:, column_index].fillna(value)
return data
# Define a function to convert a string to camel case
def to_camel_case(s):
parts = s.split('_')
return ''.join([part.capitalize() for part in parts])
# Define the function to preprocess a file (CSV or XLSX)
def preprocess_uploaded_file(uploaded_file):
try:
# Determine file type based on extension
if uploaded_file.name.lower().endswith('.xlsx'):
data = preprocess_xlsx(uploaded_file)
else:
# Original CSV processing
file_content = uploaded_file.read()
processed_output = preprocess_csv(file_content)
processed_file = io.StringIO(processed_output.getvalue())
data = load_data(processed_file)
data = fill_missing_data(data, 4, 0)
data['Start datetime'] = pd.to_datetime(data['Start datetime'], dayfirst=True, errors='coerce')
data['End datetime'] = pd.to_datetime(data['End datetime'], dayfirst=True, errors='coerce')
data['Time spent'] = (data['End datetime'] - data['Start datetime']).dt.total_seconds()
data['Time spent(m:s)'] = pd.to_datetime(data['Time spent'], unit='s').dt.strftime('%M:%S')
# Extract environment name from filename
filename = uploaded_file.name
environment = filename.split('_Puppeteer')[0]
# Add environment column to the dataframe
data['Environment'] = environment
# Make sure all required columns exist and have proper values
if data is not None and not data.empty:
# Ensure Time spent is numeric
if 'Time spent' in data.columns:
data['Time spent'] = pd.to_numeric(data['Time spent'], errors='coerce')
data['Time spent'] = data['Time spent'].fillna(0)
# Replace any NaN string values
for col in data.columns:
if data[col].dtype == 'object':
data[col] = data[col].replace('NaN', '').replace('nan', '')
return data
except Exception as e:
st.error(f"Error processing {uploaded_file.name}: {str(e)}")
# Provide more detailed error information
import traceback
st.error(f"Detailed error: {traceback.format_exc()}")
# Return empty DataFrame with expected columns to avoid cascading errors
return pd.DataFrame(columns=[
'Functional area', 'Scenario Name', 'Status', 'Time spent',
'Time spent(m:s)', 'Environment', 'Start datetime', 'End datetime'
])
def add_app_description():
app_title = '<p style="font-family:Roboto, sans-serif; color:#004E7C; font-size: 42px;">DataLink Compare</p>'
st.markdown(app_title, unsafe_allow_html=True)
is_selected = st.sidebar.checkbox('Show App Description', value=False)
if is_selected:
with st.expander('Show App Description'):
st.markdown("Welcome to DataLink Compare. This tool allows you to analyze batch run reports and provides insights into their statuses, processing times, and more. You can also compare two files to identify differences and similarities between them.")
st.markdown("### Instructions:")
st.write("1. Upload your CSV or XLSX file using the file uploader on the sidebar.")
st.write("2. Choose between 'Multi', 'Compare', 'Weekly', and 'Multi-Env Compare' mode using the dropdown on the sidebar.")
st.write("3. In 'Multi' mode, you can upload and analyze multiple files for individual environments.")
st.write("4. In 'Compare' mode, you can upload two files to compare them.")
st.markdown("### Features:")
st.write("- View statistics of passing and failing scenarios.")
st.write("- Filter scenarios by functional area and status.")
st.write("- Calculate average time spent for each functional area.")
st.write("- Display bar graphs showing the number of failed scenarios.")
st.write("- Identify consistent failures, new failures, and changes in passing scenarios.")
# Add the new link here
link_html = '<p style="font-size: 14px;"><a href="https://scenarioswitcher.negadan77.workers.dev/" target="_blank">Open Scenario Processor</a></p>'
st.markdown(link_html, unsafe_allow_html=True)