Spaces:
Sleeping
Sleeping
import pandas as pd | |
import streamlit as st | |
import csv | |
import io | |
import openpyxl # Add this import for Excel handling | |
from datetime import datetime | |
import re | |
def preprocess_csv(input_bytes): | |
# Keep this for backward compatibility with CSV files | |
text = input_bytes.decode() # Decode bytes to text | |
output = io.StringIO() | |
writer = csv.writer(output) | |
for row in csv.reader(io.StringIO(text)): # Read text as csv | |
if len(row) > 5: | |
row = row[0:5] + [','.join(row[5:])] # Combine extra fields into one | |
writer.writerow(row) | |
output.seek(0) # go to the start of the StringIO object | |
return output | |
def load_data(file): | |
column_names = [ | |
'Functional area', | |
'Scenario name', | |
'Start datetime', | |
'End datetime', | |
'Status', | |
'Error message' | |
] | |
data = pd.read_csv(file, header=None, names=column_names) | |
return data | |
def preprocess_xlsx(uploaded_file): | |
"""Process Excel file with step-level data and convert to scenario-level summary""" | |
# Define data types for columns | |
dtype_dict = { | |
'Feature Name': 'string', | |
'Scenario Name': 'string', | |
'Total Time Taken (ms)': 'float64' | |
} | |
# Read both the first sheet for error messages and "Time Taken" sheet | |
excel_file = pd.ExcelFile(uploaded_file, engine='openpyxl') | |
# Read error messages from first sheet | |
error_df = pd.read_excel(excel_file, sheet_name=0) | |
# Read time taken data | |
df = pd.read_excel( | |
excel_file, | |
sheet_name='Time Taken', | |
dtype=dtype_dict | |
) | |
# Convert Failed Scenario column to boolean after reading | |
df['Failed Scenario'] = df['Failed Scenario'].astype(str).map({'TRUE': True, 'FALSE': False}) | |
# Get error messages from the first sheet | |
error_messages = error_df[['Scenario Name', 'Error message']].copy() | |
# Extract date from filename (e.g., RI2211_batch_20250225_27031.xlsx) | |
filename = uploaded_file.name | |
date_match = re.search(r'_(\d{8})_', filename) | |
if date_match: | |
date_str = date_match.group(1) | |
file_date = datetime.strptime(date_str, '%Y%m%d').date() | |
else: | |
st.warning(f"Could not extract date from filename: {filename}. Using current date.") | |
file_date = datetime.now().date() | |
# Extract environment from filename | |
if any(pattern in filename for pattern in ['_batch_', '_fin_', '_priority_', '_Puppeteer_']): | |
environment = filename.split('_')[0] | |
else: | |
environment = filename.split('.')[0] | |
# Create result dataframe | |
result_df = pd.DataFrame({ | |
'Functional area': df['Feature Name'], | |
'Scenario name': df['Scenario Name'], | |
'Status': df['Failed Scenario'].map({True: 'FAILED', False: 'PASSED'}), | |
'Time spent': df['Total Time Taken (ms)'] / 1000 # Convert ms to seconds | |
}) | |
# Merge error messages with result dataframe | |
result_df = result_df.merge(error_messages, on='Scenario name', how='left') | |
# Add environment column | |
result_df['Environment'] = environment | |
# Calculate formatted time spent | |
result_df['Time spent(m:s)'] = pd.to_datetime(result_df['Time spent'], unit='s').dt.strftime('%M:%S') | |
# Add start datetime (using file date since actual start time isn't available in this sheet) | |
result_df['Start datetime'] = pd.to_datetime(file_date) | |
result_df['End datetime'] = result_df['Start datetime'] + pd.to_timedelta(result_df['Time spent'], unit='s') | |
return result_df | |
def fill_missing_data(data, column_index, value): | |
data.iloc[:, column_index] = data.iloc[:, column_index].fillna(value) | |
return data | |
# Define a function to convert a string to camel case | |
def to_camel_case(s): | |
parts = s.split('_') | |
return ''.join([part.capitalize() for part in parts]) | |
# Define the function to preprocess a file (CSV or XLSX) | |
def preprocess_uploaded_file(uploaded_file): | |
with st.spinner(f'Processing {uploaded_file.name}...'): | |
# Determine file type based on extension | |
if uploaded_file.name.lower().endswith('.xlsx'): | |
data = preprocess_xlsx(uploaded_file) | |
else: | |
# Original CSV processing | |
file_content = uploaded_file.read() | |
processed_output = preprocess_csv(file_content) | |
processed_file = io.StringIO(processed_output.getvalue()) | |
data = load_data(processed_file) | |
data = fill_missing_data(data, 4, 0) | |
data['Start datetime'] = pd.to_datetime(data['Start datetime'], dayfirst=True, errors='coerce') | |
data['End datetime'] = pd.to_datetime(data['End datetime'], dayfirst=True, errors='coerce') | |
data['Time spent'] = (data['End datetime'] - data['Start datetime']).dt.total_seconds() | |
data['Time spent(m:s)'] = pd.to_datetime(data['Time spent'], unit='s').dt.strftime('%M:%S') | |
# Extract environment name from filename | |
filename = uploaded_file.name | |
environment = filename.split('_Puppeteer')[0] | |
# Add environment column to the dataframe | |
data['Environment'] = environment | |
return data | |
def add_app_description(): | |
app_title = '<p style="font-family:Roboto, sans-serif; color:#004E7C; font-size: 42px;">DataLink Compare</p>' | |
st.markdown(app_title, unsafe_allow_html=True) | |
is_selected = st.sidebar.checkbox('Show App Description', value=False) | |
if is_selected: | |
with st.expander('Show App Description'): | |
st.markdown("Welcome to DataLink Compare. This tool allows you to analyze batch run reports and provides insights into their statuses, processing times, and more. You can also compare two files to identify differences and similarities between them.") | |
st.markdown("### Instructions:") | |
st.write("1. Upload your CSV or XLSX file using the file uploader on the sidebar.") | |
st.write("2. Choose between 'Multi', 'Compare', 'Weekly', and 'Multi-Env Compare' mode using the dropdown on the sidebar.") | |
st.write("3. In 'Multi' mode, you can upload and analyze multiple files for individual environments.") | |
st.write("4. In 'Compare' mode, you can upload two files to compare them.") | |
st.markdown("### Features:") | |
st.write("- View statistics of passing and failing scenarios.") | |
st.write("- Filter scenarios by functional area and status.") | |
st.write("- Calculate average time spent for each functional area.") | |
st.write("- Display bar graphs showing the number of failed scenarios.") | |
st.write("- Identify consistent failures, new failures, and changes in passing scenarios.") | |
# Add the new link here | |
link_html = '<p style="font-size: 14px;"><a href="https://scenarioswitcher.negadan77.workers.dev/" target="_blank">Open Scenario Processor</a></p>' | |
st.markdown(link_html, unsafe_allow_html=True) | |