import pandas as pd import streamlit as st import csv import io import openpyxl # Add this import for Excel handling from datetime import datetime import re def preprocess_csv(input_bytes): # Keep this for backward compatibility with CSV files text = input_bytes.decode() # Decode bytes to text output = io.StringIO() writer = csv.writer(output) for row in csv.reader(io.StringIO(text)): # Read text as csv if len(row) > 5: row = row[0:5] + [','.join(row[5:])] # Combine extra fields into one writer.writerow(row) output.seek(0) # go to the start of the StringIO object return output def load_data(file): column_names = [ 'Functional area', 'Scenario name', 'Start datetime', 'End datetime', 'Status', 'Error message' ] data = pd.read_csv(file, header=None, names=column_names) return data @st.cache_data def preprocess_xlsx(uploaded_file): """Process Excel file with step-level data and convert to scenario-level summary""" # Define data types for columns dtype_dict = { 'Feature Name': 'string', 'Scenario Name': 'string', 'Total Time Taken (ms)': 'float64', 'Failed Scenario': 'string' } # Attempt fast streaming read (read_only) for performance fast_excel = pd.ExcelFile(uploaded_file, engine='openpyxl') # Read first sheet (error messages) error_df = pd.read_excel(fast_excel, sheet_name=0) # Read 'Time Taken' sheet in fast mode df = pd.read_excel(fast_excel, sheet_name='Time Taken', dtype=object) # If the sheet appears truncated (e.g., only header row) or missing expected columns, retry in full mode if df.shape[0] <= 1 or 'Total Time Taken (ms)' not in df.columns: st.warning("Fast Excel read produced incomplete data; retrying in full mode.") slow_excel = pd.ExcelFile( uploaded_file, engine='openpyxl', engine_kwargs={ 'read_only': False, 'data_only': True, 'keep_links': False } ) # Reload both sheets in full mode error_df = pd.read_excel(slow_excel, sheet_name=0) df = pd.read_excel(slow_excel, sheet_name='Time Taken', dtype=object) # Print column names and sample values for debugging # st.write("Excel columns:", df.columns.tolist()) # st.write("Sample data from Time Taken sheet:", df.head()) # st.write("Unique Feature Names:", df['Feature Name'].unique()) # st.write("Feature Name count:", df['Feature Name'].nunique()) # # Check for any empty or NaN values in Feature Name # empty_features = df['Feature Name'].isna().sum() # st.write(f"Empty Feature Names: {empty_features}") # Convert specific columns after reading as object df['Total Time Taken (ms)'] = pd.to_numeric(df['Total Time Taken (ms)'], errors='coerce').fillna(0).astype('float64') df['Failed Scenario'] = df['Failed Scenario'].astype(str).str.upper() # Ensure Feature Name and Scenario Name are strings, handling potential NaNs read as objects df['Feature Name'] = df['Feature Name'].astype(str).fillna('Unknown') df['Scenario Name'] = df['Scenario Name'].astype(str) df['Status'] = df['Failed Scenario'].map( lambda x: 'FAILED' if x in ['TRUE', 'YES', 'Y', '1'] else 'PASSED' ) # Count failed and passed scenarios failed_count = (df['Status'] == 'FAILED').sum() passed_count = (df['Status'] == 'PASSED').sum() # Extract error messages from the first sheet # Find rows with FAILED result and group by Scenario Name to get the error message if 'Result' in error_df.columns: failed_steps = error_df[error_df['Result'] == 'FAILED'].copy() # If there are failed steps, get the error messages if not failed_steps.empty: # Group by Scenario Name and get the first error message and step for each scenario error_messages = failed_steps.groupby('Scenario Name').agg({ 'Error Message': 'first', 'Step': 'first' # Capture the step where it failed }).reset_index() else: # Create empty DataFrame with required columns error_messages = pd.DataFrame(columns=['Scenario Name', 'Error Message', 'Step']) else: # If Result column doesn't exist, create empty DataFrame error_messages = pd.DataFrame(columns=['Scenario Name', 'Error Message', 'Step']) # Extract date from filename (e.g., RI2211_batch_20250225_27031.xlsx) filename = uploaded_file.name date_match = re.search(r'_(\d{8})_', filename) if date_match: date_str = date_match.group(1) file_date = datetime.strptime(date_str, '%Y%m%d').date() else: st.warning(f"Could not extract date from filename: {filename}. Using current date.") file_date = datetime.now().date() # Extract environment from filename if any(pattern in filename for pattern in ['_batch_', '_fin_', '_priority_', '_Puppeteer_']): environment = filename.split('_')[0] else: environment = filename.split('.')[0] # Create result dataframe result_df = pd.DataFrame({ 'Functional area': df['Feature Name'], 'Scenario Name': df['Scenario Name'], 'Status': df['Status'], 'Time spent': df['Total Time Taken (ms)'] / 1000 # Convert ms to seconds }) # Fill any NaN values in Functional area result_df['Functional area'] = result_df['Functional area'].fillna('Unknown') # Merge error messages with result dataframe if not error_messages.empty: result_df = result_df.merge(error_messages[['Scenario Name', 'Error Message', 'Step']], on='Scenario Name', how='left') # Add environment column result_df['Environment'] = environment # Calculate formatted time spent (coerce non-numeric values) _secs = pd.to_numeric(result_df['Time spent'], errors='coerce') result_df['Time spent(m:s)'] = pd.to_datetime(_secs, unit='s', errors='coerce').dt.strftime('%M:%S') result_df['Start datetime'] = pd.to_datetime(file_date) result_df['End datetime'] = result_df['Start datetime'] + pd.to_timedelta(result_df['Time spent'], unit='s') # Add failed step information if available if 'Step' in result_df.columns: result_df['Failed Step'] = result_df['Step'] result_df.drop('Step', axis=1, inplace=True) # Extract start time from the first sheet before_steps = error_df[error_df['Step'].str.contains('before', case=False, na=False)] if not before_steps.empty: # Get the first 'before' step for each scenario before_steps['Time Stamp'] = pd.to_datetime(before_steps['Time Stamp'], format='%H:%M:%S', errors='coerce') start_times = before_steps.groupby('Scenario Name').agg({'Time Stamp': 'first'}).reset_index() # Store the timestamps in a variable for efficient reuse result_df = result_df.merge(start_times, on='Scenario Name', how='left') result_df.rename(columns={'Time Stamp': 'Scenario Start Time'}, inplace=True) scenario_start_times = result_df['Scenario Start Time'] # Combine time and date strings, then parse with explicit format to prevent mismatches combined = scenario_start_times.dt.strftime('%H:%M:%S') + ' ' + file_date.strftime('%Y-%m-%d') # Let pandas infer the datetime format and coerce invalid parses result_df['Start datetime'] = pd.to_datetime( combined, infer_datetime_format=True, errors='coerce' ) # Print counts for debugging # st.write(f"Processed data - Failed: {len(result_df[result_df['Status'] == 'FAILED'])}, Passed: {len(result_df[result_df['Status'] == 'PASSED'])}") # st.write(f"Unique functional areas in processed data: {result_df['Functional area'].nunique()}") # st.write(f"Unique functional areas: {result_df['Functional area'].unique()}") # Debugging: Print the columns of the first sheet # st.write("Columns in the first sheet:", error_df.columns.tolist()) # st.write("Sample data from the first sheet:", error_df.head()) return result_df def fill_missing_data(data, column_index, value): data.iloc[:, column_index] = data.iloc[:, column_index].fillna(value) return data # Define a function to convert a string to camel case def to_camel_case(s): parts = s.split('_') return ''.join([part.capitalize() for part in parts]) # Define the function to preprocess a file (CSV or XLSX) def preprocess_uploaded_file(uploaded_file): # Commenting out the spinner to disable it # with st.spinner(f'Processing {uploaded_file.name}...'): # Determine file type based on extension if uploaded_file.name.lower().endswith('.xlsx'): data = preprocess_xlsx(uploaded_file) else: # Original CSV processing file_content = uploaded_file.read() processed_output = preprocess_csv(file_content) processed_file = io.StringIO(processed_output.getvalue()) data = load_data(processed_file) data = fill_missing_data(data, 4, 0) data['Start datetime'] = pd.to_datetime(data['Start datetime'], dayfirst=True, errors='coerce') data['End datetime'] = pd.to_datetime(data['End datetime'], dayfirst=True, errors='coerce') data['Time spent'] = (data['End datetime'] - data['Start datetime']).dt.total_seconds() ## Format time spent for CSV branch (coerce non-numeric values) _secs = pd.to_numeric(data['Time spent'], errors='coerce') data['Time spent(m:s)'] = pd.to_datetime(_secs, unit='s', errors='coerce').dt.strftime('%M:%S') # Extract environment name from filename filename = uploaded_file.name environment = filename.split('_Puppeteer')[0] # Add environment column to the dataframe data['Environment'] = environment return data def add_app_description(): app_title = '
DataLink Compare
' st.markdown(app_title, unsafe_allow_html=True) is_selected = st.sidebar.checkbox('Show App Description', value=False) if is_selected: with st.expander('Show App Description'): st.markdown("Welcome to DataLink Compare. This tool allows you to analyze batch run reports and provides insights into their statuses, processing times, and more. You can also compare two files to identify differences and similarities between them.") st.markdown("### Instructions:") st.write("1. Upload your CSV or XLSX file using the file uploader on the sidebar.") st.write("2. Choose between 'Multi', 'Compare', 'Weekly', and 'Multi-Env Compare' mode using the dropdown on the sidebar.") st.write("3. In 'Multi' mode, you can upload and analyze multiple files for individual environments.") st.write("4. In 'Compare' mode, you can upload two files to compare them.") st.markdown("### Features:") st.write("- View statistics of passing and failing scenarios.") st.write("- Filter scenarios by functional area and status.") st.write("- Calculate average time spent for each functional area.") st.write("- Display bar graphs showing the number of failed scenarios.") st.write("- Identify consistent failures, new failures, and changes in passing scenarios.") # Add the new link here link_html = '' st.markdown(link_html, unsafe_allow_html=True)