|
import logging |
|
from fastapi import FastAPI, HTTPException, BackgroundTasks |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel |
|
from typing import Dict, Optional, List, Any |
|
import uuid |
|
from datetime import datetime |
|
from contextlib import asynccontextmanager |
|
|
|
from models.embedding import EmbeddingModel |
|
from models.summarization import SummarizationModel |
|
from models.nlp import NLPModel |
|
from database.query import DatabaseService |
|
from database.query_processor import QueryProcessor |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
|
handlers=[logging.StreamHandler()] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
embedding_model = None |
|
summarization_model = None |
|
nlp_model = None |
|
db_service = None |
|
|
|
@asynccontextmanager |
|
async def lifespan(app: FastAPI): |
|
global embedding_model, summarization_model, nlp_model, db_service |
|
|
|
|
|
logger.info("Initializing models...") |
|
try: |
|
embedding_model = EmbeddingModel() |
|
summarization_model = SummarizationModel() |
|
nlp_model = NLPModel() |
|
db_service = DatabaseService() |
|
logger.info("All models initialized successfully") |
|
except Exception as e: |
|
logger.error(f"Model initialization failed: {str(e)}") |
|
raise |
|
|
|
yield |
|
|
|
|
|
logger.info("Shutting down application...") |
|
if db_service: |
|
try: |
|
await db_service.close() |
|
logger.info("Database connection closed successfully") |
|
except Exception as e: |
|
logger.error(f"Error closing database connection: {str(e)}") |
|
|
|
app = FastAPI( |
|
title="Kairos News API", |
|
version="1.0", |
|
lifespan=lifespan |
|
) |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
jobs_db: Dict[str, Dict] = {} |
|
|
|
class PostRequest(BaseModel): |
|
query: str |
|
topic: Optional[str] = None |
|
start_date: Optional[str] = None |
|
end_date: Optional[str] = None |
|
|
|
class JobStatus(BaseModel): |
|
id: str |
|
status: str |
|
created_at: datetime |
|
completed_at: Optional[datetime] = None |
|
request: PostRequest |
|
result: Optional[Dict[str, Any]] = None |
|
|
|
@app.post("/index", response_model=JobStatus) |
|
async def create_job(request: PostRequest, background_tasks: BackgroundTasks): |
|
job_id = str(uuid.uuid4()) |
|
logger.info(f"Creating new job {job_id} with request: {request.dict()}") |
|
|
|
jobs_db[job_id] = { |
|
"id": job_id, |
|
"status": "processing", |
|
"created_at": datetime.now(), |
|
"completed_at": None, |
|
"request": request.dict(), |
|
"result": None |
|
} |
|
|
|
background_tasks.add_task( |
|
process_job, |
|
job_id, |
|
request, |
|
embedding_model, |
|
summarization_model, |
|
nlp_model, |
|
db_service |
|
) |
|
|
|
logger.info(f"Job {job_id} created and processing started") |
|
return jobs_db[job_id] |
|
|
|
@app.get("/loading", response_model=JobStatus) |
|
async def get_job_status(id: str): |
|
logger.info(f"Checking status for job {id}") |
|
if id not in jobs_db: |
|
logger.warning(f"Job {id} not found") |
|
raise HTTPException(status_code=404, detail="Job not found") |
|
|
|
logger.info(f"Returning status for job {id}: {jobs_db[id]['status']}") |
|
return jobs_db[id] |
|
|
|
async def process_job( |
|
job_id: str, |
|
request: PostRequest, |
|
embedding_model: EmbeddingModel, |
|
summarization_model: SummarizationModel, |
|
nlp_model: NLPModel, |
|
db_service: DatabaseService |
|
): |
|
try: |
|
logger.info(f"Starting processing for job {job_id}") |
|
|
|
processor = QueryProcessor( |
|
embedding_model=embedding_model, |
|
summarization_model=summarization_model, |
|
nlp_model=nlp_model, |
|
db_service=db_service |
|
) |
|
|
|
logger.debug(f"Processing query: {request.query}") |
|
result = await processor.process( |
|
query=request.query, |
|
topic=request.topic, |
|
start_date=request.start_date, |
|
end_date=request.end_date |
|
) |
|
|
|
jobs_db[job_id].update({ |
|
"status": "completed", |
|
"completed_at": datetime.now(), |
|
"result": result if result else {"message": "No results found"} |
|
}) |
|
logger.info(f"Job {job_id} completed successfully") |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing job {job_id}: {str(e)}", exc_info=True) |
|
jobs_db[job_id].update({ |
|
"status": "failed", |
|
"completed_at": datetime.now(), |
|
"result": {"error": str(e)} |
|
}) |
|
logger.info(f"Job {job_id} marked as failed") |