Visual Document Retrieval
ColPali
Safetensors
English
vidore-experimental
vidore
colqwen2-v1.0 / handler.py
manu's picture
Update handler.py
530094e verified
import base64
import io
from PIL import Image
from typing import Dict, List, Any
from transformers.utils.import_utils import is_flash_attn_2_available
from colpali_engine.models import ColQwen2, ColQwen2Processor
import torch
class EndpointHandler():
def __init__(self, path=""):
self.model = ColQwen2.from_pretrained(
path,
torch_dtype=torch.bfloat16,
device_map="cuda:0", # or "mps" if on Apple Silicon
attn_implementation="flash_attention_2" if is_flash_attn_2_available() else None,
).eval()
self.processor = ColQwen2Processor.from_pretrained(path) #, max_num_visual_tokens=8192) # temporary
# self.model = torch.compile(self.model)
print(f"Model and processor loaded {'with' if is_flash_attn_2_available() else 'without'} FA2")
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Expects data in one of the following formats in the "inputs" key:
{
"images": [
"base64_encoded_image1",
"base64_encoded_image2",
...
]
}
xor
{
"queries": [
"text1",
"text2",
...
]
}
Returns embeddings for the provided input type.
"""
# Input validation
data = data.get("inputs", [])
input_keys = [key for key in ["images", "queries"] if key in data]
if len(input_keys) != 1:
return {"error": "Exactly one of 'images', 'queries' must be provided"}
input_type = input_keys[0]
inputs = data[input_type]
if input_type == "images":
if not isinstance(inputs, list):
inputs = [inputs]
if len(inputs) > 8:
return {"message": "Send a maximum of 8 images at once. We recommend sending one by one to improve load balancing."}
# Decode each image from base64 and convert to a PIL Image
decoded_images = []
for img_str in inputs:
try:
img_data = base64.b64decode(img_str)
image = Image.open(io.BytesIO(img_data)).convert("RGB")
decoded_images.append(image)
except Exception as e:
return {"error": f"Error decoding image: {str(e)}"}
# Process the images using the processor
batch = self.processor.process_images(decoded_images).to(self.model.device)
# elif input_type == "processed_images":
# try:
# buffer = io.BytesIO(base64.b64decode(inputs))
# batch = torch.load(buffer, map_location=self.model.device)
# except Exception as e:
# return {"error": f"Error processing preprocessed images: {str(e)}"}
else: # text
if not isinstance(inputs, list):
inputs = [inputs]
try:
batch = self.processor.process_queries(inputs).to(self.model.device)
except Exception as e:
return {"error": f"Error processing text: {str(e)}"}
# Forward pass through the model
with torch.inference_mode():
embeddings = self.model(**batch).tolist()
return {"embeddings": embeddings}