How To Set SLA in Apache Airflow
24 July 2022 - 5 MINS READ
A quick overview of Airflow
Apache Airflow is, at its core, an open-source batch task scheduler. It’s a platform for programmatically authoring (creating), scheduling, and monitoring workflows via Directed Acyclic Graphs (DAGs). Airflow is distinguished from other schedulers primarily by its ability to schedule tasks as code. It’s gaining popularity, especially among developers, because it’s written in Python and it’s easy to create code-based orchestration with it. To put it all together, Airflow is a batch-oriented open-source framework that enables us to build data pipelines and monitor data workflows efficiently.
How to set SLA in Airflow
SLA stands for service-level agreement. It’s a contract between a service provider and its customer that outlines the level of service expected by the customer and guaranteed by the vendor. In the context of Airflow, an SLA is the maximum amount of time a task or a DAG should take to complete. It serves to notify us if our DAG or tasks are taking longer than expected to complete.
Note that SLAs are set to the DAG execution date rather than the task start time.
An SLA miss occurs if a task or DAG is not finished within the allotted time for the SLA. If an SLA is missed, an email alert is sent detailing the list of tasks that missed their SLA. Additionally, the event is also logged in the database and made available in the web UI. The web UI gives us broad details about which task failed to meet the SLA and when. Additionally, it shows if an email was sent after the SLA failed.
When SLAs are breached, Airflow can perform the following:
- Sends an email alert notification
- Logs the event in the database
- Invokes the callback function via
It should be noted that only scheduled tasks will be checked against the SLA. An SLA miss won’t be triggered by manually initiated tasks!
There are several ways to define an SLA that include:
- Defining SLA on the task itself
- Defining SLA on DAG level
Defining SLA on the task itself
Note that defining SLAs for a task is optional. As a result, in a DAG with several tasks, some tasks may have SLAs and others may not. To set an SLA for a task, pass a
datetime.timedelta object to the task/operator’s
sla parameter as shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta with DAG('setting_sla', start_date=datetime(2002, 1, 1), schedule_interval='@daily', catchup=False) as dag: bash_task = BashOperator( task_id='bash_task', bash_command='sleep 10', sla=timedelta(seconds=5) # SLA for the task )
When a task exceeds its SLA, it is not cancelled. It’s free to continue running till the end.
timedelta object is found in the Python’s datetime library. It takes the following arguments:
- microseconds (not applicable to Airflow)
- milliseconds (not applicable to Airflow)
Note that milliseconds and microseconds available, but those wouldn’t apply to Airflow.
Cancelling or failing the task
Timeouts can help cancel or fail a task if it takes longer than the allotted time. If we want to cancel a task after a certain runtime is reached, we can set the execution_timeout attribute to a
datetime.timedelta value that represents the longest permitted duration.
Having said that, the maximum duration permitted for each execution is managed by the property
execution_timeout. If it’s breached, the task times out and
AirflowTaskTimeout is raised.
Implementing our own logic
We can use callbacks to trigger our own logic if we choose to do so. In this case, we can use
sla_miss_callback that will be called when the SLA is not met.
Only one callback function is permitted for tasks or a DAG.
The function signature of an
sla_miss_callback requires the following five parameters:
Let’s define a callback method:
1 2 def _sla_missied_take_action(*args, **kwargs): logger.info(args)
Pass the callback method to DAG as shown below:
1 2 3 4 5 6 with DAG('setting_sla', start_date=datetime(2002, 1, 1), default_args=default_args, schedule_interval='@daily', sla_miss_callback=_sla_missied_take_action # callback function catchup=False) as dag:
If we want to disable SLA checking entirely, we can set
check_slas = False in Airflow’s
1 2 [core] check_slas = False