"""Module to define utility function""" import glob import json import logging import os import re import time import urllib.request import uuid from datetime import datetime, timedelta from decimal import Decimal from urllib.parse import urlparse import boto3 import pandas as pd import requests from dotenv import load_dotenv from deep_translator import GoogleTranslator, exceptions from langdetect import detect, lang_detect_exception from lxml import etree import PyPDF2 from transformers import pipeline from controllers.summarizer import summarize from controllers.vectorizer import vectorize load_dotenv() AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert") with open('xpath.json', 'r', encoding='UTF-8') as f: xpath_dict = json.load(f) with open('patterns.json', 'r', encoding='UTF-8') as f: patterns = json.load(f) logging.basicConfig( format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO ) def datemodifier(date_string, date_format): """Date Modifier Function This function takes a date string and a date format as input and modifies the date string according to the specified format. It returns modified date string in the format 'YYYY-MM-DD'. Args: date_string (str): The date string to be modified. date_format (str): The format of the date string. Returns: str: The modified date string in the format 'YYYY-MM-DD'. False: If an error occurs during the modification process. """ try: to_date = time.strptime(date_string, date_format) return time.strftime("%Y-%m-%d", to_date) except (ValueError, KeyError, TypeError) as error: logging.error("ValueError: %s", error) return False def encode(content): """ Encodes the given content into a single string. Args: content (list): A list of elements to be encoded. Each element can be either a string or an `etree._Element` object. Returns: str: The encoded content as a single string. """ text = '' for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element text += line return text def encode_content(content): """ Encodes the content by removing unnecessary characters and extracting a summary. Args: content (list): A list of elements representing the content. Returns: tuple: A tuple containing the encoded text and the summary. """ text = '' index = -1 for element in content: if isinstance(element, etree._Element): subelement = etree.tostring(element).decode() subpage = etree.HTML(subelement) tree = subpage.xpath('//text()') line = ''.join(translist(tree)).\ replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip() else: line = element if line != '': line = line + '\n' text += line index = text.find('打印本页') if index != -1: text = text[:index] try: summary = '\n'.join(text.split('\n')[:2]) except (IndexError, AttributeError) as e: logging.error(e) summary = text return text, summary def fetch_url(url): """ Fetches the content of a given URL. Args: url (str): The URL to fetch. Returns: str or None: The content of the URL if the request is successful (status code 200), otherwise None. Raises: requests.exceptions.RequestException: If there is an error while making the request or if the response status code is not 200. """ try: response = requests.get(url, timeout=60) if response.status_code == 200: return response.text else: return None except (requests.exceptions.RequestException, requests.exceptions.ReadTimeout) as e: logging.error(e) # Optional: handle or log the error in some way return None def translate(text, max_length=4950): """ Translates the given text to English. Args: text (str): The text to be translated. Returns: str: The translated text in English. """ if not text: return "" if len(text) <= max_length: for _ in range(5): try: return GoogleTranslator(source='auto', target='en').translate(text) or "" except exceptions.RequestError: time.sleep(1) return "" # If text is too long, split and translate in chunks result = [] for i in range(0, len(text), max_length): chunk = text[i:i+max_length] for _ in range(5): try: result.append(GoogleTranslator(source='auto', target='en').translate(chunk) or "") break except exceptions.RequestError: time.sleep(1) else: result.append("") # If all retries fail, append empty string return " ".join(result) def sentiment_computation(content): """ Compute the sentiment score and label for the given content. Parameters: content (str): The content for which sentiment needs to be computed. Returns: tuple: A tuple containing the sentiment score and label. The sentiment score is a float representing the overall sentiment score of the content. The sentiment label is a string representing the sentiment label ('+', '-', or '0'). """ label_dict = { "positive": "+", "negative": "-", "neutral": "0", } sentiment_score = 0 maximum_value = 0 raw_sentiment = analyzer(content[:500], top_k=None) sentiment_label = None for sentiment_dict in raw_sentiment: value = sentiment_dict["score"] if value > maximum_value: sentiment_label = sentiment_dict["label"] maximum_value = value if sentiment_dict["label"] == "positive": sentiment_score = sentiment_score + value if sentiment_dict["label"] == "negative": sentiment_score = sentiment_score - value else: sentiment_score = sentiment_score + 0 sentiment_score = sentiment_score * 100 return sentiment_score, label_dict[sentiment_label] def get_client_connection(): """ Returns a client connection to DynamoDB. :return: DynamoDB client connection """ dynamodb = boto3.client(service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) return dynamodb def update_content(report): """ Updates the content of an article in the 'Article_China' table in DynamoDB. Args: report (dict): A dictionary containing the report data. Returns: None """ logging.info("Updating content for %s", report['id']) dynamodb = get_client_connection() dynamodb.update_item( TableName="Article_China", Key={ 'id': { 'S': str(report['id']) } # 'site': { # 'S': report['site'] # } }, UpdateExpression= 'SET title = :title, site = :site, titleCN = :titleCN, contentCN = :contentCN, \ category = :category, author = :author, content = :content, subtitle = :subtitle, \ publishDate = :publishDate, link = :link, attachment = :attachment, \ sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel, \ LastModifiedDate = :LastModifiedDate, entityList = :entityList', ExpressionAttributeValues={ ':title': { 'S': report['title'] }, ':site': { 'S': report['site'] }, ':titleCN': { 'S': report['titleCN'] }, ':contentCN': { 'S': report['contentCN'] }, ':category': { 'S': report['category'] }, ':author': { 'S': report['author'] }, ':content': { 'S': report['content'] }, ':subtitle': { 'S': report['subtitle'] }, ':publishDate': { 'S': report['publishDate'] }, ':link': { 'S': report['link'] }, ':attachment': { 'S': report['attachment'] }, ':LastModifiedDate': { 'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") }, ':sentimentScore': { 'N': str( Decimal(str(report['sentimentScore'])).quantize( Decimal('0.01'))) }, ':sentimentLabel': { 'S': report['sentimentLabel'] }, ':entityList': { 'L': [] } }) def update_reference(report): """ Updates the reference in the 'reference_china' table in DynamoDB. Args: report (dict): A dictionary containing the report details. Returns: None """ dynamodb = get_client_connection() response = dynamodb.update_item( TableName="reference_china", Key={ 'id': { 'S': str(report['refID']) }, 'sourceID': { 'S': str(report['sourceID']) } }, UpdateExpression= 'SET link = :link, referenceID = :referenceID, LastModifiedDate = :LastModifiedDate', ExpressionAttributeValues={ ':link': { 'S': report['link'] }, ':referenceID': { 'S': report['referenceID'] }, ':LastModifiedDate': { 'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S") }, }) logging.info(response) def download_files_from_s3(folder): """ Downloads Parquet files from an S3 bucket and returns a concatenated DataFrame. Args: folder (str): The folder in the S3 bucket to download files from. Returns: pandas.DataFrame: A concatenated DataFrame containing data from downloaded Parquet files. """ if not os.path.exists(folder): os.makedirs(folder) client = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) response = client.list_objects_v2(Bucket='china-securities-report', Prefix=f"{folder}/") for obj in response['Contents']: key = obj['Key'] if key.endswith('.parquet'): client.download_file('china-securities-report', key, key) file_paths = glob.glob(os.path.join(folder, '*.parquet')) return pd.concat([pd.read_parquet(file_path) for file_path in file_paths], ignore_index=True) def extract_from_pdf_by_pattern(url, pattern): """ Extracts text from a PDF file based on a given pattern. Args: url (str): The URL of the PDF file to extract text from. pattern (dict): A dictionary containing pattern to match and the pages to extract text from. Returns: str: The extracted text from the PDF file. Raises: Exception: If there is an error while retrieving or processing the PDF file. """ # Send a GET request to the URL and retrieve the PDF content try: response = requests.get(url, timeout=60) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as file: file.write(pdf_content) # Open the downloaded PDF file and extract the text with open("downloaded_file.pdf", "rb") as file: pdf_reader = PyPDF2.PdfReader(file) extracted_text = "" if 'pages' in pattern: pages = pattern['pages'] else: pages = len(pdf_reader.pages) for page in pages: text = pdf_reader.pages[page].extract_text() if 'keyword' in pattern and pattern['keyword'] in text: text = text.split(pattern['keyword'], 1)[1].strip() else: text = text.strip() extracted_text += text except (requests.exceptions.RequestException, requests.exceptions.ReadTimeout, PyPDF2.errors.PdfReadError, PyPDF2.errors.DependencyError) as e: logging.error(e) extracted_text = '' return extracted_text.replace('?\n', '?-\n').replace( '!\n', '!-\n').replace('。\n', '。-\n').replace('\n', ' ').replace( '?-', '?\n').replace('!-', '!\n').replace('。-', '。\n') def get_reference_by_regex(pattern, text): """ Finds all occurrences of a given regex pattern in the provided text. Args: pattern (str): The regex pattern to search for. text (str): The text to search within. Returns: list: A list of all matches found in the text. """ return re.findall(pattern, text) def isnot_substring(list_a, string_to_check): """ Check if any string in the given list is a substring of the string_to_check. Args: list_a (list): A list of strings to check. string_to_check (str): The string to check for substrings. Returns: bool: True if none of strings in list_a are substrings of string_to_check, False otherwise. """ return all(s not in string_to_check for s in list_a) def extract_reference(row): """ Extracts reference information from a given row. Args: row (dict): A dictionary representing a row of data. Returns: None """ try: print("Extracting reference for %s", row['id']) # Get the pattern for the given site. If not found, skip extraction. pattern = next((elem for elem in patterns if elem['site'] == row['site']), None) if pattern is None: logging.warning("No reference pattern found for site %s. Skipping reference extraction.", row['site']) return [] # Extract text from PDF. If extraction fails, return an empty list. extracted_text = extract_from_pdf_by_pattern(row.get('attachment', ''), pattern) if not extracted_text: logging.warning("PDF extraction returned empty text for record %s.", row['id']) return [] # Now safely attempt to extract reference titles and dates. reference_titles = re.findall(pattern.get('article_regex', ''), extracted_text) or [] reference_dates = re.findall(pattern.get('date_regex', ''), extracted_text) or [] # Proceed only if reference_titles and reference_dates are non-empty. if not reference_titles or not reference_dates: logging.info("No reference titles or dates found for record %s.", row['id']) return [] reference_titles = [s.replace(' ', '') for s in reference_titles] reference_dates = [s.replace(' ', '') for s in reference_dates] if 'remove' in pattern: for remove_string in pattern['remove']: reference_titles = [ s.replace(remove_string, '') for s in reference_titles ] if len(reference_dates) > 0: reference_ids = [] for title, date in zip(reference_titles, reference_dates): try: date = datetime.strptime(date, pattern['date_format']) except ValueError: date = datetime(2006, 1, 1) dates = [] if 'date_range' in pattern: for i in range(pattern['date_range'] + 1): dates.append( (date + timedelta(days=i)).strftime('%Y-%m-%d')) dates.append( (date - timedelta(days=i)).strftime('%Y-%m-%d')) dates.append(date.strftime('%Y-%m-%d')) date = date.strftime('%Y-%m-%d') if 'split' in pattern: for split_item in pattern['split']: if 'exceptional_string' in split_item: if split_item[ 'string'] in title and isnot_substring( split_item['exceptional_string'], title): title = re.split(split_item['string'], title)[split_item['index']] else: if split_item['string'] in title: title = title.split( split_item['string'])[split_item['index']] if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) == 0: logging.info("------------ = 0 ------------") logging.info("%s - %s", date, repr(title)) elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))]) > 1: logging.info("------------ > 1 ------------") logging.info("%s - %s", date, repr(title)) else: logging.info("------------ = 1 ------------") reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site']) & (data['publishdate'].isin(dates))] row['referenceID'] = reference_df.iloc[0]['id'] reference_ids.append(row['referenceID']) row['link'] = reference_df.iloc[0]['link'] row['sourceID'] = row['id'] row['refID'] = uuid.uuid5( uuid.NAMESPACE_OID, str(row['sourceID']) + str(row['referenceID'])) logging.info("%s - %s - %s - %s", date, repr(title), row['sourceID'], row['referenceID']) update_reference(row) return reference_ids else: reference_ids = [] for title in reference_titles: if 'split' in pattern: for split_item in pattern['split']: if 'exceptional_string' in split_item: if split_item[ 'string'] in title and isnot_substring( split_item['exceptional_string'], title): title = re.split(split_item['string'], title)[split_item['index']] else: if split_item['string'] in title: title = title.split( split_item['string'])[split_item['index']] if len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])]) == 0: logging.info("------------ = 0 ------------") logging.info(repr(title)) elif len(data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])]) > 1: logging.info("------------ > 1 ------------") logging.info(repr(title)) else: logging.info("------------ = 1 ------------") reference_df = data[(data['titleCN'].str.contains(title)) & (data['site'] == row['site'])] row['referenceID'] = reference_df.iloc[0]['id'] reference_ids.append(row['referenceID']) row['link'] = reference_df.iloc[0]['link'] row['sourceID'] = row['id'] row['refID'] = uuid.uuid5( uuid.NAMESPACE_OID, str(row['sourceID']) + str(row['referenceID'])) logging.info("%s - %s - %s", repr(title), row['sourceID'], row['referenceID']) update_reference(row) return reference_ids except (ValueError, KeyError, TypeError) as error: logging.error(error) return None def translist(infolist): """ Filter and transform a list of strings. Args: infolist (list): The input list of strings. Returns: list: The filtered and transformed list of strings. """ out = list( filter(lambda s: s and (isinstance(s, str) or len(s.strip()) > 0), [i.strip() for i in infolist])) return out def extract_from_pdf(url): """ Extracts text from a PDF file given its URL. Args: url (str): The URL of the PDF file. Returns: tuple: A tuple containing the extracted text and a summary of the text. Raises: Exception: If there is an error during the extraction process. """ # Send a GET request to the URL and retrieve the PDF content response = requests.get(url, timeout=60) pdf_content = response.content # Save the PDF content to a local file with open("downloaded_file.pdf", "wb") as file: file.write(pdf_content) # Open the downloaded PDF file and extract the text extracted_text = "" try: with open("downloaded_file.pdf", "rb") as file: pdf_reader = PyPDF2.PdfReader(file) num_pages = len(pdf_reader.pages) for page in range(num_pages): text = pdf_reader.pages[page].extract_text() if text and text[0].isdigit(): text = text[1:] # first_newline_index = text.find('。\n') text = text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace( '。\n', '。-\n').replace('\n', '').replace('?-', '?\n').replace( '!-', '!\n').replace('。-', '。\n') if text != '': extracted_text += text summary = '\n'.join(extracted_text.split('\n')[:2]) except (ValueError, KeyError, TypeError, PyPDF2.errors.PdfReadError) as e: logging.error(e) summary = extracted_text return extracted_text, summary def get_db_connection(): """Get dynamoDB connection. Returns: boto3.resource: The DynamoDB resource object representing the connection. """ dynamodb = boto3.resource(service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) return dynamodb def crawl_by_url(url, article): """ Crawls the given URL and extracts relevant information from the webpage. Args: url (str): The URL of the webpage to crawl. article (dict): A dictionary to store the extracted information. Returns: None: If the length of the extracted content is less than 10 characters. Raises: None """ domain = '.'.join(urlparse(url).netloc.split('.')[1:]) headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} req = urllib.request.Request(url, headers=headers) req = urllib.request.urlopen(req, timeout=60) text = req.read() html_text = text.decode("utf-8") page = etree.HTML(html_text) contentcn, summary = encode_content( page.xpath(xpath_dict[domain]['content'])) if contentcn is None or len(contentcn) < 10: return article['originSite'] = xpath_dict[domain]['siteCN'] article['site'] = xpath_dict[domain]['site'] article['titleCN'] = encode(page.xpath(xpath_dict[domain]['title'])) article['title'] = translate(article['titleCN']) if 'author' in xpath_dict[domain]: article['author'] = translate( encode(page.xpath(xpath_dict[domain]['author']))) else: article['author'] = "" article['contentCN'] = repr(contentcn)[1:-1].strip() if len(article['contentCN']) < 10: return None contenteng = '' for element in contentcn.split("\n"): contenteng += translate(element) + '\n' try: if detect(contenteng) != 'en': for element in contentcn.split("。"): contenteng += translate(element) + '. ' except (requests.exceptions.RequestException, requests.exceptions.ReadTimeout, PyPDF2.errors.PdfReadError, PyPDF2.errors.DependencyError, lang_detect_exception.LangDetectException) as e: print(f"An unexpected error occurred: {e}") article['content'] = repr(contenteng)[1:-1].strip() try: article['subtitle'] = summarize(article['content']) except (ValueError, KeyError, TypeError): article['subtitle'] = translate(summary) article['publishDate'] = datemodifier( encode(page.xpath(xpath_dict[domain]['publishdate'])), xpath_dict[domain]['datetime_format']) article['link'] = url article['attachment'] = "" article['sentimentScore'], article[ 'sentimentLabel'] = sentiment_computation(contenteng.replace( "\n", "")) article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['titleCN'] + article['publishDate']) logging.info("%s - %s", article['id'], article['site']) article['referenceid'] = None update_content(article) vectorize(article) data = download_files_from_s3('data')