from datetime import datetime import logging,os import fastapi from fastapi import Body, Depends import uvicorn from fastapi import BackgroundTasks,HTTPException , status from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi import FastAPI as Response from sse_starlette.sse import EventSourceResponse from starlette.responses import StreamingResponse from starlette.requests import Request from pydantic import BaseModel, Extra from enum import Enum from typing import List, Dict, Any, Generator, Optional, cast, Callable from chromaIntf import ChromaIntf import baseInfra.dropbox_handler as dbh import traceback from pathlib import Path logger=logging.getLogger("root") chromaIntf=ChromaIntf() class PathRequest(BaseModel): dir: str = "/" class MetaD(BaseModel): timestamp: Optional[str]= datetime.now().isoformat() class Config: allow_population_by_field_name = True extra = Extra.allow class DocWithMeta(BaseModel): text: str = "" metadata: Optional[MetaD] = MetaD() async def catch_exceptions_middleware( request: Request, call_next: Callable[[Request], Any] ) -> Response: try: #print("In exception cater middleware") #print(request.headers) #print(await request.body()) return await call_next(request) except Exception as e: print(repr(e)) print("from the catch exception") traceback.print_exc() return JSONResponse(content={"error": repr(e)}, status_code=500) app = fastapi.FastAPI(title="Maya Persistence") app.middleware("http")(catch_exceptions_middleware) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) api_base="/api/v1" @app.post(api_base+"/getMatchingDocs") async def get_matching_docs(inStr: str, kwargs: Dict [Any, Any] ,background_tasks:BackgroundTasks) -> Any: """ Gets the query embeddings and uses metadata appropriately and gets the matching docs for query TODO: Add parameter for type of query and number of docs to return TODO: Add parameter to return the source information as well """ logger.info("================================================") logger.info("Received new request at /getMatchingDocs:"+inStr) #retVal=await chromaIntf.getRelevantDocs(inStr,kwargs) background_tasks.add_task(chromaIntf.getRelevantDocs,inStr,kwargs) retVal="added to queue" logger.info("Returning :"+retVal) logger.info("================================================") return retVal @app.post(api_base+"/getMatchingDocsSync") async def get_matching_docs_sync(inStr: str, kwargs: Dict [Any, Any]) -> Any: """ Gets the query embeddings and uses metadata appropriately and gets the matching docs for query This is synchronous version TODO: Add parameter for type of query and number of docs to return TODO: Add parameter to return the source information as well """ logger.info("================================================") logger.info("Received new request at /getMatchingDocsSync:"+inStr) retVal=await chromaIntf.getRelevantDocs(inStr,kwargs) #background_tasks.add_task(chromaIntf.getRelevantDocs,inStr,kwargs) #retVal="added to queue" logger.info(f"Returning: {len(retVal)} results") logger.info("================================================") return retVal @app.post(api_base+"/addTextDocument") async def add_text_document(inDoc: DocWithMeta ) -> Any: """ Add text and metadata to the db """ logger.info("================================================") logger.info("Received new request at /addTextDocuments:"+inDoc.text) retVal= await chromaIntf.addText(inDoc.text,inDoc.metadata) logger.info(f"Returning: {len(retVal)} results") logger.info("================================================") return retVal @app.get(api_base+"/persist") async def persist_db(): return await chromaIntf.persist() @app.get(api_base+"/reset") async def reset_db(): return await dbh.restoreFolder("db") @app.post(api_base+"/walk") def walk(path: PathRequest): print("Received walk request for",path) dirs=[] fileList=[] try: for root, items, files in os.walk(path.dir,topdown=True): for item in items: dirs.append(item) for filea in files: fileList.append(filea) except Exception: print("got exception",Exception) print("dirs ",dirs) print("files ",fileList) response= JSONResponse(content= {"dirs":dirs,"files":fileList}) return response @app.get(api_base+"/list") async def list_docs(): return await chromaIntf.listDocs() print(__name__) if __name__ == '__main__' or __name__ == "src.main": uvicorn.run(f"{Path(__file__).stem}:app", host="0.0.0.0", port=8000,workers=1) #uvicorn.run(app, host="0.0.0.0", port=8000)