"""Module to crawl the website 'eastmoney.com' to fetch and process articles.""" import logging import time import json import urllib.request import uuid from datetime import datetime, timedelta from urllib.parse import urlparse from http.client import IncompleteRead from prefect import task, get_run_logger from lxml import etree from controllers.summarizer import summarize from controllers.utils import ( datemodifier, encode, encode_content, extract_reference, fetch_url, sentiment_computation, translate, update_content ) from controllers.vectorizer import vectorize with open('xpath.json', 'r', encoding='UTF-8') as f: xpath_dict = json.load(f) def _crawl(url, article, retries=3): """ Crawls the given URL and extracts information from the webpage. Args: url (str): The URL of the webpage to crawl. article (dict): A dictionary to store the extracted information. Returns: None: If the length of the extracted content is less than 10 characters. Raises: None. """ domain = urlparse(url).netloc for attempt in range(retries): try: req = urllib.request.urlopen(url, timeout=60) text = req.read() break except (IncompleteRead, TimeoutError) as e: if attempt == retries - 1: time.sleep(1) # Wait before retrying continue else: logging.error(e) return None html_text = text.decode("utf-8") page = etree.HTML(html_text) contentcn, summary = encode_content( page.xpath(xpath_dict[domain]['content'])) article['attachment'] = encode(page.xpath( xpath_dict[domain]['attachment'])) article['link'] = url if article['orgSName'] == "''": article['site'] = translate(article['orgSName']) else: article['site'] = translate(article['orgName']) article['titleCN'] = article['title'] article['title'] = translate(article['title']) article['author'] = translate(article['researcher']) article['originAuthor'] = article['researcher'] article['contentCN'] = repr(contentcn)[1:-1].strip() article['category'] = "Macroeconomic Research" if len(article['contentCN']) < 10: return None contenteng = '' for element in contentcn.split("\n"): contenteng += translate(element) + '\n' logging.info(contenteng) article['content'] = repr(contenteng)[1:-1].strip() try: article['subtitle'] = summarize(article['content']) except (RuntimeError, ValueError): article['subtitle'] = translate(summary) article['authorid'] = uuid.uuid5(uuid.NAMESPACE_OID, article['author']) article['publishDate'] = datemodifier( article['publishDate'], xpath_dict[domain]['datetime_format']) article['id'] = uuid.uuid5(uuid.NAMESPACE_OID, article['titleCN'] + article['publishDate']) article['sentimentScore'], article[ 'sentimentLabel'] = sentiment_computation(contentcn.replace("\n", "")) article['referenceid'] = None reference_id = extract_reference(article) if reference_id: article['referenceid'] = reference_id update_content(article) vectorize(article) # openai_vectorize(article) @task(name = "Data Collection - eastmoney", log_prints = True) def crawl(delta): """ Crawls the website data.eastmoney.com and retrieves reports within a specified time range. Args: delta (int): The number of days to go back from the current date. Returns: None Raises: None """ logger = get_run_logger() logger.info("data.eastmoney.com") today = datetime.today().strftime('%Y-%m-%d') i = 0 while i > -1: category_url = "https://reportapi.eastmoney.com/report/jg" params = { "cb": "datatable8544623", "pageSize": "100", "beginTime": (datetime.today() - timedelta(days=delta)).strftime('%Y-%m-%d'), "endTime": today, "pageNo": i, "qType": "3", } category_url = category_url + "?" + "&".join(f"{key}={value}" for key, value in params.items()) content = fetch_url(category_url) logging.info(content) logging.info(category_url) if content: start_index = content.find("(") result = content[start_index + 1:-1] if start_index != -1 else content reportinfo = json.loads(result) if reportinfo["size"] > 0: i = i + 1 for article in reportinfo['data']: try: link = "https://data.eastmoney.com/report/zw_macresearch.jshtml" url = f"{link}?encodeUrl={article['encodeUrl']}" _crawl(url, article) except (urllib.error.URLError, json.JSONDecodeError, KeyError) as error: logger.error(error) else: i = -1 else: logger.error("Failed to fetch URL: %s", category_url)