"""Utilis Functions""" import os import time from datetime import datetime from decimal import Decimal import requests import boto3 from lxml import etree from googletrans import Translator from transformers import pipeline from PyPDF2 import PdfReader AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] # AWS_ACCESS_KEY_ID="AKIAQFXZMGHQYXKWUDWR" # AWS_SECRET_ACCESS_KEY="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" analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") translator = Translator() def translate(text): return translator.translate(text, dest='en').text def datemodifier(date_string, date_format): """Date Modifier Function""" try: to_date = time.strptime(date_string,date_format) return time.strftime("%Y-%m-%d",to_date) except: return False def fetch_url(url): response = requests.get(url) if response.status_code == 200: return response.text else: return None def translist(infolist): """Translist Function""" out = list(filter(lambda s: s and (isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) return out def encode(content): """Encode Function""" text = '' for element in content[:1]: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element text += line index = text.find('打印本页') if index != -1: text = text[:index] return text def extract_from_pdf(url): # Send a GET request to the URL and retrieve the PDF content response = requests.get(url) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as f: f.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as f: pdf_reader = PdfReader(f) num_pages = len(pdf_reader.pages) extracted_text = "" extracted_text_eng = "" for page in range(num_pages): text = pdf_reader.pages[page].extract_text() if text and text[0].isdigit(): text = text[1:] first_newline_index = text.find('\n') text = text[:first_newline_index+1].replace('\n', ' ') + text[first_newline_index+1:].replace('\n', '') extracted_text_eng += translator.translate(text, dest='en').text extracted_text += text return extracted_text, extracted_text_eng def get_db_connection(): """Get dynamoDB connection""" dynamodb = boto3.resource( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def sentiment_computation(content): label_dict = { "positive": "+", "negative": "-", "neutral": "0", } sentiment_score = 0 maximum_value = 0 raw_sentiment = analyzer(content[:512], return_all_scores=True) sentiment_label = None for sentiment_dict in raw_sentiment[0]: value = sentiment_dict["score"] if value > maximum_value: sentiment_label = sentiment_dict["label"] maximum_value = value if sentiment_dict["label"] == "positive": sentiment_score = sentiment_score + value if sentiment_dict["label"] == "negative": sentiment_score = sentiment_score - value else: sentiment_score = sentiment_score + 0 return sentiment_score, label_dict[sentiment_label] def upsert_content(report): """Upsert the content records""" dynamodb = get_db_connection() table = dynamodb.Table('article_china') # Define the item data item = { 'id': str(report['id']), 'site': report['site'], 'title': report['title'], # 'originalSite': report['originalSite'], # 'originalTitle': report['originalTitle'], # 'originalContent': report['originalContent'], 'category': report['category'], # 'author': report['author'], 'content': report['content'], 'publishDate': report['publishDate'], 'link': report['url'], # 'attachment': report['reporturl'], # 'authorID': str(report['authorid']), 'sentimentScore': str(Decimal(report['sentimentScore']).quantize(Decimal('0.01'))), 'sentimentLabel': report['sentimentLabel'], 'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") } response = table.put_item(Item=item) print(response)