Setting up an Apache Airflow Cluster

In one of our previous blog posts, we described the process you should take when Installing and Configuring Apache Airflow.  In this post, we will describe how to setup an Apache Airflow Cluster to run across multiple nodes. This will provide you with more computing power and higher availability for your Apache Airflow instance.

Airflow Daemons

A running instance of Airflow has a number of Daemons that work together to provide the full functionality of Airflow. The daemons include the Web Server, Scheduler, Worker, Kerberos Ticket Renewer, Flower and others. Bellow are the primary ones you will need to have running for a production quality Apache Airflow Cluster.

Web Server

A daemon which accepts HTTP requests and allows you to interact with Airflow via a Python Flask Web Application. It provides the ability to pause, unpause DAGs, manually trigger DAGs, view running DAGs, restart failed DAGs and much more.

The Web Server Daemon starts up gunicorn workers to handle requests in parallel. You can scale up the number of gunicorn workers on a single machine to handle more load by updating the ‘workers’ configuration in the {AIRFLOW_HOME}/airflow.cfg file.

Example

workers = 4

Startup Command:

$ airflow webserver
Scheduler

A daemon which periodically polls to determine if any registered DAG and/or Task Instances needs to triggered based off its schedule.

Startup Command:

$ airflow scheduler
Executors/Workers

A daemon that handles starting up and managing 1 to many CeleryD processes to execute the desired tasks of a particular DAG.

This daemon only needs to be running when you set the ‘executor ‘ config in the {AIRFLOW_HOME}/airflow.cfg file to ‘CeleryExecutor’. It is recommended to do so for Production.

Example:

executor = CeleryExecutor

Startup Command:

$ airflow worker

How do the Daemons work together?

One thing to note about the Airflow Daemons is that they don’t register with each other or even need to know about each other. Each of them handle their own assigned task and when all of them are running, everything works as you would expect.

  1. The Scheduler periodically polls to see if any DAGs that are registered in the MetaStore need to be executed. If a particular DAG needs to be triggered (based off the DAGs Schedule), then the Scheduler Daemon creates a DagRun instance in the MetaStore and starts to trigger the individual tasks in the DAG. The scheduler will do this by pushing messages into the Queueing Service. Each message contains information about the Task it is executing including the DAG Id, Task Id and what function needs to be performed. In the case where the Task is a BashOperator with some bash code, the message will contain this bash code.
  2. A user might also interact with the Web Server and manually trigger DAGs to be ran. When a user does this, a DagRun will be created and the scheduler will start to trigger individual Tasks in the DAG in the same way that was mentioned in #1.
  3. The celeryd processes controlled by the Worker daemon, will pull from the Queueing Service on regular intervals to see if there are any tasks that need to be executed. When one of the celeryd processes pulls a Task message, it updates the Task instance in the MetaStore to a Running state and tries to execute the code provided. If it succeeds then it updates the state as succeeded but if the code fails while being executed then it updates the Task as failed.

Single Node Airflow Setup

A simple instance of Apache Airflow involves putting all the services on a single node like the bellow diagram depicts.

Apache Airflow Single-Node Cluster

Multi-Node (Cluster) Airflow Setup

A more formal setup for Apache Airflow is to distribute the daemons across multiple machines as a cluster.

Apache Airflow Multi-Node Cluster

Benefits

Higher Availability

If one of the worker nodes were to go down or be purposely taken offline, the cluster would still be operational and tasks would still be executed.

Distributed Processing

If you have a workflow with several memory intensive tasks, then the tasks will be better distributed to allow for higher utilizaiton of data across the cluster and provide faster execution of the tasks.

Scaling Workers

Horizontally

You can scale the cluster horizontally and distribute the processing by adding more executor nodes to the cluster and allowing those new nodes to take load off the existing nodes. Since workers don’t need to register with any central authority to start processing tasks, the machine can be turned on and off without any downtime to the cluster.

Vertically

