Spaces:
Sleeping
Sleeping
import os | |
import csv | |
import uuid | |
from datetime import datetime | |
import streamlit as st | |
from settings import BASE_DIR, NUMBER_OF_TECHNICAL_QUESTIONS, NUMBER_OF_COMMON_QUESTIONS | |
from core.slack_notifier import SlackNotifier | |
from core.questions_loader_local import QuestionLoaderLocal | |
from presentation.layout import Layout | |
# Slack Webhook | |
SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"] | |
layout = Layout() | |
def call(candidate_id): | |
# Check if questions are already loaded | |
if 'questions' not in st.session_state: | |
# Define questions | |
questions = QuestionLoaderLocal( | |
os.path.join(BASE_DIR, "data/questions", st.session_state['technology'].lower(), "questions.csv"), | |
NUMBER_OF_TECHNICAL_QUESTIONS | |
).fetch_questions() | |
if not questions: | |
st.markdown("Work in progress!!") | |
return | |
common_questions = QuestionLoaderLocal( | |
os.path.join(BASE_DIR, "data/questions", "common", "questions.csv"), | |
NUMBER_OF_COMMON_QUESTIONS | |
).fetch_questions() | |
questions.extend(common_questions) | |
# Store questions in session state to persist across interactions | |
st.session_state['questions'] = questions | |
# Retrieve the questions from session state | |
questions = st.session_state['questions'] | |
score = 0 | |
total_questions = len(questions) | |
answered_all = True | |
for idx, question in enumerate(questions): | |
# Section for each question with styling | |
selected_option = layout.render_test_question(question, idx) | |
if not selected_option: | |
answered_all = False | |
# Checking for correct answer and assigning points based on difficulty | |
if selected_option == question['answer']: | |
score += 1 | |
if st.button("Submit Test", use_container_width=True, type="primary"): | |
if answered_all: | |
st.session_state['test_started'] = False | |
layout.render_completion_message(score, total_questions) | |
result = (score / total_questions) * 100 | |
update_candidate_status(candidate_id, result) | |
SlackNotifier(SLACK_WEBHOOK_URL).send_candidate_info( | |
st.session_state['name'], | |
st.session_state['email'], | |
st.session_state['experience'], | |
st.session_state['technology'], | |
f"{result:.2f}%" | |
) | |
else: | |
# Show a message asking the user to answer all questions | |
st.warning("Please answer all questions before submitting.") | |
def generate_url(): | |
id = uuid.uuid4().hex # Generate a UUID and convert it to a hex string | |
return f'http://localhost:8501/?candidate_id={id}', id | |
def add_candidate_id_to_csv(candidate_id): | |
with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'a') as file: | |
csv.writer(file).writerow([candidate_id,"","","","","","","In Progress"]) | |
def read_candidate_csv(): | |
with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file: | |
reader = csv.DictReader(file) | |
return list(reader) | |
def is_candidate_test_completed(candidate_id, candidates): | |
for row in candidates: | |
if row["id"].lower() == candidate_id.lower() and row["status"] == "Test Completed": | |
return True | |
return False | |
def is_valid_candidate_id(candidate_id, candidates): | |
for row in candidates: | |
if row["id"].lower() == candidate_id: | |
return True | |
return False | |
def update_candidate_status(candidate_id, result): | |
with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file: | |
reader = csv.reader(file) | |
header = next(reader) # Save the header | |
rows = [] | |
for row in reader: | |
if row[0] == candidate_id: | |
row[1] = st.session_state['name'] | |
row[2] = st.session_state['email'] | |
row[3] = st.session_state['experience'] | |
row[4] = st.session_state['technology'] | |
row[5] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
row[6] = result | |
row[7] = "Test Completed" | |
rows.append(row) | |
with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'w') as file: | |
writer = csv.writer(file) | |
writer.writerow(header) # Write the header | |
writer.writerows(rows) | |
def main(): | |
query_params = st.query_params | |
if not query_params: | |
st.markdown("Invalid URL") | |
return | |
candidate_id = query_params.get("candidate_id", None) | |
internal = query_params.get("internal", None) | |
# Set page config with custom title and layout | |
st.set_page_config(page_title="Candidate MCQ Platform", layout="wide") | |
layout.render_header() | |
if internal: | |
if st.button('Generate URL'): | |
url, candidate_id = generate_url() | |
st.write(f"Generated URL: {url}") | |
add_candidate_id_to_csv(candidate_id) | |
elif candidate_id: | |
candidates = read_candidate_csv() | |
if not is_valid_candidate_id(candidate_id, candidates): | |
st.markdown("Invalid URL") | |
return | |
print("Candidate ID:", candidate_id) | |
if is_candidate_test_completed(candidate_id, candidates): | |
st.markdown("Test has been completed! Great job on completing it. Thank you for your effort and dedication.") | |
return | |
if 'test_started' not in st.session_state: | |
st.session_state['test_started'] = False | |
if not st.session_state['test_started']: | |
st.title("Welcome to the Candidate Assessment Platform") | |
name, email, experience, technology, submit = layout.render_signup_form() | |
if name and email: | |
st.session_state['name'] = name | |
st.session_state['email'] = email | |
st.session_state['experience'] = experience | |
st.session_state['technology'] = technology | |
st.session_state['test_wip'] = True | |
if submit: | |
st.session_state['test_started'] = True | |
st.rerun() | |
layout.render_instructions() | |
else: | |
call(candidate_id) | |
else: | |
st.markdown("Invalid URL") | |
if __name__ == "__main__": | |
main() | |