1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | from airflow.operators.bash_operator import BashOperator import datetime as dt from airflow.models import DAG from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.utils.dates import days_ago DAG_ID = "dags_update" # must change before uploading to airflow !!! START_DATE = days_ago(2) SCHEDULE_INTERVAL = dt.timedelta(minutes=2) MAX_ACTIVE_RUN = 1 MAX_CONCURRENT_TASKS = 3 RETRY_COUNT = 0 RETRY_DELAY = dt.timedelta(minutes=1) EMAIL = "mobinalhassan1@gmail.com" OWNER = "datascope_jobs" default_args = { 'owner': OWNER, 'depends_on_past': False, 'start_date': START_DATE, 'email': [EMAIL], 'email_on_failure': False, 'email_on_retry': False, 'retries': RETRY_COUNT, 'retry_delay': RETRY_DELAY, # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'trigger_rule': u'all_success' } dag = DAG( dag_id=DAG_ID, schedule_interval=SCHEDULE_INTERVAL, start_date=START_DATE, max_active_runs=MAX_ACTIVE_RUN, catchup=False, default_args=default_args, concurrency=MAX_CONCURRENT_TASKS ) latest_only = LatestOnlyOperator(task_id='my_latest_only', dag=dag) run_this = BashOperator( task_id='git_update', bash_command='cd {{var.value.dag_directory}} && ' 'if [ -d "dags" ]; then ' ' cd dags && git remote set-url origin https://mobinalhassan:hjk%2567leyd%2891uygfx@gitlab.com/aintelproject/dags.git && ' ' git fetch --all && git reset --hard origin/master && git pull; ' 'else ' ' git clone https://mobinalhassan:hjk%2567leyd%2891uygfx@gitlab.com/aintelproject/dags.git && ' ' cd dags && git fetch --all && git reset --hard origin/master && git pull; ' 'fi', dag=dag, ) latest_only >> run_this |