You can scale the cluster vertically by increasing the number of celeryd daemons running on each node. This can be done by increasing the value in the ‘celeryd_concurrency’ config in the {AIRFLOW_HOME}/airflow.cfg file.

Example:

celeryd_concurrency = 30

You may need to increase the size of the instances in order to support a larger number of celeryd processes. This will depend on the memory and cpu intensity of the tasks you’re running on the cluster.

Scaling Master Nodes

You can also add more Master Nodes to your cluster to scale out the services that are running on the Master Nodes. This will mainly allow you to scale out the Web Server Daemon incase there are too many HTTP requests coming for one machine to handle or if you want to provide Higher Availability for that service.

One thing to note is that there can only be one Scheduler instance running at a time. If you have multiple Schedulers running, there is a possibility that multiple instances of a single task will be scheduled. This could cause some major problems with your Workflow and cause duplicate data to show up in the final table if you were running some sort of ETL process.

If you would like, the Scheduler daemon may also be setup to run on its own dedicated Master Node.

Apache Airflow Multi-Master Node Cluster

Apache Airflow Cluster Setup Steps

Pre-Requisites
  • The following nodes are available with the given host names:
    • master1
      • Will have the role(s): Web Server, Scheduler
    • master2
      • Will have the role(s): Web Server
    • worker1
      • Will have the role(s): Worker
    • worker2
      • Will have the role(s): Worker
  • A Queuing Service is Running. (RabbitMQ, AWS SQS, etc)
    • You can install RabbitMQ by following these instructions: Installing RabbitMQ
      • If you’re using RabbitMQ, it is recommended that it is also setup to be a cluster for High Availability. Setup a Load Balancer to proxy requests to the RabbitMQ instances.
Steps
  1. Install Apache Airflow on ALL machines that will have a role in the Airflow
  2. Apply Airflow Configuration changes to all ALL machines. Apply changes to the {AIRFLOW_HOME}/airflow.cfg file.
    1. Change the Executor to CeleryExecutor
      executor = CeleryExecutor
    2. Point SQL Alchemy to the MetaStore
      sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
    3. Set the Broker URL
      1. If you’re using RabbitMQ:
        broker_url = amqp://guest:guest@{RABBITMQ_HOST}:5672/
      2. If you’re using AWS SQS:
        broker_url = sqs://{ACCESS_KEY_ID}:{SECRET_KEY}@
         
        #Note: You will also need to install boto:
        $ pip install -U boto
    4. Point Celery to the MetaStore
      celery_result_backend = db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
  3. Deploy your DAGs/Workflows on master1 and master2 (and any future master nodes you might add)
  4. On master1, initialize the Airflow Database (if not already done after updating the sql_alchemy_conn configuration)
    airflow initdb
  5. On master1, startup the required role(s)
    • Startup Web Server
      $ airflow webserver
    • Startup Scheduler
      $ airflow scheduler
  6. On master2, startup the required role(s)
    • Startup Web Server
      $ airflow webserver
  7. On worker1 and worker2, startup the required role(s)
    • Startup Worker
      $ airflow worker
  8. Create a Load Balancer to balance requests going to the Web Servers
    • If you’re in AWS you can do this with the EC2 Load Balancer
    • If you’re not on AWS you can use something like haproxy to proxy/balance requests to the Web Servers
  9. You’re done!

Additional Documentation

Documentation: https://airflow.incubator.apache.org/

Install Documentation: https://airflow.incubator.apache.org/installation.html

GitHub Repo: https://github.com/apache/incubator-airflow

Understanding Resource Allocation configurations for a Spark application

Resource Allocation is an important aspect during the execution of any spark job. If not configured correctly, a spark job can consume entire cluster resources and make other applications starve for resources.

This blog helps to understand the basic flow in a Spark Application and then how to configure the number of executors, memory settings of each executors and the number of cores for a Spark Job. There are a few factors that we need to consider to decide the optimum numbers for the above three, like:

  • The amount of data
  • The time in which a job has to complete
  • Static or dynamic allocation of resources
  • Upstream or downstream application

 

