"""Module to upsert data into AstraDB""" import os import logging import time import tiktoken from pytz import timezone import pandas as pd from langchain_astradb import AstraDBVectorStore # from langchain_openai import AzureOpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import DataFrameLoader # from astrapy import DataAPIClient # from astrapy.info import CollectionVectorServiceOptions # from astrapy.exceptions import CollectionAlreadyExistsException # from astrapy.core.api import APIRequestError logging.basicConfig( format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', datefmt="%Y-%m-%d %H:%M:%S", level=logging.ERROR) # from astrapy import AstraClient # ASTRA_DB_APPLICATION_TOKEN = os.environ['ASTRA_DB_APPLICATION_TOKEN'] # ASTRA_DB_API_ENDPOINT = os.environ['ASTRA_DB_API_ENDPOINT'] # COLLECTION_NAME = "article" # VECTOR_OPTIONS = CollectionVectorServiceOptions( # provider="azureOpenAI", # model_name="text-embedding-3-small", # authentication={"providerKey": "AZURE_OPENAI_API_KEY"}, # parameters={ # "resourceName": "openai-oe", # "deploymentId": "text-embedding-3-small", # }, # ) # client = DataAPIClient(token=ASTRA_DB_APPLICATION_TOKEN) # database = client.get_database(ASTRA_DB_API_ENDPOINT) # embedding = AzureOpenAIEmbeddings( # api_version="2024-07-01-preview", # azure_endpoint="https://openai-oe.openai.azure.com/") vstore = AstraDBVectorStore( # collection_vector_service_options=CollectionVectorServiceOptions( # provider="azureOpenAI", # model_name="text-embedding-3-small", # authentication={ # "providerKey": "AZURE_OPENAI_API_KEY", # }, # parameters={ # "resourceName": "openai-oe", # "deploymentId": "text-embedding-3-small", # }, # ), namespace="default_keyspace", collection_name="article", token=os.environ["ASTRA_DB_APPLICATION_TOKEN"], api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"], autodetect_collection=True) def token_length(text): """ Calculates length of encoded text using the tokenizer for the "text-embedding-3-small" model. Args: text (str): The input text to be tokenized and measured. Returns: int: The length of the encoded text. """ tokenizer = tiktoken.encoding_for_model("text-embedding-3-small") return len(tokenizer.encode(text)) def add_documents_with_retry(chunks, ids, max_retries=3): """ Attempts to add documents to the vstore with a specified number of retries. Parameters: chunks (list): The list of document chunks to be added. ids (list): The list of document IDs corresponding to the chunks. max_retries (int, optional): The maximum number of retry attempts. Default is 3. Raises: Exception: If the operation fails after the maximum number of retries, the exception is logged. """ for attempt in range(max_retries): try: vstore.add_documents(chunks, ids=ids) except (ConnectionError, TimeoutError) as e: logging.info("Attempt %d failed: %s", attempt + 1, e) if attempt < max_retries - 1: time.sleep(10) else: logging.error("Max retries reached. Operation failed.") logging.error(ids) print(ids) def vectorize(article): """ Process the given article. Parameters: article (DataFrame): The article to be processed. Returns: None """ article['id'] = str(article['id']) if isinstance(article, dict): article = [article] # Convert single dictionary to list of dictionaries df = pd.DataFrame(article) df = df[['id', 'publishDate', 'author', 'category', 'content', 'referenceid', 'site', 'title', 'link']] df['publishDate'] = pd.to_datetime(df['publishDate'], errors='coerce') df['publishDate'] = df['publishDate'].dt.tz_localize('UTC', ambiguous='NaT', nonexistent='NaT') df['publishDate'] = df['publishDate'].dt.tz_localize(None).dt.tz_localize(timezone('Etc/GMT+8')) documents = DataFrameLoader(df, page_content_column="content").load() text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=token_length, is_separator_regex=False, separators=["\n\n", "\n", "\t", "\\n"] # Logical separators ) chunks = text_splitter.split_documents(documents) ids = [] for index, chunk in enumerate(chunks): _id = f"{chunk.metadata['id']}-{str(index)}" ids.append(_id) try: add_documents_with_retry(chunks, ids) except (ConnectionError, TimeoutError, ValueError) as e: logging.error("Failed to add documents: %s", e) # def vectorize(article): # """ # Process the given article. # Parameters: # article (DataFrame): The article to be processed. # Returns: # None # """ # article['id'] = str(article['id']) # if isinstance(article, dict): # article = [article] # Convert single dictionary to list of dictionaries # df = pd.DataFrame(article) # df = df[['id','site','title','titleCN','category','author','content', # 'publishDate','link']] # df['publishDate'] = pd.to_datetime(df['publishDate']) # loader = DataFrameLoader(df, page_content_column="content") # documents = loader.load() # text_splitter = RecursiveCharacterTextSplitter( # chunk_size=800, # chunk_overlap=20, # length_function=len, # is_separator_regex=False, # ) # chunks = text_splitter.split_documents(documents) # ids = [] # for chunk in chunks: # _id = f"{chunk.metadata['id']}-{str(uuid.uuid5(uuid.NAMESPACE_OID,chunk.page_content))}" # ids.append(_id) # inserted_ids = vstore.add_documents(chunks, ids=ids) # print(inserted_ids) # logging.info(inserted_ids)