Muhammad Abdur Rahman Saad
Update utils.py
a6b6592
raw
history blame
24.1 kB
"""Module to define utility function"""
import os
import re
import json
import uuid
import time
import glob
import urllib.request
from urllib.parse import urlparse
from datetime import datetime, timedelta
from decimal import Decimal
import pandas as pd
import requests
import boto3
from lxml import etree
from dotenv import load_dotenv
from googletrans import Translator
from transformers import pipeline
from PyPDF2 import PdfReader
from langdetect import detect
# load_dotenv()
# AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
# AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
AWS_ACCESS_KEY_ID = "AKIAQFXZMGHQYXKWUDWR"
AWS_SECRET_ACCESS_KEY = "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"
analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert")
translator = Translator()
with open('xpath.json', 'r', encoding='UTF-8') as f:
xpath_dict = json.load(f)
with open('patterns.json', 'r', encoding='UTF-8') as f:
patterns = json.load(f)
def get_client_connection():
"""
Returns a client connection to DynamoDB.
:return: DynamoDB client connection
"""
dynamodb = boto3.client(
service_name='dynamodb',
region_name='us-east-1',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
return dynamodb
def update_reference(report):
"""
Updates the reference in the 'reference_china' table in DynamoDB.
Args:
report (dict): A dictionary containing the report details.
Returns:
None
"""
dynamodb = get_client_connection()
response = dynamodb.update_item(
TableName="reference_china",
Key={
'id': {'S': str(report['refID'])},
'sourceID': {'S': str(report['sourceID'])}
},
UpdateExpression='SET link = :link, referenceID = :referenceID, LastModifiedDate = :LastModifiedDate',
ExpressionAttributeValues={
':link': {'S': report['link']},
':referenceID': {'S': report['referenceID']},
':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")},
}
)
print(response)
def download_files_from_s3(folder):
"""
Downloads Parquet files from an S3 bucket and returns a concatenated DataFrame.
Args:
folder (str): The folder in the S3 bucket to download files from.
Returns:
pandas.DataFrame: A concatenated DataFrame containing the data from the downloaded Parquet files.
"""
if not os.path.exists(folder):
os.makedirs(folder)
client = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
response = client.list_objects_v2(Bucket='china-securities-report', Prefix=f"{folder}/")
for obj in response['Contents']:
key = obj['Key']
if key.endswith('.parquet'):
client.download_file('china-securities-report', key, key)
file_paths = glob.glob(os.path.join(folder, '*.parquet'))
return pd.concat([pd.read_parquet(file_path) for file_path in file_paths], ignore_index=True)
def extract_from_pdf_by_pattern(url, pattern):
"""
Extracts text from a PDF file based on a given pattern.
Args:
url (str): The URL of the PDF file to extract text from.
pattern (dict): A dictionary containing the pattern to match and the pages to extract text from.
Returns:
str: The extracted text from the PDF file.
Raises:
Exception: If there is an error while retrieving or processing the PDF file.
"""
# Send a GET request to the URL and retrieve the PDF content
try:
response = requests.get(url, timeout=60)
pdf_content = response.content
# Save the PDF content to a local file
with open("downloaded_file.pdf", "wb") as file:
file.write(pdf_content)
# Open the downloaded PDF file and extract the text
with open("downloaded_file.pdf", "rb") as file:
pdf_reader = PdfReader(file)
extracted_text = ""
if 'pages' in pattern:
pages = pattern['pages']
else:
pages = len(pdf_reader.pages)
for page in pages:
text = pdf_reader.pages[page].extract_text()
if 'keyword' in pattern and pattern['keyword'] in text:
text = text.split(pattern['keyword'], 1)[1].strip()
else:
text = text.strip()
extracted_text += text
except:
extracted_text = ''
return extracted_text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n',' ').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n')
def get_reference_by_regex(pattern, text):
"""
Finds all occurrences of a given regex pattern in the provided text.
Args:
pattern (str): The regex pattern to search for.
text (str): The text to search within.
Returns:
list: A list of all matches found in the text.
"""
return re.findall(pattern, text)
def isnot_substring(list_a, string_to_check):
"""
Check if any string in the given list is a substring of the string_to_check.
Args:
list_a (list): A list of strings to check.
string_to_check (str): The string to check for substrings.
Returns:
bool: True if none of the strings in list_a are substrings of string_to_check, False otherwise.
"""
for s in list_a:
if s in string_to_check:
return False
return True
def extract_reference(row):
"""
Extracts reference information from a given row.
Args:
row (dict): A dictionary representing a row of data.
Returns:
None
"""
try:
pattern = next((elem for elem in patterns if elem['site'] == row['site']), None)
extracted_text = extract_from_pdf_by_pattern(row['attachment'],pattern)
reference_titles = re.findall(pattern['article_regex'], extracted_text)
reference_dates = re.findall(pattern['date_regex'], extracted_text)
reference_titles = [s.replace(' ', '') for s in reference_titles]
reference_dates = [s.replace(' ', '') for s in reference_dates]
print(reference_dates, reference_titles)
if 'remove' in pattern:
for remove_string in pattern['remove']:
reference_titles = [s.replace(remove_string, '') for s in reference_titles]
if len(reference_dates) > 0:
for title, date in zip(reference_titles, reference_dates):
try:
date = datetime.strptime(date, pattern['date_format'])
except:
date = datetime(2006, 1, 1)
dates = []
if 'date_range' in pattern:
for i in range(pattern['date_range'] + 1):
dates.append((date + timedelta(days=i)).strftime('%Y-%m-%d'))
dates.append((date - timedelta(days=i)).strftime('%Y-%m-%d'))
dates.append(date.strftime('%Y-%m-%d'))
date = date.strftime('%Y-%m-%d')
if 'split' in pattern:
for split_item in pattern['split']:
if 'exceptional_string' in split_item:
if split_item['string'] in title and isnot_substring(split_item['exceptional_string'], title):
title = re.split(split_item['string'], title)[split_item['index']]
else:
if split_item['string'] in title:
title = title.split(split_item['string'])[split_item['index']]
if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) == 0:
print("------------ = 0 ------------")
print(date, repr(title))
elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) > 1:
print("------------ > 1 ------------")
print(date, repr(title))
else:
print("------------ = 1 ------------")
reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]
row['referenceID'] = reference_df.iloc[0]['id']
row['link'] = reference_df.iloc[0]['link']
row['sourceID'] = row['id']
row['refID'] = uuid.uuid5(uuid.NAMESPACE_OID, str(row['sourceID'])+str(row['referenceID']))
print(date, repr(title), row['sourceID'],row['referenceID'])
update_reference(row)
else:
for title in reference_titles:
if 'split' in pattern:
for split_item in pattern['split']:
if 'exceptional_string' in split_item:
if split_item['string'] in title and isnot_substring(split_item['exceptional_string'], title):
title = re.split(split_item['string'], title)[split_item['index']]
else:
if split_item['string'] in title:
title = title.split(split_item['string'])[split_item['index']]
if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])]) == 0:
print("------------ = 0 ------------")
print(repr(title))
elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])]) > 1:
print("------------ > 1 ------------")
print(repr(title))
else:
print("------------ = 1 ------------")
reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])]
row['referenceID'] = reference_df.iloc[0]['id']
row['link'] = reference_df.iloc[0]['link']
row['sourceID'] = row['id']
row['refID'] = uuid.uuid5(uuid.NAMESPACE_OID, str(row['sourceID'])+str(row['referenceID']))
print(repr(title), row['sourceID'],row['referenceID'])
update_reference(row)
except Exception as error:
print(error)
def translate(text):
"""
Translates the given text to English.
Args:
text (str): The text to be translated.
Returns:
str: The translated text in English.
"""
return translator.translate(text, dest='en').text
def datemodifier(date_string, date_format):
"""Date Modifier Function
This function takes a date string and a date format as input and modifies the date string
according to the specified format. It returns the modified date string in the format 'YYYY-MM-DD'.
Args:
date_string (str): The date string to be modified.
date_format (str): The format of the date string.
Returns:
str: The modified date string in the format 'YYYY-MM-DD'.
False: If an error occurs during the modification process.
"""
try:
to_date = time.strptime(date_string,date_format)
return time.strftime("%Y-%m-%d",to_date)
except:
return False
def fetch_url(url):
"""
Fetches the content of a given URL.
Args:
url (str): The URL to fetch.
Returns:
str or None: The content of the URL if the request is successful (status code 200),
otherwise None.
Raises:
requests.exceptions.RequestException: If there is an error while making the request or if the response status code is not 200.
"""
try:
response = requests.get(url, timeout=60)
if response.status_code == 200:
return response.text
else:
return None
except requests.exceptions.RequestException or requests.exceptions.ReadTimeout as e:
print(f"An error occurred: {e}") # Optional: handle or log the error in some way
return None
def translist(infolist):
"""
Filter and transform a list of strings.
Args:
infolist (list): The input list of strings.
Returns:
list: The filtered and transformed list of strings.
"""
out = list(filter(lambda s: s and
(isinstance(s, str) or len(s.strip()) > 0), [i.strip() for i in infolist]))
return out
def encode(content):
"""
Encodes the given content into a single string.
Args:
content (list): A list of elements to be encoded. Each element can be either a string or an `etree._Element` object.
Returns:
str: The encoded content as a single string.
"""
text = ''
for element in content:
if isinstance(element, etree._Element):
subelement = etree.tostring(element).decode()
subpage = etree.HTML(subelement)
tree = subpage.xpath('//text()')
line = ''.join(translist(tree)).\
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip()
else:
line = element
text += line
return text
def encode_content(content):
"""
Encodes the content by removing unnecessary characters and extracting a summary.
Args:
content (list): A list of elements representing the content.
Returns:
tuple: A tuple containing the encoded text and the summary.
"""
text = ''
for element in content:
if isinstance(element, etree._Element):
subelement = etree.tostring(element).decode()
subpage = etree.HTML(subelement)
tree = subpage.xpath('//text()')
line = ''.join(translist(tree)).\
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip()
else:
line = element
if line != '':
line = line + '\n'
text += line
index = text.find('打印本页')
if index != -1:
text = text[:index]
try:
summary = '\n'.join(text.split('\n')[:2])
except:
summary = text
return text, summary
def extract_from_pdf(url):
"""
Extracts text from a PDF file given its URL.
Args:
url (str): The URL of the PDF file.
Returns:
tuple: A tuple containing the extracted text and a summary of the text.
Raises:
Exception: If there is an error during the extraction process.
"""
# Send a GET request to the URL and retrieve the PDF content
response = requests.get(url, timeout=60)
pdf_content = response.content
# Save the PDF content to a local file
with open("downloaded_file.pdf", "wb") as file:
file.write(pdf_content)
# Open the downloaded PDF file and extract the text
with open("downloaded_file.pdf", "rb") as file:
pdf_reader = PdfReader(file)
num_pages = len(pdf_reader.pages)
extracted_text = ""
for page in range(num_pages):
text = pdf_reader.pages[page].extract_text()
if text and text[0].isdigit():
text = text[1:]
# first_newline_index = text.find('。\n')
# text = text[:first_newline_index+1].replace('\n', '') + text[first_newline_index+1:]
text = text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n','').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n')
if text != '':
extracted_text += text
try:
summary = '\n'.join(extracted_text.split('\n')[:2])
except:
summary = text
return extracted_text, summary
def get_db_connection():
"""Get dynamoDB connection.
Returns:
boto3.resource: The DynamoDB resource object representing the connection.
"""
dynamodb = boto3.resource(
service_name='dynamodb',
region_name='us-east-1',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
return dynamodb
def sentiment_computation(content):
"""
Compute the sentiment score and label for the given content.
Parameters:
content (str): The content for which sentiment needs to be computed.
Returns:
tuple: A tuple containing the sentiment score and label. The sentiment score is a float representing the overall sentiment score of the content. The sentiment label is a string representing the sentiment label ('+', '-', or '0').
"""
label_dict = {
"positive": "+",
"negative": "-",
"neutral": "0",
}
sentiment_score = 0
maximum_value = 0
raw_sentiment = analyzer(content[:511], top_k=None)
sentiment_label = None
for sentiment_dict in raw_sentiment:
value = sentiment_dict["score"]
if value > maximum_value:
sentiment_label = sentiment_dict["label"]
maximum_value = value
if sentiment_dict["label"] == "positive":
sentiment_score = sentiment_score + value
if sentiment_dict["label"] == "negative":
sentiment_score = sentiment_score - value
else:
sentiment_score = sentiment_score + 0
sentiment_score = sentiment_score * 100
return sentiment_score, label_dict[sentiment_label]
def crawl(url, article):
"""
Crawls the given URL and extracts relevant information from the webpage.
Args:
url (str): The URL of the webpage to crawl.
article (dict): A dictionary to store the extracted information.
Returns:
None: If the length of the extracted content is less than 10 characters.
Raises:
None
"""
domain = '.'.join(urlparse(url).netloc.split('.')[1:])
req = urllib.request.urlopen(url)
text = req.read()
html_text = text.decode("utf-8")
page = etree.HTML(html_text)
contentCN, summary = encode_content(page.xpath(xpath_dict[domain]['content']))
article['originSite'] = xpath_dict[domain]['siteCN']
article['site'] = xpath_dict[domain]['site']
article['titleCN'] = encode(page.xpath(xpath_dict[domain]['title']))
article['title'] = translate(article['titleCN'])
if 'author' in xpath_dict[domain]:
article['author'] = translate(encode(page.xpath(xpath_dict[domain]['author'])))
else:
article['author'] = ""
article['contentCN'] = repr(contentCN)[1:-1].strip()
if len(article['contentCN']) < 10:
return None
CONTENT_ENG = ''
for element in contentCN.split("\n"):
CONTENT_ENG += translate(element) + '\n'
if detect(CONTENT_ENG) != 'en':
for element in contentCN.split("。"):
CONTENT_ENG += translate(element) + '. '
article['content'] = repr(CONTENT_ENG)[1:-1].strip()
if 'subtitle' in xpath_dict[domain]:
article['subtitle'] = translate(encode(page.xpath(xpath_dict[domain]['subtitle'])))
else:
article['subtitle'] = translate(summary)
article['publishDate'] = datemodifier(encode(page.xpath(xpath_dict[domain]['publishdate'])), xpath_dict[domain]['datetime_format'])
article['link'] = url
article['attachment'] = ""
article['sentimentScore'], article['sentimentLabel'] = sentiment_computation(CONTENT_ENG.replace("\n",""))
article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['titleCN']+article['publishDate'])
print(article['id'], article['site'] )
update_content(article)
def upsert_content(report):
"""
Upserts the content of a report into the 'article_china' table in DynamoDB.
Args:
report (dict): A dictionary containing the report data.
Returns:
dict: The response from the DynamoDB put_item operation.
"""
dynamodb = get_db_connection()
table = dynamodb.Table('article_china')
# Define the item data
item = {
'id': str(report['id']),
'site': report['site'],
'title': report['title'],
'titleCN': report['titleCN'],
'contentCN': report['contentCN'],
'category': report['category'],
'author': report['author'],
'content': report['content'],
'subtitle': report['subtitle'],
'publishDate': report['publishDate'],
'link': report['link'],
'attachment': report['attachment'],
# 'authorID': str(report['authorid']),
# 'entityList': report['entitylist'],
'sentimentScore': Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')),
'sentimentLabel': report['sentimentLabel'],
'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
}
response = table.put_item(Item=item)
print(response)
def delete_records(item):
"""
Deletes a record from the 'article_test' table in DynamoDB.
Args:
item (dict): The item to be deleted, containing 'id' and 'site' keys.
Returns:
None
"""
dynamodb_client = get_client_connection()
dynamodb_client.delete_item(
TableName="article_test",
Key={
'id': {'S': item['id']},
'site': {'S': item['site']}
}
)
def update_content(report):
"""
Updates the content of an article in the 'article_china' table in DynamoDB.
Args:
report (dict): A dictionary containing the report data.
Returns:
None
"""
dynamodb = get_client_connection()
response = dynamodb.update_item(
TableName="article_china",
Key={
'id': {'S': str(report['id'])},
'site': {'S': report['site']}
},
UpdateExpression='SET title = :title, titleCN = :titleCN, contentCN = :contentCN, category = :category, author = :author, content = :content, subtitle = :subtitle, publishDate = :publishDate, link = :link, attachment = :attachment, sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel, LastModifiedDate = :LastModifiedDate',
ExpressionAttributeValues={
':title': {'S': report['title']},
':titleCN': {'S': report['titleCN']},
':contentCN': {'S': report['contentCN']},
':category': {'S': report['category']},
':author': {'S': report['author']},
':content': {'S': report['content']},
':subtitle': {'S': report['subtitle']},
':publishDate': {'S': report['publishDate']},
':link': {'S': report['link']},
':attachment': {'S': report['attachment']},
':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")},
':sentimentScore': {'N': str(Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')))},
':sentimentLabel': {'S': report['sentimentLabel']}
}
)
print(response)
def update_content_sentiment(report):
"""
Updates the sentiment score and label of an article in the 'article_test' DynamoDB table.
Args:
report (dict): A dictionary containing the report information.
Returns:
None
"""
dynamodb = get_client_connection()
response = dynamodb.update_item(
TableName="article_test",
Key={
'id': {'S': report['id']},
'site': {'S': report['site']}
},
UpdateExpression='SET sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel',
ExpressionAttributeValues={
':sentimentScore': {'N': str(Decimal(str(report['sentimentscore'])).quantize(Decimal('0.01')))},
':sentimentLabel': {'S': report['sentimentlabel']}
}
)
print(response)
data = download_files_from_s3('data')