Introduction

 

Let’s start with some basic definitions of the terms used in handling Spark applications.

Partitions : A partition is a small chunk of a large distributed data set. Spark manages data using partitions that helps parallelize data processing with minimal data shuffle across the executors.

Task : A task is a unit of work that can be run on a partition of a distributed dataset and gets executed on a single executor. The unit of parallel execution is at the task level.All the tasks with-in a single stage can be executed in parallel

Executor : An executor is a single JVM process which is launched for an application on a worker node. Executor runs tasks and keeps data in memory or disk storage across them. Each application has its own executors. A single node can run multiple executors and executors for an application can span multiple worker nodes. An executor stays up for the
duration of the Spark Application and runs the tasks in multiple threads. The number of executors for a spark application can be specified inside the SparkConf or via the flag –num-executors from command-line.

Cluster Manager : An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN). Spark is agnostic to a cluster manager as long as it can acquire executor processes and those can communicate with each other.We are primarily interested in Yarn as the cluster manager. A spark cluster can run in either yarn cluster or yarn-client mode:

yarn-client mode – A driver runs on client process, Application Master is only used for requesting resources from YARN.

yarn-cluster mode – A driver runs inside application master process, client goes away once the application is initialized

Cores : A core is a basic computation unit of CPU and a CPU may have one or more cores to perform tasks at a given time. The more cores we have, the more work we can do. In spark, this controls the number of parallel tasks an executor can run.

 

 

Steps involved in cluster mode for a Spark Job

  1. From the driver code, SparkContext connects to cluster manager (standalone/Mesos/YARN).
  2. Cluster Manager allocates resources across the other applications. Any cluster manager can be used as long as the executor processes are running and they communicate with each other.
  3. Spark acquires executors on nodes in cluster. Here each application will get its own executor processes.
  4. Application code (jar/python files/python egg files) is sent to executors
  5. Tasks are sent by SparkContext to the executors.

 

From the above steps, it is clear that the number of executors and their memory setting play a major role in a spark job. Running executors with too much memory often results in excessive garbage collection delays

Now we try to understand, how to configure the best set of values to optimize a spark job.

There are two ways in which we configure the executor and core details to the Spark job. They are:

  1. Static Allocation – The values are given as part of spark-submit
  2. Dynamic Allocation – The values are picked up based on the requirement (size of data, amount of computations needed) and released after use. This helps the resources to be re-used for other applications.

 

Static Allocation

 

Different cases are discussed varying different parameters and arriving at different combinations as per user/data requirements.

 

Case 1 Hardware – 6 Nodes and each node have 16 cores, 64 GB RAM

First on each node, 1 core and 1 GB is needed for Operating System and Hadoop Daemons, so we have 15 cores, 63 GB RAM for each node

We start with how to choose number of cores:

Number of cores = Concurrent tasks an executor can run

So we might think, more concurrent tasks for each executor will give better performance. But research shows that any application with more than 5 concurrent tasks, would lead to a bad show. So the optimal value is 5.

This number comes from the ability of an executor to run parallel tasks and not from how many cores a system has. So the number 5 stays same even if we have double (32) cores in the CPU

Number of executors:

Coming to the next step, with 5 as cores per executor, and 15 as total available cores in one node (CPU) – we come to 3 executors per node which is 15/5. We need to calculate the number of executors on each node and then get the total number for the job.

So with 6 nodes, and 3 executors per node – we get a total of 18 executors. Out of 18 we need 1 executor (java process) for Application Master in YARN. So final number is 17 executors

This 17 is the number we give to spark using –num-executors while running from spark-submit shell command

Memory for each executor:

From above step, we have 3 executors per node. And available RAM on each node is 63 GB

So memory for each executor in each node is 63/3 = 21GB.

However small overhead memory is also needed to determine the full memory request to YARN for each executor.

The formula for that overhead is max(384, .07 * spark.executor.memory)

Calculating that overhead:  .07 * 21 (Here 21 is calculated as above 63/3) = 1.47

