1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 | import datetime as dt
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from pprint import pprint
DAG_ID = "MA_scraper" # must change before uploading to airflow !!!
START_DATE = days_ago(30)
# SCHEDULE_INTERVAL = dt.timedelta(hours=12)
# SCHEDULE_INTERVAL = "@daily"
SCHEDULE_INTERVAL = "5 0 * * 2-6"
MAX_ACTIVE_RUN = 1
MAX_CONCURRENT_TASKS = 3
RETRY_COUNT = 3
RETRY_DELAY = dt.timedelta(minutes=5)
EMAIL = "mobinalhassan1@gmail.com"
OWNER = "datascope"
default_args = {
'owner': OWNER,
'depends_on_past': False,
'start_date': START_DATE,
'email': [EMAIL],
'email_on_failure': True,
'email_on_retry': True,
'retries': RETRY_COUNT,
'retry_delay': RETRY_DELAY,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
'trigger_rule': 'all_done'
}
dag = DAG(
dag_id=DAG_ID,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
max_active_runs=MAX_ACTIVE_RUN,
catchup=False,
default_args=default_args,
concurrency=MAX_CONCURRENT_TASKS
)
latest_only = LatestOnlyOperator(task_id='my_latest_only', dag=dag)
# Pre-requisite
prerequisite_scheduler = DockerOperator(
task_id='Prerequisite-scheduler',
image='registry.gitlab.com/aintelproject/prerequisite:latest',
auto_remove=True,
force_pull=True,
dag=dag
)
# N3
north_worcester_scheduler = DockerOperator(
task_id='N3-T1-North-Worcester-County-scheduler',
image='registry.gitlab.com/aintelproject/n3-t1-north-worcester-county:latest',
auto_remove=True,
force_pull=True,
environment={
"AWS_ACCESS_KEY_ID": "{{var.value.AWS_ACCESS_KEY_ID_BOSTON}}",
"AWS_SECRET_ACCESS_KEY": "{{var.value.AWS_SECRET_ACCESS_KEY_BOSTON}}",
"BUCKET_NAME": "{{var.value.BUCKET_NAME_BOSTON}}",
"REGION": "{{var.value.REGION_BOSTON}}",
},
dag=dag
)
# N4
north_essex_scheduler = DockerOperator(
task_id='N4-T1-North-Essex-County-scheduler',
image='registry.gitlab.com/aintelproject/n4-t1-north-essex-county:latest',
auto_remove=True,
force_pull=True,
environment={
"AWS_ACCESS_KEY_ID": "{{var.value.AWS_ACCESS_KEY_ID_BOSTON}}",
"AWS_SECRET_ACCESS_KEY": "{{var.value.AWS_SECRET_ACCESS_KEY_BOSTON}}",
"BUCKET_NAME": "{{var.value.BUCKET_NAME_BOSTON}}",
"REGION": "{{var.value.REGION_BOSTON}}",
},
dag=dag
)
# N19
north_bristol_county_scheduler = DockerOperator(
task_id='N19-T4-North-Bristol-county-scheduler',
image='registry.gitlab.com/aintelproject/n19-t4-north-bristol-county:latest',
auto_remove=True,
force_pull=True,
environment={
"AWS_ACCESS_KEY_ID": "{{var.value.AWS_ACCESS_KEY_ID_BOSTON}}",
"AWS_SECRET_ACCESS_KEY": "{{var.value.AWS_SECRET_ACCESS_KEY_BOSTON}}",
"BUCKET_NAME": "{{var.value.BUCKET_NAME_BOSTON}}",
"REGION": "{{var.value.REGION_BOSTON}}",
},
dag=dag
)
# Post-requirement
post_requisite_scheduler = DockerOperator(
task_id='Post-Requirement-scheduler',
image='registry.gitlab.com/aintelproject/postrequisite:latest',
auto_remove=True,
force_pull=True,
environment={
"AWS_ACCESS_KEY_ID": "{{var.value.AWS_ACCESS_KEY_ID_BOSTON}}",
"AWS_SECRET_ACCESS_KEY": "{{var.value.AWS_SECRET_ACCESS_KEY_BOSTON}}",
"BUCKET_NAME": "{{var.value.BUCKET_NAME_BOSTON}}",
"REGION": "{{var.value.REGION_BOSTON}}",
},
dag=dag
)
print('We are in N3 N4 N19 Dag')
prerequisite_scheduler >> [north_worcester_scheduler >> north_essex_scheduler, north_bristol_county_scheduler] >> post_requisite_scheduler
|