"""Module to define utility function""" import os import re import json import uuid import time import glob import urllib.request from urllib.parse import urlparse from datetime import datetime, timedelta from decimal import Decimal import pandas as pd import requests import boto3 from lxml import etree from dotenv import load_dotenv from googletrans import Translator from transformers import pipeline from PyPDF2 import PdfReader from langdetect import detect load_dotenv() AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] # AWS_ACCESS_KEY_ID = "AKIAQFXZMGHQYXKWUDWR" # AWS_SECRET_ACCESS_KEY = "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" analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") translator = Translator() with open('xpath.json', 'r', encoding='UTF-8') as f: xpath_dict = json.load(f) with open('patterns.json', 'r', encoding='UTF-8') as f: patterns = json.load(f) def get_client_connection(): """ Returns a client connection to DynamoDB. :return: DynamoDB client connection """ dynamodb = boto3.client( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def update_reference(report): """ Updates the reference in the 'reference_china' table in DynamoDB. Args: report (dict): A dictionary containing the report details. Returns: None """ dynamodb = get_client_connection() response = dynamodb.update_item( TableName="reference_china", Key={ 'id': {'S': str(report['refID'])}, 'sourceID': {'S': str(report['sourceID'])} }, UpdateExpression='SET link = :link, referenceID = :referenceID, LastModifiedDate = :LastModifiedDate', ExpressionAttributeValues={ ':link': {'S': report['link']}, ':referenceID': {'S': report['referenceID']}, ':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}, } ) print(response) def download_files_from_s3(folder): """ Downloads Parquet files from an S3 bucket and returns a concatenated DataFrame. Args: folder (str): The folder in the S3 bucket to download files from. Returns: pandas.DataFrame: A concatenated DataFrame containing the data from the downloaded Parquet files. """ if not os.path.exists(folder): os.makedirs(folder) client = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) response = client.list_objects_v2(Bucket='china-securities-report', Prefix=f"{folder}/") for obj in response['Contents']: key = obj['Key'] if key.endswith('.parquet'): client.download_file('china-securities-report', key, key) file_paths = glob.glob(os.path.join(folder, '*.parquet')) return pd.concat([pd.read_parquet(file_path) for file_path in file_paths], ignore_index=True) def extract_from_pdf_by_pattern(url, pattern): """ Extracts text from a PDF file based on a given pattern. Args: url (str): The URL of the PDF file to extract text from. pattern (dict): A dictionary containing the pattern to match and the pages to extract text from. Returns: str: The extracted text from the PDF file. Raises: Exception: If there is an error while retrieving or processing the PDF file. """ # Send a GET request to the URL and retrieve the PDF content try: response = requests.get(url, timeout=60) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as file: file.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as file: pdf_reader = PdfReader(file) extracted_text = "" if 'pages' in pattern: pages = pattern['pages'] else: pages = len(pdf_reader.pages) for page in pages: text = pdf_reader.pages[page].extract_text() if 'keyword' in pattern and pattern['keyword'] in text: text = text.split(pattern['keyword'], 1)[1].strip() else: text = text.strip() extracted_text += text except: extracted_text = '' return extracted_text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n',' ').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n') def get_reference_by_regex(pattern, text): """ Finds all occurrences of a given regex pattern in the provided text. Args: pattern (str): The regex pattern to search for. text (str): The text to search within. Returns: list: A list of all matches found in the text. """ return re.findall(pattern, text) def isnot_substring(list_a, string_to_check): """ Check if any string in the given list is a substring of the string_to_check. Args: list_a (list): A list of strings to check. string_to_check (str): The string to check for substrings. Returns: bool: True if none of the strings in list_a are substrings of string_to_check, False otherwise. """ for s in list_a: if s in string_to_check: return False return True def extract_reference(row): """ Extracts reference information from a given row. Args: row (dict): A dictionary representing a row of data. Returns: None """ try: pattern = next((elem for elem in patterns if elem['site'] == row['site']), None) extracted_text = extract_from_pdf_by_pattern(row['attachment'],pattern) reference_titles = re.findall(pattern['article_regex'], extracted_text) reference_dates = re.findall(pattern['date_regex'], extracted_text) reference_titles = [s.replace(' ', '') for s in reference_titles] reference_dates = [s.replace(' ', '') for s in reference_dates] print(reference_dates, reference_titles) if 'remove' in pattern: for remove_string in pattern['remove']: reference_titles = [s.replace(remove_string, '') for s in reference_titles] if len(reference_dates) > 0: for title, date in zip(reference_titles, reference_dates): try: date = datetime.strptime(date, pattern['date_format']) except: date = datetime(2006, 1, 1) dates = [] if 'date_range' in pattern: for i in range(pattern['date_range'] + 1): dates.append((date + timedelta(days=i)).strftime('%Y-%m-%d')) dates.append((date - timedelta(days=i)).strftime('%Y-%m-%d')) dates.append(date.strftime('%Y-%m-%d')) date = date.strftime('%Y-%m-%d') if 'split' in pattern: for split_item in pattern['split']: if 'exceptional_string' in split_item: if split_item['string'] in title and isnot_substring(split_item['exceptional_string'], title): title = re.split(split_item['string'], title)[split_item['index']] else: if split_item['string'] in title: title = title.split(split_item['string'])[split_item['index']] if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) == 0: print("------------ = 0 ------------") print(date, repr(title)) elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) > 1: print("------------ > 1 ------------") print(date, repr(title)) else: print("------------ = 1 ------------") reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))] row['referenceID'] = reference_df.iloc[0]['id'] row['link'] = reference_df.iloc[0]['link'] row['sourceID'] = row['id'] row['refID'] = uuid.uuid5(uuid.NAMESPACE_OID, str(row['sourceID'])+str(row['referenceID'])) print(date, repr(title), row['sourceID'],row['referenceID']) update_reference(row) else: for title in reference_titles: if 'split' in pattern: for split_item in pattern['split']: if 'exceptional_string' in split_item: if split_item['string'] in title and isnot_substring(split_item['exceptional_string'], title): title = re.split(split_item['string'], title)[split_item['index']] else: if split_item['string'] in title: title = title.split(split_item['string'])[split_item['index']] if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])]) == 0: print("------------ = 0 ------------") print(repr(title)) elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])]) > 1: print("------------ > 1 ------------") print(repr(title)) else: print("------------ = 1 ------------") reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])] row['referenceID'] = reference_df.iloc[0]['id'] row['link'] = reference_df.iloc[0]['link'] row['sourceID'] = row['id'] row['refID'] = uuid.uuid5(uuid.NAMESPACE_OID, str(row['sourceID'])+str(row['referenceID'])) print(repr(title), row['sourceID'],row['referenceID']) update_reference(row) except Exception as error: print(error) def translate(text): """ Translates the given text to English. Args: text (str): The text to be translated. Returns: str: The translated text in English. """ return translator.translate(text, dest='en').text def datemodifier(date_string, date_format): """Date Modifier Function This function takes a date string and a date format as input and modifies the date string according to the specified format. It returns the modified date string in the format 'YYYY-MM-DD'. Args: date_string (str): The date string to be modified. date_format (str): The format of the date string. Returns: str: The modified date string in the format 'YYYY-MM-DD'. False: If an error occurs during the modification process. """ try: to_date = time.strptime(date_string,date_format) return time.strftime("%Y-%m-%d",to_date) except: return False def fetch_url(url): """ Fetches the content of a given URL. Args: url (str): The URL to fetch. Returns: str or None: The content of the URL if the request is successful (status code 200), otherwise None. Raises: requests.exceptions.RequestException: If there is an error while making the request or if the response status code is not 200. """ try: response = requests.get(url, timeout=60) if response.status_code == 200: return response.text else: return None except requests.exceptions.RequestException or requests.exceptions.ReadTimeout as e: print(f"An error occurred: {e}") # Optional: handle or log the error in some way return None def translist(infolist): """ Filter and transform a list of strings. Args: infolist (list): The input list of strings. Returns: list: The filtered and transformed list of strings. """ out = list(filter(lambda s: s and (isinstance(s, str) or len(s.strip()) > 0), [i.strip() for i in infolist])) return out def encode(content): """ Encodes the given content into a single string. Args: content (list): A list of elements to be encoded. Each element can be either a string or an `etree._Element` object. Returns: str: The encoded content as a single string. """ text = '' for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element text += line return text def encode_content(content): """ Encodes the content by removing unnecessary characters and extracting a summary. Args: content (list): A list of elements representing the content. Returns: tuple: A tuple containing the encoded text and the summary. """ text = '' for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element if line != '': line = line + '\n' text += line index = text.find('打印本页') if index != -1: text = text[:index] try: summary = '\n'.join(text.split('\n')[:2]) except: summary = text return text, summary def extract_from_pdf(url): """ Extracts text from a PDF file given its URL. Args: url (str): The URL of the PDF file. Returns: tuple: A tuple containing the extracted text and a summary of the text. Raises: Exception: If there is an error during the extraction process. """ # Send a GET request to the URL and retrieve the PDF content response = requests.get(url, timeout=60) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as file: file.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as file: pdf_reader = PdfReader(file) num_pages = len(pdf_reader.pages) extracted_text = "" for page in range(num_pages): text = pdf_reader.pages[page].extract_text() if text and text[0].isdigit(): text = text[1:] # first_newline_index = text.find('。\n') # text = text[:first_newline_index+1].replace('\n', '') + text[first_newline_index+1:] text = text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace('。\n', '。-\n').replace('\n','').replace('?-','?\n').replace('!-','!\n').replace('。-','。\n') if text != '': extracted_text += text try: summary = '\n'.join(extracted_text.split('\n')[:2]) except: summary = text return extracted_text, summary def get_db_connection(): """Get dynamoDB connection. Returns: boto3.resource: The DynamoDB resource object representing the connection. """ dynamodb = boto3.resource( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb def sentiment_computation(content): """ Compute the sentiment score and label for the given content. Parameters: content (str): The content for which sentiment needs to be computed. Returns: tuple: A tuple containing the sentiment score and label. The sentiment score is a float representing the overall sentiment score of the content. The sentiment label is a string representing the sentiment label ('+', '-', or '0'). """ label_dict = { "positive": "+", "negative": "-", "neutral": "0", } sentiment_score = 0 maximum_value = 0 raw_sentiment = analyzer(content[:511], top_k=None) sentiment_label = None for sentiment_dict in raw_sentiment: value = sentiment_dict["score"] if value > maximum_value: sentiment_label = sentiment_dict["label"] maximum_value = value if sentiment_dict["label"] == "positive": sentiment_score = sentiment_score + value if sentiment_dict["label"] == "negative": sentiment_score = sentiment_score - value else: sentiment_score = sentiment_score + 0 sentiment_score = sentiment_score * 100 return sentiment_score, label_dict[sentiment_label] def crawl(url, article): """ Crawls the given URL and extracts relevant information from the webpage. Args: url (str): The URL of the webpage to crawl. article (dict): A dictionary to store the extracted information. Returns: None: If the length of the extracted content is less than 10 characters. Raises: None """ domain = '.'.join(urlparse(url).netloc.split('.')[1:]) req = urllib.request.urlopen(url) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) contentCN, summary = encode_content(page.xpath(xpath_dict[domain]['content'])) article['originSite'] = xpath_dict[domain]['siteCN'] article['site'] = xpath_dict[domain]['site'] article['titleCN'] = encode(page.xpath(xpath_dict[domain]['title'])) article['title'] = translate(article['titleCN']) if 'author' in xpath_dict[domain]: article['author'] = translate(encode(page.xpath(xpath_dict[domain]['author']))) else: article['author'] = "" article['contentCN'] = repr(contentCN)[1:-1].strip() if len(article['contentCN']) < 10: return None CONTENT_ENG = '' for element in contentCN.split("\n"): CONTENT_ENG += translate(element) + '\n' if detect(CONTENT_ENG) != 'en': for element in contentCN.split("。"): CONTENT_ENG += translate(element) + '. ' article['content'] = repr(CONTENT_ENG)[1:-1].strip() if 'subtitle' in xpath_dict[domain]: article['subtitle'] = translate(encode(page.xpath(xpath_dict[domain]['subtitle']))) else: article['subtitle'] = translate(summary) article['publishDate'] = datemodifier(encode(page.xpath(xpath_dict[domain]['publishdate'])), xpath_dict[domain]['datetime_format']) article['link'] = url article['attachment'] = "" article['sentimentScore'], article['sentimentLabel'] = sentiment_computation(CONTENT_ENG.replace("\n","")) article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['titleCN']+article['publishDate']) print(article['id'], article['site'] ) update_content(article) def upsert_content(report): """ Upserts the content of a report into the 'article_china' table in DynamoDB. Args: report (dict): A dictionary containing the report data. Returns: dict: The response from the DynamoDB put_item operation. """ dynamodb = get_db_connection() table = dynamodb.Table('article_china') # Define the item data item = { 'id': str(report['id']), 'site': report['site'], 'title': report['title'], 'titleCN': report['titleCN'], 'contentCN': report['contentCN'], 'category': report['category'], 'author': report['author'], 'content': report['content'], 'subtitle': report['subtitle'], 'publishDate': report['publishDate'], 'link': report['link'], 'attachment': report['attachment'], # 'authorID': str(report['authorid']), # 'entityList': report['entitylist'], 'sentimentScore': Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')), 'sentimentLabel': report['sentimentLabel'], 'LastModifiedDate': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") } response = table.put_item(Item=item) print(response) def delete_records(item): """ Deletes a record from the 'article_test' table in DynamoDB. Args: item (dict): The item to be deleted, containing 'id' and 'site' keys. Returns: None """ dynamodb_client = get_client_connection() dynamodb_client.delete_item( TableName="article_test", Key={ 'id': {'S': item['id']}, 'site': {'S': item['site']} } ) def update_content(report): """ Updates the content of an article in the 'article_china' table in DynamoDB. Args: report (dict): A dictionary containing the report data. Returns: None """ dynamodb = get_client_connection() response = dynamodb.update_item( TableName="article_china", Key={ 'id': {'S': str(report['id'])}, 'site': {'S': report['site']} }, UpdateExpression='SET title = :title, titleCN = :titleCN, contentCN = :contentCN, category = :category, author = :author, content = :content, subtitle = :subtitle, publishDate = :publishDate, link = :link, attachment = :attachment, sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel, LastModifiedDate = :LastModifiedDate', ExpressionAttributeValues={ ':title': {'S': report['title']}, ':titleCN': {'S': report['titleCN']}, ':contentCN': {'S': report['contentCN']}, ':category': {'S': report['category']}, ':author': {'S': report['author']}, ':content': {'S': report['content']}, ':subtitle': {'S': report['subtitle']}, ':publishDate': {'S': report['publishDate']}, ':link': {'S': report['link']}, ':attachment': {'S': report['attachment']}, ':LastModifiedDate': {'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}, ':sentimentScore': {'N': str(Decimal(str(report['sentimentScore'])).quantize(Decimal('0.01')))}, ':sentimentLabel': {'S': report['sentimentLabel']} } ) print(response) def update_content_sentiment(report): """ Updates the sentiment score and label of an article in the 'article_test' DynamoDB table. Args: report (dict): A dictionary containing the report information. Returns: None """ dynamodb = get_client_connection() response = dynamodb.update_item( TableName="article_test", Key={ 'id': {'S': report['id']}, 'site': {'S': report['site']} }, UpdateExpression='SET sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel', ExpressionAttributeValues={ ':sentimentScore': {'N': str(Decimal(str(report['sentimentscore'])).quantize(Decimal('0.01')))}, ':sentimentLabel': {'S': report['sentimentlabel']} } ) print(response) data = download_files_from_s3('data')