Spaces:
Running
Running
import os | |
import tempfile | |
import json | |
import numpy as np | |
import cv2 | |
from PIL import Image | |
from pdf2image import convert_from_bytes | |
from fastapi import FastAPI, UploadFile, File, HTTPException | |
from fastapi.responses import JSONResponse, StreamingResponse | |
import uvicorn | |
from fastapi import APIRouter, HTTPException, Path | |
# Get API key from environment | |
GENAI_API_KEY = os.getenv("GENAI_API_KEY") | |
if not GENAI_API_KEY: | |
raise Exception("GENAI_API_KEY not set in environment") | |
# Import the Google GenAI client libraries. | |
from google import genai | |
from google.genai import types | |
# Initialize the GenAI client with the API key. | |
client = genai.Client(api_key=GENAI_API_KEY) | |
router = APIRouter(prefix="/check", tags=["check"]) | |
# Use system temporary directory to store the results file. | |
TEMP_FOLDER = tempfile.gettempdir() | |
RESULT_FILE = os.path.join(TEMP_FOLDER, "result_cards.json") | |
############################################################## | |
# Preprocessing & Extraction Functions | |
############################################################## | |
def extract_json_from_output(output_str: str): | |
""" | |
Extracts a JSON object from a string containing extra text. | |
""" | |
start = output_str.find('{') | |
end = output_str.rfind('}') | |
if start == -1 or end == -1: | |
print("No JSON block found in the output.") | |
return None | |
json_str = output_str[start:end+1] | |
try: | |
return json.loads(json_str) | |
except json.JSONDecodeError as e: | |
print("Error decoding JSON:", e) | |
return None | |
def parse_all_answers(image_input: Image.Image) -> str: | |
""" | |
Extracts answers from an image of a 15-question answer sheet. | |
Returns the raw JSON string response from the model. | |
""" | |
output_format = """ | |
Answer in the following JSON format. Do not write anything else: | |
{ | |
"Answers": { | |
"1": "<option or text>", | |
"2": "<option or text>", | |
"3": "<option or text>", | |
"4": "<option or text>", | |
"5": "<option or text>", | |
"6": "<option or text>", | |
"7": "<option or text>", | |
"8": "<option or text>", | |
"9": "<option or text>", | |
"10": "<option or text>", | |
"11": "<free-text answer>", | |
"12": "<free-text answer>", | |
"13": "<free-text answer>", | |
"14": "<free-text answer>", | |
"15": "<free-text answer>" | |
} | |
} | |
""" | |
prompt = f""" | |
You are an assistant that extracts answers from an image. | |
The image is a screenshot of an answer sheet containing 15 questions. | |
For questions 1 to 10, the answers are multiple-choice selections. | |
For questions 11 to 15, the answers are free-text responses. | |
Extract the answer for each question (1 to 15) and provide the result in JSON using the format below: | |
{output_format} | |
""" | |
response = client.models.generate_content( | |
model="gemini-2.0-flash", | |
contents=[prompt, image_input] | |
) | |
return response.text | |
def parse_info(image_input: Image.Image) -> str: | |
""" | |
Extracts candidate information including name, number, country, level and paper from an image. | |
Returns the raw JSON string response from the model. | |
""" | |
output_format = """ | |
Answer in the following JSON format. Do not write anything else: | |
{ | |
"Candidate Info": { | |
"Name": "<name>", | |
"Number": "<number>", | |
"Country": "<country>", | |
"Level": "<level>", | |
"Paper": "<paper>" | |
} | |
} | |
""" | |
prompt = f""" | |
You are an assistant that extracts candidate information from an image. | |
The image contains candidate details including name, candidate number, country, level and paper. | |
Extract the information accurately and provide the result in JSON using the following format: | |
{output_format} | |
""" | |
response = client.models.generate_content( | |
model="gemini-2.0-flash", | |
contents=[prompt, image_input] | |
) | |
return response.text | |
def calculate_result(student_answers: dict, correct_answers: dict) -> dict: | |
""" | |
Compares student's answers with the correct answers and calculates the score. | |
Assumes JSON structures with a top-level "Answers" key containing Q1 to Q15. | |
""" | |
student_all = student_answers.get("Answers", {}) | |
correct_all = correct_answers.get("Answers", {}) | |
total_questions = 15 | |
marks = 0 | |
detailed = {} | |
for q in map(str, range(1, total_questions + 1)): | |
stud_ans = student_all.get(q, "").strip() | |
corr_ans = correct_all.get(q, "").strip() | |
if stud_ans == corr_ans: | |
marks += 1 | |
detailed[q] = {"Student": stud_ans, "Correct": corr_ans, "Result": "Correct"} | |
else: | |
detailed[q] = {"Student": stud_ans, "Correct": corr_ans, "Result": "Incorrect"} | |
percentage = (marks / total_questions) * 100 | |
return { | |
"Total Marks": marks, | |
"Total Questions": total_questions, | |
"Percentage": percentage, | |
"Detailed Results": detailed | |
} | |
def load_answer_key(pdf_bytes: bytes) -> dict: | |
""" | |
Converts a PDF (as bytes) to images, takes the last page, and parses the answers. | |
Returns the parsed JSON answer key. | |
""" | |
images = convert_from_bytes(pdf_bytes) | |
last_page_image = images[-1] | |
answer_key_response = parse_all_answers(last_page_image) | |
return extract_json_from_output(answer_key_response) | |
############################################################## | |
# FastAPI Endpoints | |
############################################################## | |
async def process_pdfs( | |
original_pdf: UploadFile = File(..., description="PDF with all student answer sheets (one page per student)"), | |
paper_k_pdf: UploadFile = File(..., description="Answer key PDF for Paper K") | |
): | |
try: | |
# Read file bytes | |
student_pdf_bytes = await original_pdf.read() | |
paper_k_bytes = await paper_k_pdf.read() | |
# Load the Paper K answer key | |
answer_key_k = load_answer_key(paper_k_bytes) | |
if answer_key_k is None: | |
raise Exception("Failed to parse Paper K answer key.") | |
# Convert the student answer PDF to images (each page = one student) | |
student_images = convert_from_bytes(student_pdf_bytes) | |
all_results = [] | |
for idx, page in enumerate(student_images): | |
# --- Extract Candidate Info Region --- | |
page_cv = cv2.cvtColor(np.array(page), cv2.COLOR_RGB2BGR) | |
h, w = page_cv.shape[:2] | |
mask = np.zeros((h, w), dtype="uint8") | |
top, bottom = int(h * 0.10), int(h * 0.75) | |
cv2.rectangle(mask, (0, top), (w, h - bottom), 255, -1) | |
cropped = cv2.bitwise_and(page_cv, page_cv, mask=mask) | |
coords = cv2.findNonZero(mask) | |
if coords is None: | |
continue | |
x, y, mw, mh = cv2.boundingRect(coords) | |
cand_img = Image.fromarray(cv2.cvtColor(cropped[y:y+mh, x:x+mw], cv2.COLOR_BGR2RGB)) | |
# Extract candidate info | |
info_resp = parse_info(cand_img) | |
cand_info = extract_json_from_output(info_resp) or {} | |
# Extract student answers | |
stud_resp = parse_all_answers(page) | |
stud_answers = extract_json_from_output(stud_resp) or {} | |
# Calculate result against Paper K key | |
result = calculate_result(stud_answers, answer_key_k) | |
all_results.append({ | |
"Student Index": idx + 1, | |
"Candidate Info": cand_info.get("Candidate Info", {}), | |
"Student Answers": stud_answers, | |
"Correct Answer Key": answer_key_k, | |
"Result": result | |
}) | |
# Write out JSON file | |
with open(RESULT_FILE, "w", encoding="utf-8") as f: | |
json.dump({"results": all_results}, f, indent=2) | |
return JSONResponse(content={"results": all_results}) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def download_results(): | |
""" | |
Returns the result JSON file stored in the temporary folder. | |
""" | |
if not os.path.exists(RESULT_FILE): | |
raise HTTPException(status_code=404, detail="Result file not found. Please run /process first.") | |
return StreamingResponse( | |
open(RESULT_FILE, "rb"), | |
media_type="application/json", | |
headers={"Content-Disposition": "attachment; filename=result_cards.json"} | |
) | |
async def root(): | |
return { | |
"message": "Welcome to the Student Result Card API (Paper K only).", | |
"usage": ( | |
"POST two PDFs to /process: " | |
"(1) original answer sheet PDF, " | |
"(2) Paper K answer-key PDF. " | |
"Then GET /download to retrieve the graded results." | |
) | |
} | |
if __name__ == "__main__": | |
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) | |