"""Module to upsert data into AstraDB""" import os import logging import uuid import pandas as pd from langchain_astradb import AstraDBVectorStore from langchain_openai import AzureOpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import DataFrameLoader logging.basicConfig( format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) ASTRA_DB_APPLICATION_TOKEN = os.environ['ASTRA_DB_APPLICATION_TOKEN'] ASTRA_DB_API_ENDPOINT = os.environ['ASTRA_DB_API_ENDPOINT'] embedding = AzureOpenAIEmbeddings( api_version="2024-07-01-preview", azure_endpoint="https://openai-oe.openai.azure.com/") vstore = AstraDBVectorStore(embedding=embedding, namespace="default_keyspace", collection_name="FinFast_China", token=os.environ["ASTRA_DB_APPLICATION_TOKEN"], api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"]) def vectorize(article): """ Process the given article. Parameters: article (DataFrame): The article to be processed. Returns: None """ article['id'] = str(article['id']) df = pd.DataFrame(article) df = df[['id','site','title','titleCN','category','author','content', 'publishDate','link']] df = df[['id', 'publishdate', 'author', 'category', 'content', 'referenceid', 'site', 'title', 'link']] # df['sentimentScore'] = df['sentimentScore'].round(2) # df['sentimentScore'] = df['sentimentScore'].astype(float) df['publishDate'] = pd.to_datetime(df['publishDate']) loader = DataFrameLoader(df, page_content_column="content") documents = loader.load() text_splitter = RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=20, length_function=len, is_separator_regex=False, ) chunks = text_splitter.split_documents(documents) ids = [] for chunk in chunks: _id = f"{chunk.metadata['id']}-{str(uuid.uuid5(uuid.NAMESPACE_OID,chunk.page_content))}" ids.append(_id) inserted_ids = vstore.add_documents(chunks, ids=ids) logging.info(inserted_ids)