gavinzli's picture
Refactor error handling and improve logging in utils.py; update vectorization process in vectorizer.py; adjust variable naming in eastmoney.py
c39d841
raw
history blame
2.29 kB
"""Module to upsert data into AstraDB"""
import os
import logging
import uuid
import pandas as pd
from langchain_astradb import AstraDBVectorStore
from langchain_openai import AzureOpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DataFrameLoader
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
ASTRA_DB_APPLICATION_TOKEN = os.environ['ASTRA_DB_APPLICATION_TOKEN']
ASTRA_DB_API_ENDPOINT = os.environ['ASTRA_DB_API_ENDPOINT']
embedding = AzureOpenAIEmbeddings(
api_version="2024-07-01-preview",
azure_endpoint="https://openai-oe.openai.azure.com/")
vstore = AstraDBVectorStore(embedding=embedding,
namespace="default_keyspace",
collection_name="FinFast_China",
token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"])
def vectorize(article):
"""
Process the given article.
Parameters:
article (DataFrame): The article to be processed.
Returns:
None
"""
article['id'] = str(article['id'])
df = pd.DataFrame(article)
df = df[['id','site','title','titleCN','category','author','content',
'publishDate','link']]
df = df[['id', 'publishdate', 'author', 'category',
'content', 'referenceid', 'site', 'title', 'link']]
# df['sentimentScore'] = df['sentimentScore'].round(2)
# df['sentimentScore'] = df['sentimentScore'].astype(float)
df['publishDate'] = pd.to_datetime(df['publishDate'])
loader = DataFrameLoader(df, page_content_column="content")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=20,
length_function=len,
is_separator_regex=False,
)
chunks = text_splitter.split_documents(documents)
ids = []
for chunk in chunks:
_id = f"{chunk.metadata['id']}-{str(uuid.uuid5(uuid.NAMESPACE_OID,chunk.page_content))}"
ids.append(_id)
inserted_ids = vstore.add_documents(chunks, ids=ids)
logging.info(inserted_ids)