File size: 6,311 Bytes
f7c75d0
f187aae
 
 
53f9c43
 
 
 
 
f7c75d0
 
 
53f9c43
f7c75d0
f187aae
2b130d1
 
 
 
f187aae
2b130d1
 
a709932
 
 
 
 
2b130d1
f187aae
2b130d1
 
 
 
 
 
 
 
 
f7c75d0
 
2b130d1
f7c75d0
 
 
53f9c43
2b130d1
 
 
f7c75d0
 
 
53f9c43
2b130d1
 
 
 
f187aae
2b130d1
 
 
 
 
 
 
 
 
 
f7c75d0
f187aae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7c75d0
f187aae
 
 
 
 
 
 
f7c75d0
 
53f9c43
f187aae
 
 
 
 
f7c75d0
f187aae
 
 
 
 
 
 
 
 
 
 
 
f7c75d0
f187aae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7c75d0
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import os
import csv
import uuid
from datetime import datetime
import streamlit as st
from settings import BASE_DIR, NUMBER_OF_TECHNICAL_QUESTIONS, NUMBER_OF_COMMON_QUESTIONS
from core.slack_notifier import SlackNotifier
from core.questions_loader_local import QuestionLoaderLocal
from presentation.layout import Layout

# Slack Webhook
SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]
layout = Layout()

def call(candidate_id):
    # Check if questions are already loaded
    if 'questions' not in st.session_state:
        # Define questions
        questions = QuestionLoaderLocal(
            os.path.join(BASE_DIR, "data/questions", st.session_state['technology'].lower(), "questions.csv"),
            NUMBER_OF_TECHNICAL_QUESTIONS
        ).fetch_questions()
        
        if not questions:
            st.markdown("Work in progress!!")
            return

        common_questions = QuestionLoaderLocal(
            os.path.join(BASE_DIR, "data/questions", "common", "questions.csv"),
            NUMBER_OF_COMMON_QUESTIONS
        ).fetch_questions()
        questions.extend(common_questions)
        
        # Store questions in session state to persist across interactions
        st.session_state['questions'] = questions

    # Retrieve the questions from session state
    questions = st.session_state['questions']
    score = 0
    total_questions = len(questions)
    answered_all = True

    for idx, question in enumerate(questions):
        # Section for each question with styling
        selected_option = layout.render_test_question(question, idx)
        if not selected_option:
            answered_all = False
        # Checking for correct answer and assigning points based on difficulty
        if selected_option == question['answer']:
            score += 1

    if st.button("Submit Test", use_container_width=True, type="primary"):
        if answered_all:
            st.session_state['test_started'] = False
            layout.render_completion_message(score, total_questions)
            result = (score / total_questions) * 100
            update_candidate_status(candidate_id, result)
            SlackNotifier(SLACK_WEBHOOK_URL).send_candidate_info(
                st.session_state['name'], 
                st.session_state['email'], 
                st.session_state['experience'], 
                st.session_state['technology'], 
                f"{result:.2f}%"
            )
        else:
            # Show a message asking the user to answer all questions
            st.warning("Please answer all questions before submitting.")

def generate_url():
    id = uuid.uuid4().hex  # Generate a UUID and convert it to a hex string
    return f'http://localhost:8501/?candidate_id={id}', id

def add_candidate_id_to_csv(candidate_id):
    with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'a') as file:
        csv.writer(file).writerow([candidate_id,"","","","","","","In Progress"])        

def read_candidate_csv():
    with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file:
        reader = csv.DictReader(file)
        return list(reader)

def is_candidate_test_completed(candidate_id, candidates):
    for row in candidates:
        if row["id"].lower() == candidate_id.lower() and row["status"] == "Test Completed":
            return True
    return False

def is_valid_candidate_id(candidate_id, candidates):
    for row in candidates:
        if row["id"].lower() == candidate_id:
            return True
    return False

def update_candidate_status(candidate_id, result):
    with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file:
        reader = csv.reader(file)
        header = next(reader)  # Save the header
        rows = []
        for row in reader:
            if row[0] == candidate_id:
                row[1] = st.session_state['name']
                row[2] = st.session_state['email']
                row[3] = st.session_state['experience']
                row[4] = st.session_state['technology'] 
                row[5] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                row[6] = result
                row[7] = "Test Completed"
            rows.append(row)
    with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'w') as file:

        writer = csv.writer(file)
        writer.writerow(header)  # Write the header
        writer.writerows(rows)

def main():
    query_params = st.query_params
    if not query_params:
        st.markdown("Invalid URL")
        return
    
    candidate_id = query_params.get("candidate_id", None)
    internal = query_params.get("internal", None)
    # Set page config with custom title and layout
    st.set_page_config(page_title="Candidate MCQ Platform", layout="wide")
    layout.render_header()
    if internal:
        if st.button('Generate URL'):
            url, candidate_id = generate_url()
            st.write(f"Generated URL: {url}")
            add_candidate_id_to_csv(candidate_id)

    elif candidate_id:
        candidates = read_candidate_csv()
        if not is_valid_candidate_id(candidate_id, candidates):
            st.markdown("Invalid URL")
            return
        print("Candidate ID:", candidate_id)
        if is_candidate_test_completed(candidate_id, candidates):
            st.markdown("Test has been completed! Great job on completing it. Thank you for your effort and dedication.")
            return
        
        if 'test_started' not in st.session_state:
            st.session_state['test_started'] = False

        if not st.session_state['test_started']:
            st.title("Welcome to the Candidate Assessment Platform")
            name, email, experience, technology, submit = layout.render_signup_form()
            if name and email:
                st.session_state['name'] = name
                st.session_state['email'] = email
                st.session_state['experience'] = experience
                st.session_state['technology'] = technology
                st.session_state['test_wip'] = True
            if submit:
                st.session_state['test_started'] = True
                st.rerun()
            layout.render_instructions()
        else:
            call(candidate_id)
    else:
        st.markdown("Invalid URL")
if __name__ == "__main__":
    main()