|
"""Utilis Functions""" |
|
import os |
|
import time |
|
from datetime import datetime |
|
from decimal import Decimal |
|
import requests |
|
import boto3 |
|
from lxml import etree |
|
from googletrans import Translator |
|
from transformers import pipeline |
|
from PyPDF2 import PdfReader |
|
|
|
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] |
|
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] |
|
|
|
|
|
|
|
analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") |
|
|
|
translator = Translator() |
|
|
|
def translate(text): |
|
return translator.translate(text, dest='en').text |
|
|
|
def datemodifier(date_string, date_format): |
|
"""Date Modifier Function""" |
|
try: |
|
to_date = time.strptime(date_string,date_format) |
|
return time.strftime("%Y-%m-%d",to_date) |
|
except: |
|
return False |
|
|
|
def fetch_url(url): |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
return response.text |
|
else: |
|
return None |
|
|
|
def translist(infolist): |
|
"""Translist Function""" |
|
out = list(filter(lambda s: s and |
|
(isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) |
|
return out |
|
|
|
def encode(content): |
|
"""Encode Function""" |
|
text = '' |
|
for element in content[:1]: |
|
if isinstance(element, etree._Element): |
|
subelement = etree.tostring(element).decode() |
|
subpage = etree.HTML(subelement) |
|
tree = subpage.xpath('//text()') |
|
line = ''.join(translist(tree)).\ |
|
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() |
|
else: |
|
line = element |
|
text += line |
|
index = text.find('打印本页') |
|
if index != -1: |
|
text = text[:index] |
|
|
|
return text |
|
|
|
def extract_from_pdf(url): |
|
|
|
response = requests.get(url) |
|
pdf_content = response.content |
|
|
|
|
|
with open("downloaded_file.pdf", "wb") as f: |
|
f.write(pdf_content) |
|
|
|
|
|
with open("downloaded_file.pdf", "rb") as f: |
|
pdf_reader = PdfReader(f) |
|
num_pages = len(pdf_reader.pages) |
|
extracted_text = "" |
|
extracted_text_eng = "" |
|
for page in range(num_pages): |
|
text = pdf_reader.pages[page].extract_text() |
|
if text and text[0].isdigit(): |
|
text = text[1:] |
|
first_newline_index = text.find('\n') |
|
text = text[:first_newline_index+1].replace('\n', ' ') + text[first_newline_index+1:].replace('\n', '') |
|
extracted_text_eng += translator.translate(text, dest='en').text |
|
extracted_text += text |
|
return extracted_text, extracted_text_eng |
|
|
|
def get_db_connection(): |
|
"""Get dynamoDB connection""" |
|
dynamodb = boto3.resource( |
|
service_name='dynamodb', |
|
region_name='us-east-1', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY |
|
) |
|
return dynamodb |
|
|
|
def sentiment_computation(content): |
|
label_dict = { |
|
"positive": "+", |
|
"negative": "-", |
|
"neutral": "0", |
|
} |
|
sentiment_score = 0 |
|
maximum_value = 0 |
|
raw_sentiment = analyzer(content[:512], return_all_scores=True) |
|
sentiment_label = None |
|
for sentiment_dict in raw_sentiment[0]: |
|
value = sentiment_dict["score"] |
|
if value > maximum_value: |
|
sentiment_label = sentiment_dict["label"] |
|
maximum_value = value |
|
if sentiment_dict["label"] == "positive": |
|
sentiment_score = sentiment_score + value |
|
if sentiment_dict["label"] == "negative": |
|
sentiment_score = sentiment_score - value |
|
else: |
|
sentiment_score = sentiment_score + 0 |
|
return sentiment_score, label_dict[sentiment_label] |
|
|
|
def upsert_content(report): |
|
"""Upsert the content records""" |
|
dynamodb = get_db_connection() |
|
table = dynamodb.Table('article_china') |
|
|
|
item = { |
|
'id': str(report['id']), |
|
'site': report['site'], |
|
'title': report['title'], |
|
|
|
|
|
|
|
'category': report['category'], |
|
|
|
'content': report['content'], |
|
'publishDate': report['publishDate'], |
|
'link': report['url'], |
|
|
|
|
|
'sentimentScore': str(Decimal(report['sentimentScore']).quantize(Decimal('0.01'))), |
|
'sentimentLabel': report['sentimentLabel'], |
|
'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") |
|
} |
|
response = table.put_item(Item=item) |
|
print(response) |
|
|