gavinzli's picture
refactor logging and streamline content update process
93058c6
"""Module to crawl the website 'eastmoney.com' to fetch and process articles."""
import logging
import time
import json
import urllib.request
import uuid
from datetime import datetime, timedelta
from urllib.parse import urlparse
from http.client import IncompleteRead
from prefect import task, get_run_logger
from lxml import etree
from controllers.summarizer import summarize
from controllers.utils import (
datemodifier,
encode,
encode_content,
extract_reference,
fetch_url,
sentiment_computation,
translate,
update_content
)
from controllers.vectorizer import vectorize
with open('xpath.json', 'r', encoding='UTF-8') as f:
xpath_dict = json.load(f)
def _crawl(url, article, retries=3):
"""
Crawls the given URL and extracts information from the webpage.
Args:
url (str): The URL of the webpage to crawl.
article (dict): A dictionary to store the extracted information.
Returns:
None: If the length of the extracted content is less than 10 characters.
Raises:
None.
"""
domain = urlparse(url).netloc
for attempt in range(retries):
try:
req = urllib.request.urlopen(url, timeout=60)
text = req.read()
break
except (IncompleteRead, TimeoutError) as e:
if attempt == retries - 1:
time.sleep(1) # Wait before retrying
continue
else:
logging.error(e)
return None
html_text = text.decode("utf-8")
page = etree.HTML(html_text)
contentcn, summary = encode_content(
page.xpath(xpath_dict[domain]['content']))
article['attachment'] = encode(page.xpath(
xpath_dict[domain]['attachment']))
article['link'] = url
if article['orgSName'] == "''":
article['site'] = translate(article['orgSName'])
else:
article['site'] = translate(article['orgName'])
article['titleCN'] = article['title']
article['title'] = translate(article['title'])
article['author'] = translate(article['researcher'])
article['originAuthor'] = article['researcher']
article['contentCN'] = repr(contentcn)[1:-1].strip()
article['category'] = "Macroeconomic Research"
if len(article['contentCN']) < 10:
return None
contenteng = ''
for element in contentcn.split("\n"):
contenteng += translate(element) + '\n'
logging.info(contenteng)
article['content'] = repr(contenteng)[1:-1].strip()
try:
article['subtitle'] = summarize(article['content'])
except (RuntimeError, ValueError):
article['subtitle'] = translate(summary)
article['authorid'] = uuid.uuid5(uuid.NAMESPACE_OID, article['author'])
article['publishDate'] = datemodifier(
article['publishDate'], xpath_dict[domain]['datetime_format'])
article['id'] = uuid.uuid5(uuid.NAMESPACE_OID,
article['titleCN'] + article['publishDate'])
article['sentimentScore'], article[
'sentimentLabel'] = sentiment_computation(contentcn.replace("\n", ""))
article['referenceid'] = None
reference_id = extract_reference(article)
if reference_id:
article['referenceid'] = reference_id
update_content(article)
vectorize(article)
# openai_vectorize(article)
@task(name = "Data Collection - eastmoney", log_prints = True)
def crawl(delta):
"""
Crawls the website data.eastmoney.com and retrieves reports within a specified time range.
Args:
delta (int): The number of days to go back from the current date.
Returns:
None
Raises:
None
"""
logger = get_run_logger()
logger.info("data.eastmoney.com")
today = datetime.today().strftime('%Y-%m-%d')
i = 0
while i > -1:
category_url = "https://reportapi.eastmoney.com/report/jg"
params = {
"cb": "datatable8544623",
"pageSize": "100",
"beginTime": (datetime.today() - timedelta(days=delta)).strftime('%Y-%m-%d'),
"endTime": today,
"pageNo": i,
"qType": "3",
}
category_url = category_url + "?" + "&".join(f"{key}={value}"
for key, value in params.items())
content = fetch_url(category_url)
logging.info(content)
logging.info(category_url)
if content:
start_index = content.find("(")
result = content[start_index +
1:-1] if start_index != -1 else content
reportinfo = json.loads(result)
if reportinfo["size"] > 0:
i = i + 1
for article in reportinfo['data']:
try:
link = "https://data.eastmoney.com/report/zw_macresearch.jshtml"
url = f"{link}?encodeUrl={article['encodeUrl']}"
_crawl(url, article)
except (urllib.error.URLError, json.JSONDecodeError, KeyError) as error:
logger.error(error)
else:
i = -1
else:
logger.error("Failed to fetch URL: %s", category_url)