Work Flow Management for Big Data: Guide to Airflow (part 1)

Data analytics has been playing a key role in the decision making process at various stages of the business in many industries. In this era of Big Data, the adoption level is going to ONLY increase day by day. It is really overwhelming to see all the Big Data technologies that are popping up every week to cater to various stages of the Big Data Solution implementations. With data being generated at a very fast pace at various sources (applications that automate the business processes), implementing solutions for use cases like “real time data ingestion from various sources”, “processing the data at different levels of the data ingestion” and “preparing the final data for analysis” become challenging. Especially, orchestrating, scheduling, managing and monitoring the pipelines is a very critical task for the Data platform to be stable and reliable. Also, due to the dynamic nature of the data sources, data inflow rate, data schema, processing needs, etc, the work flow management (pipeline generation/maintenance/monitoring) becomes even more challenging.

This is a three part series of which “Overview along with few architectural details of Airflow” is covered as part of the first part.

Part 1:Overview along with few architectural details of Airflow

Typical stages of Workflow Management

The typical stages of the life cycle for Workflow Management of Big Data would be as follows:

  • Create Jobs to interact with systems that operate on Data
    • Hive/Presto/HDFS/Postgres/S3 etc
  • (Dynamic) Workflow creation
    • Based on the number of sources, size of data, business logic, variety of data, changes in the schema, and the list goes on.
  • Manage Dependencies between Operations
    • Upstream, Downstream, Cross Pipeline dependencies, Previous Job state, etc.
  • Schedule the Jobs/Operations
    • Calendar schedule, Event Driven, Cron Expression etc.
  • Keep track of the Operations and the metrics of the workflow
    • Monitor the current/historic state of the jobs, the results of the jobs etc.
  • Ensuring Fault tolerance of the pipelines and capability to back fill any missing data, etc.

And the list grows as the complexity increases.

Where to start?

There are various tools that have been developed to solve this problem but each have their own strengths and limitations. Below are few tools that are used in the big data context.

Few (open source) tools that solve the Workflow Management Problem in Big Data space

There are a bunch of Workflow Management Tools out there in the market. few that have support for Big Data operations out of the box, and few that need extensive customization/extensions to support Big Data operations. Please refer to this link to see the list of tools available for exploration.

  • Oozie → Oozie is a workflow scheduler system to manage Apache Hadoop jobs
  • BigDataScript → BigDataScript is intended as a scripting language for big data pipeline
  • Makeflow → Makeflow is a workflow engine for executing large complex workflows on clusters, clouds, and grids
  • Luigi → Luigi is a Python module that helps you build complex pipelines of batch jobs. (This is a strong contender for Airflow)
  • Airflow → Airflow is a platform to programmatically author, schedule and monitor workflows
  • Azkaban → Azkaban is a batch workflow job scheduler created at LinkedIn to run Hadoop jobs
  • Pinball → Pinball is a scalable workflow manager developed at Pinterest

Though most of the above tools meets the basic needs of the workflow management, when it comes to dealing with the complex workflows, only few of the above shine. Luigi, Airflow, Oozie and Pinball are the tools preferred (and are being used in Production) by most teams across the industry. A quick comparison of these tools is available at this link (by Marton – Data Engineer as Facebook).  We observed that none of the existing resources (on the web) talk about architecture graphically and about the setup of Airflow (Workflow Management Tool, open sourced by AirBnB) in production with CeleryExecutor and more importantly on how Airflow needs to be configured to be Highly Available. Hence here is an attempt to share that missing information.

Introduction to Airflow

Airflow is a platform to programmatically author, schedule and monitor data pipelines that meets the needs of almost all of the stages of the life cycle of Workflow Management. The system has been built (by AirBnB) on the below four principles (copied as is from Airflow docs):

  • Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation. This allows for writing code that instantiates pipelines dynamically.
  • Extensible: Easily define your own operators, executors and extend the library so that it fits the level of abstraction that suits your environment.
  • Elegant: Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.
  • Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity.

Basic concepts of Airflow

  • DAGs – Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
    • DAGs are defined as python scripts and are placed in the DAGs folder (could be any location, but needs to be configured in the airflow config file).
    • Once a new DAG is placed into the DAGS folder, the DAGS are picked up by Airflow automatically within a minutes time.
  • Operators – An operator describes a single task in a workflow. While DAGs describe how to run a workflow, Operators determine what actually gets done.
    • Task: Once an operator is instantiated using some parameters, it is referred to as a “task”
    • Task Instance: A task executed at a particular time is called TaskInstance.
  • Scheduling the DAGs/Tasks – The DAGs and Tasks can be scheduled to be run at certain frequency using the below parameters.
    • schedule_interval → Determines when the DAG has to be triggered. This can be a cron expression or a datetime.deltatime object of python.
  • Executors – Once the DAGs, Tasks and the scheduling definitions are in place, someone need to execute the jobs/tasks. Here is where Executors come into picture.
    • There are three types of executors provided by Airflow out of the box.
      • Sequential → A Sequential executor is for test drive that can execute the tasks one by one (sequentially). Tasks can not be parallelized.
      • Local → A local executor is similar to Sequential executor. But it can parallelize task instances locally.
      • Celery → Celery executor is a open source Distributed Tasks Execution Engine that based on message queues making it more scalable and fault tolerant. Message queues like RabbitMQ or Redis can be used along with Celery.
        • This is typically used for production purposes.

***  For advanced concepts of Airflow like Hooks, Pools, Connections, Queues, SLAs etc, please take a look at the Airflow documentation.

Airflow has an edge over other tools in the space

