import requests import uuid import time import urllib.request from lxml import etree from googletrans import Translator from transformers import pipeline from PyPDF2 import PdfReader analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") translator = Translator() def datemodifier(date_string): """Date Modifier Function""" try: to_date = time.strptime(date_string,"%Y-%m-%d %H:%M:%S") return time.strftime("%Y-%m-%d",to_date) except: return False def fetch_url(url): response = requests.get(url) if response.status_code == 200: return response.text else: return None def translist(infolist): """Translist Function""" out = list(filter(lambda s: s and (isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) return out def encode(content): """Encode Function""" text = '' for element in content[:1]: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element text += line index = text.find('打印本页') if index != -1: text = text[:index] return text def extract_from_pdf(url): # Send a GET request to the URL and retrieve the PDF content response = requests.get(url) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as f: f.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as f: pdf_reader = PdfReader(f) num_pages = len(pdf_reader.pages) extracted_text = "" extracted_text_eng = "" for page in range(num_pages): text = pdf_reader.pages[page].extract_text() if text and text[0].isdigit(): text = text[1:] first_newline_index = text.find('\n') text = text[:first_newline_index+1].replace('\n', ' ') + text[first_newline_index+1:].replace('\n', '') extracted_text_eng += translator.translate(text, dest='en').text extracted_text += text return extracted_text, extracted_text_eng """Upload file to dynamoDB""" # import datetime from datetime import datetime, timedelta from decimal import Decimal import boto3 AWS_ACCESS_KEY_ID = "AKIAQFXZMGHQYXKWUDWR" AWS_SECRET_ACCESS_KEY = "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" def get_db_connection(): """Get dynamoDB connection""" dynamodb = boto3.resource( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def upsert_content(report): """Upsert the content records""" dynamodb = get_db_connection() table = dynamodb.Table('article_china') # Define the item data item = { 'id': str(report['id']), 'site': report['site'], 'title': report['title'], # 'originalSite': report['originalSite'], # 'originalTitle': report['originalTitle'], # 'originalContent': report['originalContent'], 'category': report['category'], # 'author': report['author'], 'content': report['content'], 'publishDate': report['publishDate'], 'link': report['url'], # 'attachment': report['reporturl'], # 'authorID': str(report['authorid']), 'sentimentScore': str(Decimal(report['sentimentScore']).quantize(Decimal('0.01'))), 'sentimentLabel': report['sentimentLabel'], 'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") } response = table.put_item(Item=item) print(response) i = 0 while i > -1: if i == 0: categoryu_url = "http://www.pbc.gov.cn/rmyh/3963412/3963426/index.html" else: j = i + 1 categoryu_url = f"http://www.pbc.gov.cn/rmyh/3963412/3963426/index_{j}.html" i = i + 1 response = requests.get(categoryu_url) page = etree.HTML(response.text) urls = page.xpath("//td[contains(@height,'22')]//a[contains(@target, '_blank')]/@href") urls = [item for item in urls if item.startswith("/rmyh/")] for url in urls: try: url = "http://www.pbc.gov.cn" + url article = {} response = requests.get(url) response.encoding = 'utf-8' page = etree.HTML(response.text) article['originalContent'] = encode(page.xpath("//div[@class='mainw950']//td[@class='content']/font[@class='zoom1']//p")) content_eng = '' for element in article['originalContent'].split("。"): content_eng += translator.translate(element, dest='en').text + ' ' article['content'] = content_eng article['site'] = "The People's Bank of China" article['originalSite'] = "中国人民银行" article['originalTitle'] = page.xpath("//title/text()")[0] article['title'] = translator.translate(article['originalTitle'], dest='en').text article['url'] = url article['category']= "Policy Interpretation" article['publishDate'] = datemodifier(page.xpath("//meta[@name = '页面生成时间']/@content")[0]) parsed_datetime = datetime.strptime(time.strftime("%Y-%m-%d", time.strptime(article['publishDate'],"%Y-%m-%d")), "%Y-%m-%d") if parsed_datetime < (datetime.today() - timedelta(days=183)): i = -1 else: article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['title']+article['publishDate']) label_dict = { "positive": "+", "negative": "-", "neutral": "0", } sentiment_score = 0 maximum_value = 0 raw_sentiment = analyzer(article['content'][:512], return_all_scores=True) sentiment_label = None for sentiment_dict in raw_sentiment[0]: value = sentiment_dict["score"] if value > maximum_value: sentiment_label = sentiment_dict["label"] maximum_value = value if sentiment_dict["label"] == "positive": sentiment_score = sentiment_score + value if sentiment_dict["label"] == "negative": sentiment_score = sentiment_score - value else: sentiment_score = sentiment_score + 0 article['sentimentScore'] = sentiment_score article['sentimentLabel'] = label_dict[sentiment_label] upsert_content(article) except Exception as error: print(error)