gavinzli commited on
Commit
693e166
·
1 Parent(s): 1269de7

Add reference ID extraction and implement retry logic for document addition

Browse files
controllers/utils.py CHANGED
@@ -18,11 +18,11 @@ from dotenv import load_dotenv
18
  from deep_translator import GoogleTranslator, exceptions
19
  from langdetect import detect
20
  from lxml import etree
21
- from PyPDF2 import PdfReader
22
  from transformers import pipeline
23
 
24
  from controllers.summarizer import summarize
25
- from controllers.vectorizer import vectorize
26
 
27
  load_dotenv()
28
 
@@ -47,7 +47,7 @@ def datemodifier(date_string, date_format):
47
  """Date Modifier Function
48
 
49
  This function takes a date string and a date format as input and modifies the date string
50
- according to the specified format. It returns the modified date string in the format 'YYYY-MM-DD'.
51
 
52
  Args:
53
  date_string (str): The date string to be modified.
@@ -70,7 +70,8 @@ def encode(content):
70
  Encodes the given content into a single string.
71
 
72
  Args:
73
- content (list): A list of elements to be encoded. Each element can be either a string or an `etree._Element` object.
 
74
 
75
  Returns:
76
  str: The encoded content as a single string.
@@ -138,7 +139,8 @@ def fetch_url(url):
138
  otherwise None.
139
 
140
  Raises:
141
- requests.exceptions.RequestException: If there is an error while making the request or if the response status code is not 200.
 
142
  """
143
  try:
144
  response = requests.get(url, timeout=60)
@@ -161,7 +163,7 @@ def translate(text):
161
  Returns:
162
  str: The translated text in English.
163
  """
164
- for i in range(5):
165
  try:
166
  return GoogleTranslator(source='auto', target='en').translate(text)
167
  except exceptions.RequestError:
@@ -178,7 +180,9 @@ def sentiment_computation(content):
178
  content (str): The content for which sentiment needs to be computed.
179
 
180
  Returns:
181
- tuple: A tuple containing the sentiment score and label. The sentiment score is a float representing the overall sentiment score of the content. The sentiment label is a string representing the sentiment label ('+', '-', or '0').
 
 
182
 
183
  """
