|
"""Module to upsert data into AstraDB""" |
|
import os |
|
import logging |
|
import uuid |
|
|
|
import pandas as pd |
|
from langchain_astradb import AstraDBVectorStore |
|
from langchain_openai import AzureOpenAIEmbeddings |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_community.document_loaders import DataFrameLoader |
|
|
|
logging.basicConfig( |
|
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', |
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
level=logging.INFO) |
|
|
|
ASTRA_DB_APPLICATION_TOKEN = os.environ['ASTRA_DB_APPLICATION_TOKEN'] |
|
ASTRA_DB_API_ENDPOINT = os.environ['ASTRA_DB_API_ENDPOINT'] |
|
|
|
embedding = AzureOpenAIEmbeddings( |
|
api_version="2024-07-01-preview", |
|
azure_endpoint="https://openai-oe.openai.azure.com/") |
|
|
|
vstore = AstraDBVectorStore(embedding=embedding, |
|
namespace="default_keyspace", |
|
collection_name="FinFast_China", |
|
token=os.environ["ASTRA_DB_APPLICATION_TOKEN"], |
|
api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"]) |
|
|
|
def vectorize(article): |
|
""" |
|
Process the given article. |
|
|
|
Parameters: |
|
article (DataFrame): The article to be processed. |
|
|
|
Returns: |
|
None |
|
""" |
|
article['id'] = str(article['id']) |
|
df = pd.DataFrame(article) |
|
df = df[['id','site','title','titleCN','category','author','content', |
|
'publishDate','link']] |
|
df = df[['id', 'publishdate', 'author', 'category', |
|
'content', 'referenceid', 'site', 'title', 'link']] |
|
|
|
|
|
df['publishDate'] = pd.to_datetime(df['publishDate']) |
|
loader = DataFrameLoader(df, page_content_column="content") |
|
documents = loader.load() |
|
text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=800, |
|
chunk_overlap=20, |
|
length_function=len, |
|
is_separator_regex=False, |
|
) |
|
|
|
chunks = text_splitter.split_documents(documents) |
|
ids = [] |
|
for chunk in chunks: |
|
_id = f"{chunk.metadata['id']}-{str(uuid.uuid5(uuid.NAMESPACE_OID,chunk.page_content))}" |
|
ids.append(_id) |
|
inserted_ids = vstore.add_documents(chunks, ids=ids) |
|
logging.info(inserted_ids) |
|
|