"""Utilis Functions""" import os import re import json import uuid import time import glob import urllib.request from urllib.parse import urlparse from datetime import datetime, timedelta from decimal import Decimal import pandas as pd import requests import boto3 from lxml import etree from googletrans import Translator from transformers import pipeline from PyPDF2 import PdfReader AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] # AWS_ACCESS_KEY_ID="AKIAQFXZMGHQYXKWUDWR" # AWS_SECRET_ACCESS_KEY="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" analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") translator = Translator() with open('xpath.json', 'r', encoding='UTF-8') as f: xpath_dict = json.load(f) with open('xpath.json', 'r', encoding='UTF-8') as f: patterns = json.load(f) def get_client_connection(): """Get dynamoDB connection""" dynamodb = boto3.client( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def update_reference(report): dynamodb = get_client_connection() response = dynamodb.update_item( TableName="reference_china", Key={ 'id': {'S': str(report['refID'])}, 'sourceID': {'S': report['sourceID']} }, UpdateExpression='SET link = :link, referenceID = :referenceID, LastModifiedDate = :LastModifiedDate', ExpressionAttributeValues={ ':link': {'S': report['link']}, ':referenceID': {'S': report['referenceID']}, ':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}, } ) print(response) def download_files_from_s3(folder): """Download Data Files""" if not os.path.exists(folder): os.makedirs(folder) client = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) response = client.list_objects_v2(Bucket='china-securities-report', Prefix=f"{folder}/") for obj in response['Contents']: key = obj['Key'] if key.endswith('.parquet'): client.download_file('china-securities-report', key, key) file_paths = glob.glob(os.path.join(folder, '*.parquet')) return pd.concat([pd.read_parquet(file_path) for file_path in file_paths], ignore_index=True) def extract_from_pdf_by_pattern(url, pattern): # Send a GET request to the URL and retrieve the PDF content try: response = requests.get(url, timeout=60) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as file: file.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as file: pdf_reader = PdfReader(file) extracted_text = "" if 'pages' in pattern: pages = pattern['pages'] else: pages = len(pdf_reader.pages) for page in pages: text = pdf_reader.pages[page].extract_text() if 'keyword' in pattern and pattern['keyword'] in text: text = text.split(pattern['keyword'], 1)[1].strip() else: text = text.strip() extracted_text += text except: extracted_text = '' return extracted_text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n',' ').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n') def get_reference_by_regex(pattern, text): return re.findall(pattern, text) def isnot_substring(list_a, string_to_check): for s in list_a: if s in string_to_check: return False return True def extract_reference(row): try: pattern = next((elem for elem in patterns if elem['site'] == row['site']), None) extracted_text = extract_from_pdf_by_pattern(row['attachment'],pattern) reference_titles = re.findall(pattern['article_regex'], extracted_text) reference_dates = re.findall(pattern['date_regex'], extracted_text) reference_titles = [s.replace(' ', '') for s in reference_titles] reference_dates = [s.replace(' ', '') for s in reference_dates] print(reference_dates, reference_titles) if 'remove' in pattern: for remove_string in pattern['remove']: reference_titles = [s.replace(remove_string, '') for s in reference_titles] for title, date in zip(reference_titles, reference_dates): print(title, date) try: date = datetime.strptime(date, pattern['date_format']) except: date = datetime(2006, 1, 1) dates = [] if 'date_range' in pattern: for i in range(pattern['date_range'] + 1): dates.append((date + timedelta(days=i)).strftime('%Y-%m-%d')) dates.append((date - timedelta(days=i)).strftime('%Y-%m-%d')) dates.append(date.strftime('%Y-%m-%d')) date = date.strftime('%Y-%m-%d') if 'split' in pattern: for split_item in pattern['split']: if 'exceptional_string' in split_item: if split_item['string'] in title and isnot_substring(split_item['exceptional_string'], title): title = re.split(split_item['string'], title)[split_item['index']] else: if split_item['string'] in title: title = title.split(split_item['string'])[split_item['index']] if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) == 0: print("------------ = 0 ------------") print(date, repr(title)) elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) > 1: print("------------ > 1 ------------") print(date, repr(title)) else: print("------------ = 1 ------------") reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))] row['referenceID'] = reference_df.iloc[0]['id'] row['link'] = reference_df.iloc[0]['link'] row['sourceID'] = row['id_x'] row['refID'] = uuid.uuid5(uuid.NAMESPACE_OID, str(row['sourceID'])+str(row['referenceID'])) print(date, repr(title), row['sourceID'],row['referenceID']) except Exception as error: print(error) # update_reference(row) def translate(text): return translator.translate(text, dest='en').text def datemodifier(date_string, date_format): """Date Modifier Function""" try: to_date = time.strptime(date_string,date_format) return time.strftime("%Y-%m-%d",to_date) except: return False def fetch_url(url): response = requests.get(url, timeout = 60) if response.status_code == 200: return response.text else: return None def translist(infolist): """Translist Function""" out = list(filter(lambda s: s and (isinstance (s,str) or len(s.strip()) > 0), [i.strip() for i in infolist])) return out def encode(content): """Encode Function""" text = '' for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element text += line return text def encode_content(content): """Encode Function""" text = '' for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element if line != '': line = line + '\n' text += line index = text.find('打印本页') if index != -1: text = text[:index] try: summary = '\n'.join(text.split('\n')[:2]) except: summary = text return text, summary def extract_from_pdf(url): # Send a GET request to the URL and retrieve the PDF content response = requests.get(url, timeout=60) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as file: file.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as file: pdf_reader = PdfReader(file) num_pages = len(pdf_reader.pages) extracted_text = "" for page in range(num_pages): text = pdf_reader.pages[page].extract_text() if text and text[0].isdigit(): text = text[1:] # first_newline_index = text.find('。\n') # text = text[:first_newline_index+1].replace('\n', '') + text[first_newline_index+1:] text = text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n','').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n') print(text) if text != '': extracted_text += text try: summary = '\n'.join(extracted_text.split('\n')[:2]) except: summary = text return extracted_text, summary def get_db_connection(): """Get dynamoDB connection""" dynamodb = boto3.resource( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def sentiment_computation(content): label_dict = { "positive": "+", "negative": "-", "neutral": "0", } sentiment_score = 0 maximum_value = 0 raw_sentiment = analyzer(content[:511], top_k=None) sentiment_label = None for sentiment_dict in raw_sentiment: value = sentiment_dict["score"] if value > maximum_value: sentiment_label = sentiment_dict["label"] maximum_value = value if sentiment_dict["label"] == "positive": sentiment_score = sentiment_score + value if sentiment_dict["label"] == "negative": sentiment_score = sentiment_score - value else: sentiment_score = sentiment_score + 0 return sentiment_score, label_dict[sentiment_label] def crawl(url, article): domain = '.'.join(urlparse(url).netloc.split('.')[1:]) req = urllib.request.urlopen(url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) contentCN, summary = encode_content(page.xpath(xpath_dict[domain]['content'])) article['originSite'] = xpath_dict[domain]['siteCN'] article['site'] = xpath_dict[domain]['site'] article['titleCN'] = encode(page.xpath(xpath_dict[domain]['title'])) article['title'] = translate(article['titleCN']) if 'author' in xpath_dict[domain]: article['author'] = translate(encode(page.xpath(xpath_dict[domain]['author']))) else: article['author'] = "" article['contentCN'] = repr(contentCN)[1:-1].strip() if len(article['contentCN']) < 10: return None CONTENT_ENG = '' for element in contentCN.split("\n"): CONTENT_ENG += translate(element) + '\n' article['content'] = repr(CONTENT_ENG)[1:-1].strip() if 'subtitle' in xpath_dict[domain]: article['subtitle'] = translate(encode(page.xpath(xpath_dict[domain]['subtitle']))) else: article['subtitle'] = translate(summary) article['publishDate'] = datemodifier(encode(page.xpath(xpath_dict[domain]['publishdate'])), xpath_dict[domain]['datetime_format']) article['link'] = url article['attachment'] = "" article['sentimentScore'], article['sentimentLabel'] = sentiment_computation(CONTENT_ENG.replace("\n","")) article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['titleCN']+article['publishDate']) print(article['id'], article['site'] ) update_content(article) def upsert_content(report): """Upsert the content records""" dynamodb = get_db_connection() table = dynamodb.Table('article_china') # Define the item data item = { 'id': str(report['id']), 'site': report['site'], 'title': report['title'], 'titleCN': report['titleCN'], 'contentCN': report['contentCN'], 'category': report['category'], 'author': report['author'], 'content': report['content'], 'subtitle': report['subtitle'], 'publishDate': report['publishDate'], 'link': report['link'], 'attachment': report['attachment'], # 'authorID': str(report['authorid']), # 'entityList': report['entitylist'], 'sentimentScore': Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')), 'sentimentLabel': report['sentimentLabel'], 'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") } response = table.put_item(Item=item) print(response) # def get_client_connection(): # """Get dynamoDB connection""" # dynamodb = boto3.client( # service_name='dynamodb', # region_name='us-east-1', # aws_access_key_id=AWS_ACCESS_KEY_ID, # aws_secret_access_key=AWS_SECRET_ACCESS_KEY # ) # return dynamodb def delete_records(item): dynamodb_client = get_client_connection() dynamodb_client.delete_item( TableName="article_test", Key={ 'id': {'S': item['id']}, 'site': {'S': item['site']} } ) def update_content(report): dynamodb = get_client_connection() response = dynamodb.update_item( TableName="article_china", Key={ 'id': {'S': str(report['id'])}, 'site': {'S': report['site']} }, UpdateExpression='SET title = :title, titleCN = :titleCN, contentCN = :contentCN, category = :category, author = :author, content = :content, subtitle = :subtitle, publishDate = :publishDate, link = :link, attachment = :attachment, sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel, LastModifiedDate = :LastModifiedDate', ExpressionAttributeValues={ ':title': {'S': report['title']}, ':titleCN': {'S': report['titleCN']}, ':contentCN': {'S': report['contentCN']}, ':category': {'S': report['category']}, ':author': {'S': report['author']}, ':content': {'S': report['content']}, ':subtitle': {'S': report['subtitle']}, ':publishDate': {'S': report['publishDate']}, ':link': {'S': report['link']}, ':attachment': {'S': report['attachment']}, ':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}, ':sentimentScore': {'N': str(Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')))}, ':sentimentLabel': {'S': report['sentimentLabel']} } ) print(response) def update_content_sentiment(report): dynamodb = get_client_connection() response = dynamodb.update_item( TableName="article_test", Key={ 'id': {'S': report['id']}, 'site': {'S': report['site']} }, UpdateExpression='SET sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel', ExpressionAttributeValues={ ':sentimentScore': {'N': str(Decimal(str(report['sentimentscore'])).quantize(Decimal('0.01')))}, ':sentimentLabel': {'S': report['sentimentlabel']} } ) print(response) data = download_files_from_s3('data')