import pandas as pd import streamlit as st import matplotlib.pyplot as plt import numpy as np from pre import preprocess_uploaded_file from jira_integration import ( render_jira_login, get_current_sprint, get_regression_board, get_sprint_issues, calculate_points, create_regression_task, generate_task_content, calculate_story_points, get_project_metadata, get_field_dependencies, get_dependent_field_value, get_boards, get_functional_area_values ) from datetime import datetime, timedelta import plotly.express as px import plotly.graph_objects as go import os from dotenv import load_dotenv import json import logging # Inject CSS to shrink metric font sizes and padding to prevent ellipsis overflow if __name__ == "__main__": st.markdown(""" """, unsafe_allow_html=True) load_dotenv() JIRA_SERVER = os.getenv("JIRA_SERVER") # Initialize session state variables if 'filtered_scenarios_df' not in st.session_state: st.session_state.filtered_scenarios_df = None if 'task_content' not in st.session_state: st.session_state.task_content = None if 'total_story_points' not in st.session_state: st.session_state.total_story_points = 0 if 'completed_points' not in st.session_state: st.session_state.completed_points = 0 if 'current_page' not in st.session_state: st.session_state.current_page = "analysis" if 'task_df' not in st.session_state: st.session_state.task_df = None if 'task_environment' not in st.session_state: st.session_state.task_environment = None if 'last_task_key' not in st.session_state: st.session_state.last_task_key = None if 'last_task_url' not in st.session_state: st.session_state.last_task_url = None if 'show_success' not in st.session_state: st.session_state.show_success = False # Get logger from jira_integration logger = logging.getLogger("multiple") # Function to capture button clicks with manual callback def handle_task_button_click(summary, description, formatted_env, filtered_df): logger.info("=== Task button clicked - Starting callback function ===") try: logger.info(f"Summary: {summary}") logger.info(f"Description length: {len(description)}") logger.info(f"Environment: {formatted_env}") logger.info(f"DataFrame shape: {filtered_df.shape}") # Import here to avoid circular imports from jira_integration import create_regression_task logger.info("Imported create_regression_task function") # Call the actual function with st.spinner("Creating task in Jira..."): logger.info("About to call create_regression_task function") task = create_regression_task( project_key="RS", summary=summary, description=description, environment=formatted_env, filtered_scenarios_df=filtered_df ) logger.info(f"create_regression_task returned: {task}") if task: logger.info(f"Task created successfully: {task.key}") # Store task information in session state st.session_state.last_task_key = task.key st.session_state.last_task_url = f"{JIRA_SERVER}/browse/{task.key}" st.session_state.show_success = True # Display success message and task details st.success("✅ Task created successfully!") st.markdown( f"""
""", unsafe_allow_html=True ) # Clear task content st.session_state.task_content = None # Add button to create another task if st.button("Create Another Task", key="create_another"): # Clear all task-related state st.session_state.task_content = None st.session_state.last_task_key = None st.session_state.last_task_url = None st.session_state.show_success = False st.rerun() logger.info("Task creation process completed successfully") return True else: logger.error("Task creation failed (returned None)") st.error("❌ Task creation failed. Please check the error messages and try again.") return False except Exception as e: logger.exception(f"Error in handle_task_button_click: {str(e)}") st.error(f"❌ Error creating task: {str(e)}") import traceback error_trace = traceback.format_exc() logger.error(f"Full traceback: {error_trace}") st.error(error_trace) return False finally: logger.info("=== Ending handle_task_button_click function ===") # Define the function to perform analysis def perform_analysis(uploaded_dataframes): # Concatenate all dataframes into a single dataframe combined_data = pd.concat(uploaded_dataframes, ignore_index=True) # Display debugging information # st.write("Combined data shape:", combined_data.shape) # st.write("Unique functional areas in combined data:", combined_data['Functional area'].nunique()) # st.write("Sample of combined data:", combined_data.head()) # Display scenarios with status "failed" grouped by functional area failed_scenarios = combined_data[combined_data['Status'] == 'FAILED'] passed_scenarios = combined_data[combined_data['Status'] == 'PASSED'] # Display total count of failures fail_count = len(failed_scenarios) st.markdown(f"Failing scenarios Count: {fail_count}") # Display total count of Passing pass_count = len(passed_scenarios) st.markdown(f"Passing scenarios Count: {pass_count}") # Use radio buttons for selecting status selected_status = st.radio("Select a status", ['Failed', 'Passed']) # Determine which scenarios to display based on selected status if selected_status == 'Failed': unique_areas = np.append(failed_scenarios['Functional area'].unique(), "All") selected_scenarios = failed_scenarios elif selected_status == 'Passed': unique_areas = np.append(passed_scenarios['Functional area'].unique(), "All") selected_scenarios = passed_scenarios else: selected_scenarios = None if selected_scenarios is not None: st.markdown(f"### Scenarios with status '{selected_status}' grouped by functional area:") # Select a range of functional areas to filter scenarios selected_functional_areas = st.multiselect("Select functional areas", unique_areas, ["All"]) if "All" in selected_functional_areas: filtered_scenarios = selected_scenarios else: filtered_scenarios = selected_scenarios[selected_scenarios['Functional area'].isin(selected_functional_areas)] if not selected_functional_areas: # Check if the list is empty st.error("Please select at least one functional area.") else: # Display count of filtered scenarios st.write(f"Number of filtered scenarios: {len(filtered_scenarios)}") # Calculate the average time spent for each functional area average_time_spent_seconds = filtered_scenarios.groupby('Functional area')['Time spent'].mean().reset_index() # Convert average time spent from seconds to minutes and seconds format average_time_spent_seconds['Time spent'] = pd.to_datetime(average_time_spent_seconds['Time spent'], unit='s').dt.strftime('%M:%S') # Group by functional area and get the start datetime for sorting start_datetime_group = filtered_scenarios.groupby('Functional area')['Start datetime'].min().reset_index() end_datetime_group = filtered_scenarios.groupby('Functional area')['End datetime'].max().reset_index() # Calculate the total time spent for each functional area (difference between end and start datetime) total_time_spent_seconds = (end_datetime_group['End datetime'] - start_datetime_group['Start datetime']).dt.total_seconds() # Convert total time spent from seconds to minutes and seconds format total_time_spent_seconds = pd.to_datetime(total_time_spent_seconds, unit='s').dt.strftime('%M:%S') # Merge the average_time_spent_seconds with start_datetime_group and end_datetime_group average_time_spent_seconds = average_time_spent_seconds.merge(start_datetime_group, on='Functional area') average_time_spent_seconds = average_time_spent_seconds.merge(end_datetime_group, on='Functional area') average_time_spent_seconds['Total Time Spent'] = total_time_spent_seconds # Filter scenarios based on selected functional area if selected_status == 'Failed': # Define columns in the exact order they appear in the table columns_to_keep = [ 'Environment', 'Functional area', 'Scenario Name', 'Error Message', 'Failed Step', 'Time spent(m:s)', 'Start datetime' ] # Check if Failed Step column exists if 'Failed Step' in filtered_scenarios.columns: grouped_filtered_scenarios = filtered_scenarios[columns_to_keep].copy() else: columns_to_keep.remove('Failed Step') grouped_filtered_scenarios = filtered_scenarios[columns_to_keep].copy() elif selected_status == 'Passed': grouped_filtered_scenarios = filtered_scenarios[[ 'Environment', 'Functional area', 'Scenario Name', 'Time spent(m:s)' ]].copy() else: grouped_filtered_scenarios = None # Only proceed if we have data if grouped_filtered_scenarios is not None: # Reset the index to start from 1 grouped_filtered_scenarios.index = range(1, len(grouped_filtered_scenarios) + 1) st.dataframe(grouped_filtered_scenarios) # Task creation section: always show button placeholder with tooltip, enabling only when conditions are met can_create_task = ( 'jira_client' in st.session_state and st.session_state.jira_client and selected_status == 'Failed' and len(selected_functional_areas) == 1 and "All" not in selected_functional_areas ) col1, col2, col3 = st.columns([1, 2, 1]) with col2: if st.session_state.show_success and st.session_state.last_task_key: st.success("✅ Task created successfully!") st.markdown( f""" """, unsafe_allow_html=True ) if st.button("Create Another Task", key="create_another", use_container_width=True): st.session_state.task_content = None st.session_state.last_task_key = None st.session_state.last_task_url = None st.session_state.show_success = False st.rerun() else: help_text = ( "Requires: Jira login, 'Failed' status selected, " "and exactly one functional area (not 'All')." ) if st.button( "📝 Log Jira Task", disabled=not can_create_task, use_container_width=True, help=help_text ) and can_create_task: environment = filtered_scenarios['Environment'].iloc[0] task_df = grouped_filtered_scenarios.copy() expected_columns = [ 'Environment', 'Functional area', 'Scenario Name', 'Error Message', 'Failed Step', 'Time spent(m:s)', 'Start datetime' ] missing_columns = [col for col in expected_columns if col not in task_df.columns] if missing_columns: st.error(f"Missing required columns: {', '.join(missing_columns)}") st.error("Please ensure your data includes all required columns") return summary, description = generate_task_content(task_df) if summary and description: handle_task_button_click(summary, description, environment, task_df) # Check if selected_status is 'Failed' and show bar graph if selected_status != 'Passed': # Create and display bar graph of errors by functional area st.write(f"### Bar graph showing number of '{selected_status}' scenarios in each functional area:") error_counts = grouped_filtered_scenarios['Functional area'].value_counts() # Only create the graph if there are errors to display if not error_counts.empty: plt.figure(figsize=(12, 10)) bars = plt.bar(error_counts.index, error_counts.values) plt.xlabel('Functional Area') plt.ylabel('Number of Failures') plt.title(f"Number of '{selected_status}' scenarios by Functional Area") plt.xticks(rotation=45, ha='right', fontsize=10) # Set y-axis limits and ticks for consistent interval of 1 y_max = max(error_counts.values) + 1 plt.ylim(0, y_max) plt.yticks(range(0, y_max, 1), fontsize=10) # Display individual numbers on y-axis for bar in bars: height = bar.get_height() # Annotate bar height, defaulting to 0 if conversion fails try: # Ensure numeric conversion in case of string 'NaN' h_int = int(float(height)) except Exception: h_int = 0 plt.text( bar.get_x() + bar.get_width() / 2, height, str(h_int), ha='center', va='bottom' ) # Reduce font size of individual numbers plt.tight_layout() # Add this line to adjust layout st.pyplot(plt) else: st.info(f"No '{selected_status}' scenarios found to display in the graph.") pass def display_story_points_stats(force_refresh=False): """Display story points statistics from current sprint with caching""" if not st.session_state.jira_client: return # Initialize cache if 'sprint_data_cache' not in st.session_state: st.session_state.sprint_data_cache = None if 'last_sprint_fetch' not in st.session_state: st.session_state.last_sprint_fetch = None now = datetime.now() cache_expiry = 300 # 5 minutes refresh_needed = ( force_refresh or st.session_state.sprint_data_cache is None or (st.session_state.last_sprint_fetch and (now - st.session_state.last_sprint_fetch).total_seconds() > cache_expiry) ) if refresh_needed: if force_refresh: with st.spinner("Fetching sprint data..."): board = get_regression_board("RS") if not board: return sprint = get_current_sprint(board['id']) if not sprint: return issues = get_sprint_issues(board['id'], sprint.id, board['estimation_field']) if not issues: return _, total_points, completed_points, in_progress_points = calculate_points( issues, board['estimation_field'] ) st.session_state.sprint_data_cache = { 'sprint_name': sprint.name, 'total_points': total_points, 'completed_points': completed_points, 'in_progress_points': in_progress_points } st.session_state.last_sprint_fetch = now else: # Fetch data silently without spinner board = get_regression_board("RS") if not board: return sprint = get_current_sprint(board['id']) if not sprint: return issues = get_sprint_issues(board['id'], sprint.id, board['estimation_field']) if not issues: return _, total_points, completed_points, in_progress_points = calculate_points( issues, board['estimation_field'] ) st.session_state.sprint_data_cache = { 'sprint_name': sprint.name, 'total_points': total_points, 'completed_points': completed_points, 'in_progress_points': in_progress_points } st.session_state.last_sprint_fetch = now # Display cached sprint data if st.session_state.sprint_data_cache: sprint_data = st.session_state.sprint_data_cache # Use markdown with custom HTML for a compact, non-truncating display metrics_html = f"""