|
"""Utilis Functions""" |
|
import os |
|
import json |
|
import uuid |
|
import time |
|
import urllib.request |
|
from urllib.parse import urlparse |
|
from datetime import datetime |
|
from decimal import Decimal |
|
import requests |
|
import boto3 |
|
from lxml import etree |
|
from googletrans import Translator |
|
from transformers import pipeline |
|
from PyPDF2 import PdfReader |
|
|
|
|
|
|
|
AWS_ACCESS_KEY_ID="AKIAQFXZMGHQYXKWUDWR" |
|
AWS_SECRET_ACCESS_KEY="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" |
|
|
|
analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") |
|
|
|
translator = Translator() |
|
|
|
with open('xpath.json', 'r', encoding='UTF-8') as f: |
|
xpath_dict = json.load(f) |
|
|
|
def translate(text): |
|
return translator.translate(text, dest='en').text |
|
|
|
def datemodifier(date_string, date_format): |
|
"""Date Modifier Function""" |
|
try: |
|
to_date = time.strptime(date_string,date_format) |
|
return time.strftime("%Y-%m-%d",to_date) |
|
except: |
|
return False |
|
|
|
def fetch_url(url): |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
return response.text |
|
else: |
|
return None |
|
|
|
def translist(infolist): |
|
"""Translist Function""" |
|
out = list(filter(lambda s: s and |
|
(isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) |
|
return out |
|
|
|
def encode(content): |
|
"""Encode Function""" |
|
text = '' |
|
for element in content: |
|
if isinstance(element, etree._Element): |
|
subelement = etree.tostring(element).decode() |
|
subpage = etree.HTML(subelement) |
|
tree = subpage.xpath('//text()') |
|
line = ''.join(translist(tree)).\ |
|
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() |
|
else: |
|
line = element |
|
text += line |
|
return text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def encode_content(content): |
|
"""Encode Function""" |
|
text = '' |
|
for element in content: |
|
if isinstance(element, etree._Element): |
|
subelement = etree.tostring(element).decode() |
|
subpage = etree.HTML(subelement) |
|
tree = subpage.xpath('//text()') |
|
line = ''.join(translist(tree)).\ |
|
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() |
|
else: |
|
line = element |
|
line = line + '\n' |
|
text += line |
|
index = text.find('打印本页') |
|
if index != -1: |
|
text = text[:index] |
|
try: |
|
summary = '\n'.join(text.split('\n')[:2]) |
|
except: |
|
summary = text |
|
return text, summary |
|
|
|
def extract_from_pdf(url): |
|
|
|
response = requests.get(url) |
|
pdf_content = response.content |
|
|
|
|
|
with open("downloaded_file.pdf", "wb") as f: |
|
f.write(pdf_content) |
|
|
|
|
|
with open("downloaded_file.pdf", "rb") as f: |
|
pdf_reader = PdfReader(f) |
|
num_pages = len(pdf_reader.pages) |
|
extracted_text = "" |
|
for page in range(num_pages): |
|
text = pdf_reader.pages[page].extract_text() |
|
if text and text[0].isdigit(): |
|
text = text[1:] |
|
first_newline_index = text.find('\n') |
|
text = text[:first_newline_index+1].replace('\n', ' ') + text[first_newline_index+1:] |
|
extracted_text += text |
|
try: |
|
summary = '\n'.join(extracted_text.split('\n')[:2]) |
|
except: |
|
summary = text |
|
return extracted_text, summary |
|
|
|
def get_db_connection(): |
|
"""Get dynamoDB connection""" |
|
dynamodb = boto3.resource( |
|
service_name='dynamodb', |
|
region_name='us-east-1', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY |
|
) |
|
return dynamodb |
|
|
|
def sentiment_computation(content): |
|
label_dict = { |
|
"positive": "+", |
|
"negative": "-", |
|
"neutral": "0", |
|
} |
|
sentiment_score = 0 |
|
maximum_value = 0 |
|
raw_sentiment = analyzer(content[:512], top_k=None) |
|
sentiment_label = None |
|
for sentiment_dict in raw_sentiment: |
|
value = sentiment_dict["score"] |
|
if value > maximum_value: |
|
sentiment_label = sentiment_dict["label"] |
|
maximum_value = value |
|
if sentiment_dict["label"] == "positive": |
|
sentiment_score = sentiment_score + value |
|
if sentiment_dict["label"] == "negative": |
|
sentiment_score = sentiment_score - value |
|
else: |
|
sentiment_score = sentiment_score + 0 |
|
return sentiment_score, label_dict[sentiment_label] |
|
|
|
def crawl(url, article): |
|
domain = '.'.join(urlparse(url).netloc.split('.')[1:]) |
|
req = urllib.request.urlopen(url) |
|
text = req.read() |
|
html_text = text.decode("utf-8") |
|
page = etree.HTML(html_text) |
|
contentCN, summary = encode_content(page.xpath(xpath_dict[domain]['content'])) |
|
article['originSite'] = xpath_dict[domain]['siteCN'] |
|
article['site'] = xpath_dict[domain]['site'] |
|
article['titleCN'] = encode(page.xpath(xpath_dict[domain]['title'])) |
|
article['title'] = translate(article['titleCN']) |
|
if 'author' in xpath_dict[domain]: |
|
article['author'] = translate(encode(page.xpath(xpath_dict[domain]['author']))) |
|
else: |
|
article['author'] = "" |
|
article['contentCN'] = repr(contentCN) |
|
if len(article['contentCN']) < 10: |
|
return None |
|
CONTENT_ENG = '' |
|
for element in contentCN.split("\n"): |
|
CONTENT_ENG += translate(element) + '\n' |
|
article['content'] = repr(CONTENT_ENG) |
|
if 'subtitle' in xpath_dict[domain]: |
|
article['subtitle'] = translate(encode(page.xpath(xpath_dict[domain]['subtitle']))) |
|
else: |
|
article['subtitle'] = translate(summary) |
|
article['publishDate'] = datemodifier(encode(page.xpath(xpath_dict[domain]['publishdate'])), xpath_dict[domain]['datetime_format']) |
|
article['link'] = url |
|
article['attachment'] = "" |
|
article['sentimentScore'], article['sentimentLabel'] = sentiment_computation(CONTENT_ENG.replace("\n","")) |
|
article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['title']+article['publishDate']) |
|
upsert_content(article) |
|
|
|
def upsert_content(report): |
|
"""Upsert the content records""" |
|
dynamodb = get_db_connection() |
|
table = dynamodb.Table('article_test') |
|
|
|
item = { |
|
'id': str(report['id']), |
|
'site': report['site'], |
|
'title': report['title'], |
|
'titleCN': report['titleCN'], |
|
'site': report['site'], |
|
'contentCN': report['contentCN'], |
|
'category': report['category'], |
|
'author': report['author'], |
|
'content': report['content'], |
|
'subtitle': report['subtitle'], |
|
'publishDate': report['publishDate'], |
|
'link': report['link'], |
|
'attachment': report['attachment'], |
|
|
|
|
|
'sentimentScore': Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')), |
|
'sentimentLabel': report['sentimentLabel'], |
|
'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") |
|
} |
|
response = table.put_item(Item=item) |
|
print(response) |
|
|
|
def get_client_connection(): |
|
"""Get dynamoDB connection""" |
|
dynamodb = boto3.client( |
|
service_name='dynamodb', |
|
region_name='us-east-1', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY |
|
) |
|
return dynamodb |
|
|
|
def delete_records(item): |
|
dynamodb_client = get_client_connection() |
|
dynamodb_client.delete_item( |
|
TableName="article_test", |
|
Key={ |
|
'id': {'S': item['id']}, |
|
'site': {'S': item['site']} |
|
} |
|
) |
|
|
|
def update_content(report): |
|
dynamodb = get_client_connection() |
|
response = dynamodb.update_item( |
|
TableName="article_test", |
|
Key={ |
|
'id': {'S': report['id']}, |
|
'site': {'S': report['site']} |
|
}, |
|
UpdateExpression='SET sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel', |
|
ExpressionAttributeValues={ |
|
':sentimentScore': {'N': str(Decimal(str(report['sentimentscore'])).quantize(Decimal('0.01')))}, |
|
':sentimentLabel': {'S': report['sentimentlabel']} |
|
} |
|
) |
|
print(response) |
|
|
|
|