Spaces:
Runtime error
Runtime error
from datetime import datetime | |
import logging,os | |
import fastapi | |
from fastapi import Body, Depends | |
import uvicorn | |
from fastapi import BackgroundTasks,HTTPException , status | |
from fastapi.responses import JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi import FastAPI as Response | |
from sse_starlette.sse import EventSourceResponse | |
from starlette.responses import StreamingResponse | |
from starlette.requests import Request | |
from pydantic import BaseModel, Extra | |
from enum import Enum | |
from typing import List, Dict, Any, Generator, Optional, cast, Callable | |
from chromaIntf import ChromaIntf | |
import baseInfra.dropbox_handler as dbh | |
import traceback | |
from pathlib import Path | |
logger=logging.getLogger("root") | |
chromaIntf=ChromaIntf() | |
class PathRequest(BaseModel): | |
dir: str = "/" | |
class MetaD(BaseModel): | |
timestamp: Optional[str]= datetime.now().isoformat() | |
class Config: | |
allow_population_by_field_name = True | |
extra = Extra.allow | |
class DocWithMeta(BaseModel): | |
text: str = "" | |
metadata: Optional[MetaD] = MetaD() | |
async def catch_exceptions_middleware( | |
request: Request, call_next: Callable[[Request], Any] | |
) -> Response: | |
try: | |
#print("In exception cater middleware") | |
#print(request.headers) | |
#print(await request.body()) | |
return await call_next(request) | |
except Exception as e: | |
print(repr(e)) | |
print("from the catch exception") | |
traceback.print_exc() | |
return JSONResponse(content={"error": repr(e)}, status_code=500) | |
app = fastapi.FastAPI(title="Maya Persistence") | |
app.middleware("http")(catch_exceptions_middleware) | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
api_base="/api/v1" | |
async def get_matching_docs(inStr: str, kwargs: Dict [Any, Any] ,background_tasks:BackgroundTasks) -> Any: | |
""" | |
Gets the query embeddings and uses metadata appropriately and gets the matching docs for query | |
TODO: Add parameter for type of query and number of docs to return | |
TODO: Add parameter to return the source information as well | |
""" | |
logger.info("================================================") | |
logger.info("Received new request at /getMatchingDocs:"+inStr) | |
#retVal=await chromaIntf.getRelevantDocs(inStr,kwargs) | |
background_tasks.add_task(chromaIntf.getRelevantDocs,inStr,kwargs) | |
retVal="added to queue" | |
logger.info("Returning :"+retVal) | |
logger.info("================================================") | |
return retVal | |
async def get_matching_docs_sync(inStr: str, kwargs: Dict [Any, Any]) -> Any: | |
""" | |
Gets the query embeddings and uses metadata appropriately and gets the matching docs for query | |
This is synchronous version | |
TODO: Add parameter for type of query and number of docs to return | |
TODO: Add parameter to return the source information as well | |
""" | |
logger.info("================================================") | |
logger.info("Received new request at /getMatchingDocsSync:"+inStr) | |
retVal=await chromaIntf.getRelevantDocs(inStr,kwargs) | |
#background_tasks.add_task(chromaIntf.getRelevantDocs,inStr,kwargs) | |
#retVal="added to queue" | |
logger.info(f"Returning: {len(retVal)} results") | |
logger.info("================================================") | |
return retVal | |
async def add_text_document(inDoc: DocWithMeta ) -> Any: | |
""" | |
Add text and metadata to the db | |
""" | |
logger.info("================================================") | |
logger.info("Received new request at /addTextDocuments:"+inDoc.text) | |
retVal= await chromaIntf.addText(inDoc.text,inDoc.metadata) | |
logger.info(f"Returning: {len(retVal)} results") | |
logger.info("================================================") | |
return retVal | |
async def persist_db(): | |
return await chromaIntf.persist() | |
async def reset_db(): | |
return await dbh.restoreFolder("db") | |
def walk(path: PathRequest): | |
print("Received walk request for",path) | |
dirs=[] | |
fileList=[] | |
try: | |
for root, items, files in os.walk(path.dir,topdown=True): | |
for item in items: | |
dirs.append(item) | |
for filea in files: | |
fileList.append(filea) | |
except Exception: | |
print("got exception",Exception) | |
print("dirs ",dirs) | |
print("files ",fileList) | |
response= JSONResponse(content= {"dirs":dirs,"files":fileList}) | |
return response | |
async def list_docs(): | |
return await chromaIntf.listDocs() | |
print(__name__) | |
if __name__ == '__main__' or __name__ == "src.main": | |
uvicorn.run(f"{Path(__file__).stem}:app", host="0.0.0.0", port=8000,workers=1) | |
#uvicorn.run(app, host="0.0.0.0", port=8000) |