DAG: MA_scraper

schedule: 5 0 * * 2-6


MA_scraper

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import datetime as dt

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import PythonOperator

from airflow.operators.docker_operator import DockerOperator
from airflow.operators.bash_operator import BashOperator

from airflow.utils.dates import days_ago
from pprint import pprint

DAG_ID = "MA_scraper"  # must change before uploading to airflow !!!
START_DATE = days_ago(30)
# SCHEDULE_INTERVAL = dt.timedelta(hours=12)
# SCHEDULE_INTERVAL = "@daily"
SCHEDULE_INTERVAL = "5 0 * * 2-6"

MAX_ACTIVE_RUN = 1
MAX_CONCURRENT_TASKS = 3
RETRY_COUNT = 3
RETRY_DELAY = dt.timedelta(minutes=5)

EMAIL = "mobinalhassan1@gmail.com"
OWNER = "datascope"

default_args = {
    'owner': OWNER,
    'depends_on_past': False,
    'start_date': START_DATE,
    'email': [EMAIL],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': RETRY_COUNT,
    'retry_delay': RETRY_DELAY,
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    'trigger_rule': 'all_done'
}

dag = DAG(
    dag_id=DAG_ID,
    schedule_interval=SCHEDULE_INTERVAL,
    start_date=START_DATE,
    max_active_runs=MAX_ACTIVE_RUN,
    catchup=False,
    default_args=default_args,
    concurrency=MAX_CONCURRENT_TASKS

)

latest_only = LatestOnlyOperator(task_id='my_latest_only', dag=dag)
# Pre-requisite
prerequisite_scheduler = DockerOperator(
    task_id='Prerequisite-scheduler',
    image='registry.gitlab.com/aintelproject/prerequisite:latest',
    auto_remove=True,
    force_pull=True,
    dag=dag
)

# N3
north_worcester_scheduler = DockerOperator(
    task_id='N3-T1-North-Worcester-County-scheduler',
    image='registry.gitlab.com/aintelproject/n3-t1-north-worcester-county:latest',
    auto_remove=True,
    force_pull=True,
    environment={
        "AWS_ACCESS_KEY_ID": "{{var.value.AWS_ACCESS_KEY_ID_BOSTON}}",
        "AWS_SECRET_ACCESS_KEY": "{{var.value.AWS_SECRET_ACCESS_KEY_BOSTON}}",
        "BUCKET_NAME": "{{var.value.BUCKET_NAME_BOSTON}}",
        "REGION": "{{var.value.REGION_BOSTON}}",
    },
    dag=dag
)
# N4
north_essex_scheduler = DockerOperator(
    task_id='N4-T1-North-Essex-County-scheduler',
    image='registry.gitlab.com/aintelproject/n4-t1-north-essex-county:latest',
    auto_remove=True,
    force_pull=True,
    environment={
        "AWS_ACCESS_KEY_ID": "{{var.value.AWS_ACCESS_KEY_ID_BOSTON}}",
        "AWS_SECRET_ACCESS_KEY": "{{var.value.AWS_SECRET_ACCESS_KEY_BOSTON}}",
        "BUCKET_NAME": "{{var.value.BUCKET_NAME_BOSTON}}",
        "REGION": "{{var.value.REGION_BOSTON}}",
    },
    dag=dag
)
# N19
north_bristol_county_scheduler = DockerOperator(
    task_id='N19-T4-North-Bristol-county-scheduler',
    image='registry.gitlab.com/aintelproject/n19-t4-north-bristol-county:latest',
    auto_remove=True,
    force_pull=True,
    environment={
        "AWS_ACCESS_KEY_ID": "{{var.value.AWS_ACCESS_KEY_ID_BOSTON}}",
        "AWS_SECRET_ACCESS_KEY": "{{var.value.AWS_SECRET_ACCESS_KEY_BOSTON}}",
        "BUCKET_NAME": "{{var.value.BUCKET_NAME_BOSTON}}",
        "REGION": "{{var.value.REGION_BOSTON}}",
    },
    dag=dag
)
# Post-requirement
post_requisite_scheduler = DockerOperator(
    task_id='Post-Requirement-scheduler',
    image='registry.gitlab.com/aintelproject/postrequisite:latest',
    auto_remove=True,
    force_pull=True,
    environment={
        "AWS_ACCESS_KEY_ID": "{{var.value.AWS_ACCESS_KEY_ID_BOSTON}}",
        "AWS_SECRET_ACCESS_KEY": "{{var.value.AWS_SECRET_ACCESS_KEY_BOSTON}}",
        "BUCKET_NAME": "{{var.value.BUCKET_NAME_BOSTON}}",
        "REGION": "{{var.value.REGION_BOSTON}}",
    },
    dag=dag
)

print('We are in N3 N4 N19 Dag')

prerequisite_scheduler >> [north_worcester_scheduler >> north_essex_scheduler, north_bristol_county_scheduler] >> post_requisite_scheduler