gavinzli's picture
Refactor logging in multiple files to replace print statements with logging calls for better traceability
1c5f2e5
raw
history blame
4.91 kB
"""Module to crawl the website 'eastmoney.com' to fetch and process articles."""
import logging
import time
import json
import urllib.request
import uuid
from datetime import datetime, timedelta
from urllib.parse import urlparse
from http.client import IncompleteRead
from prefect import task, get_run_logger
from lxml import etree
from controllers.summarizer import summarize
from controllers.utils import (
datemodifier,
encode,
encode_content,
extract_reference,
fetch_url,
sentiment_computation,
translate,
update_content
)
with open('xpath.json', 'r', encoding='UTF-8') as f:
xpath_dict = json.load(f)
def _crawl(url, article, retries=3):
"""
Crawls the given URL and extracts information from the webpage.
Args:
url (str): The URL of the webpage to crawl.
article (dict): A dictionary to store the extracted information.
Returns:
None: If the length of the extracted content is less than 10 characters.
Raises:
None.
"""
domain = urlparse(url).netloc
for attempt in range(retries):
try:
req = urllib.request.urlopen(url)
text = req.read()
break
except IncompleteRead as e:
if attempt == retries - 1:
time.sleep(1) # Wait before retrying
continue
else:
logging.error(e)
return None
html_text = text.decode("utf-8")
page = etree.HTML(html_text)
contentcn, summary = encode_content(
page.xpath(xpath_dict[domain]['content']))
article['attachment'] = encode(page.xpath(
xpath_dict[domain]['attachment']))
article['link'] = url
if article['orgSName'] == "''":
article['site'] = translate(article['orgSName'])
else:
article['site'] = translate(article['orgName'])
article['titleCN'] = article['title']
article['title'] = translate(article['title'])
article['author'] = translate(article['researcher'])
article['originAuthor'] = article['researcher']
article['contentCN'] = repr(contentcn)[1:-1].strip()
article['category'] = "Macroeconomic Research"
if len(article['contentCN']) < 10:
return None
contenteng = ''
for element in contentcn.split("\n"):
contenteng += translate(element) + '\n'
article['content'] = repr(contenteng)[1:-1].strip()
try:
article['subtitle'] = summarize(article['content'])
except (RuntimeError, ValueError):
article['subtitle'] = translate(summary)
article['authorid'] = uuid.uuid5(uuid.NAMESPACE_OID, article['author'])
article['publishDate'] = datemodifier(
article['publishDate'], xpath_dict[domain]['datetime_format'])
article['id'] = uuid.uuid5(uuid.NAMESPACE_OID,
article['titleCN'] + article['publishDate'])
article['sentimentScore'], article[
'sentimentLabel'] = sentiment_computation(contentcn.replace("\n", ""))
extract_reference(article)
update_content(article)
@task(name = "Data Collection - eastmoney", log_prints = True)
def crawl(delta):
"""
Crawls the website data.eastmoney.com and retrieves reports within a specified time range.
Args:
delta (int): The number of days to go back from the current date.
Returns:
None
Raises:
None
"""
logger = get_run_logger()
logger.info("data.eastmoney.com")
today = datetime.today().strftime('%Y-%m-%d')
i = 0
while i > -1:
category_url = "https://reportapi.eastmoney.com/report/jg"
params = {
"cb": "datatable8544623",
"pageSize": "100",
"beginTime": (datetime.today() - timedelta(days=delta)).strftime('%Y-%m-%d'),
"endTime": today,
"pageNo": i,
"qType": "3",
}
category_url = category_url + "?" + "&".join(f"{key}={value}"
for key, value in params.items())
content = fetch_url(category_url)
logging.info(content)
logging.info(category_url)
if content:
start_index = content.find("(")
result = content[start_index +
1:-1] if start_index != -1 else content
reportinfo = json.loads(result)
if reportinfo["size"] > 0:
i = i + 1
for article in reportinfo['data']:
try:
url = f"https://data.eastmoney.com/report/zw_macresearch.jshtml?encodeUrl={article['encodeUrl']}"
_crawl(url, article)
except (urllib.error.URLError, json.JSONDecodeError, KeyError) as error:
logger.error(error)
else:
i = -1
else:
logger.error("Failed to fetch URL: %s", category_url)