Since 1.47 GB > 384 MB, the overhead is 1.47

Take the above from each 21 above => 21 – 1.47 ~ 19 GB

So executor memory – 19 GB

Final numbers – Executors – 17, Cores 5, Executor Memory – 19 GB

 

Case 2 Hardware – 6 Nodes and Each node have 32 Cores, 64 GB

 

Number of cores of 5 is same for good concurrency as explained above.

Number of executors for each node = 32/5 ~ 6

So total executors = 6 * 6 Nodes = 36. Then final number is 36 – 1(for AM) = 35

Executor memory:

6 executors for each node. 63/6 ~ 10. Overhead is .07 * 10 = 700 MB. So rounding to 1GB as overhead, we get 10-1 = 9 GB

Final numbers – Executors – 35, Cores 5, Executor Memory – 9 GB

 

Case 3 – When more memory is not required for the executors

 

The above scenarios start with accepting number of cores as fixed and moving to the number of executors and memory.

Now for the first case, if we think we do not need 19 GB, and just 10 GB is sufficient based on the data size and computations involved, then following are the numbers:

Cores: 5

Number of executors for each node = 3. Still 15/5 as calculated above.

At this stage, this would lead to 21 GB, and then 19 as per our first calculation. But since we thought 10 is ok (assume little overhead), then we cannot switch the number of executors per node to 6 (like 63/10). Because with 6 executors per node and 5 cores it comes down to 30 cores per node, when we only have 16 cores. So we also need to change number of cores for each executor.

So calculating again,

The magic number 5 comes to 3 (any number less than or equal to 5). So with 3 cores, and 15 available cores – we get 5 executors per node, 29 executors ( which is  (5*6 -1)) and memory is 63/5 ~ 12.

Overhead is 12*.07=.84. So executor memory is 12 – 1 GB = 11 GB

Final Numbers are 29 executors, 3 cores, executor memory is 11 GB

 

Summary Table

 

screen-shot-2016-12-06-at-11-58-15-pm

 

Dynamic Allocation

 

Note: Upper bound for the number of executors if dynamic allocation is enabled is infinity. So this says that spark application can eat away all the resources if needed. In a cluster where we have other applications running and they also need cores to run the tasks, we need to make sure that we assign the cores at cluster level.

 

This means that we can allocate specific number of cores for YARN based applications based on user access. So we can create a spark_user and then give cores (min/max) for that user. These limits are for sharing between spark and other applications which run on YARN.

To understand dynamic allocation, we need to have knowledge of the following properties:

spark.dynamicAllocation.enabled – when this is set to true we need not mention executors. The reason is below:

The static parameter numbers we give at spark-submit is for the entire job duration. However if dynamic allocation comes into picture, there would be different stages like the following:

What is the number for executors to start with:

Initial number of executors (spark.dynamicAllocation.initialExecutors) to start with

 Controlling the number of executors dynamically:

Then based on load (tasks pending) how many executors to request. This would eventually be the number what we give at spark-submit in static way. So once the initial executor numbers are set, we go to min (spark.dynamicAllocation.minExecutors) and max (spark.dynamicAllocation.maxExecutors) numbers.

 When to ask new executors or give away current executors:

When do we request new executors (spark.dynamicAllocation.schedulerBacklogTimeout) – This means that there have been pending tasks for this much duration. So the request for the number of executors requested in each round increases exponentially from the previous round. For instance, an application will add 1 executor in the first round, and then 2, 4, 8 and so on executors in the subsequent rounds. At a specific point, the above property max comes into picture.

When do we give away an executor is set using spark.dynamicAllocation.executorIdleTimeout.

To conclude, if we need more control over the job execution time, monitor the job for unexpected data volume the static numbers would help. By moving to dynamic, the resources would be used at the background and the jobs involving unexpected volumes might affect other applications.

 

References:

http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

http://spark.apache.org/docs/latest/job-scheduling.html#resource-allocation-policy

https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

http://spark.apache.org/docs/latest/cluster-overview.html