Spaces:
Sleeping
Sleeping
File size: 6,311 Bytes
f7c75d0 f187aae 53f9c43 f7c75d0 53f9c43 f7c75d0 f187aae 2b130d1 f187aae 2b130d1 a709932 2b130d1 f187aae 2b130d1 f7c75d0 2b130d1 f7c75d0 53f9c43 2b130d1 f7c75d0 53f9c43 2b130d1 f187aae 2b130d1 f7c75d0 f187aae f7c75d0 f187aae f7c75d0 53f9c43 f187aae f7c75d0 f187aae f7c75d0 f187aae f7c75d0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import os
import csv
import uuid
from datetime import datetime
import streamlit as st
from settings import BASE_DIR, NUMBER_OF_TECHNICAL_QUESTIONS, NUMBER_OF_COMMON_QUESTIONS
from core.slack_notifier import SlackNotifier
from core.questions_loader_local import QuestionLoaderLocal
from presentation.layout import Layout
# Slack Webhook
SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]
layout = Layout()
def call(candidate_id):
# Check if questions are already loaded
if 'questions' not in st.session_state:
# Define questions
questions = QuestionLoaderLocal(
os.path.join(BASE_DIR, "data/questions", st.session_state['technology'].lower(), "questions.csv"),
NUMBER_OF_TECHNICAL_QUESTIONS
).fetch_questions()
if not questions:
st.markdown("Work in progress!!")
return
common_questions = QuestionLoaderLocal(
os.path.join(BASE_DIR, "data/questions", "common", "questions.csv"),
NUMBER_OF_COMMON_QUESTIONS
).fetch_questions()
questions.extend(common_questions)
# Store questions in session state to persist across interactions
st.session_state['questions'] = questions
# Retrieve the questions from session state
questions = st.session_state['questions']
score = 0
total_questions = len(questions)
answered_all = True
for idx, question in enumerate(questions):
# Section for each question with styling
selected_option = layout.render_test_question(question, idx)
if not selected_option:
answered_all = False
# Checking for correct answer and assigning points based on difficulty
if selected_option == question['answer']:
score += 1
if st.button("Submit Test", use_container_width=True, type="primary"):
if answered_all:
st.session_state['test_started'] = False
layout.render_completion_message(score, total_questions)
result = (score / total_questions) * 100
update_candidate_status(candidate_id, result)
SlackNotifier(SLACK_WEBHOOK_URL).send_candidate_info(
st.session_state['name'],
st.session_state['email'],
st.session_state['experience'],
st.session_state['technology'],
f"{result:.2f}%"
)
else:
# Show a message asking the user to answer all questions
st.warning("Please answer all questions before submitting.")
def generate_url():
id = uuid.uuid4().hex # Generate a UUID and convert it to a hex string
return f'http://localhost:8501/?candidate_id={id}', id
def add_candidate_id_to_csv(candidate_id):
with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'a') as file:
csv.writer(file).writerow([candidate_id,"","","","","","","In Progress"])
def read_candidate_csv():
with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file:
reader = csv.DictReader(file)
return list(reader)
def is_candidate_test_completed(candidate_id, candidates):
for row in candidates:
if row["id"].lower() == candidate_id.lower() and row["status"] == "Test Completed":
return True
return False
def is_valid_candidate_id(candidate_id, candidates):
for row in candidates:
if row["id"].lower() == candidate_id:
return True
return False
def update_candidate_status(candidate_id, result):
with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file:
reader = csv.reader(file)
header = next(reader) # Save the header
rows = []
for row in reader:
if row[0] == candidate_id:
row[1] = st.session_state['name']
row[2] = st.session_state['email']
row[3] = st.session_state['experience']
row[4] = st.session_state['technology']
row[5] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
row[6] = result
row[7] = "Test Completed"
rows.append(row)
with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'w') as file:
writer = csv.writer(file)
writer.writerow(header) # Write the header
writer.writerows(rows)
def main():
query_params = st.query_params
if not query_params:
st.markdown("Invalid URL")
return
candidate_id = query_params.get("candidate_id", None)
internal = query_params.get("internal", None)
# Set page config with custom title and layout
st.set_page_config(page_title="Candidate MCQ Platform", layout="wide")
layout.render_header()
if internal:
if st.button('Generate URL'):
url, candidate_id = generate_url()
st.write(f"Generated URL: {url}")
add_candidate_id_to_csv(candidate_id)
elif candidate_id:
candidates = read_candidate_csv()
if not is_valid_candidate_id(candidate_id, candidates):
st.markdown("Invalid URL")
return
print("Candidate ID:", candidate_id)
if is_candidate_test_completed(candidate_id, candidates):
st.markdown("Test has been completed! Great job on completing it. Thank you for your effort and dedication.")
return
if 'test_started' not in st.session_state:
st.session_state['test_started'] = False
if not st.session_state['test_started']:
st.title("Welcome to the Candidate Assessment Platform")
name, email, experience, technology, submit = layout.render_signup_form()
if name and email:
st.session_state['name'] = name
st.session_state['email'] = email
st.session_state['experience'] = experience
st.session_state['technology'] = technology
st.session_state['test_wip'] = True
if submit:
st.session_state['test_started'] = True
st.rerun()
layout.render_instructions()
else:
call(candidate_id)
else:
st.markdown("Invalid URL")
if __name__ == "__main__":
main()
|