184
  label_dict = {
@@ -230,22 +234,29 @@ def update_content(report):
230
  """
231
  print("Updating content for %s", report['id'])
232
  dynamodb = get_client_connection()
233
- response = dynamodb.update_item(
234
  TableName="article_china",
235
  Key={
236
  'id': {
237
  'S': str(report['id'])
238
- },
239
- 'site': {
240
- 'S': report['site']
241
  }
 
 
 
242
  },
243
  UpdateExpression=
244
- 'SET title = :title, titleCN = :titleCN, contentCN = :contentCN, category = :category, author = :author, content = :content, subtitle = :subtitle, publishDate = :publishDate, link = :link, attachment = :attachment, sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel, LastModifiedDate = :LastModifiedDate',
 
 
 
 
245
  ExpressionAttributeValues={
246
  ':title': {
247
  'S': report['title']
248
  },
 
 
 
249
  ':titleCN': {
250
  'S': report['titleCN']
251
  },
@@ -287,6 +298,7 @@ def update_content(report):
287
  }
288
  })
289
  vectorize(report)
 
290
 
291
 
292
  def update_reference(report):
@@ -334,7 +346,7 @@ def download_files_from_s3(folder):
334
  folder (str): The folder in the S3 bucket to download files from.
335
 
336
  Returns:
337
- pandas.DataFrame: A concatenated DataFrame containing the data from the downloaded Parquet files.
338
  """
339
  if not os.path.exists(folder):
340
  os.makedirs(folder)
@@ -360,7 +372,7 @@ def extract_from_pdf_by_pattern(url, pattern):
360
 
361
  Args:
362
  url (str): The URL of the PDF file to extract text from.
363
- pattern (dict): A dictionary containing the pattern to match and the pages to extract text from.
364
 
365
  Returns:
366
  str: The extracted text from the PDF file.
@@ -379,7 +391,7 @@ def extract_from_pdf_by_pattern(url, pattern):
379
 
380
  # Open the downloaded PDF file and extract the text
381
  with open("downloaded_file.pdf", "rb") as file:
382
- pdf_reader = PdfReader(file)
383
  extracted_text = ""
384
  if 'pages' in pattern:
385
  pages = pattern['pages']
@@ -392,7 +404,8 @@ def extract_from_pdf_by_pattern(url, pattern):
392
  else:
393
  text = text.strip()
394
  extracted_text += text
395
- except (requests.exceptions.RequestException, requests.exceptions.ReadTimeout) as e:
 
396
  logging.error(e)
397
  extracted_text = ''
398
  return extracted_text.replace('?\n', '?-\n').replace(
@@ -423,7 +436,7 @@ def isnot_substring(list_a, string_to_check):
423
  string_to_check (str): The string to check for substrings.
424
 
425
  Returns:
426
- bool: True if none of the strings in list_a are substrings of string_to_check, False otherwise.
427
  """
428
  return all(s not in string_to_check for s in list_a)
429
 
@@ -454,6 +467,7 @@ def extract_reference(row):
454
  s.replace(remove_string, '') for s in reference_titles
455
  ]
456
  if len(reference_dates) > 0:
 
457
  for title, date in zip(reference_titles, reference_dates):
458
  try:
459
  date = datetime.strptime(date, pattern['date_format'])
@@ -497,6 +511,7 @@ def extract_reference(row):
497
  & (data['site'] == row['site']) &
498
  (data['publishdate'].isin(dates))]
499
  row['referenceID'] = reference_df.iloc[0]['id']
 
500
  row['link'] = reference_df.iloc[0]['link']
501
  row['sourceID'] = row['id']
502
  row['refID'] = uuid.uuid5(
@@ -505,7 +520,9 @@ def extract_reference(row):
505
  logging.info("%s - %s - %s - %s",
506
  date, repr(title), row['sourceID'], row['referenceID'])
507
  update_reference(row)
 
508
  else:
 
509
  for title in reference_titles:
510
  if 'split' in pattern:
511
  for split_item in pattern['split']:
@@ -533,6 +550,7 @@ def extract_reference(row):
533
  reference_df = data[(data['titleCN'].str.contains(title))
534
  & (data['site'] == row['site'])]
535
  row['referenceID'] = reference_df.iloc[0]['id']
 
536
  row['link'] = reference_df.iloc[0]['link']
537
  row['sourceID'] = row['id']
538
  row['refID'] = uuid.uuid5(
@@ -541,6 +559,7 @@ def extract_reference(row):
541
  logging.info("%s - %s - %s", repr(title), row['sourceID'],
542
  row['referenceID'])
543
  update_reference(row)
 
544
  except (ValueError, KeyError, TypeError) as error:
545
  logging.error(error)
546
  return None
@@ -584,26 +603,25 @@ def extract_from_pdf(url):
584
  file.write(pdf_content)
585
 
586
  # Open the downloaded PDF file and extract the text
587
- with open("downloaded_file.pdf", "rb") as file:
588
- pdf_reader = PdfReader(file)
589
- num_pages = len(pdf_reader.pages)
590
- extracted_text = ""
591
- for page in range(num_pages):
592
- text = pdf_reader.pages[page].extract_text()
593
- if text and text[0].isdigit():
594
- text = text[1:]
595
- # first_newline_index = text.find('。\n')
596
- # text = text[:first_newline_index+1].replace('\n', '') + text[first_newline_index+1:]
597
- text = text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace(
598
- '。\n', '。-\n').replace('\n', '').replace('?-', '?\n').replace(
599
- '!-', '!\n').replace('。-', '。\n')
600
- if text != '':
601
- extracted_text += text
602
- try:
603
- summary = '\n'.join(extracted_text.split('\n')[:2])
604
- except (ValueError, KeyError, TypeError) as e:
605
- logging.error(e)
606
- summary = extracted_text
607
  return extracted_text, summary
608
 
609
 
 
18
  from deep_translator import GoogleTranslator, exceptions
19
  from langdetect import detect
20
  from lxml import etree
21
+ import PyPDF2
22
  from transformers import pipeline
23
 
24
  from controllers.summarizer import summarize
25
+ from controllers.vectorizer import vectorize, openai_vectorize
26
 
27
  load_dotenv()
28
 
 
47
  """Date Modifier Function
48
 
49
  This function takes a date string and a date format as input and modifies the date string
50
+ according to the specified format. It returns modified date string in the format 'YYYY-MM-DD'.
51
 
52
  Args:
53
  date_string (str): The date string to be modified.
 
70
  Encodes the given content into a single string.
71
 
72
  Args:
73
+ content (list): A list of elements to be encoded.
74
+ Each element can be either a string or an `etree._Element` object.
75
 
76
  Returns:
77
  str: The encoded content as a single string.
 
139
  otherwise None.
140
 
141
  Raises:
142
+ requests.exceptions.RequestException:
143
+ If there is an error while making the request or if the response status code is not 200.
144
  """
145
  try:
146
  response = requests.get(url, timeout=60)
 
163
  Returns:
164
  str: The translated text in English.
165
  """
166
+ for _ in range(5):
167
  try:
168
  return GoogleTranslator(source='auto', target='en').translate(text)
169
  except exceptions.RequestError:
 
180
  content (str): The content for which sentiment needs to be computed.
181
 
182
  Returns:
183
+ tuple: A tuple containing the sentiment score and label.
184
+ The sentiment score is a float representing the overall sentiment score of the content.
185
+ The sentiment label is a string representing the sentiment label ('+', '-', or '0').
186
 
187
  """
188
  label_dict = {
 
234
  """
235
  print("Updating content for %s", report['id'])
236
  dynamodb = get_client_connection()
237
+ dynamodb.update_item(
238
  TableName="article_china",
239
  Key={
240
  'id': {
241
  'S': str(report['id'])
 
 
 
242
  }
243
+ # 'site': {
244
+ # 'S': report['site']
245
+ # }
246
  },
247
  UpdateExpression=
248
+ 'SET title = :title, site = :site, titleCN = :titleCN, contentCN = :contentCN, \
249
+ category = :category, author = :author, content = :content, subtitle = :subtitle, \
250
+ publishDate = :publishDate, link = :link, attachment = :attachment, \
251
+ sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel, \
252
+ LastModifiedDate = :LastModifiedDate',
253
  ExpressionAttributeValues={
254
  ':title': {
255
  'S': report['title']
256
  },
257
+ ':site': {
258
+ 'S': report['site']
259
+ },
260
  ':titleCN': {
261
  'S': report['titleCN']
262
  },
 
298
  }
299
  })
300
  vectorize(report)
301
+ openai_vectorize(report)
302
 
303
 
304
  def update_reference(report):
 
346
  folder (str): The folder in the S3 bucket to download files from.
347
 
348
  Returns:
349
+ pandas.DataFrame: A concatenated DataFrame containing data from downloaded Parquet files.
350
  """
351
  if not os.path.exists(folder):
352
  os.makedirs(folder)
 
372
 
373
  Args:
374
  url (str): The URL of the PDF file to extract text from.
375
+ pattern (dict): A dictionary containing pattern to match and the pages to extract text from.
376
 
377
  Returns:
378
  str: The extracted text from the PDF file.
 
391
 
392
  # Open the downloaded PDF file and extract the text
393
  with open("downloaded_file.pdf", "rb") as file:
394
+ pdf_reader = PyPDF2.PdfReader(file)
395
  extracted_text = ""
396
  if 'pages' in pattern:
397
  pages = pattern['pages']
 
404
  else:
405
  text = text.strip()
406
  extracted_text += text
407
+ except (requests.exceptions.RequestException, requests.exceptions.ReadTimeout,
408
+ PyPDF2.errors.PdfReadError) as e:
409
  logging.error(e)
410
  extracted_text = ''
411
  return extracted_text.replace('?\n', '?-\n').replace(
 
436
  string_to_check (str): The string to check for substrings.
437
 
438
  Returns:
439
+ bool: True if none of strings in list_a are substrings of string_to_check, False otherwise.
440
  """
441
  return all(s not in string_to_check for s in list_a)
442
 
 
467
  s.replace(remove_string, '') for s in reference_titles
468
  ]
