"""Utilis Functions""" import os import json import uuid import time import urllib.request from urllib.parse import urlparse from datetime import datetime from decimal import Decimal import requests import boto3 from lxml import etree from googletrans import Translator from transformers import pipeline from PyPDF2 import PdfReader # AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] # AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] AWS_ACCESS_KEY_ID="AKIAQFXZMGHQYXKWUDWR" AWS_SECRET_ACCESS_KEY="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" analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") translator = Translator() with open('xpath.json', 'r', encoding='UTF-8') as f: xpath_dict = json.load(f) def translate(text): return translator.translate(text, dest='en').text def datemodifier(date_string, date_format): """Date Modifier Function""" try: to_date = time.strptime(date_string,date_format) return time.strftime("%Y-%m-%d",to_date) except: return False def fetch_url(url): response = requests.get(url) if response.status_code == 200: return response.text else: return None def translist(infolist): """Translist Function""" out = list(filter(lambda s: s and (isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) return out def encode(content): """Encode Function""" text = '' for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element text += line return text # def encode(content): # """Encode Function""" # text = '' # for element in content: # if isinstance(element, etree._Element): # subelement = etree.tostring(element).decode() # subpage = etree.HTML(subelement) # tree = subpage.xpath('//text()') # line = ''.join(translist(tree)).\ # replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() # else: # line = element # text += line # index = text.find('打印本页') # if index != -1: # text = text[:index] def encode_content(content): """Encode Function""" text = '' for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element line = line + '\n' text += line index = text.find('打印本页') if index != -1: text = text[:index] try: summary = '\n'.join(text.split('\n')[:2]) except: summary = text return text, summary def extract_from_pdf(url): # Send a GET request to the URL and retrieve the PDF content response = requests.get(url) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as f: f.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as f: pdf_reader = PdfReader(f) num_pages = len(pdf_reader.pages) extracted_text = "" for page in range(num_pages): text = pdf_reader.pages[page].extract_text() if text and text[0].isdigit(): text = text[1:] first_newline_index = text.find('\n') text = text[:first_newline_index+1].replace('\n', ' ') + text[first_newline_index+1:] extracted_text += text try: summary = '\n'.join(extracted_text.split('\n')[:2]) except: summary = text return extracted_text, summary def get_db_connection(): """Get dynamoDB connection""" dynamodb = boto3.resource( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def sentiment_computation(content): label_dict = { "positive": "+", "negative": "-", "neutral": "0", } sentiment_score = 0 maximum_value = 0 raw_sentiment = analyzer(content[:512], top_k=None) sentiment_label = None for sentiment_dict in raw_sentiment: value = sentiment_dict["score"] if value > maximum_value: sentiment_label = sentiment_dict["label"] maximum_value = value if sentiment_dict["label"] == "positive": sentiment_score = sentiment_score + value if sentiment_dict["label"] == "negative": sentiment_score = sentiment_score - value else: sentiment_score = sentiment_score + 0 return sentiment_score, label_dict[sentiment_label] def crawl(url, article): domain = '.'.join(urlparse(url).netloc.split('.')[1:]) req = urllib.request.urlopen(url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) contentCN, summary = encode_content(page.xpath(xpath_dict[domain]['content'])) article['originSite'] = xpath_dict[domain]['siteCN'] article['site'] = xpath_dict[domain]['site'] article['titleCN'] = encode(page.xpath(xpath_dict[domain]['title'])) article['title'] = translate(article['titleCN']) if 'author' in xpath_dict[domain]: article['author'] = translate(encode(page.xpath(xpath_dict[domain]['author']))) else: article['author'] = "" article['contentCN'] = repr(contentCN) if len(article['contentCN']) < 10: return None CONTENT_ENG = '' for element in contentCN.split("\n"): CONTENT_ENG += translate(element) + '\n' article['content'] = repr(CONTENT_ENG) if 'subtitle' in xpath_dict[domain]: article['subtitle'] = translate(encode(page.xpath(xpath_dict[domain]['subtitle']))) else: article['subtitle'] = translate(summary) article['publishDate'] = datemodifier(encode(page.xpath(xpath_dict[domain]['publishdate'])), xpath_dict[domain]['datetime_format']) article['link'] = url article['attachment'] = "" article['sentimentScore'], article['sentimentLabel'] = sentiment_computation(CONTENT_ENG.replace("\n","")) article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['title']+article['publishDate']) upsert_content(article) def upsert_content(report): """Upsert the content records""" dynamodb = get_db_connection() table = dynamodb.Table('article_test') # Define the item data item = { 'id': str(report['id']), 'site': report['site'], 'title': report['title'], 'titleCN': report['titleCN'], 'site': report['site'], 'contentCN': report['contentCN'], 'category': report['category'], 'author': report['author'], 'content': report['content'], 'subtitle': report['subtitle'], 'publishDate': report['publishDate'], 'link': report['link'], 'attachment': report['attachment'], # 'authorID': str(report['authorid']), # 'entityList': report['entitylist'], 'sentimentScore': Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')), 'sentimentLabel': report['sentimentLabel'], 'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") } response = table.put_item(Item=item) print(response) def get_client_connection(): """Get dynamoDB connection""" dynamodb = boto3.client( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def delete_records(item): dynamodb_client = get_client_connection() dynamodb_client.delete_item( TableName="article_test", Key={ 'id': {'S': item['id']}, 'site': {'S': item['site']} } ) def update_content(report): dynamodb = get_client_connection() response = dynamodb.update_item( TableName="article_test", Key={ 'id': {'S': report['id']}, 'site': {'S': report['site']} }, UpdateExpression='SET sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel', ExpressionAttributeValues={ ':sentimentScore': {'N': str(Decimal(str(report['sentimentscore'])).quantize(Decimal('0.01')))}, ':sentimentLabel': {'S': report['sentimentlabel']} } ) print(response)