anubhav77's picture
v0.1.3
d462b73
raw
history blame
4.98 kB
from datetime import datetime
import logging,os
import fastapi
from fastapi import Body, Depends
import uvicorn
from fastapi import BackgroundTasks,HTTPException , status
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI as Response
from sse_starlette.sse import EventSourceResponse
from starlette.responses import StreamingResponse
from starlette.requests import Request
from pydantic import BaseModel, Extra
from enum import Enum
from typing import List, Dict, Any, Generator, Optional, cast, Callable
from chromaIntf import ChromaIntf
import baseInfra.dropbox_handler as dbh
import traceback
from pathlib import Path
logger=logging.getLogger("root")
chromaIntf=ChromaIntf()
class PathRequest(BaseModel):
dir: str = "/"
class MetaD(BaseModel):
timestamp: Optional[str]= datetime.now().isoformat()
class Config:
allow_population_by_field_name = True
extra = Extra.allow
class DocWithMeta(BaseModel):
text: str = ""
metadata: Optional[MetaD] = MetaD()
async def catch_exceptions_middleware(
request: Request, call_next: Callable[[Request], Any]
) -> Response:
try:
#print("In exception cater middleware")
#print(request.headers)
#print(await request.body())
return await call_next(request)
except Exception as e:
print(repr(e))
print("from the catch exception")
traceback.print_exc()
return JSONResponse(content={"error": repr(e)}, status_code=500)
app = fastapi.FastAPI(title="Maya Persistence")
app.middleware("http")(catch_exceptions_middleware)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
api_base="/api/v1"
@app.post(api_base+"/getMatchingDocs")
async def get_matching_docs(inStr: str, kwargs: Dict [Any, Any] ,background_tasks:BackgroundTasks) -> Any:
"""
Gets the query embeddings and uses metadata appropriately and gets the matching docs for query
TODO: Add parameter for type of query and number of docs to return
TODO: Add parameter to return the source information as well
"""
logger.info("================================================")
logger.info("Received new request at /getMatchingDocs:"+inStr)
#retVal=await chromaIntf.getRelevantDocs(inStr,kwargs)
background_tasks.add_task(chromaIntf.getRelevantDocs,inStr,kwargs)
retVal="added to queue"
logger.info("Returning :"+retVal)
logger.info("================================================")
return retVal
@app.post(api_base+"/getMatchingDocsSync")
async def get_matching_docs_sync(inStr: str, kwargs: Dict [Any, Any]) -> Any:
"""
Gets the query embeddings and uses metadata appropriately and gets the matching docs for query
This is synchronous version
TODO: Add parameter for type of query and number of docs to return
TODO: Add parameter to return the source information as well
"""
logger.info("================================================")
logger.info("Received new request at /getMatchingDocsSync:"+inStr)
retVal=await chromaIntf.getRelevantDocs(inStr,kwargs)
#background_tasks.add_task(chromaIntf.getRelevantDocs,inStr,kwargs)
#retVal="added to queue"
logger.info(f"Returning: {len(retVal)} results")
logger.info("================================================")
return retVal
@app.post(api_base+"/addTextDocument")
async def add_text_document(inDoc: DocWithMeta ) -> Any:
"""
Add text and metadata to the db
"""
logger.info("================================================")
logger.info("Received new request at /addTextDocuments:"+inDoc.text)
retVal= await chromaIntf.addText(inDoc.text,inDoc.metadata)
logger.info(f"Returning: {len(retVal)} results")
logger.info("================================================")
return retVal
@app.get(api_base+"/persist")
async def persist_db():
return await chromaIntf.persist()
@app.get(api_base+"/reset")
async def reset_db():
return await dbh.restoreFolder("db")
@app.post(api_base+"/walk")
def walk(path: PathRequest):
print("Received walk request for",path)
dirs=[]
fileList=[]
try:
for root, items, files in os.walk(path.dir,topdown=True):
for item in items:
dirs.append(item)
for filea in files:
fileList.append(filea)
except Exception:
print("got exception",Exception)
print("dirs ",dirs)
print("files ",fileList)
response= JSONResponse(content= {"dirs":dirs,"files":fileList})
return response
@app.get(api_base+"/list")
async def list_docs():
return await chromaIntf.listDocs()
print(__name__)
if __name__ == '__main__' or __name__ == "src.main":
uvicorn.run(f"{Path(__file__).stem}:app", host="0.0.0.0", port=8000,workers=1)
#uvicorn.run(app, host="0.0.0.0", port=8000)