Spaces:
Running
Running
File size: 5,968 Bytes
96e47ba f94d355 96e47ba f94d355 96e47ba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
import os
import random
import string
import json
import gzip
import chromadb
from ibm_watsonx_ai.client import APIClient
from ibm_watsonx_ai.foundation_models import ModelInference, Rerank
from ibm_watsonx_ai.foundation_models.embeddings.sentence_transformer_embeddings import SentenceTransformerEmbeddings
VECTOR_DB = "c8af7dfa-bcad-46e5-b69d-cd85ce9315d1"
def get_credentials():
"""
Obtain credentials for Watsonx.ai from environment.
"""
return {
"url": "https://us-south.ml.cloud.ibm.com",
"apikey": os.getenv("IBM_API_KEY")
}
def rerank(client, documents, query, top_n):
"""
Rerank a list of documents given a query using the Rerank model.
Returns the documents in a new order (highest relevance first).
"""
reranker = Rerank(
model_id="cross-encoder/ms-marco-minilm-l-12-v2",
api_client=client,
params={
"return_options": {
"top_n": top_n
},
"truncate_input_tokens": 512
}
)
reranked_results = reranker.generate(query=query, inputs=documents)["results"]
# Build the new list of documents
new_documents = []
for result in reranked_results:
result_index = result["index"]
new_documents.append(documents[result_index])
return new_documents
def RAGinit():
"""
Initialize:
- Watsonx.ai Client
- Foundation Model
- Embeddings
- ChromaDB Collection
- Vector index properties
- Top N for query
Returns all objects/values needed by RAG_proximity_search.
"""
# Project/Space from environment
project_id = os.getenv("IBM_PROJECT_ID")
space_id = os.getenv("IBM_SPACE_ID")
# Watsonx.ai client
wml_credentials = get_credentials()
client = APIClient(credentials=wml_credentials, project_id=project_id)
# Model Inference
model_inference_params = {
"decoding_method": "greedy",
"max_new_tokens": 900,
"min_new_tokens": 0,
"repetition_penalty": 1
}
model = ModelInference(
model_id="ibm/granite-3-8b-instruct",
params=model_inference_params,
credentials=get_credentials(),
project_id=project_id,
space_id=space_id
)
# Vector index details
vector_index_id =VECTOR_DB
vector_index_details = client.data_assets.get_details(vector_index_id)
vector_index_properties = vector_index_details["entity"]["vector_index"]
# Decide how many results to return
top_n = 20 if vector_index_properties["settings"].get("rerank") \
else int(vector_index_properties["settings"]["top_k"])
# Embedding model
emb = SentenceTransformerEmbeddings('sentence-transformers/all-MiniLM-L6-v2')
# Hydrate ChromaDB with embeddings from the vector index
chroma_collection = _hydrate_chromadb(client, vector_index_id)
return client, model, emb, chroma_collection, vector_index_properties, top_n
def _hydrate_chromadb(client, vector_index_id):
"""
Helper function to retrieve the stored embedding data from Watsonx.ai,
then create (or reset) and populate a ChromaDB collection.
"""
data = client.data_assets.get_content(vector_index_id)
content = gzip.decompress(data)
stringified_vectors = content.decode("utf-8")
vectors = json.loads(stringified_vectors)
# Use a Persistent ChromaDB client (on-disk)
chroma_client = chromadb.PersistentClient(path="./chroma_db")
# Create or clear the collection
collection_name = "my_collection"
try:
chroma_client.delete_collection(name=collection_name)
except:
print("Collection didn't exist - nothing to do.")
collection = chroma_client.create_collection(name=collection_name)
# Prepare data for insertion
vector_embeddings = []
vector_documents = []
vector_metadatas = []
vector_ids = []
for vector in vectors:
embedding = vector["embedding"]
content = vector["content"]
metadata = vector["metadata"]
lines = metadata["loc"]["lines"]
vector_embeddings.append(embedding)
vector_documents.append(content)
clean_metadata = {
"asset_id": metadata["asset_id"],
"asset_name": metadata["asset_name"],
"url": metadata["url"],
"from": lines["from"],
"to": lines["to"]
}
vector_metadatas.append(clean_metadata)
# Generate unique ID
asset_id = metadata["asset_id"]
random_string = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
doc_id = f"{asset_id}:{lines['from']}-{lines['to']}-{random_string}"
vector_ids.append(doc_id)
# Add all data to the collection
collection.add(
embeddings=vector_embeddings,
documents=vector_documents,
metadatas=vector_metadatas,
ids=vector_ids
)
return collection
def RAG_proximity_search(question, client, model, emb, chroma_collection, vector_index_properties, top_n):
"""
Execute a proximity search in the ChromaDB collection for the given question.
Optionally rerank results if specified in the vector index properties.
Returns a concatenated string of best matching documents.
"""
# Embed query
query_vectors = emb.embed_query(question)
# Query top_n results from ChromaDB
query_result = chroma_collection.query(
query_embeddings=query_vectors,
n_results=top_n,
include=["documents", "metadatas", "distances"]
)
# Documents come back in ascending distance, so best match is index=0
documents = query_result["documents"][0]
# If rerank is enabled, reorder the documents
if vector_index_properties["settings"].get("rerank"):
documents = rerank(client, documents, question, vector_index_properties["settings"]["top_k"])
# Return them as a single string
return "\n".join(documents)
|