Muhammad Abdur Rahman Saad
fix NotValidLengthException
497072d
"""Module to define utility function"""
import glob
import json
import logging
import os
import re
import time
import urllib.request
import uuid
from datetime import datetime, timedelta
from decimal import Decimal
from urllib.parse import urlparse
import boto3
import pandas as pd
import requests
from dotenv import load_dotenv
from deep_translator import GoogleTranslator, exceptions
from langdetect import detect, lang_detect_exception
from lxml import etree
import PyPDF2
from transformers import pipeline
from controllers.summarizer import summarize
from controllers.vectorizer import vectorize
load_dotenv()
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
analyzer = pipeline("sentiment-analysis", model="ProsusAI/finbert")
with open('xpath.json', 'r', encoding='UTF-8') as f:
xpath_dict = json.load(f)
with open('patterns.json', 'r', encoding='UTF-8') as f:
patterns = json.load(f)
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO
)
def datemodifier(date_string, date_format):
"""Date Modifier Function
This function takes a date string and a date format as input and modifies the date string
according to the specified format. It returns modified date string in the format 'YYYY-MM-DD'.
Args:
date_string (str): The date string to be modified.
date_format (str): The format of the date string.
Returns:
str: The modified date string in the format 'YYYY-MM-DD'.
False: If an error occurs during the modification process.
"""
try:
to_date = time.strptime(date_string, date_format)
return time.strftime("%Y-%m-%d", to_date)
except (ValueError, KeyError, TypeError) as error:
logging.error("ValueError: %s", error)
return False
def encode(content):
"""
Encodes the given content into a single string.
Args:
content (list): A list of elements to be encoded.
Each element can be either a string or an `etree._Element` object.
Returns:
str: The encoded content as a single string.
"""
text = ''
for element in content:
if isinstance(element, etree._Element):
subelement = etree.tostring(element).decode()
subpage = etree.HTML(subelement)
tree = subpage.xpath('//text()')
line = ''.join(translist(tree)).\
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip()
else:
line = element
text += line
return text
def encode_content(content):
"""
Encodes the content by removing unnecessary characters and extracting a summary.
Args:
content (list): A list of elements representing the content.
Returns:
tuple: A tuple containing the encoded text and the summary.
"""
text = ''
index = -1
for element in content:
if isinstance(element, etree._Element):
subelement = etree.tostring(element).decode()
subpage = etree.HTML(subelement)
tree = subpage.xpath('//text()')
line = ''.join(translist(tree)).\
replace('\n','').replace('\t','').replace('\r','').replace(' ','').strip()
else:
line = element
if line != '':
line = line + '\n'
text += line
index = text.find('打印本页')
if index != -1:
text = text[:index]
try:
summary = '\n'.join(text.split('\n')[:2])
except (IndexError, AttributeError) as e:
logging.error(e)
summary = text
return text, summary
def fetch_url(url):
"""
Fetches the content of a given URL.
Args:
url (str): The URL to fetch.
Returns:
str or None: The content of the URL if the request is successful (status code 200),
otherwise None.
Raises:
requests.exceptions.RequestException:
If there is an error while making the request or if the response status code is not 200.
"""
try:
response = requests.get(url, timeout=60)
if response.status_code == 200:
return response.text
else:
return None
except (requests.exceptions.RequestException, requests.exceptions.ReadTimeout) as e:
logging.error(e) # Optional: handle or log the error in some way
return None
def translate(text, max_length=4950):
"""
Translates the given text to English.
Args:
text (str): The text to be translated.
Returns:
str: The translated text in English.
"""
if not text:
return ""
if len(text) <= max_length:
for _ in range(5):
try:
return GoogleTranslator(source='auto', target='en').translate(text) or ""
except exceptions.RequestError:
time.sleep(1)
return ""
# If text is too long, split and translate in chunks
result = []
for i in range(0, len(text), max_length):
chunk = text[i:i+max_length]
for _ in range(5):
try:
result.append(GoogleTranslator(source='auto', target='en').translate(chunk) or "")
break
except exceptions.RequestError:
time.sleep(1)
else:
result.append("") # If all retries fail, append empty string
return " ".join(result)
def sentiment_computation(content):
"""
Compute the sentiment score and label for the given content.
Parameters:
content (str): The content for which sentiment needs to be computed.
Returns:
tuple: A tuple containing the sentiment score and label.
The sentiment score is a float representing the overall sentiment score of the content.
The sentiment label is a string representing the sentiment label ('+', '-', or '0').
"""
label_dict = {
"positive": "+",
"negative": "-",
"neutral": "0",
}
sentiment_score = 0
maximum_value = 0
raw_sentiment = analyzer(content[:500], top_k=None)
sentiment_label = None
for sentiment_dict in raw_sentiment:
value = sentiment_dict["score"]
if value > maximum_value:
sentiment_label = sentiment_dict["label"]
maximum_value = value
if sentiment_dict["label"] == "positive":
sentiment_score = sentiment_score + value
if sentiment_dict["label"] == "negative":
sentiment_score = sentiment_score - value
else:
sentiment_score = sentiment_score + 0
sentiment_score = sentiment_score * 100
return sentiment_score, label_dict[sentiment_label]
def get_client_connection():
"""
Returns a client connection to DynamoDB.
:return: DynamoDB client connection
"""
dynamodb = boto3.client(service_name='dynamodb',
region_name='us-east-1',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
return dynamodb
def update_content(report):
"""
Updates the content of an article in the 'Article_China' table in DynamoDB.
Args:
report (dict): A dictionary containing the report data.
Returns:
None
"""
logging.info("Updating content for %s", report['id'])
dynamodb = get_client_connection()
dynamodb.update_item(
TableName="Article_China",
Key={
'id': {
'S': str(report['id'])
}
# 'site': {
# 'S': report['site']
# }
},
UpdateExpression=
'SET title = :title, site = :site, titleCN = :titleCN, contentCN = :contentCN, \
category = :category, author = :author, content = :content, subtitle = :subtitle, \
publishDate = :publishDate, link = :link, attachment = :attachment, \
sentimentScore = :sentimentScore, sentimentLabel = :sentimentLabel, \
LastModifiedDate = :LastModifiedDate, entityList = :entityList',
ExpressionAttributeValues={
':title': {
'S': report['title']
},
':site': {
'S': report['site']
},
':titleCN': {
'S': report['titleCN']
},
':contentCN': {
'S': report['contentCN']
},
':category': {
'S': report['category']
},
':author': {
'S': report['author']
},
':content': {
'S': report['content']
},
':subtitle': {
'S': report['subtitle']
},
':publishDate': {
'S': report['publishDate']
},
':link': {
'S': report['link']
},
':attachment': {
'S': report['attachment']
},
':LastModifiedDate': {
'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
},
':sentimentScore': {
'N':
str(
Decimal(str(report['sentimentScore'])).quantize(
Decimal('0.01')))
},
':sentimentLabel': {
'S': report['sentimentLabel']
},
':entityList': {
'L': []
}
})
def update_reference(report):
"""
Updates the reference in the 'reference_china' table in DynamoDB.
Args:
report (dict): A dictionary containing the report details.
Returns:
None
"""
dynamodb = get_client_connection()
response = dynamodb.update_item(
TableName="reference_china",
Key={
'id': {
'S': str(report['refID'])
},
'sourceID': {
'S': str(report['sourceID'])
}
},
UpdateExpression=
'SET link = :link, referenceID = :referenceID, LastModifiedDate = :LastModifiedDate',
ExpressionAttributeValues={
':link': {
'S': report['link']
},
':referenceID': {
'S': report['referenceID']
},
':LastModifiedDate': {
'S': datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
},
})
logging.info(response)
def download_files_from_s3(folder):
"""
Downloads Parquet files from an S3 bucket and returns a concatenated DataFrame.
Args:
folder (str): The folder in the S3 bucket to download files from.
Returns:
pandas.DataFrame: A concatenated DataFrame containing data from downloaded Parquet files.
"""
if not os.path.exists(folder):
os.makedirs(folder)
client = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
response = client.list_objects_v2(Bucket='china-securities-report',
Prefix=f"{folder}/")
for obj in response['Contents']:
key = obj['Key']
if key.endswith('.parquet'):
client.download_file('china-securities-report', key, key)
file_paths = glob.glob(os.path.join(folder, '*.parquet'))
return pd.concat([pd.read_parquet(file_path) for file_path in file_paths],
ignore_index=True)
def extract_from_pdf_by_pattern(url, pattern):
"""
Extracts text from a PDF file based on a given pattern.
Args:
url (str): The URL of the PDF file to extract text from.
pattern (dict): A dictionary containing pattern to match and the pages to extract text from.
Returns:
str: The extracted text from the PDF file.
Raises:
Exception: If there is an error while retrieving or processing the PDF file.
"""
# Send a GET request to the URL and retrieve the PDF content
try:
response = requests.get(url, timeout=60)
pdf_content = response.content
# Save the PDF content to a local file
with open("downloaded_file.pdf", "wb") as file:
file.write(pdf_content)
# Open the downloaded PDF file and extract the text
with open("downloaded_file.pdf", "rb") as file:
pdf_reader = PyPDF2.PdfReader(file)
extracted_text = ""
if 'pages' in pattern:
pages = pattern['pages']
else:
pages = len(pdf_reader.pages)
for page in pages:
text = pdf_reader.pages[page].extract_text()
if 'keyword' in pattern and pattern['keyword'] in text:
text = text.split(pattern['keyword'], 1)[1].strip()
else:
text = text.strip()
extracted_text += text
except (requests.exceptions.RequestException, requests.exceptions.ReadTimeout,
PyPDF2.errors.PdfReadError, PyPDF2.errors.DependencyError) as e:
logging.error(e)
extracted_text = ''
return extracted_text.replace('?\n', '?-\n').replace(
'!\n', '!-\n').replace('。\n', '。-\n').replace('\n', ' ').replace(
'?-', '?\n').replace('!-', '!\n').replace('。-', '。\n')
def get_reference_by_regex(pattern, text):
"""
Finds all occurrences of a given regex pattern in the provided text.
Args:
pattern (str): The regex pattern to search for.
text (str): The text to search within.
Returns:
list: A list of all matches found in the text.
"""
return re.findall(pattern, text)
def isnot_substring(list_a, string_to_check):
"""
Check if any string in the given list is a substring of the string_to_check.
Args:
list_a (list): A list of strings to check.
string_to_check (str): The string to check for substrings.
Returns:
bool: True if none of strings in list_a are substrings of string_to_check, False otherwise.
"""
return all(s not in string_to_check for s in list_a)
def extract_reference(row):
"""
Extracts reference information from a given row.
Args:
row (dict): A dictionary representing a row of data.
Returns:
None
"""
try:
print("Extracting reference for %s", row['id'])
# Get the pattern for the given site. If not found, skip extraction.
pattern = next((elem for elem in patterns if elem['site'] == row['site']), None)
if pattern is None:
logging.warning("No reference pattern found for site %s. Skipping reference extraction.", row['site'])
return []
# Extract text from PDF. If extraction fails, return an empty list.
extracted_text = extract_from_pdf_by_pattern(row.get('attachment', ''), pattern)
if not extracted_text:
logging.warning("PDF extraction returned empty text for record %s.", row['id'])
return []
# Now safely attempt to extract reference titles and dates.
reference_titles = re.findall(pattern.get('article_regex', ''), extracted_text) or []
reference_dates = re.findall(pattern.get('date_regex', ''), extracted_text) or []
# Proceed only if reference_titles and reference_dates are non-empty.
if not reference_titles or not reference_dates:
logging.info("No reference titles or dates found for record %s.", row['id'])
return []
reference_titles = [s.replace(' ', '') for s in reference_titles]
reference_dates = [s.replace(' ', '') for s in reference_dates]
if 'remove' in pattern:
for remove_string in pattern['remove']:
reference_titles = [
s.replace(remove_string, '') for s in reference_titles
]
if len(reference_dates) > 0:
reference_ids = []
for title, date in zip(reference_titles, reference_dates):
try:
date = datetime.strptime(date, pattern['date_format'])
except ValueError:
date = datetime(2006, 1, 1)
dates = []
if 'date_range' in pattern:
for i in range(pattern['date_range'] + 1):
dates.append(
(date + timedelta(days=i)).strftime('%Y-%m-%d'))
dates.append(
(date - timedelta(days=i)).strftime('%Y-%m-%d'))
dates.append(date.strftime('%Y-%m-%d'))
date = date.strftime('%Y-%m-%d')
if 'split' in pattern:
for split_item in pattern['split']:
if 'exceptional_string' in split_item:
if split_item[
'string'] in title and isnot_substring(
split_item['exceptional_string'],
title):
title = re.split(split_item['string'],
title)[split_item['index']]
else:
if split_item['string'] in title:
title = title.split(
split_item['string'])[split_item['index']]
if len(data[(data['titleCN'].str.contains(title))
& (data['site'] == row['site']) &
(data['publishdate'].isin(dates))]) == 0:
logging.info("------------ = 0 ------------")
logging.info("%s - %s", date, repr(title))
elif len(data[(data['titleCN'].str.contains(title))
& (data['site'] == row['site']) &
(data['publishdate'].isin(dates))]) > 1:
logging.info("------------ > 1 ------------")
logging.info("%s - %s", date, repr(title))
else:
logging.info("------------ = 1 ------------")
reference_df = data[(data['titleCN'].str.contains(title))
& (data['site'] == row['site']) &
(data['publishdate'].isin(dates))]
row['referenceID'] = reference_df.iloc[0]['id']
reference_ids.append(row['referenceID'])
row['link'] = reference_df.iloc[0]['link']
row['sourceID'] = row['id']
row['refID'] = uuid.uuid5(
uuid.NAMESPACE_OID,
str(row['sourceID']) + str(row['referenceID']))
logging.info("%s - %s - %s - %s",
date, repr(title), row['sourceID'], row['referenceID'])
update_reference(row)
return reference_ids
else:
reference_ids = []
for title in reference_titles:
if 'split' in pattern:
for split_item in pattern['split']:
if 'exceptional_string' in split_item:
if split_item[
'string'] in title and isnot_substring(
split_item['exceptional_string'],
title):
title = re.split(split_item['string'],
title)[split_item['index']]
else:
if split_item['string'] in title:
title = title.split(
split_item['string'])[split_item['index']]
if len(data[(data['titleCN'].str.contains(title))
& (data['site'] == row['site'])]) == 0:
logging.info("------------ = 0 ------------")
logging.info(repr(title))
elif len(data[(data['titleCN'].str.contains(title))
& (data['site'] == row['site'])]) > 1:
logging.info("------------ > 1 ------------")
logging.info(repr(title))
else:
logging.info("------------ = 1 ------------")
reference_df = data[(data['titleCN'].str.contains(title))
& (data['site'] == row['site'])]
row['referenceID'] = reference_df.iloc[0]['id']
reference_ids.append(row['referenceID'])
row['link'] = reference_df.iloc[0]['link']
row['sourceID'] = row['id']
row['refID'] = uuid.uuid5(
uuid.NAMESPACE_OID,
str(row['sourceID']) + str(row['referenceID']))
logging.info("%s - %s - %s", repr(title), row['sourceID'],
row['referenceID'])
update_reference(row)
return reference_ids
except (ValueError, KeyError, TypeError) as error:
logging.error(error)
return None
def translist(infolist):
"""
Filter and transform a list of strings.
Args:
infolist (list): The input list of strings.
Returns:
list: The filtered and transformed list of strings.
"""
out = list(
filter(lambda s: s and (isinstance(s, str) or len(s.strip()) > 0),
[i.strip() for i in infolist]))
return out
def extract_from_pdf(url):
"""
Extracts text from a PDF file given its URL.
Args:
url (str): The URL of the PDF file.
Returns:
tuple: A tuple containing the extracted text and a summary of the text.
Raises:
Exception: If there is an error during the extraction process.
"""
# Send a GET request to the URL and retrieve the PDF content
response = requests.get(url, timeout=60)
pdf_content = response.content
# Save the PDF content to a local file
with open("downloaded_file.pdf", "wb") as file:
file.write(pdf_content)
# Open the downloaded PDF file and extract the text
extracted_text = ""
try:
with open("downloaded_file.pdf", "rb") as file:
pdf_reader = PyPDF2.PdfReader(file)
num_pages = len(pdf_reader.pages)
for page in range(num_pages):
text = pdf_reader.pages[page].extract_text()
if text and text[0].isdigit():
text = text[1:]
# first_newline_index = text.find('。\n')
text = text.replace('?\n', '?-\n').replace('!\n', '!-\n').replace(
'。\n', '。-\n').replace('\n', '').replace('?-', '?\n').replace(
'!-', '!\n').replace('。-', '。\n')
if text != '':
extracted_text += text
summary = '\n'.join(extracted_text.split('\n')[:2])
except (ValueError, KeyError, TypeError, PyPDF2.errors.PdfReadError) as e:
logging.error(e)
summary = extracted_text
return extracted_text, summary
def get_db_connection():
"""Get dynamoDB connection.
Returns:
boto3.resource: The DynamoDB resource object representing the connection.
"""
dynamodb = boto3.resource(service_name='dynamodb',
region_name='us-east-1',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
return dynamodb
def crawl_by_url(url, article):
"""
Crawls the given URL and extracts relevant information from the webpage.
Args:
url (str): The URL of the webpage to crawl.
article (dict): A dictionary to store the extracted information.
Returns:
None: If the length of the extracted content is less than 10 characters.
Raises:
None
"""
domain = '.'.join(urlparse(url).netloc.split('.')[1:])
headers = {'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
req = urllib.request.Request(url, headers=headers)
req = urllib.request.urlopen(req, timeout=60)
text = req.read()
html_text = text.decode("utf-8")
page = etree.HTML(html_text)
contentcn, summary = encode_content(
page.xpath(xpath_dict[domain]['content']))
if contentcn is None or len(contentcn) < 10:
return
article['originSite'] = xpath_dict[domain]['siteCN']
article['site'] = xpath_dict[domain]['site']
article['titleCN'] = encode(page.xpath(xpath_dict[domain]['title']))
article['title'] = translate(article['titleCN'])
if 'author' in xpath_dict[domain]:
article['author'] = translate(
encode(page.xpath(xpath_dict[domain]['author'])))
else:
article['author'] = ""
article['contentCN'] = repr(contentcn)[1:-1].strip()
if len(article['contentCN']) < 10:
return None
contenteng = ''
for element in contentcn.split("\n"):
contenteng += translate(element) + '\n'
try:
if detect(contenteng) != 'en':
for element in contentcn.split("。"):
contenteng += translate(element) + '. '
except (requests.exceptions.RequestException, requests.exceptions.ReadTimeout,
PyPDF2.errors.PdfReadError, PyPDF2.errors.DependencyError,
lang_detect_exception.LangDetectException) as e:
print(f"An unexpected error occurred: {e}")
article['content'] = repr(contenteng)[1:-1].strip()
try:
article['subtitle'] = summarize(article['content'])
except (ValueError, KeyError, TypeError):
article['subtitle'] = translate(summary)
article['publishDate'] = datemodifier(
encode(page.xpath(xpath_dict[domain]['publishdate'])),
xpath_dict[domain]['datetime_format'])
article['link'] = url
article['attachment'] = ""
article['sentimentScore'], article[
'sentimentLabel'] = sentiment_computation(contenteng.replace(
"\n", ""))
article['id'] = uuid.uuid5(uuid.NAMESPACE_OID,
article['titleCN'] + article['publishDate'])
logging.info("%s - %s", article['id'], article['site'])
article['referenceid'] = None
update_content(article)
vectorize(article)
data = download_files_from_s3('data')