|
"""Utilis Functions""" |
|
import os |
|
import re |
|
import json |
|
import uuid |
|
import time |
|
import glob |
|
import urllib.request |
|
from urllib.parse import urlparse |
|
from datetime import datetime, timedelta |
|
from decimal import Decimal |
|
import pandas as pd |
|
import requests |
|
import boto3 |
|
from lxml import etree |
|
from googletrans import Translator |
|
from transformers import pipeline |
|
from PyPDF2 import PdfReader |
|
|
|
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] |
|
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] |
|
|
|
|
|
|
|
analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") |
|
|
|
translator = Translator() |
|
|
|
with open('xpath.json', 'r', encoding='UTF-8') as f: |
|
xpath_dict = json.load(f) |
|
|
|
with open('xpath.json', 'r', encoding='UTF-8') as f: |
|
patterns = json.load(f) |
|
|
|
def get_client_connection(): |
|
"""Get dynamoDB connection""" |
|
dynamodb = boto3.client( |
|
service_name='dynamodb', |
|
region_name='us-east-1', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY |
|
) |
|
return dynamodb |
|
|
|
def update_reference(report): |
|
dynamodb = get_client_connection() |
|
response = dynamodb.update_item( |
|
TableName="reference_china", |
|
Key={ |
|
'id': {'S': str(report['refID'])}, |
|
'sourceID': {'S': report['sourceID']} |
|
}, |
|
UpdateExpression='SET link = :link, referenceID = :referenceID, LastModifiedDate = :LastModifiedDate', |
|
ExpressionAttributeValues={ |
|
':link': {'S': report['link']}, |
|
':referenceID': {'S': report['referenceID']}, |
|
':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}, |
|
} |
|
) |
|
print(response) |
|
|
|
def download_files_from_s3(folder): |
|
"""Download Data Files""" |
|
if not os.path.exists(folder): |
|
os.makedirs(folder) |
|
client = boto3.client( |
|
's3', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, |
|
) |
|
response = client.list_objects_v2(Bucket='china-securities-report', Prefix=f"{folder}/") |
|
for obj in response['Contents']: |
|
key = obj['Key'] |
|
if key.endswith('.parquet'): |
|
client.download_file('china-securities-report', key, key) |
|
file_paths = glob.glob(os.path.join(folder, '*.parquet')) |
|
return pd.concat([pd.read_parquet(file_path) for file_path in file_paths], ignore_index=True) |
|
|
|
def extract_from_pdf_by_pattern(url, pattern): |
|
|
|
try: |
|
response = requests.get(url, timeout=60) |
|
pdf_content = response.content |
|
|
|
with open("downloaded_file.pdf", "wb") as file: |
|
file.write(pdf_content) |
|
|
|
|
|
with open("downloaded_file.pdf", "rb") as file: |
|
pdf_reader = PdfReader(file) |
|
extracted_text = "" |
|
if 'pages' in pattern: |
|
pages = pattern['pages'] |
|
else: |
|
pages = len(pdf_reader.pages) |
|
for page in pages: |
|
text = pdf_reader.pages[page].extract_text() |
|
if 'keyword' in pattern and pattern['keyword'] in text: |
|
text = text.split(pattern['keyword'], 1)[1].strip() |
|
else: |
|
text = text.strip() |
|
extracted_text += text |
|
except: |
|
extracted_text = '' |
|
return extracted_text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n',' ').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n') |
|
|
|
def get_reference_by_regex(pattern, text): |
|
return re.findall(pattern, text) |
|
|
|
def isnot_substring(list_a, string_to_check): |
|
for s in list_a: |
|
if s in string_to_check: |
|
return False |
|
return True |
|
|
|
def extract_reference(row): |
|
try: |
|
pattern = next((elem for elem in patterns if elem['site'] == row['site']), None) |
|
extracted_text = extract_from_pdf_by_pattern(row['attachment'],pattern) |
|
reference_titles = re.findall(pattern['article_regex'], extracted_text) |
|
reference_dates = re.findall(pattern['date_regex'], extracted_text) |
|
reference_titles = [s.replace(' ', '') for s in reference_titles] |
|
reference_dates = [s.replace(' ', '') for s in reference_dates] |
|
print(reference_dates, reference_titles) |
|
if 'remove' in pattern: |
|
for remove_string in pattern['remove']: |
|
reference_titles = [s.replace(remove_string, '') for s in reference_titles] |
|
for title, date in zip(reference_titles, reference_dates): |
|
print(title, date) |
|
try: |
|
date = datetime.strptime(date, pattern['date_format']) |
|
except: |
|
date = datetime(2006, 1, 1) |
|
dates = [] |
|
if 'date_range' in pattern: |
|
for i in range(pattern['date_range'] + 1): |
|
dates.append((date + timedelta(days=i)).strftime('%Y-%m-%d')) |
|
dates.append((date - timedelta(days=i)).strftime('%Y-%m-%d')) |
|
dates.append(date.strftime('%Y-%m-%d')) |
|
date = date.strftime('%Y-%m-%d') |
|
if 'split' in pattern: |
|
for split_item in pattern['split']: |
|
if 'exceptional_string' in split_item: |
|
if split_item['string'] in title and isnot_substring(split_item['exceptional_string'], title): |
|
title = re.split(split_item['string'], title)[split_item['index']] |
|
else: |
|
if split_item['string'] in title: |
|
title = title.split(split_item['string'])[split_item['index']] |
|
if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) == 0: |
|
print("------------ = 0 ------------") |
|
print(date, repr(title)) |
|
elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) > 1: |
|
print("------------ > 1 ------------") |
|
print(date, repr(title)) |
|
else: |
|
print("------------ = 1 ------------") |
|
reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))] |
|
row['referenceID'] = reference_df.iloc[0]['id'] |
|
row['link'] = reference_df.iloc[0]['link'] |
|
row['sourceID'] = row['id_x'] |
|
row['refID'] = uuid.uuid5(uuid.NAMESPACE_OID, str(row['sourceID'])+str(row['referenceID'])) |
|
print(date, repr(title), row['sourceID'],row['referenceID']) |
|
except Exception as error: |
|
print(error) |
|
|
|
|
|
def translate(text): |
|
return translator.translate(text, dest='en').text |
|
|
|
def datemodifier(date_string, date_format): |
|
"""Date Modifier Function""" |
|
try: |
|
to_date = time.strptime(date_string,date_format) |
|
return time.strftime("%Y-%m-%d",to_date) |
|
except: |
|
return False |
|
|
|
def fetch_url(url): |
|
response = requests.get(url, timeout = 60) |
|
if response.status_code == 200: |
|
return response.text |
|
else: |
|
return None |
|
|
|
def translist(infolist): |
|
"""Translist Function""" |
|
out = list(filter(lambda s: s and |
|
(isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) |
|
return out |
|
|
|
def encode(content): |
|
"""Encode Function""" |
|
text = '' |
|
for element in content: |
|
if isinstance(element, etree._Element): |
|
subelement = etree.tostring(element).decode() |
|
subpage = etree.HTML(subelement) |
|
tree = subpage.xpath('//text()') |
|
line = ''.join(translist(tree)).\ |
|
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() |
|
else: |
|
line = element |
|
text += line |
|
return text |
|
|
|
def encode_content(content): |
|
"""Encode Function""" |
|
text = '' |
|
for element in content: |
|
if isinstance(element, etree._Element): |
|
subelement = etree.tostring(element).decode() |
|
subpage = etree.HTML(subelement) |
|
tree = subpage.xpath('//text()') |
|
line = ''.join(translist(tree)).\ |
|
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() |
|
else: |
|
line = element |
|
if line != '': |
|
line = line + '\n' |
|
text += line |
|
index = text.find('打印本页') |
|
if index != -1: |
|
text = text[:index] |
|
try: |
|
summary = '\n'.join(text.split('\n')[:2]) |
|
except: |
|
summary = text |
|
return text, summary |
|
|
|
def extract_from_pdf(url): |
|
|
|
response = requests.get(url, timeout=60) |
|
pdf_content = response.content |
|
|
|
|
|
with open("downloaded_file.pdf", "wb") as file: |
|
file.write(pdf_content) |
|
|
|
|
|
with open("downloaded_file.pdf", "rb") as file: |
|
pdf_reader = PdfReader(file) |
|
num_pages = len(pdf_reader.pages) |
|
extracted_text = "" |
|
for page in range(num_pages): |
|
text = pdf_reader.pages[page].extract_text() |
|
if text and text[0].isdigit(): |
|
text = text[1:] |
|
|
|
|
|
text = text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n','').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n') |
|
print(text) |
|
if text != '': |
|
extracted_text += text |
|
try: |
|
summary = '\n'.join(extracted_text.split('\n')[:2]) |
|
except: |
|
summary = text |
|
return extracted_text, summary |
|
|
|
def get_db_connection(): |
|
"""Get dynamoDB connection""" |
|
dynamodb = boto3.resource( |
|
service_name='dynamodb', |
|
region_name='us-east-1', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY |
|
) |
|
return dynamodb |
|
|
|
def sentiment_computation(content): |
|
label_dict = { |
|
"positive": "+", |
|
"negative": "-", |
|
"neutral": "0", |
|
} |
|
sentiment_score = 0 |
|
maximum_value = 0 |
|
raw_sentiment = analyzer(content[:511], top_k=None) |
|
sentiment_label = None |
|
for sentiment_dict in raw_sentiment: |
|
value = sentiment_dict["score"] |
|
if value > maximum_value: |
|
sentiment_label = sentiment_dict["label"] |
|
maximum_value = value |
|
if sentiment_dict["label"] == "positive": |
|
sentiment_score = sentiment_score + value |
|
if sentiment_dict["label"] == "negative": |
|
sentiment_score = sentiment_score - value |
|
else: |
|
sentiment_score = sentiment_score + 0 |
|
return sentiment_score, label_dict[sentiment_label] |
|
|
|
def crawl(url, article): |
|
domain = '.'.join(urlparse(url).netloc.split('.')[1:]) |
|
req = urllib.request.urlopen(url) |
|
text = req.read() |
|
html_text = text.decode("utf-8") |
|
page = etree.HTML(html_text) |
|
contentCN, summary = encode_content(page.xpath(xpath_dict[domain]['content'])) |
|
article['originSite'] = xpath_dict[domain]['siteCN'] |
|
article['site'] = xpath_dict[domain]['site'] |
|
article['titleCN'] = encode(page.xpath(xpath_dict[domain]['title'])) |
|
article['title'] = translate(article['titleCN']) |
|
if 'author' in xpath_dict[domain]: |
|
article['author'] = translate(encode(page.xpath(xpath_dict[domain]['author']))) |
|
else: |
|
article['author'] = "" |
|
article['contentCN'] = repr(contentCN)[1:-1].strip() |
|
if len(article['contentCN']) < 10: |
|
return None |
|
CONTENT_ENG = '' |
|
for element in contentCN.split("\n"): |
|
CONTENT_ENG += translate(element) + '\n' |
|
article['content'] = repr(CONTENT_ENG)[1:-1].strip() |
|
if 'subtitle' in xpath_dict[domain]: |
|
article['subtitle'] = translate(encode(page.xpath(xpath_dict[domain]['subtitle']))) |
|
else: |
|
article['subtitle'] = translate(summary) |
|
article['publishDate'] = datemodifier(encode(page.xpath(xpath_dict[domain]['publishdate'])), xpath_dict[domain]['datetime_format']) |
|
article['link'] = url |
|
article['attachment'] = "" |
|
article['sentimentScore'], article['sentimentLabel'] = sentiment_computation(CONTENT_ENG.replace("\n","")) |
|
article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['titleCN']+article['publishDate']) |
|
print(article['id'], article['site'] ) |
|
update_content(article) |
|
|
|
def upsert_content(report): |
|
"""Upsert the content records""" |
|
dynamodb = get_db_connection() |
|
table = dynamodb.Table('article_china') |
|
|
|
item = { |
|
'id': str(report['id']), |
|
'site': report['site'], |
|
'title': report['title'], |
|
'titleCN': report['titleCN'], |
|
'contentCN': report['contentCN'], |
|
'category': report['category'], |
|
'author': report['author'], |
|
'content': report['content'], |
|
'subtitle': report['subtitle'], |
|
'publishDate': report['publishDate'], |
|
'link': report['link'], |
|
'attachment': report['attachment'], |
|
|
|
|
|
'sentimentScore': Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')), |
|
'sentimentLabel': report['sentimentLabel'], |
|
'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") |
|
} |
|
response = table.put_item(Item=item) |
|
print(response) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def delete_records(item): |
|
dynamodb_client = get_client_connection() |
|
dynamodb_client.delete_item( |
|
TableName="article_test", |
|
Key={ |
|
'id': {'S': item['id']}, |
|
'site': {'S': item['site']} |
|
} |
|
) |
|
|
|
def update_content(report): |
|
dynamodb = get_client_connection() |
|
response = dynamodb.update_item( |
|
TableName="article_china", |
|
Key={ |
|
'id': {'S': str(report['id'])}, |
|
'site': {'S': report['site']} |
|
}, |
|
UpdateExpression='SET title = :title, titleCN = :titleCN, contentCN = :contentCN, category = :category, author = :author, content = :content, subtitle = :subtitle, publishDate = :publishDate, link = :link, attachment = :attachment, sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel, LastModifiedDate = :LastModifiedDate', |
|
ExpressionAttributeValues={ |
|
':title': {'S': report['title']}, |
|
':titleCN': {'S': report['titleCN']}, |
|
':contentCN': {'S': report['contentCN']}, |
|
':category': {'S': report['category']}, |
|
':author': {'S': report['author']}, |
|
':content': {'S': report['content']}, |
|
':subtitle': {'S': report['subtitle']}, |
|
':publishDate': {'S': report['publishDate']}, |
|
':link': {'S': report['link']}, |
|
':attachment': {'S': report['attachment']}, |
|
':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}, |
|
':sentimentScore': {'N': str(Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')))}, |
|
':sentimentLabel': {'S': report['sentimentLabel']} |
|
} |
|
) |
|
print(response) |
|
|
|
def update_content_sentiment(report): |
|
dynamodb = get_client_connection() |
|
response = dynamodb.update_item( |
|
TableName="article_test", |
|
Key={ |
|
'id': {'S': report['id']}, |
|
'site': {'S': report['site']} |
|
}, |
|
UpdateExpression='SET sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel', |
|
ExpressionAttributeValues={ |
|
':sentimentScore': {'N': str(Decimal(str(report['sentimentscore'])).quantize(Decimal('0.01')))}, |
|
':sentimentLabel': {'S': report['sentimentlabel']} |
|
} |
|
) |
|
print(response) |
|
|
|
data = download_files_from_s3('data') |
|
|