# importing pacakages import streamlit as st st.set_page_config(layout="wide") from utilities import ( load_local_css, set_header, ensure_project_dct_structure, store_hashed_password, verify_password, is_pswrd_flag_set, set_pswrd_flag, ) import os from datetime import datetime import pandas as pd import pickle import psycopg2 # import numbers from collections import OrderedDict import re from ppt_utils import create_ppt from constants import default_dct import time from log_application import log_message, delete_old_log_files import sqlite3 # setting page config load_local_css("styles.css") set_header() db_cred = None # --------------Functions----------------------# # # schema = db_cred["schema"] ##API DATA####################### # Function to load gold layer data # @st.cache_data(show_spinner=False) def load_gold_layer_data(table_name): # Fetch Table query = f""" SELECT * FROM {table_name}; """ # Execute the query and get the results results = query_excecuter_postgres( query, db_cred, insert=False, return_dataframe=True ) if results is not None and not results.empty: # Create a DataFrame gold_layer_df = results else: st.warning("No data found for the selected table.") st.stop() # Columns to be removed columns_to_remove = [ "clnt_nam", "crte_dt_tm", "crte_by_uid", "updt_dt_tm", "updt_by_uid", "campgn_id", "campgn_nam", "ad_id", "ad_nam", "tctc_id", "tctc_nam", "campgn_grp_id", "campgn_grp_nam", "ad_grp_id", "ad_grp_nam", ] # TEMP CODE gold_layer_df = gold_layer_df.rename( columns={ "imprssns_cnt": "mda_imprssns_cnt", "clcks_cnt": "mda_clcks_cnt", "vd_vws_cnt": "mda_vd_vws_cnt", } ) # Remove specific columns gold_layer_df = gold_layer_df.drop(columns=columns_to_remove, errors="ignore") # Convert columns to numeric or datetime as appropriate for col in gold_layer_df.columns: if ( col.startswith("rspns_mtrc_") or col.startswith("mda_") or col.startswith("exogenous_") or col.startswith("internal_") or col in ["spnd_amt"] ): gold_layer_df[col] = pd.to_numeric(gold_layer_df[col], errors="coerce") elif col == "rcrd_dt": gold_layer_df[col] = pd.to_datetime(gold_layer_df[col], errors="coerce") # Replace columns starting with 'mda_' to 'media_' gold_layer_df.columns = [ (col.replace("mda_", "media_") if col.startswith("mda_") else col) for col in gold_layer_df.columns ] # Identify non-numeric columns non_numeric_columns = gold_layer_df.select_dtypes(exclude=["number"]).columns allow_non_numeric_columns = ["rcrd_dt", "aggrgtn_lvl", "sub_chnnl_nam", "panl_nam"] # Remove non-numeric columns except for allowed non-numeric columns non_numeric_columns_to_remove = [ col for col in non_numeric_columns if col not in allow_non_numeric_columns ] gold_layer_df = gold_layer_df.drop( columns=non_numeric_columns_to_remove, errors="ignore" ) # Remove specific columns allow_columns = ["rcrd_dt", "aggrgtn_lvl", "sub_chnnl_nam", "panl_nam", "spnd_amt"] for col in gold_layer_df.columns: if ( col.startswith("rspns_mtrc_") or col.startswith("media_") or col.startswith("exogenous_") or col.startswith("internal_") ): allow_columns.append(col) gold_layer_df = gold_layer_df[allow_columns] # Rename columns gold_layer_df = gold_layer_df.rename( columns={ "rcrd_dt": "date", "sub_chnnl_nam": "channels", "panl_nam": "panel", "spnd_amt": "spends", } ) # Clean column values gold_layer_df["panel"] = ( gold_layer_df["panel"].astype(str).str.lower().str.strip().str.replace(" ", "_") ) gold_layer_df["channels"] = ( gold_layer_df["channels"] .astype(str) .str.lower() .str.strip() .str.replace(" ", "_") ) # Replace columns starting with 'rspns_mtrc_' to 'response_metric_' gold_layer_df.columns = [ ( col.replace("rspns_mtrc_", "response_metric_") if col.startswith("rspns_mtrc_") else col ) for col in gold_layer_df.columns ] # Get the minimum date from the main dataframe min_date = gold_layer_df["date"].min() # Get maximum dates for daily and weekly data max_date_daily = None max_date_weekly = None if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "daily"].empty: max_date_daily = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "daily"][ "date" ].max() if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"].empty: max_date_weekly = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"][ "date" ].max() + pd.DateOffset(days=6) # Determine final maximum date if max_date_daily is not None and max_date_weekly is not None: final_max_date = max(max_date_daily, max_date_weekly) elif max_date_daily is not None: final_max_date = max_date_daily elif max_date_weekly is not None: final_max_date = max_date_weekly # Create a date range with daily frequency date_range = pd.date_range(start=min_date, end=final_max_date, freq="D") # Create a base DataFrame with all channels and all panels for each channel unique_channels = gold_layer_df["channels"].unique() unique_panels = gold_layer_df["panel"].unique() base_data = [ (channel, panel, date) for channel in unique_channels for panel in unique_panels for date in date_range ] base_df = pd.DataFrame(base_data, columns=["channels", "panel", "date"]) # Process weekly data to convert it to daily if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"].empty: weekly_data = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"].copy() daily_data = [] for index, row in weekly_data.iterrows(): week_start = pd.to_datetime(row["date"]) - pd.to_timedelta( pd.to_datetime(row["date"]).weekday(), unit="D" ) for i in range(7): daily_date = week_start + pd.DateOffset(days=i) new_row = row.copy() new_row["date"] = daily_date for col in new_row.index: if isinstance(new_row[col], numbers.Number): new_row[col] = new_row[col] / 7 daily_data.append(new_row) daily_data_df = pd.DataFrame(daily_data) daily_data_df["aggrgtn_lvl"] = "daily" gold_layer_df = pd.concat( [gold_layer_df[gold_layer_df["aggrgtn_lvl"] != "weekly"], daily_data_df], ignore_index=True, ) # Process monthly data to convert it to daily if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "monthly"].empty: monthly_data = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "monthly"].copy() daily_data = [] for index, row in monthly_data.iterrows(): month_start = pd.to_datetime(row["date"]).replace(day=1) next_month_start = (month_start + pd.DateOffset(months=1)).replace(day=1) days_in_month = (next_month_start - month_start).days for i in range(days_in_month): daily_date = month_start + pd.DateOffset(days=i) new_row = row.copy() new_row["date"] = daily_date for col in new_row.index: if isinstance(new_row[col], numbers.Number): new_row[col] = new_row[col] / days_in_month daily_data.append(new_row) daily_data_df = pd.DataFrame(daily_data) daily_data_df["aggrgtn_lvl"] = "daily" gold_layer_df = pd.concat( [gold_layer_df[gold_layer_df["aggrgtn_lvl"] != "monthly"], daily_data_df], ignore_index=True, ) # Remove aggrgtn_lvl column gold_layer_df = gold_layer_df.drop(columns=["aggrgtn_lvl"], errors="ignore") # Group by 'panel', and 'date' gold_layer_df = gold_layer_df.groupby(["channels", "panel", "date"]).sum() # Merge gold_layer_df to base_df on channels, panel and date gold_layer_df_cleaned = pd.merge( base_df, gold_layer_df, on=["channels", "panel", "date"], how="left" ) # Pivot the dataframe and rename columns pivot_columns = [ col for col in gold_layer_df_cleaned.columns if col not in ["channels", "panel", "date"] ] gold_layer_df_cleaned = gold_layer_df_cleaned.pivot_table( index=["date", "panel"], columns="channels", values=pivot_columns, aggfunc="sum" ).reset_index() # Flatten the columns gold_layer_df_cleaned.columns = [ "_".join(col).strip() if col[1] else col[0] for col in gold_layer_df_cleaned.columns.values ] # Replace columns ending with '_all' to '_total' gold_layer_df_cleaned.columns = [ col.replace("_all", "_total") if col.endswith("_all") else col for col in gold_layer_df_cleaned.columns ] # Clean panel column values gold_layer_df_cleaned["panel"] = ( gold_layer_df_cleaned["panel"] .astype(str) .str.lower() .str.strip() .str.replace(" ", "_") ) # Drop all columns that end with '_total' except those starting with 'response_metric_' cols_to_drop = [ col for col in gold_layer_df_cleaned.columns if col.endswith("_total") and not col.startswith("response_metric_") ] gold_layer_df_cleaned.drop(columns=cols_to_drop, inplace=True) return gold_layer_df_cleaned def check_valid_name(): if ( not st.session_state["project_name_box"] .lower() .startswith(defualt_project_prefix) ): st.session_state["disable_create_project"] = True with warning_box: st.warning("Project Name should follow naming conventions") st.session_state["warning"] = ( "Project Name should follow naming conventions!" ) with warning_box: st.warning("Project Name should follow naming conventions") st.button("Reset Name", on_click=reset_project_text_box, key="2") if st.session_state["project_name_box"] == defualt_project_prefix: with warning_box: st.warning("Cannot Name only with Prefix") st.session_state["warning"] = "Cannot Name only with Prefix" st.session_state["disable_create_project"] = True if st.session_state["project_name_box"] in user_projects: with warning_box: st.warning("Project already exists please enter new name") st.session_state["warning"] = "Project already exists please enter new name" st.session_state["disable_create_project"] = True else: st.session_state["disable_create_project"] = False def query_excecuter_postgres( query, db_path=None, params=None, insert=True, insert_retrieve=False, db_cred=None, ): """ Executes a SQL query on a SQLite database, handling both insert and select operations. Parameters: query (str): The SQL query to be executed. db_path (str): Path to the SQLite database file. params (tuple, optional): Parameters to pass into the SQL query for parameterized execution. insert (bool, default=True): Flag to determine if the query is an insert operation (default) or a select operation. insert_retrieve (bool, default=False): Flag to determine if the query should insert and then return the inserted ID. """ try: # Construct a cross-platform path to the database db_dir = os.path.join("db") os.makedirs(db_dir, exist_ok=True) # Make sure the directory exists db_path = os.path.join(db_dir, "imp_db.db") # Establish connection to the SQLite database conn = sqlite3.connect(db_path) except sqlite3.Error as e: st.warning(f"Unable to connect to the SQLite database: {e}") st.stop() # Create a cursor object to interact with the database c = conn.cursor() try: # Execute the query with or without parameters if params: params = tuple(params) query = query.replace("IN (?)", f"IN ({','.join(['?' for _ in params])})") c.execute(query, params) else: c.execute(query) if not insert: # If not an insert operation, fetch and return the results results = c.fetchall() return results elif insert_retrieve: # If insert and retrieve operation, commit and return the last inserted row ID conn.commit() return c.lastrowid else: # For standard insert operations, commit the transaction conn.commit() except Exception as e: st.write(f"Error executing query: {e}") finally: conn.close() # Function to check if the input contains any SQL keywords def contains_sql_keywords_check(user_input): sql_keywords = [ "SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE", "GRANT", "REVOKE", "UNION", "JOIN", "WHERE", "HAVING", "EXEC", "TRUNCATE", "REPLACE", "MERGE", "DECLARE", "SHOW", "FROM", ] pattern = "|".join(re.escape(keyword) for keyword in sql_keywords) return re.search(pattern, user_input, re.IGNORECASE) # def get_table_names(schema): # query = f""" # SELECT table_name # FROM information_schema.tables # WHERE table_schema = '{schema}' # AND table_type = 'BASE TABLE' # AND table_name LIKE '%_mmo_gold'; # """ # table_names = query_excecuter_postgres(query, db_cred, insert=False) # table_names = [table[0] for table in table_names] # return table_names def update_summary_df(): """ Updates the 'project_summary_df' in the session state with the latest project summary information based on the most recent updates. This function executes a SQL query to retrieve project metadata from a database and stores the result in the session state. Uses: - query_excecuter_postgres(query, params=params, insert=False): A function that executes the provided SQL query on a PostgreSQL database. Modifies: - st.session_state['project_summary_df']: Updates the dataframe with columns: 'Project Number', 'Project Name', 'Last Modified Page', 'Last Modified Time'. """ query = f""" WITH LatestUpdates AS ( SELECT prj_id, page_nam, updt_dt_tm, ROW_NUMBER() OVER (PARTITION BY prj_id ORDER BY updt_dt_tm DESC) AS rn FROM mmo_project_meta_data ) SELECT p.prj_id, p.prj_nam AS prj_nam, lu.page_nam, lu.updt_dt_tm FROM LatestUpdates lu RIGHT JOIN mmo_projects p ON lu.prj_id = p.prj_id WHERE p.prj_ownr_id = ? AND lu.rn = 1 """ params = (st.session_state["emp_id"],) # Parameters for the SQL query # Execute the query and retrieve project summary data project_summary = query_excecuter_postgres( query, db_cred, params=params, insert=False ) # Update the session state with the project summary dataframe st.session_state["project_summary_df"] = pd.DataFrame( project_summary, columns=[ "Project Number", "Project Name", "Last Modified Page", "Last Modified Time", ], ) st.session_state["project_summary_df"] = st.session_state[ "project_summary_df" ].sort_values(by=["Last Modified Time"], ascending=False) def reset_project_text_box(): st.session_state["project_name_box"] = defualt_project_prefix st.session_state["disable_create_project"] = True def query_excecuter_sqlite( insert_projects_query, insert_meta_data_query, db_path=None, params_projects=None, params_meta=None, ): """ Executes the project insert and associated metadata insert in an SQLite database. Parameters: insert_projects_query (str): SQL query for inserting into the mmo_projects table. insert_meta_data_query (str): SQL query for inserting into the mmo_project_meta_data table. db_path (str): Path to the SQLite database file. params_projects (tuple, optional): Parameters for the mmo_projects table insert. params_meta (tuple, optional): Parameters for the mmo_project_meta_data table insert. Returns: bool: True if successful, False otherwise. """ try: # Construct a cross-platform path to the database db_dir = os.path.join("db") os.makedirs(db_dir, exist_ok=True) # Make sure the directory exists db_path = os.path.join(db_dir, "imp_db.db") # Establish connection to the SQLite database conn = sqlite3.connect(db_path) cursor = conn.cursor() # Execute the first insert query into the mmo_projects table cursor.execute(insert_projects_query, params_projects) # Get the last inserted project ID prj_id = cursor.lastrowid # Modify the parameters for the metadata table with the inserted prj_id params_meta = (prj_id,) + params_meta # Execute the second insert query into the mmo_project_meta_data table cursor.execute(insert_meta_data_query, params_meta) # Commit the transaction conn.commit() except sqlite3.Error as e: st.warning(f"Error executing query: {e}") return False finally: # Close the connection conn.close() return True def new_project(): """ Cleans the project name input and inserts project data into the SQLite database, updating session state and triggering UI rerun if successful. """ # Define a dictionary containing project data project_dct = default_dct.copy() gold_layer_df = pd.DataFrame() if str(api_name).strip().lower() != "na": try: gold_layer_df = load_gold_layer_data(api_name) except Exception as e: st.toast( "Failed to load gold layer data. Please check the gold layer structure and connection.", icon="⚠️", ) log_message( "error", f"Error loading gold layer data: {str(e)}", "Home", ) project_dct["data_import"]["gold_layer_df"] = gold_layer_df # Get current time for database insertion inserted_time = datetime.now().isoformat() # Define SQL queries for inserting project and metadata into the SQLite database insert_projects_query = """ INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_dt_tm, crte_by_uid) VALUES (?, ?, ?, ?, ?); """ insert_meta_data_query = """ INSERT INTO mmo_project_meta_data (prj_id, page_nam, file_nam, pkl_obj, crte_dt_tm, crte_by_uid, updt_dt_tm) VALUES (?, ?, ?, ?, ?, ?, ?); """ # Get current time for metadata update updt_dt_tm = datetime.now().isoformat() # Serialize project_dct using pickle project_pkl = pickle.dumps(project_dct) # Prepare data for database insertion projects_data = ( st.session_state["emp_id"], # prj_ownr_id project_name, # prj_nam ",".join(matching_user_id), # alwd_emp_id inserted_time, # crte_dt_tm st.session_state["emp_id"], # crte_by_uid ) project_meta_data = ( "Home", # page_nam "project_dct", # file_nam project_pkl, # pkl_obj inserted_time, # crte_dt_tm st.session_state["emp_id"], # crte_by_uid updt_dt_tm, # updt_dt_tm ) # Execute the insertion query for SQLite success = query_excecuter_sqlite( insert_projects_query, insert_meta_data_query, params_projects=projects_data, params_meta=project_meta_data, ) if success: st.success("Project Created") update_summary_df() else: st.error("Failed to create project.") def validate_password(user_input): # List of SQL keywords to check for sql_keywords = [ "SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE", "GRANT", "REVOKE", "UNION", "JOIN", "WHERE", "HAVING", "EXEC", "TRUNCATE", "REPLACE", "MERGE", "DECLARE", "SHOW", "FROM", ] # Create a regex pattern for SQL keywords pattern = "|".join(re.escape(keyword) for keyword in sql_keywords) # Check if input contains any SQL keywords if re.search(pattern, user_input, re.IGNORECASE): return "SQL keyword detected." # Password validation criteria if len(user_input) < 8: return "Password should be at least 8 characters long." if not re.search(r"[A-Z]", user_input): return "Password should contain at least one uppercase letter." if not re.search(r"[0-9]", user_input): return "Password should contain at least one digit." if not re.search(r"[a-z]", user_input): return "Password should contain at least one lowercase letter." if not re.search(r'[!@#$%^&*(),.?":{}|<>]', user_input): return "Password should contain at least one special character." # If all checks pass return "Valid input." def fetch_and_process_projects(emp_id): query = f""" WITH ProjectAccess AS ( SELECT p.prj_id, p.prj_nam, p.alwd_emp_id, u.emp_nam AS project_owner FROM mmo_projects p JOIN mmo_users u ON p.prj_ownr_id = u.emp_id ) SELECT pa.prj_id, pa.prj_nam, pa.project_owner FROM ProjectAccess pa WHERE pa.alwd_emp_id LIKE ? ORDER BY pa.prj_id; """ params = (f"%{emp_id}%",) results = query_excecuter_postgres(query, db_cred, params=params, insert=False) # Process the results to create the desired dictionary structure clone_project_dict = {} for row in results: project_id, project_name, project_owner = row if project_owner not in clone_project_dict: clone_project_dict[project_owner] = [] clone_project_dict[project_owner].append( {"project_name": project_name, "project_id": project_id} ) return clone_project_dict def get_project_id_from_dict(projects_dict, owner_name, project_name): if owner_name in projects_dict: for project in projects_dict[owner_name]: if project["project_name"] == project_name: return project["project_id"] return None # def fetch_project_metadata(prj_id): # query = f""" # SELECT # prj_id, page_nam, file_nam, pkl_obj, dshbrd_ts # FROM # mmo_project_meta_data # WHERE # prj_id = ?; # """ # params = (prj_id,) # return query_excecuter_postgres(query, db_cred, params=params, insert=False) def fetch_project_metadata(prj_id): # Query to select project metadata query = """ SELECT prj_id, page_nam, file_nam, pkl_obj, dshbrd_ts FROM mmo_project_meta_data WHERE prj_id = ?; """ params = (prj_id,) return query_excecuter_postgres(query, db_cred, params=params, insert=False) # def create_new_project(prj_ownr_id, prj_nam, alwd_emp_id, emp_id): # query = f""" # INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_by_uid, crte_dt_tm) # VALUES (?, ?, ?, ?, NOW()) # RETURNING prj_id; # """ # params = (prj_ownr_id, prj_nam, alwd_emp_id, emp_id) # result = query_excecuter_postgres( # query, db_cred, params=params, insert=True, insert_retrieve=True # ) # return result[0][0] # def create_new_project(prj_ownr_id, prj_nam, alwd_emp_id, emp_id): # # Query to insert a new project # insert_query = """ # INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_by_uid, crte_dt_tm) # VALUES (?, ?, ?, ?, DATETIME('now')); # """ # params = (prj_ownr_id, prj_nam, alwd_emp_id, emp_id) # # Execute the insert query # query_excecuter_postgres(insert_query, db_cred, params=params, insert=True) # # Retrieve the last inserted prj_id # retrieve_id_query = "SELECT last_insert_rowid();" # result = query_excecuter_postgres(retrieve_id_query, db_cred, insert_retrieve=True) # return result[0][0] def create_new_project(prj_ownr_id, prj_nam, alwd_emp_id, emp_id): # Query to insert a new project insert_query = """ INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_by_uid, crte_dt_tm) VALUES (?, ?, ?, ?, DATETIME('now')); """ params = (prj_ownr_id, prj_nam, alwd_emp_id, emp_id) # Execute the insert query and retrieve the last inserted prj_id directly last_inserted_id = query_excecuter_postgres( insert_query, params=params, insert_retrieve=True ) return last_inserted_id def insert_project_metadata(new_prj_id, metadata, created_emp_id): # query = f""" # INSERT INTO mmo_project_meta_data ( # prj_id, page_nam, crte_dt_tm, file_nam, pkl_obj, dshbrd_ts, crte_by_uid # ) # VALUES (?, ?, NOW(), ?, ?, ?, ?); # """ query = """ INSERT INTO mmo_project_meta_data ( prj_id, page_nam, crte_dt_tm, file_nam, pkl_obj, dshbrd_ts, crte_by_uid ) VALUES (?, ?, DATETIME('now'), ?, ?, ?, ?); """ for row in metadata: params = (new_prj_id, row[1], row[2], row[3], row[4], created_emp_id) query_excecuter_postgres(query, db_cred, params=params, insert=True) # def delete_projects_by_ids(prj_ids): # # Ensure prj_ids is a tuple to use with the IN clause # prj_ids_tuple = tuple(prj_ids) # # Query to delete project metadata # delete_metadata_query = f""" # DELETE FROM mmo_project_meta_data # WHERE prj_id IN ?; # """ # delete_projects_query = f""" # DELETE FROM mmo_projects # WHERE prj_id IN ?; # """ # try: # # Delete from metadata table # query_excecuter_postgres( # delete_metadata_query, db_cred, params=(prj_ids_tuple,), insert=True # ) # # Delete from projects table # query_excecuter_postgres( # delete_projects_query, db_cred, params=(prj_ids_tuple,), insert=True # ) # except Exception as e: # st.write(f"Error deleting projects: {e}") def delete_projects_by_ids(prj_ids): # Ensure prj_ids is a tuple to use with the IN clause prj_ids_tuple = tuple(prj_ids) # Dynamically generate placeholders for SQLite placeholders = ", ".join(["?"] * len(prj_ids_tuple)) # Query to delete project metadata with dynamic placeholders delete_metadata_query = f""" DELETE FROM mmo_project_meta_data WHERE prj_id IN ({placeholders}); """ delete_projects_query = f""" DELETE FROM mmo_projects WHERE prj_id IN ({placeholders}); """ try: # Delete from metadata table query_excecuter_postgres( delete_metadata_query, db_cred, params=prj_ids_tuple, insert=True ) # Delete from projects table query_excecuter_postgres( delete_projects_query, db_cred, params=prj_ids_tuple, insert=True ) except Exception as e: st.write(f"Error deleting projects: {e}") def fetch_users_with_access(prj_id): # Query to get allowed employee IDs for the project get_allowed_emps_query = """ SELECT alwd_emp_id FROM mmo_projects WHERE prj_id = ?; """ # Fetch the allowed employee IDs allowed_emp_ids_result = query_excecuter_postgres( get_allowed_emps_query, db_cred, params=(prj_id,), insert=False ) if not allowed_emp_ids_result: return [] # Extract the allowed employee IDs (Assuming alwd_emp_id is a comma-separated string) allowed_emp_ids_str = allowed_emp_ids_result[0][0] allowed_emp_ids = allowed_emp_ids_str.split(",") # Convert to tuple for the IN clause allowed_emp_ids_tuple = tuple(allowed_emp_ids) # Query to get user details for the allowed employee IDs get_users_query = """ SELECT emp_id, emp_nam, emp_typ FROM mmo_users WHERE emp_id IN ({}); """.format( ",".join("?" * len(allowed_emp_ids_tuple)) ) # Dynamically construct the placeholder list # Fetch user details user_details = query_excecuter_postgres( get_users_query, db_cred, params=allowed_emp_ids_tuple, insert=False ) return user_details # def update_project_access(prj_id, user_names, new_user_ids): # # Convert the list of new user IDs to a comma-separated string # new_user_ids_str = ",".join(new_user_ids) # # Query to update the alwd_emp_id for the specified project # update_access_query = f""" # UPDATE mmo_projects # SET alwd_emp_id = ? # WHERE prj_id = ?; # """ # # Execute the update query # query_excecuter_postgres( # update_access_query, db_cred, params=(new_user_ids_str, prj_id), insert=True # ) # st.success(f"Project {prj_id} access updated successfully") def fetch_user_ids_from_dict(user_dict, user_names): user_ids = [] # Iterate over the user_dict to find matching user names for user_id, details in user_dict.items(): if details[0] in user_names: user_ids.append(user_id) return user_ids def update_project_access(prj_id, user_names, user_dict): # Fetch the new user IDs based on the provided user names from the dictionary new_user_ids = fetch_user_ids_from_dict(user_dict, user_names) # Convert the list of new user IDs to a comma-separated string new_user_ids_str = ",".join(new_user_ids) # Query to update the alwd_emp_id for the specified project update_access_query = f""" UPDATE mmo_projects SET alwd_emp_id = ? WHERE prj_id = ?; """ # Execute the update query query_excecuter_postgres( update_access_query, db_cred, params=(new_user_ids_str, prj_id), insert=True ) st.write(f"Project {prj_id} access updated successfully") def validate_emp_id(): if st.session_state.sign_up not in st.session_state["unique_ids"].keys(): st.warning("You dont have access to the tool please contact admin") # -------------------Front END-------------------------# st.header("Manage Projects") unique_users_query = f""" SELECT DISTINCT emp_id, emp_nam, emp_typ FROM mmo_users; """ if "unique_ids" not in st.session_state: unique_users_result = query_excecuter_postgres( unique_users_query, db_cred, insert=False ) # retrieves all the users who has access to MMO TOOL if len(unique_users_result) == 0: st.warning("No users data present in db, please contact admin!") st.stop() st.session_state["unique_ids"] = { emp_id: (emp_nam, emp_type) for emp_id, emp_nam, emp_type in unique_users_result } if "toggle" not in st.session_state: st.session_state["toggle"] = 0 if "emp_id" not in st.session_state: reset_password = st.radio( "Select An Option", options=["Login", "Reset Password"], index=st.session_state["toggle"], horizontal=True, ) if reset_password == "Login": emp_id = st.text_input("Employee id").lower() # emp id password = st.text_input("Password", max_chars=15, type="password") login_button = st.button("Login", use_container_width=True) else: emp_id = st.text_input( "Employee id", key="sign_up", on_change=validate_emp_id ).lower() current_password = st.text_input( "Enter Current Password and Press Enter to Validate", max_chars=15, type="password", key="current_password", ) if emp_id: if emp_id not in st.session_state["unique_ids"].keys(): st.write("Invalid id!") st.stop() else: if not is_pswrd_flag_set(emp_id): if verify_password(emp_id, current_password): st.success("Your password key has been successfully validated!") elif ( not verify_password(emp_id, current_password) and len(current_password) > 1 ): st.write("Wrong Password Key Please Try Again") st.stop() elif verify_password(emp_id, current_password): st.success("Your password has been successfully validated!") elif ( not verify_password(emp_id, current_password) and len(current_password) > 1 ): st.write("Wrong Password Please Try Again") st.stop() new_password = st.text_input( "Enter New Password", max_chars=15, type="password", key="new_password" ) st.markdown( "**Password must be at least 8 to 15 characters long and contain at least one uppercase letter, one lowercase letter, one digit, and one special character. No SQL commands allowed.**" ) validation_result = validate_password(new_password) confirm_new_password = st.text_input( "Confirm New Password", max_chars=15, type="password", key="confirm_new_password", ) reset_button = st.button("Reset Password", use_container_width=True) if reset_button: validation_result = validate_password(new_password) if validation_result != "Valid input.": st.warning(validation_result) st.stop() elif new_password != confirm_new_password: st.warning( "The new password and confirmation password do not match. Please try again." ) st.stop() else: store_hashed_password(emp_id, confirm_new_password) set_pswrd_flag(emp_id) st.success("Password Reset Successful!") with st.spinner("Redirecting to Login"): time.sleep(3) st.session_state["toggle"] = 0 st.rerun() st.stop() if login_button: if emp_id not in st.session_state["unique_ids"].keys() or len(password) == 0: st.warning("invalid id or password!") st.stop() if not is_pswrd_flag_set(emp_id): st.warning("Reset password to continue") with st.spinner("Redirecting"): st.session_state["toggle"] = 1 time.sleep(2) st.rerun() st.stop() elif verify_password(emp_id, password): with st.spinner("Loading Saved Projects"): st.session_state["emp_id"] = emp_id update_summary_df() # function call to fetch user saved projects st.session_state["clone_project_dict"] = fetch_and_process_projects( st.session_state["emp_id"] ) if "project_dct" in st.session_state: del st.session_state["project_dct"] st.session_state["project_name"] = None delete_old_log_files() st.rerun() if ( len(st.session_state["emp_id"]) == 0 or st.session_state["emp_id"] not in st.session_state["unique_ids"].keys() ): st.stop() else: st.warning("Invalid user name or password") st.stop() if st.button("Logout"): if "emp_id" in st.session_state: del st.session_state["emp_id"] st.rerun() if st.session_state["emp_id"] in st.session_state["unique_ids"].keys(): if "project_name" not in st.session_state: st.session_state["project_name"] = None cols1 = st.columns([2, 1]) st.session_state["username"] = st.session_state["unique_ids"][ st.session_state["emp_id"] ][0] with cols1[0]: st.markdown(f"**Welcome {st.session_state['username']}**") with cols1[1]: st.markdown(f"**Current Project: {st.session_state['project_name']}**") st.markdown( """ Enter project number in the text below and click on load project to load the project. """ ) st.markdown("Select Project") # st.write(type(st.session_state.keys)) if len(st.session_state["project_summary_df"]) != 0: # Display an editable data table using Streamlit's data editor component table = st.dataframe( st.session_state["project_summary_df"], use_container_width=True, hide_index=True, ) project_number = st.selectbox( "Enter Project number", options=st.session_state["project_summary_df"]["Project Number"], ) log_message( "info", f"Project number {project_number} selected by employee {st.session_state['emp_id']}.", "Home", ) project_col = st.columns(2) # if "load_project_key" not in st.session_state: # st.session_state["load_project_key"] = None\ def load_project_fun(): st.session_state["project_name"] = ( st.session_state["project_summary_df"] .loc[ st.session_state["project_summary_df"]["Project Number"] == project_number, "Project Name", ] .values[0] ) # fetching project name from project number stored in summary df project_dct_query = f""" SELECT pkl_obj FROM mmo_project_meta_data WHERE prj_id = ? AND file_nam = ?; """ # Execute the query and retrieve the result project_dct_retrieved = query_excecuter_postgres( project_dct_query, db_cred, params=(project_number, "project_dct"), insert=False, ) # retrieves project dict (meta data) stored in db st.session_state["project_dct"] = pickle.loads( project_dct_retrieved[0][0] ) # converting bytes data to original objet using pickle st.session_state["project_number"] = project_number keys_to_keep = [ "unique_ids", "emp_id", "project_dct", "project_name", "project_number", "username", "project_summary_df", "clone_project_dict", ] # Clear all keys in st.session_state except the ones to keep for key in list(st.session_state.keys()): if key not in keys_to_keep: del st.session_state[key] ensure_project_dct_structure(st.session_state["project_dct"], default_dct) if st.button( "Load Project", use_container_width=True, key="load_project_key", on_click=load_project_fun, ): st.success("Project Loded") # st.rerun() # refresh the page # st.write(st.session_state['project_dct']) if "radio_box_index" not in st.session_state: st.session_state["radio_box_index"] = 0 projct_radio = st.radio( "Select Options", [ "Create New Project", "Modify Project Access", "Clone Saved Projects", "Delete Projects", ], horizontal=True, index=st.session_state["radio_box_index"], ) if projct_radio == "Modify Project Access": with st.expander("Modify Project Access"): project_number_for_access = st.selectbox( "Select Project Number", st.session_state["project_summary_df"]["Project Number"], ) with st.spinner("Loading"): users_who_has_access = fetch_users_with_access( project_number_for_access ) users_name_who_has_access = [user[1] for user in users_who_has_access] modified_users_for_access_options = [ details[0] for user_id, details in st.session_state["unique_ids"].items() if user_id != st.session_state["emp_id"] ] users_name_who_has_access = [ name for name in users_name_who_has_access if name in modified_users_for_access_options ] modified_users_for_access = st.multiselect( "Select or deselect users to grant or revoke access, then click the 'Modify Access' button to submit changes.", options=modified_users_for_access_options, default=users_name_who_has_access, ) if st.button("Modify Access", use_container_width=True): with st.spinner("Modifying Access"): update_project_access( project_number_for_access, modified_users_for_access, st.session_state["unique_ids"], ) if projct_radio == "Create New Project": with st.expander("Create New Project", expanded=False): st.session_state["is_create_project_open"] = True unique_users = [ user[0] for user in st.session_state["unique_ids"].values() ] # fetching unique users who has access to the tool user_projects = list( set(st.session_state["project_summary_df"]["Project Name"]) ) # fetching corressponding user's projects st.markdown( """ To create a new project, follow the instructions below: 1. **Project Name**: - It should start with the client name, followed by the username. - It should not contain special characters except for underscores (`_`) and should not contain spaces. - Example format: `__` 2. **Select User**: Select the user you want to give access to this project. 3. **Create New Project**: Click **Create New Project** once the above details are entered. **Example**: - For a client named "ClientA" and a user named "UserX" with a project named "NewCampaign", the project name should be: `ClientA_UserX_NewCampaign` """ ) project_col1 = st.columns(3) with project_col1[0]: # API_tables = get_table_names(schema) # load API files slection_tables = ["NA"] api_name = st.selectbox("Select API data", slection_tables, index=0) # data availabe through API # api_path = API_path_dict[api_name] with project_col1[1]: defualt_project_prefix = f"{api_name.split('_mmo_')[0]}_{st.session_state['unique_ids'][st.session_state['emp_id']][0]}_".replace( " ", "_" ).lower() if "project_name_box" not in st.session_state: st.session_state["project_name_box"] = defualt_project_prefix project_name = st.text_input( "Enter Project Name", key="project_name_box" ) warning_box = st.empty() with project_col1[2]: allowed_users = st.multiselect( "Select Users who can access to this Project", [val for val in unique_users], ) allowed_users = list(allowed_users) matching_user_id = [] if len(allowed_users) > 0: # converting the selection to comma seperated values to store in db for emp_id, details in st.session_state["unique_ids"].items(): for name in allowed_users: if name in details: matching_user_id.append(emp_id) break st.button( "Reset Project Name", on_click=reset_project_text_box, help="", use_container_width=True, ) create = st.button( "Create New Project", use_container_width=True, help="Project Name should follow naming convention", ) if create: if not project_name.lower().startswith(defualt_project_prefix): with warning_box: st.warning("Project Name should follow naming convention") st.stop() if project_name == defualt_project_prefix: with warning_box: st.warning("Cannot name only with prefix") st.stop() if project_name in user_projects: with warning_box: st.warning("Project already exists please enter new name") st.stop() if not ( 2 <= len(project_name) <= 50 and bool(re.match("^[A-Za-z0-9_]*$", project_name)) ): # Store the warning message details in session state with warning_box: st.warning( "Please provide a valid project name (2-50 characters, only A-Z, a-z, 0-9, and _)." ) st.stop() if contains_sql_keywords_check(project_name): with warning_box: st.warning( "Input contains SQL keywords. Please avoid using SQL commands." ) st.stop() else: pass with st.spinner("Creating Project"): new_project() with warning_box: st.write("Project Created") st.session_state["radio_box_index"] = 1 log_message( "info", f"Employee {st.session_state['emp_id']} created new project {project_name}.", "Home", ) st.rerun() if projct_radio == "Clone Saved Projects": with st.expander("Clone Saved Projects", expanded=False): if len(st.session_state["clone_project_dict"]) == 0: st.warning("You dont have access to any saved projects") st.stop() cols = st.columns(2) with cols[0]: owners = list(st.session_state["clone_project_dict"].keys()) owner_name = st.selectbox("Select Owner", owners) with cols[1]: project_names = [ project["project_name"] for project in st.session_state["clone_project_dict"][owner_name] ] project_name_owner = st.selectbox( "Select a saved Project available for you", project_names, ) defualt_project_prefix = f"{project_name_owner.split('_')[0]}_{st.session_state['unique_ids'][st.session_state['emp_id']][0]}_".replace( " ", "_" ).lower() user_projects = list( set(st.session_state["project_summary_df"]["Project Name"]) ) cloned_project_name = st.text_input( "Enter Project Name", value=defualt_project_prefix, ) warning_box = st.empty() if st.button( "Load Project", use_container_width=True, key="load_project_button_key" ): if not cloned_project_name.lower().startswith(defualt_project_prefix): with warning_box: st.warning("Project Name should follow naming conventions") st.stop() if cloned_project_name == defualt_project_prefix: with warning_box: st.warning("Cannot Name only with Prefix") st.stop() if cloned_project_name in user_projects: with warning_box: st.warning("Project already exists please enter new name") st.stop() with st.spinner("Cloning Project"): old_prj_id = get_project_id_from_dict( st.session_state["clone_project_dict"], owner_name, project_name_owner, ) old_metadata = fetch_project_metadata(old_prj_id) new_prj_id = create_new_project( st.session_state["emp_id"], cloned_project_name, "", st.session_state["emp_id"], ) insert_project_metadata( new_prj_id, old_metadata, st.session_state["emp_id"] ) update_summary_df() st.success("Project Cloned") st.rerun() if projct_radio == "Delete Projects": if len(st.session_state["project_summary_df"]) != 0: with st.expander("Delete Projects", expanded=True): delete_projects = st.multiselect( "Select all the projects number who want to delete", st.session_state["project_summary_df"]["Project Number"], ) st.warning( "Projects will be permanently deleted. Other users will not be able to clone them if they have not already done so." ) if st.button("Delete Projects", use_container_width=True): if len(delete_projects) > 0: with st.spinner("Deleting Projects"): delete_projects_by_ids(delete_projects) update_summary_df() st.success("Projects Deleted") st.rerun() else: st.warning("Please select atleast one project number to delete") if projct_radio == "Download Project PPT": try: ppt = create_ppt( st.session_state["project_name"], st.session_state["username"], "panel", # new ) if ppt is not False: st.download_button( "Download", data=ppt.getvalue(), file_name=st.session_state["project_name"] + " Project Summary.pptx", use_container_width=True, ) else: st.warning("Please make some progress before downloading PPT.") except Exception as e: st.warning("PPT Download Faild ") # new log_message( log_type="error", message=f"Error in PPT build: {e}", page_name="Home" )