|
"""Module to upsert data into AstraDB""" |
|
import os |
|
import logging |
|
import time |
|
|
|
import tiktoken |
|
from pytz import timezone |
|
import pandas as pd |
|
from langchain_astradb import AstraDBVectorStore |
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_community.document_loaders import DataFrameLoader |
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', |
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
level=logging.ERROR) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
vstore = AstraDBVectorStore( |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace="default_keyspace", |
|
collection_name="article", |
|
token=os.environ["ASTRA_DB_APPLICATION_TOKEN"], |
|
api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"], |
|
autodetect_collection=True) |
|
|
|
def token_length(text): |
|
""" |
|
Calculates length of encoded text using the tokenizer for the "text-embedding-3-small" model. |
|
|
|
Args: |
|
text (str): The input text to be tokenized and measured. |
|
|
|
Returns: |
|
int: The length of the encoded text. |
|
""" |
|
tokenizer = tiktoken.encoding_for_model("text-embedding-3-small") |
|
return len(tokenizer.encode(text)) |
|
|
|
def add_documents_with_retry(chunks, ids, max_retries=3): |
|
""" |
|
Attempts to add documents to the vstore with a specified number of retries. |
|
|
|
Parameters: |
|
chunks (list): The list of document chunks to be added. |
|
ids (list): The list of document IDs corresponding to the chunks. |
|
max_retries (int, optional): The maximum number of retry attempts. Default is 3. |
|
|
|
Raises: |
|
Exception: If the operation fails after the maximum number of retries, the exception is logged. |
|
""" |
|
for attempt in range(max_retries): |
|
try: |
|
vstore.add_documents(chunks, ids=ids) |
|
except (ConnectionError, TimeoutError) as e: |
|
logging.info("Attempt %d failed: %s", attempt + 1, e) |
|
if attempt < max_retries - 1: |
|
time.sleep(10) |
|
else: |
|
logging.error("Max retries reached. Operation failed.") |
|
logging.error(ids) |
|
print(ids) |
|
|
|
def vectorize(article): |
|
""" |
|
Process the given article. |
|
|
|
Parameters: |
|
article (DataFrame): The article to be processed. |
|
|
|
Returns: |
|
None |
|
""" |
|
article['id'] = str(article['id']) |
|
if isinstance(article, dict): |
|
article = [article] |
|
df = pd.DataFrame(article) |
|
df = df[['id', 'publishDate', 'author', 'category', |
|
'content', 'referenceid', 'site', 'title', 'link']] |
|
df['publishDate'] = pd.to_datetime(df['publishDate'], errors='coerce') |
|
df['publishDate'] = df['publishDate'].dt.tz_localize('UTC', ambiguous='NaT', nonexistent='NaT') |
|
df['publishDate'] = df['publishDate'].dt.tz_localize(None).dt.tz_localize(timezone('Etc/GMT+8')) |
|
documents = DataFrameLoader(df, page_content_column="content").load() |
|
text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=1000, |
|
chunk_overlap=200, |
|
length_function=token_length, |
|
is_separator_regex=False, |
|
separators=["\n\n", "\n", "\t", "\\n"] |
|
) |
|
chunks = text_splitter.split_documents(documents) |
|
ids = [] |
|
for index, chunk in enumerate(chunks): |
|
_id = f"{chunk.metadata['id']}-{str(index)}" |
|
ids.append(_id) |
|
try: |
|
add_documents_with_retry(chunks, ids) |
|
except (ConnectionError, TimeoutError, ValueError) as e: |
|
logging.error("Failed to add documents: %s", e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|