Muhammad Abdur Rahman Saad commited on
Commit
88de1f4
·
1 Parent(s): cc37e8c

update flow

Browse files
Files changed (3) hide show
  1. .github/workflows/daily.yml +0 -1
  2. daily.py +9 -4
  3. glue.py +20 -9
.github/workflows/daily.yml CHANGED
@@ -43,4 +43,3 @@ jobs:
43
  DELTA: ${{ github.event.inputs.delta}}
44
  run: |
45
  python daily.py
46
- python glue.py
 
43
  DELTA: ${{ github.event.inputs.delta}}
44
  run: |
45
  python daily.py
 
daily.py CHANGED
@@ -13,12 +13,12 @@ from urllib.parse import urlparse
13
  from prefect import flow, task
14
 
15
  from lxml import etree
16
-
17
  from utils import (crawl, datemodifier, encode, encode_content,
18
  extract_from_pdf, extract_reference, fetch_url,
19
  sentiment_computation, translate, update_content)
20
 
21
-
22
  def crawl_eastmoney(url, article):
23
  """
24
  Crawls the given URL and extracts information from the webpage.
@@ -66,8 +66,8 @@ def crawl_eastmoney(url, article):
66
  extract_reference(article)
67
  update_content(article)
68
 
69
- @flow(name = "Data Collection China - Daily", log_prints = True)
70
- def main():
71
  with open('xpath.json', 'r', encoding='UTF-8') as f:
72
  xpath_dict = json.load(f)
73
 
@@ -525,5 +525,10 @@ def main():
525
  except Exception as error:
526
  print(error)
527
 
 
 
 
 
 
528
  if __name__ == '__main__':
529
  main()
 
13
  from prefect import flow, task
14
 
15
  from lxml import etree
16
+ from glue import glue_job_run
17
  from utils import (crawl, datemodifier, encode, encode_content,
18
  extract_from_pdf, extract_reference, fetch_url,
19
  sentiment_computation, translate, update_content)
20
 
21
+ @task(name = "crawl_eastmoney")
22
  def crawl_eastmoney(url, article):
23
  """
24
  Crawls the given URL and extracts information from the webpage.
 
66
  extract_reference(article)
67
  update_content(article)
68
 
69
+ @task(name = "data collection")
70
+ def daily():
71
  with open('xpath.json', 'r', encoding='UTF-8') as f:
72
  xpath_dict = json.load(f)
73
 
 
525
  except Exception as error:
526
  print(error)
527
 
528
+ @flow(name = "Data Collection China - Daily", log_prints=True)
529
+ def main():
530
+ daily()
531
+ glue_job_run()
532
+
533
  if __name__ == '__main__':
534
  main()
glue.py CHANGED
@@ -1,6 +1,7 @@
1
  """Trigger Parquet Snapshot Glue job"""
2
  import os
3
  import boto3
 
4
  from dotenv import load_dotenv
5
 
6
  load_dotenv()
@@ -23,12 +24,22 @@ def get_client_connection():
23
  aws_secret_access_key=AWS_SECRET_ACCESS_KEY
24
  )
25
 
26
- glue = get_client_connection()
27
- response = glue.start_job_run(
28
- JobName='Article Snapshot China'
29
- )
30
- print(response)
31
- response = glue.start_job_run(
32
- JobName='Reference China'
33
- )
34
- print(response)
 
 
 
 
 
 
 
 
 
 
 
1
  """Trigger Parquet Snapshot Glue job"""
2
  import os
3
  import boto3
4
+ from prefect import task
5
  from dotenv import load_dotenv
6
 
7
  load_dotenv()
 
24
  aws_secret_access_key=AWS_SECRET_ACCESS_KEY
25
  )
26
 
27
+ @task(name = "glue_job_run")
28
+ def glue_job_run():
29
+ """
30
+ Triggers the Glue job run for the Parquet Snapshot.
31
+
32
+ :return: None
33
+ """
34
+ glue = get_client_connection()
35
+ response = glue.start_job_run(
36
+ JobName='Article Snapshot China'
37
+ )
38
+ print(response)
39
+ response = glue.start_job_run(
40
+ JobName='Reference China'
41
+ )
42
+ print(response)
43
+
44
+ if __name__ == "__main__":
45
+ glue_job_run()