Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile, Header | |
from fastapi.responses import JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from PIL import Image | |
import io | |
import json | |
from utils import get_text, translate_text, chat_text | |
from json_flatten import flatten | |
from pydantic import BaseModel | |
from typing import Optional | |
from fastapi import FastAPI, HTTPException | |
import json | |
import os | |
from auth import isAuthorized | |
PASSCODE = os.getenv("PASSCODE") | |
TOKEN = os.getenv("TOKEN") | |
if PASSCODE == None or TOKEN == None: | |
raise Exception("Please set the PASSCODE and TOKEN environment variables.") | |
app = FastAPI( | |
title="DOCUMANTICAI API", | |
description=""" | |
This API allows you to upload an image and get a formatted response with details and image information. | |
""" | |
) | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_headers=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
) | |
async def upload_image(fields:str, model:str, file: UploadFile = File(...), authorization: Optional[str] = Header(None)): | |
""" | |
### Endpoint Description: | |
Extract form data from an uploaded image and return the extracted data in JSON format. | |
#### Request Parameters: | |
- `file`: The image file to extract data from. (Required) | |
#### Response: | |
### Notes: | |
- The image should be in a supported format (e.g., PNG, JPEG). | |
- The data extracted will vary depending on the image content. | |
""" | |
try: | |
if not isAuthorized(authorization, TOKEN): | |
return JSONResponse(content="Unauthorized", status_code=401) | |
# Load the uploaded image | |
image = Image.open(io.BytesIO(await file.read())) | |
# Example: Get image details | |
image_details = { | |
"filename": file.filename, | |
"format": image.format, | |
"size": image.size, # (width, height) | |
"mode": image.mode | |
} | |
response = get_text(image,image_details['filename'], model, fields) | |
# Step 1: Convert the escaped JSON string to a proper dictionary | |
# Step 2: Convert the response to a proper dictionary | |
response = json.loads(response) | |
# Step 3: Convert fields and values into key-value pairs | |
if 'fields' in response and 'values' in response: | |
response = dict(zip(response['fields'], response['values'])) | |
# if 'fields' in response and 'values' in response: | |
# response = {key: (None if value == "null" else value) for key, value in zip(response['fields'], response['values'])} | |
# response flattening | |
response = flatten(response) | |
# Process image (example: return metadata) | |
return JSONResponse(content={"response": response, "details": image_details}) | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}, status_code=400) | |
async def list_models(): | |
""" | |
### Endpoint Description: | |
List available models for text generation. | |
#### Response: | |
- A list of available models for text generation. | |
""" | |
models = [ | |
{ | |
"id": "gpt-4o-mini", | |
"description": "Latest model by OpenAI prefered for accuracy", | |
"title": "GPT-4.o Mini", | |
}, | |
{ | |
"id": "gpt-4o", | |
"description": "Latest model by OpenAI prefered for accuracy", | |
"title": "GPT-4.o", | |
}, | |
{ | |
"id": "deepseek-chat", | |
"description": "Latest model by Deepseek prefered for speed", | |
"title": "DeepSeek Chat", | |
}, | |
{ | |
"id": "claude-3-5-sonnet-20241022", | |
"description": "Latest model by Google for both accuracy and speed", | |
"title": "Claude 3.5 Sonnet 20241022", | |
}, | |
{ | |
"id": "llama_llm_d", | |
"description": "Latest model by Facebook for both accuracy and speed", | |
"title": "Llama LLM D", | |
}, | |
{ | |
"id": "llama_llm_o", | |
"description": "Latest model by Facebook for both accuracy and speed", | |
"title": "Llama LLM O", | |
} | |
] | |
return JSONResponse(content={"models": models}) | |
class TranslateRequest(BaseModel): | |
text: str | |
target_language: str | |
async def translate_text_endpoint(request: TranslateRequest, authorization: Optional[str] = Header(None)): | |
""" | |
### Endpoint Description: | |
Translate text to a specified language. | |
#### Request Body: | |
- `text`: The text to translate. (Required) | |
- `target_language`: The language to translate the text to. (Required) | |
#### Response: | |
- The translated text in the specified language. | |
""" | |
try: | |
if not isAuthorized(authorization, TOKEN): | |
return JSONResponse(content="Unauthorized", status_code=401) | |
response = translate_text(request.text, request.target_language) | |
response = json.loads(response) | |
return JSONResponse(content=response) | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}, status_code=400) | |
# @app.get("/translate") | |
async def translate_text_endpoint(text: str, target_language: str, authorization: Optional[str] = Header(None)): | |
""" | |
### Endpoint Description: | |
Translate text to a specified language. | |
#### Request Parameters: | |
- `text`: The text to translate. (Required) | |
- `target_language`: The language to translate the text to. (Required) | |
#### Response: | |
- The translated text in the specified language. | |
""" | |
try: | |
if not isAuthorized(authorization, TOKEN): | |
return JSONResponse(content="Unauthorized", status_code=401) | |
# print(target_language) | |
response = translate_text(text, target_language) | |
# print(response) #{"translated_text":"नमस्ते","translated_language":"नेपाली"} | |
response = json.loads(response) | |
return JSONResponse(content=response) | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}, status_code=400) | |
class ChatRequest(BaseModel): | |
query: str | |
text: str | |
history: Optional[str] = None | |
def chat_endpoint(request: ChatRequest, authorization: Optional[str] = Header(None)): | |
if not isAuthorized(authorization, TOKEN): | |
return JSONResponse(content="Unauthorized", status_code=401) | |
response_text = chat_text(request.text, request.query, request.history) | |
return {"reply": response_text} | |
# @app.get("/chat") | |
def chat_endpoint(query: str, text: str, history: Optional[str] = None): | |
response_text = chat_text(text, query, history) | |
return {"reply": response_text} | |
def get_token(passcode: str): | |
if passcode == PASSCODE: | |
return {"token": TOKEN} | |
else: | |
return JSONResponse(content="invalid passcode", status_code=401) | |