Esecuzione del DAG Airflow solo se un altro DAG ha esito positivo

 | Intelligenza-Artificiale

Utilizzo dei sensori del flusso d’aria per controllare l’esecuzione dei DAG secondo una pianificazione diversa

Immagine generata da DALL-E2

Recentemente, ho cercato di coordinare due DAG Airflow in modo tale che uno venga eseguito, secondo la propria pianificazione oraria, solo se l’altro DAG (in esecuzione su base giornaliera) ha avuto successo.

Nel tutorial di oggi ti guiderò attraverso il caso d’uso e dimostrerò come ottenere il comportamento desiderato in tre modi diversi; due utilizzando il ExternalTaskSensor e un altro che utilizza un approccio personalizzato con PythonOperator.

Ora iniziamo con il nostro caso d’uso che coinvolge due DAG Airflow.

Il primo DAG, my_daily_dagviene eseguito tutti i giorni alle 5:00 UTC.

from datetime import datetime, timedelta
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator

with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 5 * * *',
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')

Il secondo DAG, my_hourly_dagviene eseguito su base oraria, tra le 6:00 e le 20:00 UTC.

from datetime import datetime, timedelta
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator

with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 every hour between 6AM-8PM
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')

Nel nostro caso d’uso, vorremmo my_hourly_dag correre solo se my_daily_dag è stato eseguito correttamente entro la data corrente. Se no, allora my_hourly_dag dovrebbe essere saltato. È importante menzionare qui che non vogliamo innescare my_hourly_dag non appena my_daily_dag riesce. Ciò sarebbe realizzabile con TriggerDagRun

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *