"""Module to upsert data into AstraDB""" import os import logging import time import tiktoken import pandas as pd from langchain_astradb import AstraDBVectorStore from langchain_openai import AzureOpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import DataFrameLoader from astrapy import DataAPIClient from astrapy.info import CollectionVectorServiceOptions from astrapy.exceptions import CollectionAlreadyExistsException from astrapy.core.api import APIRequestError logging.basicConfig( format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', datefmt="%Y-%m-%d %H:%M:%S", level=logging.ERROR) # from astrapy import AstraClient ASTRA_DB_APPLICATION_TOKEN = os.environ['ASTRA_DB_APPLICATION_TOKEN'] ASTRA_DB_API_ENDPOINT = os.environ['ASTRA_DB_API_ENDPOINT'] COLLECTION_NAME = "article" VECTOR_OPTIONS = CollectionVectorServiceOptions( provider="azureOpenAI", model_name="text-embedding-3-small", authentication={"providerKey": "AZURE_OPENAI_API_KEY"}, parameters={ "resourceName": "openai-oe", "deploymentId": "text-embedding-3-small", }, ) client = DataAPIClient(token=ASTRA_DB_APPLICATION_TOKEN) database = client.get_database(ASTRA_DB_API_ENDPOINT) embedding = AzureOpenAIEmbeddings( api_version="2024-07-01-preview", azure_endpoint="https://openai-oe.openai.azure.com/") try: # Try to create the collection database.create_collection( COLLECTION_NAME, dimension=1536, # Default dimension for text-embedding-3-small metric="cosine", service=VECTOR_OPTIONS ) logging.info("Created new collection '%s'", COLLECTION_NAME) except (CollectionAlreadyExistsException, APIRequestError) as e: logging.info("Collection '%s' already exists. Error Message: %s", COLLECTION_NAME, e) collection = database.get_collection(COLLECTION_NAME) vstore = AstraDBVectorStore( collection_name=COLLECTION_NAME, namespace="default_keyspace", embedding=embedding, token=ASTRA_DB_APPLICATION_TOKEN, api_endpoint=ASTRA_DB_API_ENDPOINT) # vstore = AstraDBVectorStore( # collection_vector_service_options=CollectionVectorServiceOptions( # provider="azureOpenAI", # model_name="text-embedding-3-small", # authentication={ # "providerKey": "AZURE_OPENAI_API_KEY", # }, # parameters={ # "resourceName": "openai-oe", # "deploymentId": "text-embedding-3-small", # }, # ), # namespace="default_keyspace", # collection_name="article", # token=os.environ["ASTRA_DB_APPLICATION_TOKEN"], # api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"]) def token_length(text): """ Calculates length of encoded text using the tokenizer for the "text-embedding-3-small" model. Args: text (str): The input text to be tokenized and measured. Returns: int: The length of the encoded text. """ tokenizer = tiktoken.encoding_for_model("text-embedding-3-small") return len(tokenizer.encode(text)) def add_documents_with_retry(chunks, ids, max_retries=3): """ Attempts to add documents to the vstore with a specified number of retries. Parameters: chunks (list): The list of document chunks to be added. ids (list): The list of document IDs corresponding to the chunks. max_retries (int, optional): The maximum number of retry attempts. Default is 3. Raises: Exception: If the operation fails after the maximum number of retries, the exception is logged. """ for attempt in range(max_retries): try: vstore.add_documents(chunks, ids=ids) except (ConnectionError, TimeoutError) as e: logging.info("Attempt %d failed: %s", attempt + 1, e) if attempt < max_retries - 1: time.sleep(10) else: logging.error("Max retries reached. Operation failed.") logging.error(ids) print(ids) def vectorize(article): """ Process the given article. Parameters: article (DataFrame): The article to be processed. Returns: None """ article['id'] = str(article['id']) if isinstance(article, dict): article = [article] # Convert single dictionary to list of dictionaries df = pd.DataFrame(article) df = df[['id', 'publishDate', 'author', 'category', 'content', 'referenceid', 'site', 'title', 'link']] df['publishDate'] = pd.to_datetime(df['publishDate']) documents = DataFrameLoader(df, page_content_column="content").load() text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=token_length, is_separator_regex=False, separators=["\n\n", "\n", "\t", "\\n"] # Logical separators ) chunks = text_splitter.split_documents(documents) ids = [] for index, chunk in enumerate(chunks): _id = f"{chunk.metadata['id']}-{str(index)}" ids.append(_id) try: add_documents_with_retry(chunks, ids) except (ConnectionError, TimeoutError, ValueError) as e: logging.error("Failed to add documents: %s", e) # def vectorize(article): # """ # Process the given article. # Parameters: # article (DataFrame): The article to be processed. # Returns: # None # """ # article['id'] = str(article['id']) # if isinstance(article, dict): # article = [article] # Convert single dictionary to list of dictionaries # df = pd.DataFrame(article) # df = df[['id','site','title','titleCN','category','author','content', # 'publishDate','link']] # df['publishDate'] = pd.to_datetime(df['publishDate']) # loader = DataFrameLoader(df, page_content_column="content") # documents = loader.load() # text_splitter = RecursiveCharacterTextSplitter( # chunk_size=800, # chunk_overlap=20, # length_function=len, # is_separator_regex=False, # ) # chunks = text_splitter.split_documents(documents) # ids = [] # for chunk in chunks: # _id = f"{chunk.metadata['id']}-{str(uuid.uuid5(uuid.NAMESPACE_OID,chunk.page_content))}" # ids.append(_id) # inserted_ids = vstore.add_documents(chunks, ids=ids) # print(inserted_ids) # logging.info(inserted_ids)