candidate-platform / core /questions_loader_aws.py
shaileshjadhavSS
Solved start test button issue and Added python, django, Data engineering and common questions
2b130d1
raw
history blame
2.26 kB
import boto3
import csv
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
class QuestionLoaderAWS:
def __init__(self, bucket_name, aws_access_key, aws_secret_key):
"""
Initializes the QuestionLoader with AWS credentials and bucket name.
:param bucket_name: The name of the S3 bucket.
:param aws_access_key: AWS access key ID.
:param aws_secret_key: AWS secret access key.
"""
self.bucket_name = bucket_name
self.s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key
)
def fetch_questions(self, technology):
"""
Fetches the questions for the given technology from the S3 bucket.
:param technology: The technology (e.g., Python, Django) to fetch questions for.
:return: A list of dictionaries, where each dictionary represents a question.
:raises: Exception if the file cannot be fetched or read.
"""
file_key = f"questions/{technology}/questions.csv"
try:
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=file_key)
questions = []
# Decode and parse the CSV content
lines = response['Body'].read().decode('utf-8').splitlines()
csv_reader = csv.DictReader(lines)
for row in csv_reader:
questions.append({
"question": row["question"],
"option1": row["option1"],
"option2": row["option2"],
"option3": row["option3"],
"option4": row["option4"],
"answer": row["answer"],
"difficulty": row["difficulty"].lower()
})
return questions
except self.s3_client.exceptions.NoSuchKey:
raise FileNotFoundError(f"No questions found for technology: {technology}")
except (NoCredentialsError, PartialCredentialsError):
raise PermissionError("Invalid AWS credentials. Please check your .env file.")
except Exception as e:
raise RuntimeError(f"Failed to fetch questions: {str(e)}")