DAG: Permits_Boston_Data

schedule: @daily


Permits_Boston_Data

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import datetime as dt

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import PythonOperator

from airflow.operators.docker_operator import DockerOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# import config

DAG_ID = "Permits_Boston_Data"  # must change before uploading to airflow !!!
START_DATE = days_ago(30)
# SCHEDULE_INTERVAL = dt.timedelta(hours=12)
SCHEDULE_INTERVAL = "@daily"
#SCHEDULE_INTERVAL = "5 0 * * 2-6"

MAX_ACTIVE_RUN = 1
MAX_CONCURRENT_TASKS = 3
RETRY_COUNT = 3
RETRY_DELAY = dt.timedelta(minutes=5)

OWNER = "datascope"

default_args = {
    'owner': OWNER,
    'depends_on_past': False,
    'start_date': START_DATE,
    # 'email': config.EMAIL_LIST,
    'email': [],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': RETRY_COUNT,
    'retry_delay': RETRY_DELAY,
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    'trigger_rule': 'all_done'
}

dag = DAG(
    dag_id=DAG_ID,
    schedule_interval=SCHEDULE_INTERVAL,
    start_date=START_DATE,
    max_active_runs=MAX_ACTIVE_RUN,
    catchup=False,
    default_args=default_args,
    concurrency=MAX_CONCURRENT_TASKS

)

latest_only = LatestOnlyOperator(task_id='my_latest_only', dag=dag)

barnstable_scheduler = DockerOperator(
    task_id='Permits_Boston_Data-scheduler',
    image='registry.gitlab.com/aintelproject/permit_boston_data:latest',
    auto_remove=True,
    force_pull=True,
    environment={
        "AWS_ACCESS_KEY_ID": "{{var.value.AWS_ACCESS_KEY_ID_BOSTON}}",
        "AWS_SECRET_ACCESS_KEY": "{{var.value.AWS_SECRET_ACCESS_KEY_BOSTON}}",
        "BUCKET_NAME": "{{var.value.BUCKET_NAME_BOSTON}}",
        "REGION": "{{var.value.REGION_BOSTON}}",
    },
    dag=dag
)

latest_only >> barnstable_scheduler