"""Module to upsert data into AstraDB""" import os import logging import time import tiktoken import pandas as pd from langchain_astradb import AstraDBVectorStore from langchain_openai import AzureOpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import DataFrameLoader from astrapy.info import CollectionVectorServiceOptions logging.basicConfig( format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', datefmt="%Y-%m-%d %H:%M:%S", level=logging.ERROR) ASTRA_DB_APPLICATION_TOKEN = os.environ['ASTRA_DB_APPLICATION_TOKEN'] ASTRA_DB_API_ENDPOINT = os.environ['ASTRA_DB_API_ENDPOINT'] embedding = AzureOpenAIEmbeddings( api_version="2024-07-01-preview", azure_endpoint="https://openai-oe.openai.azure.com/") vstore = AstraDBVectorStore( collection_vector_service_options=CollectionVectorServiceOptions( provider="azureOpenAI", model_name="text-embedding-3-small", authentication={ "providerKey": "AZURE_OPENAI_API_KEY", }, parameters={ "resourceName": "openai-oe", "deploymentId": "text-embedding-3-small", }, ), namespace="default_keyspace", collection_name="article", token=os.environ["ASTRA_DB_APPLICATION_TOKEN"], api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"]) def token_length(text): """ Calculates length of encoded text using the tokenizer for the "text-embedding-3-small" model. Args: text (str): The input text to be tokenized and measured. Returns: int: The length of the encoded text. """ tokenizer = tiktoken.encoding_for_model("text-embedding-3-small") return len(tokenizer.encode(text)) def add_documents_with_retry(chunks, ids, max_retries=3): """ Attempts to add documents to the vstore with a specified number of retries. Parameters: chunks (list): The list of document chunks to be added. ids (list): The list of document IDs corresponding to the chunks. max_retries (int, optional): The maximum number of retry attempts. Default is 3. Raises: Exception: If the operation fails after the maximum number of retries, the exception is logged. """ for attempt in range(max_retries): try: vstore.add_documents(chunks, ids=ids) except (ConnectionError, TimeoutError) as e: logging.info("Attempt %d failed: %s", attempt + 1, e) if attempt < max_retries - 1: time.sleep(10) else: logging.error("Max retries reached. Operation failed.") logging.error(ids) print(ids) def vectorize(article): """ Process the given article. Parameters: article (DataFrame): The article to be processed. Returns: None """ article['id'] = str(article['id']) if isinstance(article, dict): article = [article] # Convert single dictionary to list of dictionaries df = pd.DataFrame(article) df = df[['id', 'publishDate', 'author', 'category', 'content', 'referenceid', 'site', 'title', 'link']] df['publishDate'] = pd.to_datetime(df['publishDate']) documents = DataFrameLoader(df, page_content_column="content").load() text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=token_length, is_separator_regex=False, separators=["\n\n", "\n", "\t", "\\n"] # Logical separators ) chunks = text_splitter.split_documents(documents) ids = [] for index, chunk in enumerate(chunks): _id = f"{chunk.metadata['id']}-{str(index)}" ids.append(_id) try: add_documents_with_retry(chunks, ids) except (ConnectionError, TimeoutError, ValueError) as e: logging.error("Failed to add documents: %s", e) # def vectorize(article): # """ # Process the given article. # Parameters: # article (DataFrame): The article to be processed. # Returns: # None # """ # article['id'] = str(article['id']) # if isinstance(article, dict): # article = [article] # Convert single dictionary to list of dictionaries # df = pd.DataFrame(article) # df = df[['id','site','title','titleCN','category','author','content', # 'publishDate','link']] # df['publishDate'] = pd.to_datetime(df['publishDate']) # loader = DataFrameLoader(df, page_content_column="content") # documents = loader.load() # text_splitter = RecursiveCharacterTextSplitter( # chunk_size=800, # chunk_overlap=20, # length_function=len, # is_separator_regex=False, # ) # chunks = text_splitter.split_documents(documents) # ids = [] # for chunk in chunks: # _id = f"{chunk.metadata['id']}-{str(uuid.uuid5(uuid.NAMESPACE_OID,chunk.page_content))}" # ids.append(_id) # inserted_ids = vstore.add_documents(chunks, ids=ids) # print(inserted_ids) # logging.info(inserted_ids)