Utilizzo dei sensori del flusso d’aria per controllare l’esecuzione dei DAG secondo una pianificazione diversa
Recentemente, ho cercato di coordinare due DAG Airflow in modo tale che uno venga eseguito, secondo la propria pianificazione oraria, solo se l’altro DAG (in esecuzione su base giornaliera) ha avuto successo.
Nel tutorial di oggi ti guiderò attraverso il caso d’uso e dimostrerò come ottenere il comportamento desiderato in tre modi diversi; due utilizzando il ExternalTaskSensor
e un altro che utilizza un approccio personalizzato con PythonOperator
.
Ora iniziamo con il nostro caso d’uso che coinvolge due DAG Airflow.
Il primo DAG, my_daily_dag
viene eseguito tutti i giorni alle 5:00 UTC.
from datetime import datetime, timedelta
from pathlib import Pathfrom airflow.models import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 5 * * *',
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
Il secondo DAG, my_hourly_dag
viene eseguito su base oraria, tra le 6:00 e le 20:00 UTC.
from datetime import datetime, timedelta
from pathlib import Pathfrom airflow.models import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 every hour between 6AM-8PM
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
Nel nostro caso d’uso, vorremmo my_hourly_dag
correre solo se my_daily_dag
è stato eseguito correttamente entro la data corrente. Se no, allora my_hourly_dag
dovrebbe essere saltato. È importante menzionare qui che non vogliamo innescare my_hourly_dag
non appena my_daily_dag
riesce. Ciò sarebbe realizzabile con TriggerDagRun
…
Fonte: towardsdatascience.com