Consider the following situation: You have a data ingestion pipeline where the data comes in real-time on weekdays and is stored in a dated folder. The day’s data needs to be ingested within four hours. An instant response may be – oh, that’s easy! Just set the schedule_interval=’0 0 * * 1-5′. Unfortunately, this would break the ‘within four hours’ condition because the data that came in on the Friday execution wouldn’t be scheduled by the Airflow Scheduler until Monday 12:00 AM. This situation is a common pitfall for new Airflow users.
Before I talk about different ways to tackle this situation, if you are new to Airflow, I recommend reading a blog that I wrote in the past (shameless plug), which highlights the basics of the Airflow Scheduler.
Firstly, why is it that Friday’s data wouldn’t be ingested until Monday at 12:00 AM if schedule_interval=’0 0 * * 1-5′? This has to do with two of the conditions that need be satisfied before the Airflow Scheduler triggers the DAG.
- execution_date <= timezone.utcnow() # execution_date
- period_end <= timezone.utcnow() # period_end = follow_schedule(execution_date)
Notice that period_end, which is defined to be the next execution date as determined by the cron expression, needs to be less than or equal to the current time. The next date after Friday at 12:00 AM is Monday at 12:00 AM. This is why the Friday at 12:00 AM execution won’t run until Monday at 12:00 AM.
Given these two conditions, how can we still process the data in a timely fashion without running the job on the weekends?
1. Run the jobs every day
Schedule the job every day but have ‘expected failures’ on the weekends. While this will solve the issue, it will be annoying to get failure alerts over the weekend. You may become jaded to actual errors.
2. Calculate the real execution date using PythonOperator
Offset the schedule and have a custom python operator at the beginning of the DAG which sets the ‘real’ execution date. The downstream tasks will have to ignore the built-in execution_date and use the date established by this task.
For our case: We change the schedule to 0 0 * * 2-6 and create a python operator at the beginning which returns execution_date – 1 day. On Monday and on Monday at midnight, nothing will run because there is no data on Sunday. From Tuesday to Saturday at midnight, the previous day’s data will be ingested. This is exactly what we are looking for!
While this works, it may be confusing for developers who are adding new tasks or attempting to trigger the DAG manually. Due to the date logic, the person who triggers the DAG manually needs to be aware of the custom date that we are using. Fairly dangerous, right?
3. Use a ShortCircuitOperator
We want to have the advantage of Solution 1, where we don’t introduce our own execution_date, but we want the schedule of Solution 2 so that we don’t have ‘expected failures’. Such a solution does exist. We can achieve the best of both by leveraging the ShortCircuitOperator.
To apply this technique to your situation, you would schedule the job every day: schedule_interval=’0 0 * * *’. At the beginning of your DAG, you create a ShortCircuitOperator which takes in a python callable that may look like the following:
def filter_execution_date(*args, **kwargs): execution_date = kwargs['execution_date'] cron = croniter('0 0 * * 1-5', execution_date) return execution_date == cron.get_next(datetime).get_prev(datetime) weekdays_only = ShortCircuitOperator( dag=dag, task_id='weekdays_only', python_callable=filter_execution_date ... )
The weekdays_only task will filter for execution_dates that fall within ‘0 0 * * 1-5’, so effectively Monday to Friday. For the execution dates of Saturday and Sunday, the downstream tasks will be skipped. This technique prevents unnecessary alarms from going off. Also, since we don’t have a custom date arithmetic, when a DAG is manually triggered with an execution date, that DAG will be run (assuming that the execution date falls within the cron expression specified in the filter_execution_date).
There are ways to augment this technique, like creating a new SubClass of the ShortCircuitOperator or creating a function that returns a python callable so that you don’t need to declare. However, I will leave that up to your imagination and creativity.