|
import base64 |
|
import io |
|
from PIL import Image |
|
from typing import Dict, List, Any |
|
from transformers.utils.import_utils import is_flash_attn_2_available |
|
from colpali_engine.models import ColQwen2, ColQwen2Processor |
|
import torch |
|
|
|
class EndpointHandler(): |
|
def __init__(self, path=""): |
|
self.model = ColQwen2.from_pretrained( |
|
path, |
|
torch_dtype=torch.bfloat16, |
|
device_map="cuda:0", |
|
attn_implementation="flash_attention_2" if is_flash_attn_2_available() else None, |
|
).eval() |
|
self.processor = ColQwen2Processor.from_pretrained(path) |
|
|
|
print(f"Model and processor loaded {'with' if is_flash_attn_2_available() else 'without'} FA2") |
|
|
|
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Expects data in one of the following formats in the "inputs" key: |
|
{ |
|
"images": [ |
|
"base64_encoded_image1", |
|
"base64_encoded_image2", |
|
... |
|
] |
|
} |
|
xor |
|
{ |
|
"queries": [ |
|
"text1", |
|
"text2", |
|
... |
|
] |
|
} |
|
|
|
Returns embeddings for the provided input type. |
|
""" |
|
|
|
data = data.get("inputs", []) |
|
input_keys = [key for key in ["images", "queries"] if key in data] |
|
if len(input_keys) != 1: |
|
return {"error": "Exactly one of 'images', 'queries' must be provided"} |
|
|
|
input_type = input_keys[0] |
|
inputs = data[input_type] |
|
|
|
|
|
if input_type == "images": |
|
if not isinstance(inputs, list): |
|
inputs = [inputs] |
|
|
|
if len(inputs) > 8: |
|
return {"message": "Send a maximum of 8 images at once. We recommend sending one by one to improve load balancing."} |
|
|
|
|
|
decoded_images = [] |
|
for img_str in inputs: |
|
try: |
|
img_data = base64.b64decode(img_str) |
|
image = Image.open(io.BytesIO(img_data)).convert("RGB") |
|
decoded_images.append(image) |
|
except Exception as e: |
|
return {"error": f"Error decoding image: {str(e)}"} |
|
|
|
|
|
batch = self.processor.process_images(decoded_images).to(self.model.device) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
if not isinstance(inputs, list): |
|
inputs = [inputs] |
|
try: |
|
batch = self.processor.process_queries(inputs).to(self.model.device) |
|
except Exception as e: |
|
return {"error": f"Error processing text: {str(e)}"} |
|
|
|
|
|
with torch.inference_mode(): |
|
embeddings = self.model(**batch).tolist() |
|
|
|
return {"embeddings": embeddings} |
|
|