import requests import uuid import time import urllib.request from lxml import etree from googletrans import Translator from transformers import pipeline from PyPDF2 import PdfReader from datetime import datetime, timedelta from decimal import Decimal import boto3 import os AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") translator = Translator() def datemodifier(date_string): """Date Modifier Function""" try: to_date = time.strptime(date_string,"%Y-%m-%d %H:%M:%S") return time.strftime("%Y-%m-%d",to_date) except: return False def fetch_url(url): response = requests.get(url) if response.status_code == 200: return response.text else: return None def translist(infolist): """Translist Function""" out = list(filter(lambda s: s and (isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) return out def encode(content): """Encode Function""" text = '' for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element text += line return text def extract_from_pdf(url): # Send a GET request to the URL and retrieve the PDF content response = requests.get(url) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as f: f.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as f: pdf_reader = PdfReader(f) num_pages = len(pdf_reader.pages) extracted_text = "" extracted_text_eng = "" for page in range(num_pages): text = pdf_reader.pages[page].extract_text() if text and text[0].isdigit(): text = text[1:] first_newline_index = text.find('\n') text = text[:first_newline_index+1].replace('\n', ' ') + text[first_newline_index+1:].replace('\n', '') extracted_text_eng += translator.translate(text, dest='en').text extracted_text += text return extracted_text, extracted_text_eng def get_db_connection(): """Get dynamoDB connection""" dynamodb = boto3.resource( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def upsert_content(report): """Upsert the content records""" dynamodb = get_db_connection() table = dynamodb.Table('article_china') # Define the item data item = { 'id': str(report['id']), 'site': report['site'], 'title': report['title'], # 'originalSite': report['originalSite'], # 'originalTitle': report['originalTitle'], # 'originalContent': report['originalContent'], 'category': report['category'], # 'author': report['author'], 'content': report['content'], 'publishDate': report['publishDate'], 'link': report['url'], # 'attachment': report['reporturl'], # 'authorID': str(report['authorid']), 'sentimentScore': str(Decimal(report['sentimentScore']).quantize(Decimal('0.01'))), 'sentimentLabel': report['sentimentLabel'], 'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") } response = table.put_item(Item=item) print(response) i = 0 while i > -1: if i == 0: categoryu_url = "https://www.mof.gov.cn/zhengwuxinxi/caizhengxinwen/" else: categoryu_url = f"https://www.mof.gov.cn/zhengwuxinxi/caizhengxinwen/index_{i}.htm" i = i + 1 req = urllib.request.urlopen(categoryu_url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) articlelist = page.xpath("//div[contains(@class, 'xwfb_listerji')]/ul/li[not(@class = 'clear')]") for article in articlelist: if isinstance(article, etree._Element): subelement = etree.tostring(article).decode() subpage = etree.HTML(subelement) date = subpage.xpath("//span/text()")[0] parsed_datetime = datetime.strptime(time.strftime("%Y-%m-%d", time.strptime(date,"%Y-%m-%d")), "%Y-%m-%d") if parsed_datetime < (datetime.today() - timedelta(days=183)): i = -1 else: urls = subpage.xpath("//a[contains(@target, '_blank')]/@href") for url in urls: try: article = {} url = url.replace("../", "https://www.mof.gov.cn/zhengwuxinxi/") url = url.replace("./", "https://www.mof.gov.cn/zhengwuxinxi/caizhengxinwen/") req = urllib.request.urlopen(url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) article['originalContent'] = encode(page.xpath("//div[contains(@class, 'TRS_Editor')]//p")) content_eng = '' for element in article['originalContent'].split("。"): content_eng += translator.translate(element, dest='en').text + ' ' article['content'] = content_eng article['site'] = "Ministry of Finance" article['originalSite'] = "财政部" article['originalTitle'] = page.xpath("//meta[@name = 'ArticleTitle']/@content")[0] article['title'] = translator.translate(article['originalTitle'], dest='en').text article['url'] = url article['category']= "Financial News" article['publishDate'] = datemodifier(page.xpath("//meta[@name = 'PubDate']/@content")[0]) article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['title']+article['publishDate']) label_dict = { "positive": "+", "negative": "-", "neutral": "0", } sentiment_score = 0 maximum_value = 0 raw_sentiment = analyzer(article['content'][:512], return_all_scores=True) sentiment_label = None for sentiment_dict in raw_sentiment[0]: value = sentiment_dict["score"] if value > maximum_value: sentiment_label = sentiment_dict["label"] maximum_value = value if sentiment_dict["label"] == "positive": sentiment_score = sentiment_score + value if sentiment_dict["label"] == "negative": sentiment_score = sentiment_score - value else: sentiment_score = sentiment_score + 0 article['sentimentScore'] = sentiment_score article['sentimentLabel'] = label_dict[sentiment_label] upsert_content(article) except Exception as error: print(error) # categoryu_urls = ["https://www.mof.gov.cn/zhengwuxinxi/zhengcefabu/"] # for categoryu_url in categoryu_urls: # req = urllib.request.urlopen(categoryu_url) # text = req.read() # html_text = text.decode("utf-8") # page = etree.HTML(html_text) # articlelist = page.xpath("//div[contains(@class, 'xwfb_listerji')]/ul/li[not(@class = 'clear')]") # for article in articlelist: # if isinstance(article, etree._Element): # subelement = etree.tostring(article).decode() # subpage = etree.HTML(subelement) # date = subpage.xpath("//span/text()")[0] # parsed_datetime = datetime.strptime(time.strftime("%Y-%m-%d", time.strptime(date,"%Y-%m-%d")), "%Y-%m-%d") # if parsed_datetime > (datetime.today() - timedelta(days=183)): # urls = subpage.xpath("//a[contains(@target, '_blank')]/@href") # for url in urls: # try: # article = {} # url = url.replace("./", categoryu_url) # req = urllib.request.urlopen(url) # text = req.read() # html_text = text.decode("utf-8") # page = etree.HTML(html_text) # article['originalContent'] = encode(page.xpath("//div[contains(@class, 'TRS_Editor')]//p")) # content_eng = '' # for element in article['originalContent'].split("。"): # content_eng += translator.translate(element, dest='en').text + ' ' # article['content'] = content_eng # article['site'] = "Ministry of Finance" # article['originalSite'] = "财政部" # article['originalTitle'] = page.xpath("//meta[@name = 'ArticleTitle']/@content")[0] # article['title'] = translator.translate(article['originalTitle'], dest='en').text # article['url'] = url # article['category']= "Policy Release" # article['publishDate'] = datemodifier(page.xpath("//meta[@name = 'PubDate']/@content")[0]) # article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['title']+article['publishDate']) # label_dict = { # "positive": "+", # "negative": "-", # "neutral": "0", # } # sentiment_score = 0 # maximum_value = 0 # raw_sentiment = analyzer(article['content'][:512], return_all_scores=True) # sentiment_label = None # for sentiment_dict in raw_sentiment[0]: # value = sentiment_dict["score"] # if value > maximum_value: # sentiment_label = sentiment_dict["label"] # maximum_value = value # if sentiment_dict["label"] == "positive": # sentiment_score = sentiment_score + value # if sentiment_dict["label"] == "negative": # sentiment_score = sentiment_score - value # else: # sentiment_score = sentiment_score + 0 # article['sentimentScore'] = sentiment_score # article['sentimentLabel'] = label_dict[sentiment_label] # upsert_content(article) # except Exception as error: # print(error) i = 0 while i > -1: if i == 0: categoryu_url = "https://www.mof.gov.cn/zhengwuxinxi/zhengcejiedu/" else: categoryu_url = f"https://www.mof.gov.cn/zhengwuxinxi/zhengcejiedu/index_{i}.htm" i = i + 1 req = urllib.request.urlopen(categoryu_url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) articlelist = page.xpath("//div[contains(@class, 'xwfb_listerji')]/ul/li[not(@class = 'clear')]") for article in articlelist: if isinstance(article, etree._Element): subelement = etree.tostring(article).decode() subpage = etree.HTML(subelement) date = subpage.xpath("//span/text()")[0] parsed_datetime = datetime.strptime(time.strftime("%Y-%m-%d", time.strptime(date,"%Y-%m-%d")), "%Y-%m-%d") if parsed_datetime < (datetime.today() - timedelta(days=183)): i = -1 else: urls = subpage.xpath("//a[contains(@target, '_blank')]/@href") for url in urls: try: article = {} url = url.replace("./", categoryu_url) req = urllib.request.urlopen(url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) article['originalContent'] = encode(page.xpath("//div[contains(@class, 'TRS_Editor')]//p")) content_eng = '' for element in article['originalContent'].split("。"): content_eng += translator.translate(element, dest='en').text + ' ' article['content'] = content_eng article['site'] = "Ministry of Finance" article['originalSite'] = "财政部" article['originalTitle'] = page.xpath("//meta[@name = 'ArticleTitle']/@content")[0] article['title'] = translator.translate(article['originalTitle'], dest='en').text article['url'] = url article['category']= "Policy Interpretation" article['publishDate'] = datemodifier(page.xpath("//meta[@name = 'PubDate']/@content")[0]) article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['title']+article['publishDate']) label_dict = { "positive": "+", "negative": "-", "neutral": "0", } sentiment_score = 0 maximum_value = 0 raw_sentiment = analyzer(article['content'][:512], return_all_scores=True) sentiment_label = None for sentiment_dict in raw_sentiment[0]: value = sentiment_dict["score"] if value > maximum_value: sentiment_label = sentiment_dict["label"] maximum_value = value if sentiment_dict["label"] == "positive": sentiment_score = sentiment_score + value if sentiment_dict["label"] == "negative": sentiment_score = sentiment_score - value else: sentiment_score = sentiment_score + 0 article['sentimentScore'] = sentiment_score article['sentimentLabel'] = label_dict[sentiment_label] upsert_content(article) except Exception as error: print(error)