What’s Airflow?
Apache Airflow is an open source scheduler built on Python. It uses a topological sorting mechanism, called a DAG (Directed Acyclic Graph) to generate dynamic tasks for execution according to dependency, schedule, dependency task completion, data partition and/or many other possible criteria. This essentially means that the tasks that Airflow generates in a DAG have execution dependencies that define their ordering, and that the workflow has no cycles in it… there’s a sequential progression of tasks that result in the ultimate completion of a complex workflow. Note that tasks can also be executed in parallel as long as they share the same prerequisite dependencies, which could be ‘no dependencies’.
The product consists of a web server with UI, CLI and configuration database (API is coming), a collection of Gunicorn workers to process tasks in parallel and a scheduler service that processes DAGs and feeds the workers task instances to run via a task queue (usually run on a message queue service, such as RabbitMQ… many options exist).
What’s all that mean? It means we can write Python code that generates and schedules dynamic tasks in the environment. A task is simply a chunk of code for Airflow to run, and can be created using a static file, by parameter passing to a template command, or by actual dynamic generation of the code in Python. In our example below, we will demonstrate the latter two options, since writing static code is kind of boring.
Why is this interesting?
Creating pipeline tasks dynamically allows us to create automation on a large scale, and also to manage dynamically-provisioned environments. For instance, we could have a DAG that queries a configuration API and creates dynamic daily backup tasks for each pipeline in our data services environment. This is what we’re going to do below. Since we have Python at our fingertips we can envision many other interactions with APIs are possible, for instance we could have Airflow manage scaling and synthetic service checks, create dynamic data processing pipelines from template, or execute snapshot backups of our cloud instances.
Let’s not get ahead of ourselves.
Airflow installation and configuration
Go here… https://airflow.apache.org/installation.html
…we installed Airflow with Celery, using a RabbitMQ task queue and MySQL DB backend. This blog isn’t about installation, but let’s understand what we have for our example setup:
- A web server with Airflow UI and MySQL backend… DAGs are displayed here, along with task execution records and Connections, Variables and Xcoms.
- A RabbitMQ message queue with the Airflow configuration pointed at a configured vhost and Celery Executor configured.
- A scheduler service that polls the DAGs directory, processes the code and manages resulting task schedules.
- A worker service consisting of a configurable pool of gunicorn task executor threads. Workers are assigned DAG processing and task execution jobs by the scheduler, which enqueues tasks for worker instances to pick up for processing.
Python3 and Virtualenv
I would recommend that you use a Python Virtualenv for Airflow and for your Python working directory. Most systems still use Python 2.7, which is olde.
Basically, install Python3 and Virtualenv for your platform. Create a Virtualenv using Python3, with:
~$ virtualenv -p python3 myenv ~$ cd myenv && source bin/activate (myenv) ~$ python -V Python 3.5.4 (myenv) ~$
The Airflow service environment should be configured in the same way, then run your pip installation from inside the virtualenv. If you wish to make the environment reproducible, run a ‘pip freeze’ inside the environment and then save the output in a requirements file. You can then add ‘-r requirements.txt’ to your virtualenv command to reconfigure the same way.
We also need to install the ‘requests’ package in the Airflow environment to support our DAG… ‘pip install -U requests’. The requests package is a wrapper for fetching HTTP URL, including headers and content. It’s often used with RESTful API’s (here also).
Building the DAG
DAGs are picked up from the ${AIRFLOW_HOME}/dags directory by default. We can pretty much just drop our code into this directory, and the scheduler service will process it. If you drop a bad DAG, it will show as an error in the logs and also display a broken DAG warning in the interface. If you drop a healthy DAG in, it will execute the code and the resulting (dynamic, in our case) task instances will be created and scheduled. At this point we can manage the DAG via the Airflow CLI, to enable or disable it, trigger a backfill of tasks in a time range, or configure Connection and Variable objects for the DAG to use when generating task parameters.
There are several basic components to a DAG, and they are simply outlined here… https://airflow.apache.org/tutorial.html (note the templating with jinja section, as we’ll use this approach in our DAG).
So, here’s code, in chunks:
Imports
# Airflow imports from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow.models import Variable from airflow.hooks.base_hook import BaseHook # Metadata API imports import requests, json, string
These declarations import all the libs we need to process our DAG.
Inputs and arguments
connection = BaseHook.get_connection("source_bucket") source_bucket = connection.extra connection = BaseHook.get_connection("target_bucket") target_bucket = connection.extra metadataurl = 'https://myweb.ka.com/api/v1/metadata/'
First this collects our bucket URLs from our Airflow Connections. We have two connections, named ‘source_bucket’ and ‘target_bucket’, with the bucket names saved in the ‘extra’ field in the connection object. We’re using the BaseHook library to fetch it from Airflow. This is the same as setting “source_bucket = ‘some_bucket_name'”.
Second, we set a simple target for our metadata service, which is our pipeline configuration store. This could also be loaded as a connection object as the bucket names were. The metadata API is a simple datastore that keeps metadata about namespaces, pipelines and data sources configured in our environment. When we query the service, it returns a JSON list of these objects and their attributes. We will use the ‘name’ key of each record in the data set in our task construction.
# DAG arguments default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 4, 12), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=10) }
These parameters are defined in a handy object that will be included in the DAG specification. Normally, we also include an ‘on_failure_callback’ param, pointing at a custom Python function, which is configured to page on a failed task execution. This is an excellent pipeline monitoring strategy, but not needed for our POC and out of scope. Airflow will record task execution failures in the database, and display them in the UI.
DAG definition
# DAG definition daily_incremental = DAG( 'daily_incremental', catchup=False, concurrency=16, default_args=default_args, schedule_interval=timedelta(1))
The code here creates our DAG object, assigns our arguments from the previous code block, establishes a task concurrency limit and schedule interval (default is days, and this is a daily backup). Our DAG is called ‘daily_incremental’. Note, Airflow is smart enough to execute daily tasks at midnight and use yesterday’s date; you don’t have to handle any offset logic. All of the parameters are documented DAG arguments, except the name ‘daily_incremental’, a static string. The ‘timedelta()’ function is also a documented Airflow feature.
Command template
# Command template gsutil_cp_command = "gsutil -m cp -r gs://{{ params.source_path }} gs://{{ params.dest_path }}/{{ ds }}"
With this line we create our template command to invoke gsutil to copy the data we wish to back up between buckets. Notice we have our bucket name parameters being rendered via Jinja. We will define these parameters below. Note that Airflow Variables can also be accessed in the same manner… use ‘Variable.get()’ instead of ‘BaseHook.get_connection()’, directly assigned. Also, we are using the Airflow builtin {{ ds }} to add our date partition, which defaults to the format ‘YYYYMMDD’. This is the target directory for each daily task execution to back up the dated directory, recursively.
Data fetch and iteration
# Fetch the namespaces list from the Metadata API #constructing the URI for our REST API call ns_resp = requests.get(metadataurl + 'namespaces') ns_json = json.loads(ns_resp.text)
Here we use the Python ‘requests’ library to query our API for configured namespaces and load the resulting JSON into a python object that we can query. Of course not everyone has a RESTful service to query for data, and so we could replace our ‘metadataurl’ variable with a filename reference that contains the relevant JSON records, and then query the file. Also, we could query the cloud storage API itself for available paths. It’s Python, so sky’s the limit here. The main point is, we grab our dynamic data from someplace and use it to generate tasks.
# Iterate over the namespaces for i in range(0, len(ns_json)): # Fetch the pipelines configured for each namespace #constructing the URI again pl_resp = requests.get(metadataurl + 'ns/' + ns_json[i]['name'] + '/pipelines') pl_json = json.loads(pl_resp.text) # Iterate over the pipelines for j in range(0, len(pl_json)): #constructs a task name, 'namespace_pipeline' task_name = str(ns_json[i]['name']).replace(".", "_") + '_' + str(pl_json[j]['name']) #constructs a bucket path, 'namespace/archive/pipeline/' task_path = '/' + str(ns_json[i]['name']) + '/archive/' + str(pl_json[j]['name'])
In this nested iteration we’re finally rendering all the inputs for our tasks. Our first iterates through each namespace and the second collects a list of pipelines that our application has configured within each namespace. We then generate a dynamic list of unique names and paths on each iteration and assign the to variables task_name and task_path, respectively.
Dynamic task definition
# Create our dynamic tasks gsutil_cp_pipeline = BashOperator( task_id=task_name, bash_command=gsutil_cp_command, params={'source_path': source_bucket + task_path, 'dest_path': target_bucket }, dag=daily_incremental)
Note above that the bash_command parameter in our BashOperator object uses the command template from above, and passes the ‘source_path’ and ‘dest_path’ parameters to it. This block constructs a task from the result of each unique namespace and pipeline pairing, using the ‘source_bucket’, ‘target_bucket’, ‘task_name’ and ‘task_path’ variables as input.
Verification
When the loop is finished processing (Airflow scheduler will assign it to a worker for DAG processing and task rendering), you can query the DAG using the Airflow CLI to see the list of tasks it generated, and then enable the DAG (it deploys paused by default).
(myenv) ~$ airflow list_tasks daily_incremental com_ka_weatherdata_chicago_weather com_ka_weatherdata_losangeles_weather com_ka_weatherdata_newyork_weather com_ka_weatherdata_washington_weather com_ka_newsfeeds_chicago_news com_ka_newsfeeds_losangeles_news com_ka_newsfeeds_newyorkcity_news com_ka_newsfeeds_winnipeg_news (myenv) ~$ airflow unpause daily_incremental Dag: <DAG: daily_incremental>, paused: 0 (myenv) ~$
Conclusions
Keep in mind that we have the power of Python at our fingertips here. I use JSON and a REST API with an Airflow Connection and a static variable. Your data inputs can come from any source however, including your Cloud Provider’s API, a metrics database for scaling tasks, an RDBMS, yaml or xml files… anything. A great learning exercise would be to deploy a DAG that works in your own environment and using your own sources.
Commands and outputs can change too… and this is the power of Airflow, to create dynamic tasks. I construct a file path, but yours might create a database connection string and execute an update to a materialized view, or invoke a mysqldump command to back up an RDBMS datasource, or run a Spark job in a data pipeline. Airflow can do it all, and also allows you to build dependencies, instrumentation and workflow processing rules right into your DAGs (for instance, don’t run task_2 until all instances of task_1 are successfully executed).
Leveraging a little knowledge of Python to interact with your application environment allows you to be dynamic with your scheduled tasks, and also to update your configuration centrally. If done well, a DAG can be fully dynamic and execute tasks based on the target environment’s configuration details without have to write new code for every instance of the same task. Cool.
11 Comments. Leave new
Great Article, Thanks. These tasks would all execute in parallel, do you know how you could make task2 dependent on task1 , task3 dependent on task2, etc. TaskN >> TaskN+1 , down the line ?
Indeed…
https://airflow.apache.org/tutorial.html#setting-up-dependencies
I am trying to build a dynamic task which comprises of my custom built operator . I am unable to get the templates fields replaced when called dynamically but normally my operator works . Is there any way to resolve this.?
you can use
from airflow.utils.helpers import chain
https://stackoverflow.com/questions/52558018/airflow-generate-dynamic-tasks-in-single-dag-task-n1-is-dependent-on-taskn
I tried this , it is listing all the tasks created dynamically, but when i try to run , it throws an error task not found
airflow.exceptions.AirflowException: Task task_my-data2 not found
Hi..Nice Blog.
I have facing an issue with this piece of code:# Iterate over the namespaces
where we iterate on a set of values.
I have my dag scheduled @daily run, and have would be triggering the facebook API to fetch the data, for several accounts( the number is dynamic in nature). so I am performing a similar . Now the iteration runs everytime (approx every 10 secs) and prints out the loop run. Can I prevent the loop from running and only trigger the dag when it has been scheduled to.
The DAG is evaluated in Airflow by the scheduler, on a loop that repeats continuously. Every loop the scheduler assigns a worker to evaluate the DAG and render the associated tasks. If you update the DAG’s iteration inputs to include more tasks, they get evaluated and rendered in that (~10 seconds) loop.
If the DAG is properly scheduled to run daily, the rendering of tasks in the DAG should not affect the DAG state in the database, and the actual pipeline would only execute once daily (this ‘@daily’ tag executes the next day, with previous day’s date… date -1 processing) and when triggered.
Hi,
I am very new airflow.
I wanted to know if there is an option to schedule queries( Data Profiling –> Ad Hoc Query –> Presto_defualt).
I wane to schedule presto query on airflow. Is that option available or is there any indirect way to do it ?
Hi,
Here’s a Presto hook: https://airflow.readthedocs.io/en/stable/_modules/airflow/hooks/presto_hook.html
There are a couple of examples of operators here (I’ve not used or tested): https://airflow.readthedocs.io/en/stable/_modules/index.html
Keep in mind that if you bring large / big data results into Airflow from Presto, you may end up out of memory.
Hi,
I am very new airflow.
I wanted to know if there is an option to schedule queries( Data Profiling –> Ad Hoc Query –> Presto_defualt).
I wane to schedule praesto query on airflow. Is that option available or is there any indirect way to do it ?
Hi,
I have my spark job written in scala. I am trying to schedule this using spark submit operator provided by airflow.
I am having few parameters that my job expects in key value pair. PFB the spark submit command that i use:
spark-submit –files –driver-class-path –master yarn-client –principal –keytab –conf “spark.driver.extraJavaOptions=-Dlogfile.name=myJob” –class com.citiustech.hscale.job.executor.JobExecutor spark-driver/lib/spark-driver-19.6.0.0.jar jobType=dt orgCode=citiustech jobName=NI msgType=db > divya_dt.log
my purpose is to provide these parameter jobType=dt orgCode=citiustech jobName=NI msgType=db as it is in the DAG file
Is there a way out?
I would also require to dynamically change the value of these parameter and send it via REST API.
Please help me with this.