Spaces:
Build error
Build error
"""Module to define utility function""" | |
import os | |
import re | |
import json | |
import uuid | |
import time | |
import glob | |
import urllib.request | |
from urllib.parse import urlparse | |
from datetime import datetime, timedelta | |
from decimal import Decimal | |
import pandas as pd | |
import requests | |
import boto3 | |
from lxml import etree | |
from googletrans import Translator | |
from transformers import pipeline | |
from PyPDF2 import PdfReader | |
from langdetect import detect | |
# AWS_ACCESS_KEY_ID = Secret.load("aws-access-key-id").get() | |
# AWS_SECRET_ACCESS_KEY = Secret.load("aws-access-secret-key").get() | |
AWS_ACCESS_KEY_ID = "AKIAQFXZMGHQYXKWUDWR" | |
AWS_SECRET_ACCESS_KEY = "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" | |
analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") | |
translator = Translator() | |
with open('xpath.json', 'r', encoding='UTF-8') as f: | |
xpath_dict = json.load(f) | |
with open('patterns.json', 'r', encoding='UTF-8') as f: | |
patterns = json.load(f) | |
def get_client_connection(): | |
""" | |
Returns a client connection to DynamoDB. | |
:return: DynamoDB client connection | |
""" | |
dynamodb = boto3.client( | |
service_name='dynamodb', | |
region_name='us-east-1', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY | |
) | |
return dynamodb | |
def update_reference(report): | |
""" | |
Updates the reference in the 'reference_china' table in DynamoDB. | |
Args: | |
report (dict): A dictionary containing the report details. | |
Returns: | |
None | |
""" | |
dynamodb = get_client_connection() | |
response = dynamodb.update_item( | |
TableName="reference_china", | |
Key={ | |
'id': {'S': str(report['refID'])}, | |
'sourceID': {'S': str(report['sourceID'])} | |
}, | |
UpdateExpression='SET link = :link, referenceID = :referenceID, LastModifiedDate = :LastModifiedDate', | |
ExpressionAttributeValues={ | |
':link': {'S': report['link']}, | |
':referenceID': {'S': report['referenceID']}, | |
':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}, | |
} | |
) | |
print(response) | |
def download_files_from_s3(folder): | |
""" | |
Downloads Parquet files from an S3 bucket and returns a concatenated DataFrame. | |
Args: | |
folder (str): The folder in the S3 bucket to download files from. | |
Returns: | |
pandas.DataFrame: A concatenated DataFrame containing the data from the downloaded Parquet files. | |
""" | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
client = boto3.client( | |
's3', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
) | |
response = client.list_objects_v2(Bucket='china-securities-report', Prefix=f"{folder}/") | |
for obj in response['Contents']: | |
key = obj['Key'] | |
if key.endswith('.parquet'): | |
client.download_file('china-securities-report', key, key) | |
file_paths = glob.glob(os.path.join(folder, '*.parquet')) | |
return pd.concat([pd.read_parquet(file_path) for file_path in file_paths], ignore_index=True) | |
def extract_from_pdf_by_pattern(url, pattern): | |
""" | |
Extracts text from a PDF file based on a given pattern. | |
Args: | |
url (str): The URL of the PDF file to extract text from. | |
pattern (dict): A dictionary containing the pattern to match and the pages to extract text from. | |
Returns: | |
str: The extracted text from the PDF file. | |
Raises: | |
Exception: If there is an error while retrieving or processing the PDF file. | |
""" | |
# Send a GET request to the URL and retrieve the PDF content | |
try: | |
response = requests.get(url, timeout=60) | |
pdf_content = response.content | |
# Save the PDF content to a local file | |
with open("downloaded_file.pdf", "wb") as file: | |
file.write(pdf_content) | |
# Open the downloaded PDF file and extract the text | |
with open("downloaded_file.pdf", "rb") as file: | |
pdf_reader = PdfReader(file) | |
extracted_text = "" | |
if 'pages' in pattern: | |
pages = pattern['pages'] | |
else: | |
pages = len(pdf_reader.pages) | |
for page in pages: | |
text = pdf_reader.pages[page].extract_text() | |
if 'keyword' in pattern and pattern['keyword'] in text: | |
text = text.split(pattern['keyword'], 1)[1].strip() | |
else: | |
text = text.strip() | |
extracted_text += text | |
except: | |
extracted_text = '' | |
return extracted_text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n',' ').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n') | |
def get_reference_by_regex(pattern, text): | |
""" | |
Finds all occurrences of a given regex pattern in the provided text. | |
Args: | |
pattern (str): The regex pattern to search for. | |
text (str): The text to search within. | |
Returns: | |
list: A list of all matches found in the text. | |
""" | |
return re.findall(pattern, text) | |
def isnot_substring(list_a, string_to_check): | |
""" | |
Check if any string in the given list is a substring of the string_to_check. | |
Args: | |
list_a (list): A list of strings to check. | |
string_to_check (str): The string to check for substrings. | |
Returns: | |
bool: True if none of the strings in list_a are substrings of string_to_check, False otherwise. | |
""" | |
for s in list_a: | |
if s in string_to_check: | |
return False | |
return True | |
def extract_reference(row): | |
""" | |
Extracts reference information from a given row. | |
Args: | |
row (dict): A dictionary representing a row of data. | |
Returns: | |
None | |
""" | |
try: | |
pattern = next((elem for elem in patterns if elem['site'] == row['site']), None) | |
extracted_text = extract_from_pdf_by_pattern(row['attachment'],pattern) | |
reference_titles = re.findall(pattern['article_regex'], extracted_text) | |
reference_dates = re.findall(pattern['date_regex'], extracted_text) | |
reference_titles = [s.replace(' ', '') for s in reference_titles] | |
reference_dates = [s.replace(' ', '') for s in reference_dates] | |
print(reference_dates, reference_titles) | |
if 'remove' in pattern: | |
for remove_string in pattern['remove']: | |
reference_titles = [s.replace(remove_string, '') for s in reference_titles] | |
if len(reference_dates) > 0: | |
for title, date in zip(reference_titles, reference_dates): | |
try: | |
date = datetime.strptime(date, pattern['date_format']) | |
except: | |
date = datetime(2006, 1, 1) | |
dates = [] | |
if 'date_range' in pattern: | |
for i in range(pattern['date_range'] + 1): | |
dates.append((date + timedelta(days=i)).strftime('%Y-%m-%d')) | |
dates.append((date - timedelta(days=i)).strftime('%Y-%m-%d')) | |
dates.append(date.strftime('%Y-%m-%d')) | |
date = date.strftime('%Y-%m-%d') | |
if 'split' in pattern: | |
for split_item in pattern['split']: | |
if 'exceptional_string' in split_item: | |
if split_item['string'] in title and isnot_substring(split_item['exceptional_string'], title): | |
title = re.split(split_item['string'], title)[split_item['index']] | |
else: | |
if split_item['string'] in title: | |
title = title.split(split_item['string'])[split_item['index']] | |
if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) == 0: | |
print("------------ = 0 ------------") | |
print(date, repr(title)) | |
elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) > 1: | |
print("------------ > 1 ------------") | |
print(date, repr(title)) | |
else: | |
print("------------ = 1 ------------") | |
reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))] | |
row['referenceID'] = reference_df.iloc[0]['id'] | |
row['link'] = reference_df.iloc[0]['link'] | |
row['sourceID'] = row['id'] | |
row['refID'] = uuid.uuid5(uuid.NAMESPACE_OID, str(row['sourceID'])+str(row['referenceID'])) | |
print(date, repr(title), row['sourceID'],row['referenceID']) | |
update_reference(row) | |
else: | |
for title in reference_titles: | |
if 'split' in pattern: | |
for split_item in pattern['split']: | |
if 'exceptional_string' in split_item: | |
if split_item['string'] in title and isnot_substring(split_item['exceptional_string'], title): | |
title = re.split(split_item['string'], title)[split_item['index']] | |
else: | |
if split_item['string'] in title: | |
title = title.split(split_item['string'])[split_item['index']] | |
if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])]) == 0: | |
print("------------ = 0 ------------") | |
print(repr(title)) | |
elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])]) > 1: | |
print("------------ > 1 ------------") | |
print(repr(title)) | |
else: | |
print("------------ = 1 ------------") | |
reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])] | |
row['referenceID'] = reference_df.iloc[0]['id'] | |
row['link'] = reference_df.iloc[0]['link'] | |
row['sourceID'] = row['id'] | |
row['refID'] = uuid.uuid5(uuid.NAMESPACE_OID, str(row['sourceID'])+str(row['referenceID'])) | |
print(repr(title), row['sourceID'],row['referenceID']) | |
update_reference(row) | |
except Exception as error: | |
print(error) | |
def translate(text): | |
""" | |
Translates the given text to English. | |
Args: | |
text (str): The text to be translated. | |
Returns: | |
str: The translated text in English. | |
""" | |
return translator.translate(text, dest='en').text | |
def datemodifier(date_string, date_format): | |
"""Date Modifier Function | |
This function takes a date string and a date format as input and modifies the date string | |
according to the specified format. It returns the modified date string in the format 'YYYY-MM-DD'. | |
Args: | |
date_string (str): The date string to be modified. | |
date_format (str): The format of the date string. | |
Returns: | |
str: The modified date string in the format 'YYYY-MM-DD'. | |
False: If an error occurs during the modification process. | |
""" | |
try: | |
to_date = time.strptime(date_string,date_format) | |
return time.strftime("%Y-%m-%d",to_date) | |
except: | |
return False | |
def fetch_url(url): | |
""" | |
Fetches the content of a given URL. | |
Args: | |
url (str): The URL to fetch. | |
Returns: | |
str or None: The content of the URL if the request is successful (status code 200), | |
otherwise None. | |
Raises: | |
requests.exceptions.RequestException: If there is an error while making the request or if the response status code is not 200. | |
""" | |
try: | |
response = requests.get(url, timeout=60) | |
if response.status_code == 200: | |
return response.text | |
else: | |
return None | |
except requests.exceptions.RequestException or requests.exceptions.ReadTimeout as e: | |
print(f"An error occurred: {e}") # Optional: handle or log the error in some way | |
return None | |
def translist(infolist): | |
""" | |
Filter and transform a list of strings. | |
Args: | |
infolist (list): The input list of strings. | |
Returns: | |
list: The filtered and transformed list of strings. | |
""" | |
out = list(filter(lambda s: s and | |
(isinstance(s, str) or len(s.strip()) > 0), [i.strip() for i in infolist])) | |
return out | |
def encode(content): | |
""" | |
Encodes the given content into a single string. | |
Args: | |
content (list): A list of elements to be encoded. Each element can be either a string or an `etree._Element` object. | |
Returns: | |
str: The encoded content as a single string. | |
""" | |
text = '' | |
for element in content: | |
if isinstance(element, etree._Element): | |
subelement = etree.tostring(element).decode() | |
subpage = etree.HTML(subelement) | |
tree = subpage.xpath('//text()') | |
line = ''.join(translist(tree)).\ | |
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() | |
else: | |
line = element | |
text += line | |
return text | |
def encode_content(content): | |
""" | |
Encodes the content by removing unnecessary characters and extracting a summary. | |
Args: | |
content (list): A list of elements representing the content. | |
Returns: | |
tuple: A tuple containing the encoded text and the summary. | |
""" | |
text = '' | |
for element in content: | |
if isinstance(element, etree._Element): | |
subelement = etree.tostring(element).decode() | |
subpage = etree.HTML(subelement) | |
tree = subpage.xpath('//text()') | |
line = ''.join(translist(tree)).\ | |
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() | |
else: | |
line = element | |
if line != '': | |
line = line + '\n' | |
text += line | |
index = text.find('打印本页') | |
if index != -1: | |
text = text[:index] | |
try: | |
summary = '\n'.join(text.split('\n')[:2]) | |
except: | |
summary = text | |
return text, summary | |
def extract_from_pdf(url): | |
""" | |
Extracts text from a PDF file given its URL. | |
Args: | |
url (str): The URL of the PDF file. | |
Returns: | |
tuple: A tuple containing the extracted text and a summary of the text. | |
Raises: | |
Exception: If there is an error during the extraction process. | |
""" | |
# Send a GET request to the URL and retrieve the PDF content | |
response = requests.get(url, timeout=60) | |
pdf_content = response.content | |
# Save the PDF content to a local file | |
with open("downloaded_file.pdf", "wb") as file: | |
file.write(pdf_content) | |
# Open the downloaded PDF file and extract the text | |
with open("downloaded_file.pdf", "rb") as file: | |
pdf_reader = PdfReader(file) | |
num_pages = len(pdf_reader.pages) | |
extracted_text = "" | |
for page in range(num_pages): | |
text = pdf_reader.pages[page].extract_text() | |
if text and text[0].isdigit(): | |
text = text[1:] | |
# first_newline_index = text.find('。\n') | |
# text = text[:first_newline_index+1].replace('\n', '') + text[first_newline_index+1:] | |
text = text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n','').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n') | |
if text != '': | |
extracted_text += text | |
try: | |
summary = '\n'.join(extracted_text.split('\n')[:2]) | |
except: | |
summary = text | |
return extracted_text, summary | |
def get_db_connection(): | |
"""Get dynamoDB connection. | |
Returns: | |
boto3.resource: The DynamoDB resource object representing the connection. | |
""" | |
dynamodb = boto3.resource( | |
service_name='dynamodb', | |
region_name='us-east-1', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY | |
) | |
return dynamodb | |
def sentiment_computation(content): | |
""" | |
Compute the sentiment score and label for the given content. | |
Parameters: | |
content (str): The content for which sentiment needs to be computed. | |
Returns: | |
tuple: A tuple containing the sentiment score and label. The sentiment score is a float representing the overall sentiment score of the content. The sentiment label is a string representing the sentiment label ('+', '-', or '0'). | |
""" | |
label_dict = { | |
"positive": "+", | |
"negative": "-", | |
"neutral": "0", | |
} | |
sentiment_score = 0 | |
maximum_value = 0 | |
raw_sentiment = analyzer(content[:511], top_k=None) | |
sentiment_label = None | |
for sentiment_dict in raw_sentiment: | |
value = sentiment_dict["score"] | |
if value > maximum_value: | |
sentiment_label = sentiment_dict["label"] | |
maximum_value = value | |
if sentiment_dict["label"] == "positive": | |
sentiment_score = sentiment_score + value | |
if sentiment_dict["label"] == "negative": | |
sentiment_score = sentiment_score - value | |
else: | |
sentiment_score = sentiment_score + 0 | |
sentiment_score = sentiment_score * 100 | |
return sentiment_score, label_dict[sentiment_label] | |
def crawl(url, article): | |
""" | |
Crawls the given URL and extracts relevant information from the webpage. | |
Args: | |
url (str): The URL of the webpage to crawl. | |
article (dict): A dictionary to store the extracted information. | |
Returns: | |
None: If the length of the extracted content is less than 10 characters. | |
Raises: | |
None | |
""" | |
domain = '.'.join(urlparse(url).netloc.split('.')[1:]) | |
req = urllib.request.urlopen(url) | |
text = req.read() | |
html_text = text.decode("utf-8") | |
page = etree.HTML(html_text) | |
contentCN, summary = encode_content(page.xpath(xpath_dict[domain]['content'])) | |
article['originSite'] = xpath_dict[domain]['siteCN'] | |
article['site'] = xpath_dict[domain]['site'] | |
article['titleCN'] = encode(page.xpath(xpath_dict[domain]['title'])) | |
article['title'] = translate(article['titleCN']) | |
if 'author' in xpath_dict[domain]: | |
article['author'] = translate(encode(page.xpath(xpath_dict[domain]['author']))) | |
else: | |
article['author'] = "" | |
article['contentCN'] = repr(contentCN)[1:-1].strip() | |
if len(article['contentCN']) < 10: | |
return None | |
CONTENT_ENG = '' | |
for element in contentCN.split("\n"): | |
CONTENT_ENG += translate(element) + '\n' | |
if detect(CONTENT_ENG) != 'en': | |
for element in contentCN.split("。"): | |
CONTENT_ENG += translate(element) + '. ' | |
article['content'] = repr(CONTENT_ENG)[1:-1].strip() | |
if 'subtitle' in xpath_dict[domain]: | |
article['subtitle'] = translate(encode(page.xpath(xpath_dict[domain]['subtitle']))) | |
else: | |
article['subtitle'] = translate(summary) | |
article['publishDate'] = datemodifier(encode(page.xpath(xpath_dict[domain]['publishdate'])), xpath_dict[domain]['datetime_format']) | |
article['link'] = url | |
article['attachment'] = "" | |
article['sentimentScore'], article['sentimentLabel'] = sentiment_computation(CONTENT_ENG.replace("\n","")) | |
article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['titleCN']+article['publishDate']) | |
print(article['id'], article['site'] ) | |
update_content(article) | |
def upsert_content(report): | |
""" | |
Upserts the content of a report into the 'article_china' table in DynamoDB. | |
Args: | |
report (dict): A dictionary containing the report data. | |
Returns: | |
dict: The response from the DynamoDB put_item operation. | |
""" | |
dynamodb = get_db_connection() | |
table = dynamodb.Table('article_china') | |
# Define the item data | |
item = { | |
'id': str(report['id']), | |
'site': report['site'], | |
'title': report['title'], | |
'titleCN': report['titleCN'], | |
'contentCN': report['contentCN'], | |
'category': report['category'], | |
'author': report['author'], | |
'content': report['content'], | |
'subtitle': report['subtitle'], | |
'publishDate': report['publishDate'], | |
'link': report['link'], | |
'attachment': report['attachment'], | |
# 'authorID': str(report['authorid']), | |
# 'entityList': report['entitylist'], | |
'sentimentScore': Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')), | |
'sentimentLabel': report['sentimentLabel'], | |
'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") | |
} | |
response = table.put_item(Item=item) | |
print(response) | |
def delete_records(item): | |
""" | |
Deletes a record from the 'article_test' table in DynamoDB. | |
Args: | |
item (dict): The item to be deleted, containing 'id' and 'site' keys. | |
Returns: | |
None | |
""" | |
dynamodb_client = get_client_connection() | |
dynamodb_client.delete_item( | |
TableName="article_test", | |
Key={ | |
'id': {'S': item['id']}, | |
'site': {'S': item['site']} | |
} | |
) | |
def update_content(report): | |
""" | |
Updates the content of an article in the 'article_china' table in DynamoDB. | |
Args: | |
report (dict): A dictionary containing the report data. | |
Returns: | |
None | |
""" | |
dynamodb = get_client_connection() | |
response = dynamodb.update_item( | |
TableName="article_china", | |
Key={ | |
'id': {'S': str(report['id'])}, | |
'site': {'S': report['site']} | |
}, | |
UpdateExpression='SET title = :title, titleCN = :titleCN, contentCN = :contentCN, category = :category, author = :author, content = :content, subtitle = :subtitle, publishDate = :publishDate, link = :link, attachment = :attachment, sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel, LastModifiedDate = :LastModifiedDate', | |
ExpressionAttributeValues={ | |
':title': {'S': report['title']}, | |
':titleCN': {'S': report['titleCN']}, | |
':contentCN': {'S': report['contentCN']}, | |
':category': {'S': report['category']}, | |
':author': {'S': report['author']}, | |
':content': {'S': report['content']}, | |
':subtitle': {'S': report['subtitle']}, | |
':publishDate': {'S': report['publishDate']}, | |
':link': {'S': report['link']}, | |
':attachment': {'S': report['attachment']}, | |
':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}, | |
':sentimentScore': {'N': str(Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')))}, | |
':sentimentLabel': {'S': report['sentimentLabel']} | |
} | |
) | |
print(response) | |
def update_content_sentiment(report): | |
""" | |
Updates the sentiment score and label of an article in the 'article_test' DynamoDB table. | |
Args: | |
report (dict): A dictionary containing the report information. | |
Returns: | |
None | |
""" | |
dynamodb = get_client_connection() | |
response = dynamodb.update_item( | |
TableName="article_test", | |
Key={ | |
'id': {'S': report['id']}, | |
'site': {'S': report['site']} | |
}, | |
UpdateExpression='SET sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel', | |
ExpressionAttributeValues={ | |
':sentimentScore': {'N': str(Decimal(str(report['sentimentscore'])).quantize(Decimal('0.01')))}, | |
':sentimentLabel': {'S': report['sentimentlabel']} | |
} | |
) | |
print(response) | |
data = download_files_from_s3('data') | |