Below are some key features where Airflow has an upper hand over other tools like Luigi and Oozie:

  • Pipelines are configured via code making the pipelines dynamic
  • A graphical representation of the DAG instances and Task Instances along with the metrices.
  • Scalability: Distribution of Workers and Queues for Task execution
  • Hot Deployment of Dags/Tasks
  • Support for Celery, SLAs, great UI for monitoring mertices
  • Has support for Calendar schedule and Cron tab scheduling
  • Back fill: Ability to rerun a particular DAG instance incase of a failure.
  • Variables for making the changes to the Dags/Tasks quick and easy

Architecture of Airflow

Airflow typically constitutes of the below components.

  • Configuration file  → All the configuration points like “which port to run the web server on”, “which executor to use”, “config related to rabbitmq/redis”, workers, dags location, repository etc are configured.
  • Metadata database (mysql or postgres)  → The database where all the metadata related to the dags, dag_runs, tasks, variables are stored.
  • DAGs (Directed Acyclic Graphs) → These are the Workflow definitions (logical units) that contains the task definitions along with the dependencies info. These are the actual jobs that the user would be like to execute.
  • Scheduler  → A component that is responsible for triggering the dag_instances and job instances for each dag. The scheduler is also responsible for invoking the Executor (be it Local or Celery or Sequential)
  • Broker (redis or rabbitmq)  → In case of a Celery executor, the broker is required to hold the messages and act as a communicator between the executor and the workers.
  • Worker nodes  → the actual workers that execute the tasks and return the resumt of the task.
  • Web server  → A web server that renders the UI for Airflow through which one can view the DAGs, its status, rerun, create variables, connections etc.

So far so good, however, when it comes to deployment of Airflow system on Production systems, based on the needs, one may have to go with a simple setup or a complex setup of Airflow. There are different ways Airflow can be deployed (especially from an Executor point of view). Please check out part 2 of this series to get a better picture on how to deploy Airflow.

How To Rebuild Cloudera’s Spark

As a followup to the post How to upgrade Spark on CDH5.5, I will show you how to get a build environment up and running with a CentOS 7 virtual machine running via Vagrant and Virtual Box. This will allow for the quick build or rebuild of Cloudera’s version of Apache Spark from https://github.com/cloudera/spark.

Why?

You may want to rebuild Cloudera’s Spark in the event that you want to add functionality that was not compiled in by default. The Thriftserver and SparkR are two things that Cloudera does not ship (nor support), so if you are looking for these things, these instructions will help.

Using a disposable virtual machine will allow for a repeatable build and will keep your workstation computing environment clean of all the bits that may get installed.

Requirements

You will need Internet access during the installation and compilation of the Spark software.

Make sure that you have the following software installed on your local workstation:

Installation of these components are documented at their respective links.

Get Started

Clone the vagrant-sparkbuilder git repository to your local workstation:

git clone https://github.com/teamclairvoyant/vagrant-sparkbuilder.git
cd vagrant-sparkbuilder

Start the Vagrant instance that comes with the vagrant-sparkbuilder repository. This will boot a CentOS 7 virtual machine, install the Puppet agent, and instruct Puppet to configure the virtual machine with Oracle Java and the Cloudera Spark git repository. Then it will log you in to the virtual machine.

vagrant up
vagrant ssh

Inside the virtual machine, change to the spark directory:

cd spark

The automation has already checked out the branch/tag that corresponds to the target CDH version (presently defaulting to cdh5.7.0-release). Now you just need to build Spark with the Hive Thriftserver while excluding dependencies that are shipped as part of CDH. The key options in this example are the -Phive -Phive-thriftserver. Expect the compilation to take 20-30 minutes depending upon your Internet speed and workstation CPU and disk speed.

patch -p0 </vagrant/undelete.patch
./make-distribution.sh -DskipTests \
  -Dhadoop.version=2.6.0-cdh5.7.0 \
  -Phadoop-2.6 \
  -Pyarn \
  -Phive -Phive-thriftserver \
  -Pflume-provided \
  -Phadoop-provided \
  -Phbase-provided \
  -Phive-provided \
  -Pparquet-provided

If the above command fails with a ‘Cannot allocate memory’ error, either run it again or increase the amount of memory in the Vagrantfile.

Copy the resulting distribution back to your local workstation:

rsync -a dist/ /vagrant/dist-cdh5.7.0-nodeps

If you want to build against a different CDH release, then use git to change the code:

git checkout -- make-distribution.sh
git checkout cdh5.5.2-release
patch -p0 </vagrant/undelete.patch

Log out of the virtual machine with the exit command, then stop and/or destroy the virtual machine:

vagrant halt
vagrant destroy

More examples can be found at the vagrant-sparkbuilder GitHub site.

What Now?

From here, you should be able to make use of the newly built software.

If you are recompiling Spark in order to get the Hive integration along with the JDBC Thriftserver, copy over and then install the newly built jars to the correct locations on the node which will run the Spark Thriftserver.

install -o root -g root -m 0644 dist-cdh5.7.0-nodeps/lib/spark*.jar \
  /opt/cloudera/parcels/CDH/jars/
install -o root -g root -m 0644 dist-cdh5.7.0-nodeps/sbin/start-thriftserver.sh \
  /opt/cloudera/parcels/CDH/lib/spark/sbin/
install -o root -g root -m 0644 dist-cdh5.7.0-nodeps/sbin/stop-thriftserver.sh \
  /opt/cloudera/parcels/CDH/lib/spark/sbin/

You should only need to install on the one node, not on all the cluster members.