File size: 5,466 Bytes
97226b8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
from transformers import AutoTokenizer, AutoModel, AutoImageProcessor
from sentence_transformers import SentenceTransformer
import torch
import torch.nn.functional as F
from PIL import Image
import requests
import os
import json
import math
import re
import pandas as pd
import numpy as np
from omeka_s_api_client import OmekaSClient,OmekaSClientError
from typing import List, Dict, Any, Union
import io
from dotenv import load_dotenv
# env var
load_dotenv(os.path.join(os.getcwd(), ".env"))
HF_TOKEN = os.environ.get("HF_TOKEN")
# Nomic vison model
processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5")
vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True)
# Nomic text model
text_model = SentenceTransformer("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True, token=HF_TOKEN)
def image_url_to_pil(url: str, max_size=(512, 512)) -> Image:
"""
Ex usage : image_blobs = df["image_url"].apply(image_url_to_pil).tolist()
"""
response = requests.get(url, stream=True, timeout=5)
response.raise_for_status()
image = Image.open(io.BytesIO(response.content)).convert("RGB")
image.thumbnail(max_size, Image.Resampling.LANCZOS)
return image
def generate_img_embed(images_urls, batch_size=20):
"""Generate image embeddings in batches to manage memory usage.
Args:
images_urls (list): List of image URLs
batch_size (int): Number of images to process at once
"""
all_embeddings = []
for i in range(0, len(images_urls), batch_size):
batch_urls = images_urls[i:i + batch_size]
images = [image_url_to_pil(image_url) for image_url in batch_urls]
inputs = processor(images, return_tensors="pt")
img_emb = vision_model(**inputs).last_hidden_state
img_embeddings = F.normalize(img_emb[:, 0], p=2, dim=1)
all_embeddings.append(img_embeddings.detach().numpy())
return np.vstack(all_embeddings)
def generate_text_embed(sentences: List, batch_size=64):
"""Generate text embeddings in batches to manage memory usage.
Args:
sentences (List): List of text strings to encode
batch_size (int): Number of sentences to process at once
"""
all_embeddings = []
for i in range(0, len(sentences), batch_size):
batch_sentences = sentences[i:i + batch_size]
embeddings = text_model.encode(batch_sentences)
all_embeddings.append(embeddings)
return np.vstack(all_embeddings)
def add_concatenated_text_field_exclude_keys(item_dict, keys_to_exclude=None, text_field_key="text", pair_separator=" - "):
if not isinstance(item_dict, dict):
raise TypeError("Input must be a dictionary.")
if keys_to_exclude is None:
keys_to_exclude = set() # Default to empty set
else:
keys_to_exclude = set(keys_to_exclude) # Ensure it's a set for efficient lookup
# Add the target text key to the exclusion set automatically
keys_to_exclude.add(text_field_key)
formatted_pairs = []
for key, value in item_dict.items():
# 1. Skip any key in the exclusion set
if key in keys_to_exclude:
continue
# 2. Check for empty/invalid values (same logic as before)
is_empty_or_invalid = False
if value is None: is_empty_or_invalid = True
elif isinstance(value, float) and math.isnan(value): is_empty_or_invalid = True
elif isinstance(value, (str, list, tuple, dict)) and len(value) == 0: is_empty_or_invalid = True
# 3. Format and add if valid
if not is_empty_or_invalid:
formatted_pairs.append(f"{str(key)}: {str(value)}")
concatenated_text = f"search_document: {pair_separator.join(formatted_pairs)}"
item_dict[text_field_key] = concatenated_text
return item_dict
def prepare_df_atlas(df: pd.DataFrame, id_col='id', images_col='images_urls'):
# Drop completely empty columns
#df = df.dropna(axis=1, how='all')
# Fill remaining nulls with empty strings
#df = df.fillna('')
# Ensure ID column exists
if id_col not in df.columns:
df[id_col] = [f'{i}' for i in range(len(df))]
# Ensure indexed field exists and is not empty
#if indexed_col not in df.columns:
# df[indexed_col] = ''
#df[images_col] = df[images_col].apply(lambda x: [x[0]] if isinstance(x, list) and len(x) > 1 else x if isinstance(x, list) else [x])
df[images_col] = df[images_col].apply(lambda x: x[0] if isinstance(x, list) else x)
# Optional: force all to string (can help with weird dtypes)
for col in df.columns:
df[col] = df[col].astype(str)
return df
def remove_key_value_from_dict(list_of_dict, key_to_remove):
new_list = []
for dictionary in list_of_dict:
new_dict = dictionary.copy() # Create a copy to avoid modifying the original list
if key_to_remove in new_dict:
del new_dict[key_to_remove]
new_list.append(new_dict)
return new_list
def remove_key_value_from_dict(input_dict, key_to_remove='text'):
if not isinstance(input_dict, dict):
raise TypeError("Input must be a dictionary.")
if key_to_remove in input_dict:
del input_dict[key_to_remove]
return input_dict |