import requests import uuid import time import urllib.request from datetime import datetime, timedelta from decimal import Decimal import boto3 import os from lxml import etree from googletrans import Translator from transformers import pipeline from PyPDF2 import PdfReader analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] translator = Translator() def datemodifier(date_string): """Date Modifier Function""" try: to_date = time.strptime(date_string,"%Y-%m-%d %H:%M:%S") return time.strftime("%Y-%m-%d",to_date) except: return False def datemodifier_gov(date_string): """Date Modifier Function""" try: to_date = time.strptime(date_string,"%Y-%m-%d-%H:%M:%S") return time.strftime("%Y-%m-%d",to_date) except: return False def fetch_url(url): response = requests.get(url) if response.status_code == 200: return response.text else: return None def translist(infolist): """Translist Function""" out = list(filter(lambda s: s and (isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) return out def encode(content): """Encode Function""" text = '' for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').replace('\u3000',' ').replace('\xa0','').strip() else: line = element text += line return text def extract_from_pdf(url): # Send a GET request to the URL and retrieve the PDF content response = requests.get(url) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as f: f.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as f: pdf_reader = PdfReader(f) num_pages = len(pdf_reader.pages) extracted_text = "" for page in range(num_pages): text = pdf_reader.pages[page].extract_text() if text and text[0].isdigit(): text = text[1:] first_newline_index = text.find('\n') text = text[:first_newline_index+1].replace('\n', ' ') + text[first_newline_index+1:].replace('\n', '') extracted_text += text return extracted_text def get_db_connection(): """Get dynamoDB connection""" dynamodb = boto3.resource( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def upsert_content(report): """Upsert the content records""" dynamodb = get_db_connection() table = dynamodb.Table('article_china') # Define the item data item = { 'id': str(report['id']), 'site': report['site'], 'title': report['title'], # 'originalSite': report['originalSite'], # 'originalTitle': report['originalTitle'], # 'originalContent': report['originalContent'], 'category': report['category'], # 'author': report['author'], 'content': report['content'], 'publishDate': report['publishDate'], 'link': report['url'], # 'attachment': report['reporturl'], # 'authorID': str(report['authorid']), 'sentimentScore': str(Decimal(report['sentimentScore']).quantize(Decimal('0.01'))), 'sentimentLabel': report['sentimentLabel'], 'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") } response = table.put_item(Item=item) print(response) # categoryu_urls = ["https://www.ndrc.gov.cn/xxgk/zcfb/fzggwl/", "https://www.ndrc.gov.cn/xxgk/zcfb/ghxwj/","https://www.ndrc.gov.cn/xxgk/zcfb/ghwb/","https://www.ndrc.gov.cn/xxgk/zcfb/gg/","https://www.ndrc.gov.cn/xxgk/zcfb/tz/","https://www.ndrc.gov.cn/xxgk/zcfb/pifu/","https://www.ndrc.gov.cn/xxgk/zcfb/qt/"] # for categoryu_url in categoryu_urls: # req = urllib.request.urlopen(categoryu_url) # text = req.read() # html_text = text.decode("utf-8") # page = etree.HTML(html_text) # articlelist = page.xpath("//div[contains(@class, 'list')]/ul/li[not(@class = 'empty')]") # for article in articlelist: # if isinstance(article, etree._Element): # subelement = etree.tostring(article).decode() # subpage = etree.HTML(subelement) # date = subpage.xpath("//span/text()")[0] # parsed_datetime = datetime.strptime(time.strftime("%Y-%m-%d", time.strptime(date,"%Y/%m/%d")), "%Y-%m-%d") # if parsed_datetime > (datetime.today() - timedelta(days=183)): # urls = subpage.xpath("//a[contains(@target, '_blank')]/@href") # for url in urls: # try: # article = {} # if "/jd/jd" in url: # url = url.replace("../../", "https://www.ndrc.gov.cn/xxgk/") # else: # url = url.replace("./", categoryu_url) # req = urllib.request.urlopen(url) # text = req.read() # html_text = text.decode("utf-8") # page = etree.HTML(html_text) # attachment_urls = page.xpath("//div[contains(@class, 'attachment_r')]//a/@href") # for attachment_url in attachment_urls: # if ".pdf" in attachment_url: # pdf_url = url.rsplit('/', 1)[0] + attachment_url.replace('./','/') # pdf_content = extract_from_pdf(pdf_url) # article['originalContent'] = pdf_content # content_eng = '' # for element in article['originalContent'].split("。"): # content_eng += translator.translate(element, dest='en').text + ' ' # article['content'] = content_eng # article['site'] = "National Development and Reform Commission" # article['originalSite'] = "国家发展和改革委员会" # article['originalTitle'] = page.xpath("//meta[@name = 'ArticleTitle']/@content")[0] # article['title'] = translator.translate(article['originalTitle'], dest='en').text # article['url'] = url # article['category']= "Policy Release" # article['publishDate'] = datemodifier(page.xpath("//meta[@name = 'PubDate']/@content")[0]) # article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['title']+article['publishDate']) # label_dict = { # "positive": "+", # "negative": "-", # "neutral": "0", # } # sentiment_score = 0 # maximum_value = 0 # raw_sentiment = analyzer(article['content'][:512], return_all_scores=True) # sentiment_label = None # for sentiment_dict in raw_sentiment[0]: # value = sentiment_dict["score"] # if value > maximum_value: # sentiment_label = sentiment_dict["label"] # maximum_value = value # if sentiment_dict["label"] == "positive": # sentiment_score = sentiment_score + value # if sentiment_dict["label"] == "negative": # sentiment_score = sentiment_score - value # else: # sentiment_score = sentiment_score + 0 # article['sentimentScore'] = sentiment_score # article['sentimentLabel'] = label_dict[sentiment_label] # upsert_content(article) # except Exception as error: # print(error) i = 0 while i > -1: if i == 0: categoryu_url = "https://www.ndrc.gov.cn/xxgk/jd/jd/index.html" else: categoryu_url = f"https://www.ndrc.gov.cn/xxgk/jd/jd/index_{i}.html" i = i + 1 req = urllib.request.urlopen(categoryu_url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) articlelist = page.xpath("//div[contains(@class, 'list')]/ul/li[not(@class = 'empty')]") for article in articlelist: if isinstance(article, etree._Element): subelement = etree.tostring(article).decode() subpage = etree.HTML(subelement) date = subpage.xpath("//span/text()")[0] parsed_datetime = datetime.strptime(time.strftime("%Y-%m-%d", time.strptime(date,"%Y/%m/%d")), "%Y-%m-%d") if parsed_datetime < (datetime.today() - timedelta(days=183)): i = -1 else: urls = subpage.xpath("//a[contains(@target, '_blank')]/@href") for url in urls: try: article = {} if "https://www.gov.cn" in url: req = urllib.request.urlopen(url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) article['originalContent'] = encode(page.xpath("//div[contains(@id, 'UCAP-CONTENT')]//p")) content_eng = '' for element in article['originalContent'].split("。"): content_eng += translator.translate(element, dest='en').text + ' ' article['content'] = content_eng article['site'] = "State Council" article['originalSite'] = "国务院" article['originalTitle'] = page.xpath("//title/text()")[0] article['title'] = translator.translate(article['originalTitle'], dest='en').text article['url'] = url article['category']= "Policy Release" article['publishDate'] = datemodifier_gov(page.xpath("//meta[@name = 'firstpublishedtime']/@content")[0]) elif "/zcfb/tz/" in url: url = url.replace("../../zcfb/tz/", "https://www.ndrc.gov.cn/xxgk/zcfb/tz/") req = urllib.request.urlopen(url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) article['originalContent'] = encode(page.xpath("//div[contains(@class, 'TRS_Editor')]//p")) content_eng = '' for element in article['originalContent'].split("。"): content_eng += translator.translate(element, dest='en').text + ' ' article['content'] = content_eng article['site'] = "National Development and Reform Commission" article['originalSite'] = "国家发展和改革委员会" article['originalTitle'] = page.xpath("//meta[@name = 'ArticleTitle']/@content")[0] article['title'] = translator.translate(article['originalTitle'], dest='en').text article['url'] = url article['category']= "Policy Release" article['publishDate'] = datemodifier(page.xpath("//meta[@name = 'PubDate']/@content")[0]) else: url = url.replace("../../", "https://www.ndrc.gov.cn/xxgk/jd/jd/") url = url.replace("./", "https://www.ndrc.gov.cn/xxgk/jd/jd/") req = urllib.request.urlopen(url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) article['originalContent'] = encode(page.xpath("//div[contains(@class, 'TRS_Editor')]//p")) content_eng = '' for element in article['originalContent'].split("。"): content_eng += translator.translate(element, dest='en').text + ' ' article['content'] = content_eng article['site'] = "National Development and Reform Commission" article['originalSite'] = "国家发展和改革委员会" article['originalTitle'] = page.xpath("//meta[@name = 'ArticleTitle']/@content")[0] article['title'] = translator.translate(article['originalTitle'], dest='en').text article['url'] = url article['category']= "Policy Interpretation" article['publishDate'] = datemodifier(page.xpath("//meta[@name = 'PubDate']/@content")[0]) article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['title']+article['publishDate']) label_dict = { "positive": "+", "negative": "-", "neutral": "0", } sentiment_score = 0 maximum_value = 0 raw_sentiment = analyzer(article['content'][:512], return_all_scores=True) sentiment_label = None for sentiment_dict in raw_sentiment[0]: value = sentiment_dict["score"] if value > maximum_value: sentiment_label = sentiment_dict["label"] maximum_value = value if sentiment_dict["label"] == "positive": sentiment_score = sentiment_score + value if sentiment_dict["label"] == "negative": sentiment_score = sentiment_score - value else: sentiment_score = sentiment_score + 0 article['sentimentScore'] = sentiment_score article['sentimentLabel'] = label_dict[sentiment_label] upsert_content(article) except Exception as error: print(error)