# main.py import logging from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Dict, Optional import uuid from datetime import datetime, timedelta import asyncio import random from sentence_transformers import SentenceTransformer from transformers import T5Tokenizer, T5ForConditionalGeneration from models.LexRank import degree_centrality_scores import torch import nltk import spacy from psycopg2 import sql app = FastAPI(title="Kairos News API", version="1.0") # Enable CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) #Database connection setup url = "" key = "" opts = ClientOptions().replace(schema="articles") supabase = create_client(url, key, options=opts) # Loading models nlp = spacy.load("pt_core_news_md") model_embedding = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") token_name = 'unicamp-dl/ptt5-base-portuguese-vocab' model_name = 'recogna-nlp/ptt5-base-summ' tokenizer = T5Tokenizer.from_pretrained(token_name) model_summ = T5ForConditionalGeneration.from_pretrained(model_name).to('cuda') # In-memory database simulation jobs_db: Dict[str, Dict] = {} class PostRequest(BaseModel): query: str topic: str start_date: str # Format: "YYYY/MM to YYYY/MM" end_date: str # Format: "YYYY/MM to YYYY/MM" class JobStatus(BaseModel): id: str status: str # "processing", "completed", "failed" created_at: datetime completed_at: Optional[datetime] request: PostRequest result: Optional[Dict] @app.post("/index", response_model=JobStatus) async def create_job(request: PostRequest, background_tasks: BackgroundTasks): """Create a new processing job""" job_id = str(uuid.uuid4()) # Store initial job data jobs_db[job_id] = { "status": "processing", "created_at": datetime.now(), "completed_at": None, "request": request.dict(), "result": None } logging.info(f"Job {job_id} created with request: {request.query}") # Simulate background processing background_tasks.add_task(process_job, job_id) return { "id": job_id, "status": "processing", "created_at": jobs_db[job_id]["created_at"], "completed_at": None, "request": request, "result": None } @app.get("/loading", response_model=JobStatus) async def get_job_status(id: str): """Check job status with timeout simulation""" if id not in jobs_db: raise HTTPException(status_code=404, detail="Job not found") job = jobs_db[id] # Simulate random processing time (3-25 seconds) elapsed = datetime.now() - job["created_at"] if elapsed < timedelta(seconds=3): await asyncio.sleep(1) # Artificial delay # 10% chance of failure for demonstration if random.random() < 0.1 and job["status"] == "processing": job["status"] = "failed" job["result"] = {"error": "Random processing failure"} return { "id": id, "status": job["status"], "created_at": job["created_at"], "completed_at": job["completed_at"], "request": job["request"], "result": job["result"] } async def process_job(job_id: str): """Background task to simulate processing""" await asyncio.sleep(random.uniform(3, 10)) # Random processing time if job_id in jobs_db: jobs_db[job_id]["status"] = "completed" jobs_db[job_id]["completed_at"] = datetime.now() jobs_db[job_id]["result"] = { "query": jobs_db[job_id]["request"]["query"], "topic": jobs_db[job_id]["request"]["topic"], "date_range": jobs_db[job_id]["request"]["date"], "analysis": f"Processed results for {jobs_db[job_id]['request']['query']}", "sources": ["Source A", "Source B", "Source C"], "summary": "This is a generated summary based on your query." } @app.get("/jobs") async def list_jobs(): """Debug endpoint to view all jobs""" return jobs_db