gavinzli's picture
Refactor vectorization process by removing openai_vectorize calls and updating vectorizer initialization
5fea365
raw
history blame
5.49 kB
"""Module to upsert data into AstraDB"""
import os
import logging
# import uuid
import time
import tiktoken
import pandas as pd
from langchain_astradb import AstraDBVectorStore
from langchain_openai import AzureOpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DataFrameLoader
from astrapy.info import CollectionVectorServiceOptions
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
ASTRA_DB_APPLICATION_TOKEN = os.environ['ASTRA_DB_APPLICATION_TOKEN']
ASTRA_DB_API_ENDPOINT = os.environ['ASTRA_DB_API_ENDPOINT']
embedding = AzureOpenAIEmbeddings(
api_version="2024-07-01-preview",
azure_endpoint="https://openai-oe.openai.azure.com/")
# vstore = AstraDBVectorStore(embedding=embedding,
# namespace="default_keyspace",
# collection_name="FinFast_China",
# token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
# api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"])
vstore = AstraDBVectorStore(
collection_vector_service_options=CollectionVectorServiceOptions(
provider="azureOpenAI",
model_name="text-embedding-3-small",
authentication={
"providerKey": "AZURE_OPENAI_API_KEY",
},
parameters={
"resourceName": "openai-oe",
"deploymentId": "text-embedding-3-small",
},
),
namespace="default_keyspace",
collection_name="article",
token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"])
def token_length(text):
"""
Calculates length of encoded text using the tokenizer for the "text-embedding-3-small" model.
Args:
text (str): The input text to be tokenized and measured.
Returns:
int: The length of the encoded text.
"""
tokenizer = tiktoken.encoding_for_model("text-embedding-3-small")
return len(tokenizer.encode(text))
def add_documents_with_retry(chunks, ids, max_retries=3):
"""
Attempts to add documents to the vstore with a specified number of retries.
Parameters:
chunks (list): The list of document chunks to be added.
ids (list): The list of document IDs corresponding to the chunks.
max_retries (int, optional): The maximum number of retry attempts. Default is 3.
Raises:
Exception: If the operation fails after the maximum number of retries, the exception is logged.
"""
for attempt in range(max_retries):
try:
vstore.add_documents(chunks, ids=ids)
except (ConnectionError, TimeoutError) as e:
logging.info("Attempt %d failed: %s", attempt + 1, e)
if attempt < max_retries - 1:
time.sleep(0.5)
else:
logging.error("Max retries reached. Operation failed.")
logging.error(ids)
print(ids)
def vectorize(article):
"""
Process the given article.
Parameters:
article (DataFrame): The article to be processed.
Returns:
None
"""
article['id'] = str(article['id'])
if isinstance(article, dict):
article = [article] # Convert single dictionary to list of dictionaries
df = pd.DataFrame(article)
df = df[['id', 'publishDate', 'author', 'category',
'content', 'referenceid', 'site', 'title', 'link']]
df['publishDate'] = pd.to_datetime(df['publishDate'])
documents = DataFrameLoader(df, page_content_column="content").load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=token_length,
is_separator_regex=False,
separators=["\n\n", "\n", "\t", ".", "?"] # Logical separators
)
chunks = text_splitter.split_documents(documents)
ids = []
for index, chunk in enumerate(chunks):
_id = f"{chunk.metadata['id']}-{str(index)}"
ids.append(_id)
try:
add_documents_with_retry(chunks, ids)
except (ConnectionError, TimeoutError, ValueError) as e:
logging.error("Failed to add documents: %s", e)
# def vectorize(article):
# """
# Process the given article.
# Parameters:
# article (DataFrame): The article to be processed.
# Returns:
# None
# """
# article['id'] = str(article['id'])
# if isinstance(article, dict):
# article = [article] # Convert single dictionary to list of dictionaries
# df = pd.DataFrame(article)
# df = df[['id','site','title','titleCN','category','author','content',
# 'publishDate','link']]
# df['publishDate'] = pd.to_datetime(df['publishDate'])
# loader = DataFrameLoader(df, page_content_column="content")
# documents = loader.load()
# text_splitter = RecursiveCharacterTextSplitter(
# chunk_size=800,
# chunk_overlap=20,
# length_function=len,
# is_separator_regex=False,
# )
# chunks = text_splitter.split_documents(documents)
# ids = []
# for chunk in chunks:
# _id = f"{chunk.metadata['id']}-{str(uuid.uuid5(uuid.NAMESPACE_OID,chunk.page_content))}"
# ids.append(_id)
# inserted_ids = vstore.add_documents(chunks, ids=ids)
# print(inserted_ids)
# logging.info(inserted_ids)