Spaces:
Running
Running
File size: 5,962 Bytes
d062f19 fbfcb17 d062f19 fbfcb17 d062f19 aab9adc d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 fbfcb17 d062f19 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# check.py
import os
import tempfile
import json
import numpy as np
import cv2
from PIL import Image
from pdf2image import convert_from_bytes
from fastapi import APIRouter, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
from google import genai
router = APIRouter(prefix="/check", tags=["check"])
# GenAI client
GENAI_API_KEY = os.getenv("GENAI_API_KEY")
if not GENAI_API_KEY:
raise Exception("GENAI_API_KEY not set in environment")
client = genai.Client(api_key=GENAI_API_KEY)
# Temp storage for results
TEMP_FOLDER = tempfile.gettempdir()
RESULT_FILE = os.path.join(TEMP_FOLDER, "result_cards.json")
def extract_json_from_output(output_str: str):
start = output_str.find("{")
end = output_str.rfind("}")
if start == -1 or end == -1:
return None
try:
return json.loads(output_str[start : end + 1])
except json.JSONDecodeError:
return None
def parse_all_answers(image_input: Image.Image) -> str:
output_format = """
Answer in the following JSON format. Do not write anything else:
{ "Answers": { "1": "<…>", …, "15": "<…>" } }
"""
prompt = f"""
You are an assistant that extracts answers from an image of a 15-question sheet.
Provide ONLY JSON in this format:
{output_format}
"""
response = client.models.generate_content(
model="gemini-2.0-flash", contents=[prompt, image_input]
)
return response.text
def parse_info(image_input: Image.Image) -> str:
output_format = """
Answer in the following JSON format. Do not write anything else:
{ "Candidate Info": { "Name": "<…>", "Number": "<…>", "Country": "<…>", "Level": "<…>", "Paper": "<…>" } }
"""
prompt = f"""
You are an assistant that extracts candidate info from an image.
Provide ONLY JSON in this format:
{output_format}
"""
response = client.models.generate_content(
model="gemini-2.0-flash", contents=[prompt, image_input]
)
return response.text
def calculate_result(student_answers: dict, correct_answers: dict) -> dict:
student_all = (student_answers or {}).get("Answers", {})
correct_all = (correct_answers or {}).get("Answers", {})
total = 15
marks = 0
detailed = {}
for q in map(str, range(1, total + 1)):
stud = (student_all.get(q) or "").strip()
corr = (correct_all.get(q) or "").strip()
ok = stud == corr
detailed[q] = {"Student": stud, "Correct": corr, "Result": "Correct" if ok else "Incorrect"}
if ok:
marks += 1
return {"Total Marks": marks, "Total Questions": total, "Percentage": marks / total * 100, "Detailed Results": detailed}
def load_answer_key(pdf_bytes: bytes) -> dict:
images = convert_from_bytes(pdf_bytes)
last_page = images[-1]
resp = parse_all_answers(last_page)
return extract_json_from_output(resp)
@router.post("/process", summary="Grade student sheets (Paper K only)")
async def process_pdfs(
student_pdf: UploadFile = File(..., description="Student sheets PDF"),
paper_k_pdf: UploadFile = File(..., description="Answer key PDF for Paper K"),
):
try:
stud_bytes = await student_pdf.read()
key_bytes = await paper_k_pdf.read()
answer_key = load_answer_key(key_bytes)
if answer_key is None:
raise HTTPException(400, detail="Could not parse Paper K answer key.")
student_pages = convert_from_bytes(stud_bytes)
all_results = []
for idx, page in enumerate(student_pages, start=1):
# crop candidate-info
cv = cv2.cvtColor(np.array(page), cv2.COLOR_RGB2BGR)
h, w = cv.shape[:2]
mask = np.zeros((h, w), dtype="uint8")
top, bottom = int(h * 0.10), int(h * 0.75)
cv2.rectangle(mask, (0, top), (w, h - bottom), 255, -1)
crop = cv2.bitwise_and(cv, cv, mask=mask)
coords = cv2.findNonZero(mask)
if coords is None:
continue
x, y, mw, mh = cv2.boundingRect(coords)
cand_img = Image.fromarray(cv2.cvtColor(crop[y : y + mh, x : x + mw], cv2.COLOR_BGR2RGB))
# parse candidate info
info_txt = parse_info(cand_img)
candidate_info = extract_json_from_output(info_txt) or {}
# parse student answers
stud_txt = parse_all_answers(page)
stud_answers = extract_json_from_output(stud_txt)
if stud_answers is None:
raise HTTPException(400, detail=f"Failed to parse answers on page {idx}.")
# grade
result = calculate_result(stud_answers, answer_key)
all_results.append(
{
"Student Index": idx,
"Candidate Info": candidate_info.get("Candidate Info", {}),
"Student Answers": stud_answers,
"Correct Answer Key": answer_key,
"Result": result,
}
)
# write file
with open(RESULT_FILE, "w", encoding="utf-8") as f:
json.dump({"results": all_results}, f, indent=2)
return JSONResponse(content={"results": all_results})
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, detail=str(e))
@router.get("/download", summary="Download latest grading results")
async def download_results():
if not os.path.exists(RESULT_FILE):
raise HTTPException(404, detail="No results available. Run /check/process first.")
return StreamingResponse(
open(RESULT_FILE, "rb"),
media_type="application/json",
headers={"Content-Disposition": "attachment; filename=result_cards.json"},
)
@router.get("/health", summary="Health check")
async def health_check():
return {"status": "healthy"}
@router.get("/version", summary="Service version")
async def version_check():
return {"version": "1.0.0"}
|