gavinzli's picture
Refactor content update process to ensure reference ID is set to None and re-enable vectorization functions in article processing
b68d569
raw
history blame
5.42 kB
"""Module to upsert data into AstraDB"""
import os
import logging
import uuid
import time
import tiktoken
import pandas as pd
from langchain_astradb import AstraDBVectorStore
from langchain_openai import AzureOpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DataFrameLoader
from astrapy.info import CollectionVectorServiceOptions
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
ASTRA_DB_APPLICATION_TOKEN = os.environ['ASTRA_DB_APPLICATION_TOKEN']
ASTRA_DB_API_ENDPOINT = os.environ['ASTRA_DB_API_ENDPOINT']
embedding = AzureOpenAIEmbeddings(
api_version="2024-07-01-preview",
azure_endpoint="https://openai-oe.openai.azure.com/")
vstore = AstraDBVectorStore(embedding=embedding,
namespace="default_keyspace",
collection_name="FinFast_China",
token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"])
openai_vstore = AstraDBVectorStore(
collection_vector_service_options=CollectionVectorServiceOptions(
provider="azureOpenAI",
model_name="text-embedding-3-small",
authentication={
"providerKey": "AZURE_OPENAI_API_KEY",
},
parameters={
"resourceName": "openai-oe",
"deploymentId": "text-embedding-3-small",
},
),
namespace="default_keyspace",
collection_name="text_embedding_3_small",
token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"])
def token_length(text):
"""
Calculates length of encoded text using the tokenizer for the "text-embedding-3-small" model.
Args:
text (str): The input text to be tokenized and measured.
Returns:
int: The length of the encoded text.
"""
tokenizer = tiktoken.encoding_for_model("text-embedding-3-small")
return len(tokenizer.encode(text))
def add_documents_with_retry(chunks, ids, max_retries=3):
"""
Attempts to add documents to the vstore with a specified number of retries.
Parameters:
chunks (list): The list of document chunks to be added.
ids (list): The list of document IDs corresponding to the chunks.
max_retries (int, optional): The maximum number of retry attempts. Default is 3.
Raises:
Exception: If the operation fails after the maximum number of retries, the exception is logged.
"""
for attempt in range(max_retries):
try:
openai_vstore.add_documents(chunks, ids=ids)
except (ConnectionError, TimeoutError) as e:
logging.info("Attempt %d failed: %s", attempt + 1, e)
if attempt < max_retries - 1:
time.sleep(0.5)
else:
logging.error("Max retries reached. Operation failed.")
logging.error(ids)
def openai_vectorize(article):
"""
Process the given article.
Parameters:
article (DataFrame): The article to be processed.
Returns:
None
"""
article['id'] = str(article['id'])
if isinstance(article, dict):
article = [article] # Convert single dictionary to list of dictionaries
df = pd.DataFrame(article)
df = df[['id', 'publishDate', 'author', 'category',
'content', 'referenceid', 'site', 'title', 'link']]
df['publishDate'] = pd.to_datetime(df['publishDate'])
documents = DataFrameLoader(df, page_content_column="content").load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=token_length,
is_separator_regex=False,
separators=["\n\n", "\n", "\t", ".", "?"] # Logical separators
)
chunks = text_splitter.split_documents(documents)
ids = []
for index, chunk in enumerate(chunks):
_id = f"{chunk.metadata['id']}-{str(index)}"
ids.append(_id)
try:
add_documents_with_retry(chunks, ids)
except (ConnectionError, TimeoutError, ValueError) as e:
logging.error("Failed to add documents: %s", e)
def vectorize(article):
"""
Process the given article.
Parameters:
article (DataFrame): The article to be processed.
Returns:
None
"""
article['id'] = str(article['id'])
if isinstance(article, dict):
article = [article] # Convert single dictionary to list of dictionaries
df = pd.DataFrame(article)
df = df[['id','site','title','titleCN','category','author','content',
'publishDate','link']]
df['publishDate'] = pd.to_datetime(df['publishDate'])
loader = DataFrameLoader(df, page_content_column="content")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=20,
length_function=len,
is_separator_regex=False,
)
chunks = text_splitter.split_documents(documents)
ids = []
for chunk in chunks:
_id = f"{chunk.metadata['id']}-{str(uuid.uuid5(uuid.NAMESPACE_OID,chunk.page_content))}"
ids.append(_id)
inserted_ids = vstore.add_documents(chunks, ids=ids)
print(inserted_ids)
logging.info(inserted_ids)