import os import csv import uuid from datetime import datetime import streamlit as st from settings import BASE_DIR, NUMBER_OF_TECHNICAL_QUESTIONS, NUMBER_OF_COMMON_QUESTIONS from core.slack_notifier import SlackNotifier from core.questions_loader_local import QuestionLoaderLocal from presentation.layout import Layout # Slack Webhook SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"] layout = Layout() def call(candidate_id): # Check if questions are already loaded if 'questions' not in st.session_state: # Define questions questions = QuestionLoaderLocal( os.path.join(BASE_DIR, "data/questions", st.session_state['technology'].lower(), "questions.csv"), NUMBER_OF_TECHNICAL_QUESTIONS ).fetch_questions() if not questions: st.markdown("Work in progress!!") return common_questions = QuestionLoaderLocal( os.path.join(BASE_DIR, "data/questions", "common", "questions.csv"), NUMBER_OF_COMMON_QUESTIONS ).fetch_questions() questions.extend(common_questions) # Store questions in session state to persist across interactions st.session_state['questions'] = questions # Retrieve the questions from session state questions = st.session_state['questions'] score = 0 total_questions = len(questions) answered_all = True for idx, question in enumerate(questions): # Section for each question with styling selected_option = layout.render_test_question(question, idx) if not selected_option: answered_all = False # Checking for correct answer and assigning points based on difficulty if selected_option == question['answer']: score += 1 if st.button("Submit Test", use_container_width=True, type="primary"): if answered_all: st.session_state['test_started'] = False layout.render_completion_message(score, total_questions) result = (score / total_questions) * 100 update_candidate_status(candidate_id, result) SlackNotifier(SLACK_WEBHOOK_URL).send_candidate_info( st.session_state['name'], st.session_state['email'], st.session_state['experience'], st.session_state['technology'], f"{result:.2f}%" ) else: # Show a message asking the user to answer all questions st.warning("Please answer all questions before submitting.") def generate_url(): id = uuid.uuid4().hex # Generate a UUID and convert it to a hex string return f'http://localhost:8501/?candidate_id={id}', id def add_candidate_id_to_csv(candidate_id): with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'a') as file: csv.writer(file).writerow([candidate_id,"","","","","","","In Progress"]) def read_candidate_csv(): with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file: reader = csv.DictReader(file) return list(reader) def is_candidate_test_completed(candidate_id, candidates): for row in candidates: if row["id"].lower() == candidate_id.lower() and row["status"] == "Test Completed": return True return False def is_valid_candidate_id(candidate_id, candidates): for row in candidates: if row["id"].lower() == candidate_id: return True return False def update_candidate_status(candidate_id, result): with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file: reader = csv.reader(file) header = next(reader) # Save the header rows = [] for row in reader: if row[0] == candidate_id: row[1] = st.session_state['name'] row[2] = st.session_state['email'] row[3] = st.session_state['experience'] row[4] = st.session_state['technology'] row[5] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") row[6] = result row[7] = "Test Completed" rows.append(row) with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'w') as file: writer = csv.writer(file) writer.writerow(header) # Write the header writer.writerows(rows) def main(): query_params = st.query_params if not query_params: st.markdown("Invalid URL") return candidate_id = query_params.get("candidate_id", None) internal = query_params.get("internal", None) # Set page config with custom title and layout st.set_page_config(page_title="Candidate MCQ Platform", layout="wide") layout.render_header() if internal: if st.button('Generate URL'): url, candidate_id = generate_url() st.write(f"Generated URL: {url}") add_candidate_id_to_csv(candidate_id) elif candidate_id: candidates = read_candidate_csv() if not is_valid_candidate_id(candidate_id, candidates): st.markdown("Invalid URL") return print("Candidate ID:", candidate_id) if is_candidate_test_completed(candidate_id, candidates): st.markdown("Test has been completed! Great job on completing it. Thank you for your effort and dedication.") return if 'test_started' not in st.session_state: st.session_state['test_started'] = False if not st.session_state['test_started']: st.title("Welcome to the Candidate Assessment Platform") name, email, experience, technology, submit = layout.render_signup_form() if name and email: st.session_state['name'] = name st.session_state['email'] = email st.session_state['experience'] = experience st.session_state['technology'] = technology st.session_state['test_wip'] = True if submit: st.session_state['test_started'] = True st.rerun() layout.render_instructions() else: call(candidate_id) else: st.markdown("Invalid URL") if __name__ == "__main__": main()