469
  if len(reference_dates) > 0:
470
+ reference_ids = []
471
  for title, date in zip(reference_titles, reference_dates):
472
  try:
473
  date = datetime.strptime(date, pattern['date_format'])
 
511
  & (data['site'] == row['site']) &
512
  (data['publishdate'].isin(dates))]
513
  row['referenceID'] = reference_df.iloc[0]['id']
514
+ reference_ids.append(row['referenceID'])
515
  row['link'] = reference_df.iloc[0]['link']
516
  row['sourceID'] = row['id']
517
  row['refID'] = uuid.uuid5(
 
520
  logging.info("%s - %s - %s - %s",
521
  date, repr(title), row['sourceID'], row['referenceID'])
522
  update_reference(row)
523
+ return reference_ids
524
  else:
525
+ reference_ids = []
526
  for title in reference_titles:
527
  if 'split' in pattern:
528
  for split_item in pattern['split']:
 
550
  reference_df = data[(data['titleCN'].str.contains(title))
551
  & (data['site'] == row['site'])]
552
  row['referenceID'] = reference_df.iloc[0]['id']
553
+ reference_ids.append(row['referenceID'])
554
  row['link'] = reference_df.iloc[0]['link']
555
  row['sourceID'] = row['id']
556
  row['refID'] = uuid.uuid5(
 
559
  logging.info("%s - %s - %s", repr(title), row['sourceID'],
560
  row['referenceID'])
561
  update_reference(row)
562
+ return reference_ids
563
  except (ValueError, KeyError, TypeError) as error:
564
  logging.error(error)
565
  return None
 
603
  file.write(pdf_content)
604
 
605
  # Open the downloaded PDF file and extract the text
606
+ extracted_text = ""
607
+ try:
608
+ with open("downloaded_file.pdf", "rb") as file:
609
+ pdf_reader = PyPDF2.PdfReader(file)
610
+ num_pages = len(pdf_reader.pages)
611
+ for page in range(num_pages):
612
+ text = pdf_reader.pages[page].extract_text()
613
+ if text and text[0].isdigit():
614
+ text = text[1:]
615
+ # first_newline_index = text.find('。\n')
616
+ text = text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace(
617
+ '。\n', '。-\n').replace('\n', '').replace('?-', '?\n').replace(
618
+ '!-', '!\n').replace('。-', '。\n')
619
+ if text != '':
620
+ extracted_text += text
621
+ summary = '\n'.join(extracted_text.split('\n')[:2])
622
+ except (ValueError, KeyError, TypeError, PyPDF2.errors.PdfReadError) as e:
623
+ logging.error(e)
624
+ summary = extracted_text
 
625
  return extracted_text, summary
626
 
627
 
controllers/vectorizer.py CHANGED
@@ -2,12 +2,15 @@
2
  import os
3
  import logging
4
  import uuid
 
5
 
 
6
  import pandas as pd
7
  from langchain_astradb import AstraDBVectorStore
8
  from langchain_openai import AzureOpenAIEmbeddings
9
  from langchain.text_splitter import RecursiveCharacterTextSplitter
10
  from langchain_community.document_loaders import DataFrameLoader
 
11
 
12
  logging.basicConfig(
13
  format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
@@ -27,6 +30,94 @@ vstore = AstraDBVectorStore(embedding=embedding,
27
  token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
28
  api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"])
29
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  def vectorize(article):
31
  """
32
  Process the given article.
 
2
  import os
3
  import logging
4
  import uuid
5
+ import time
6
 
7
+ import tiktoken
8
  import pandas as pd
9
  from langchain_astradb import AstraDBVectorStore
10
  from langchain_openai import AzureOpenAIEmbeddings
11
  from langchain.text_splitter import RecursiveCharacterTextSplitter
12
  from langchain_community.document_loaders import DataFrameLoader
13
+ from astrapy.info import CollectionVectorServiceOptions
14
 
15
  logging.basicConfig(
16
  format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
 
30
  token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
31
  api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"])
32
 
33
+ openai_vstore = AstraDBVectorStore(
34
+ collection_vector_service_options=CollectionVectorServiceOptions(
35
+ provider="azureOpenAI",
36
+ model_name="text-embedding-3-small",
37
+ authentication={
38
+ "providerKey": "AZURE_OPENAI_API_KEY",
39
+ },
40
+ parameters={
41
+ "resourceName": "openai-oe",
42
+ "deploymentId": "text-embedding-3-small",
43
+ },
44
+ ),
45
+ namespace="default_keyspace",
46
+ collection_name="text_embedding_3_small",
47
+ token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
48
+ api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"])
49
+
50
+ def token_length(text):
51
+ """
52
+ Calculates length of encoded text using the tokenizer for the "text-embedding-3-small" model.
53
+
54
+ Args:
55
+ text (str): The input text to be tokenized and measured.
56
+
57
+ Returns:
58
+ int: The length of the encoded text.
59
+ """
60
+ tokenizer = tiktoken.encoding_for_model("text-embedding-3-small")
61
+ return len(tokenizer.encode(text))
62
+
63
+ def add_documents_with_retry(chunks, ids, max_retries=3):
64
+ """
65
+ Attempts to add documents to the vstore with a specified number of retries.
66
+
67
+ Parameters:
68
+ chunks (list): The list of document chunks to be added.
69
+ ids (list): The list of document IDs corresponding to the chunks.
70
+ max_retries (int, optional): The maximum number of retry attempts. Default is 3.
71
+
72
+ Raises:
73
+ Exception: If the operation fails after the maximum number of retries, the exception is logged.
74
+ """
75
+ for attempt in range(max_retries):
76
+ try:
77
+ vstore.add_documents(chunks, ids=ids)
78
+ except (ConnectionError, TimeoutError) as e:
79
+ logging.info("Attempt %d failed: %s", attempt + 1, e)
80
+ if attempt < max_retries - 1:
81
+ time.sleep(0.5)
82
+ else:
83
+ logging.error("Max retries reached. Operation failed.")
84
+ logging.error(ids)
85
+
86
+ def openai_vectorize(article):
87
+ """
88
+ Process the given article.
89
+
90
+ Parameters:
91
+ article (DataFrame): The article to be processed.
92
+
93
+ Returns:
94
+ None
95
+ """
96
+ article['id'] = str(article['id'])
97
+ if isinstance(article, dict):
98
+ article = [article] # Convert single dictionary to list of dictionaries
99
+ df = pd.DataFrame(article)
100
+ df = df[['id', 'publishDate', 'author', 'category',
101
+ 'content', 'referenceid', 'site', 'title', 'link']]
102
+ df['publishDate'] = pd.to_datetime(df['publishDate'])
103
+ documents = DataFrameLoader(df, page_content_column="content").load()
104
+ text_splitter = RecursiveCharacterTextSplitter(
105
+ chunk_size=1000,
106
+ chunk_overlap=200,
107
+ length_function=token_length,
108
+ is_separator_regex=False,
109
+ separators=["\n\n", "\n", "\t", ".", "?"] # Logical separators
110
+ )
111
+ chunks = text_splitter.split_documents(documents)
112
+ ids = []
113
+ for index, chunk in enumerate(chunks):
114
+ _id = f"{chunk.metadata['id']}-{str(index)}"
115
+ ids.append(_id)
116
+ try:
117
+ add_documents_with_retry(chunks, ids)
118
+ except (ConnectionError, TimeoutError, ValueError) as e:
119
+ logging.error("Failed to add documents: %s", e)
120
+
121
  def vectorize(article):
122
  """
123
  Process the given article.
source/eastmoney.py CHANGED
@@ -91,7 +91,9 @@ def _crawl(url, article, retries=3):
91
  article['titleCN'] + article['publishDate'])
92
  article['sentimentScore'], article[
93
  'sentimentLabel'] = sentiment_computation(contentcn.replace("\n", ""))
94
- extract_reference(article)
 
 
95
  update_content(article)
96
 
97
  @task(name = "Data Collection - eastmoney", log_prints = True)
@@ -136,7 +138,8 @@ def crawl(delta):
136
  i = i + 1
137
  for article in reportinfo['data']:
138
  try:
139
- url = f"https://data.eastmoney.com/report/zw_macresearch.jshtml?encodeUrl={article['encodeUrl']}"
 
140
  _crawl(url, article)
141
  except (urllib.error.URLError, json.JSONDecodeError, KeyError) as error:
142
  logger.error(error)
 
91
  article['titleCN'] + article['publishDate'])
92
  article['sentimentScore'], article[
93
  'sentimentLabel'] = sentiment_computation(contentcn.replace("\n", ""))
94
+ reference_id = extract_reference(article)
95
+ if reference_id:
96
+ article['referenceid'] = reference_id
97
  update_content(article)
98
 
99
  @task(name = "Data Collection - eastmoney", log_prints = True)
 
138
  i = i + 1
139
  for article in reportinfo['data']:
140
  try:
141
+ domain = "https://data.eastmoney.com"
142
+ url = f"{domain}/report/zw_macresearch.jshtml?encodeUrl={article['encodeUrl']}"
143
  _crawl(url, article)
144
  except (urllib.error.URLError, json.JSONDecodeError, KeyError) as error:
145
  logger.error(error)