|
import requests |
|
import uuid |
|
import time |
|
import json |
|
import urllib.request |
|
from lxml import etree |
|
from googletrans import Translator |
|
import boto3 |
|
import os |
|
from datetime import datetime, timedelta |
|
from decimal import Decimal |
|
from transformers import pipeline |
|
|
|
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] |
|
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] |
|
|
|
analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") |
|
|
|
translator = Translator() |
|
|
|
def datemodifier(date_string): |
|
"""Date Modifier Function""" |
|
try: |
|
to_date = time.strptime(date_string,"%Y-%m-%d %H:%M:%S.%f") |
|
return time.strftime("%Y-%m-%d",to_date) |
|
except: |
|
return False |
|
|
|
def fetch_url(url): |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
return response.text |
|
else: |
|
return None |
|
|
|
def translist(infolist): |
|
"""Translist Function""" |
|
out = list(filter(lambda s: s and |
|
(isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) |
|
return out |
|
|
|
def encode(content): |
|
"""Encode Function""" |
|
text = '' |
|
for element in content: |
|
if isinstance(element, etree._Element): |
|
subelement = etree.tostring(element).decode() |
|
subpage = etree.HTML(subelement) |
|
tree = subpage.xpath('//text()') |
|
line = ''.join(translist(tree)).\ |
|
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() |
|
else: |
|
line = element |
|
text += line |
|
return text |
|
|
|
def get_db_connection(): |
|
"""Get dynamoDB connection""" |
|
dynamodb = boto3.resource( |
|
service_name='dynamodb', |
|
region_name='us-east-1', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY |
|
) |
|
return dynamodb |
|
|
|
def upsert_content(report): |
|
"""Upsert the content records""" |
|
dynamodb = get_db_connection() |
|
table = dynamodb.Table('article_china') |
|
|
|
|
|
item = { |
|
'id': str(report['id']), |
|
'site': report['site'], |
|
'title': report['title'], |
|
|
|
|
|
|
|
'category': "Macroeconomic Research", |
|
'author': report['author'], |
|
'content': report['content'], |
|
'publishDate': report['publishDate'], |
|
'link': report['url'], |
|
'attachment': report['reporturl'], |
|
'authorID': str(report['authorid']), |
|
'sentimentScore': str(Decimal(report['sentimentScore']).quantize(Decimal('0.01'))), |
|
'sentimentLabel': report['sentimentLabel'], |
|
'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") |
|
} |
|
response = table.put_item(Item=item) |
|
print(response) |
|
|
|
today = datetime.today().strftime('%Y-%m-%d') |
|
beginDate = (datetime.today() - timedelta(days=183)).strftime('%Y-%m-%d') |
|
i = 0 |
|
while i > -1: |
|
url = "https://reportapi.eastmoney.com/report/jg" |
|
params = { |
|
"cb": "datatable8544623", |
|
"pageSize": "100", |
|
"beginTime": beginDate, |
|
"endTime": today, |
|
"pageNo": i, |
|
"qType": "3", |
|
} |
|
url = url + "?" + "&".join(f"{key}={value}" for key, value in params.items()) |
|
print(url) |
|
content = fetch_url(url) |
|
if content: |
|
start_index = content.find("(") |
|
if start_index != -1: |
|
result = content[start_index + 1: -1] |
|
else: |
|
result = content |
|
reportinfo = json.loads(result) |
|
if reportinfo["size"] > 0: |
|
i = i + 1 |
|
for report in reportinfo['data']: |
|
try: |
|
url = f"https://data.eastmoney.com/report/zw_macresearch.jshtml?encodeUrl={report['encodeUrl']}" |
|
req = urllib.request.urlopen(url) |
|
text = req.read() |
|
html_text = text.decode("utf-8") |
|
page = etree.HTML(html_text) |
|
content = encode(page.xpath("//div[contains(@class, 'ctx-content')]//p")) |
|
reporturl = encode(page.xpath("//a[contains(@class, 'pdf-link')]/@href")) |
|
report['url'] = url |
|
if report['orgSName'] == "''": |
|
report['site'] = translator.translate(report['orgSName'], dest='en').text |
|
report['originalSite'] = report['orgSName'] |
|
else: |
|
report['site'] = translator.translate(report['orgName'], dest='en').text |
|
report['originalSite'] = report['orgSName'] |
|
report['reporturl'] = reporturl |
|
report['originalTitle'] = report['title'] |
|
report['title'] = translator.translate(report['title'], dest='en').text |
|
report['author'] = translator.translate(report['researcher'], dest='en').text |
|
report['originalAuthor'] = report['researcher'] |
|
report['originalContent'] = content |
|
content_eng = '' |
|
for element in article['originalContent'].split("。"): |
|
content_eng += translator.translate(element, dest='en').text + ' ' |
|
article['content'] = content_eng |
|
report['authorid'] = uuid.uuid5(uuid.NAMESPACE_OID, report['author']) |
|
report['publishDate'] = datemodifier(report['publishDate']) |
|
report['id'] = uuid.uuid5(uuid.NAMESPACE_OID, report['title']+report['publishDate']) |
|
label_dict = { |
|
"positive": "+", |
|
"negative": "-", |
|
"neutral": "0", |
|
} |
|
sentiment_score = 0 |
|
maximum_value = 0 |
|
raw_sentiment = analyzer(report['content'][:512], return_all_scores=True) |
|
sentiment_label = None |
|
for sentiment_dict in raw_sentiment[0]: |
|
value = sentiment_dict["score"] |
|
if value > maximum_value: |
|
sentiment_label = sentiment_dict["label"] |
|
maximum_value = value |
|
if sentiment_dict["label"] == "positive": |
|
sentiment_score = sentiment_score + value |
|
if sentiment_dict["label"] == "negative": |
|
sentiment_score = sentiment_score - value |
|
else: |
|
sentiment_score = sentiment_score + 0 |
|
report['sentimentScore'] = sentiment_score |
|
report['sentimentLabel'] = label_dict[sentiment_label] |
|
upsert_content(report) |
|
except Exception as error: |
|
print(error) |
|
else: |
|
print(reportinfo) |
|
i = -1 |
|
else: |
|
print("Failed to fetch URL:", url) |
|
|