Making Apache Airflow Highly Available

In a previous post, we discussed Setting up an Apache Airflow Cluster. In this post we’ll talk about the shortcomings of a typical Apache Airflow Cluster and what can be done to provide a Highly Available Airflow Cluster.

A Typical Apache Airflow Cluster

In a typical multi-node Airflow cluster you can separate out all the major processes onto separate machines. Here are the main processes:

Web Server

A daemon which accepts HTTP requests and allows you to interact with Airflow via a Python Flask Web Application. It provides the ability to pause, unpause DAGs, manually trigger DAGs, view running DAGs, restart failed DAGs and much more.

Scheduler

A daemon which periodically polls to determine if any registered DAG and/or Task Instances needs to triggered based off its schedule.

Executors/Workers

A daemon that handles starting up and managing 1 to many CeleryD processes to execute the desired tasks of a particular DAG.

High Availability in a Typical Apache Airflow Cluster

A typical cluster can provide a good amount of High Availability right off the bat. It does this by allowing for redundancy in most of the core processes listed above:

Web Server

You can have multiple Master Nodes with web servers running on them all load balanced. This means that if one of the masters goes down, then you have at least one other Master available to accept HTTP requests forwarded from the Load Balancer.

Executors/Workers

You can setup multiple Worker nodes. If one of those nodes were to go down, the others will still be active and able to accept and execute tasks.

Diagram

pdc_diagrams-08

Problems with the Typical Apache Airflow Cluster

The problem with the traditional Airflow Cluster setup is that there can’t be any redundancy in the Scheduler daemon. If you were to have multiple Scheduler instances running you could have multiple instances of a single task be scheduled to be executed. This can be a very bad thing depending on your jobs. For example, if you were to be running a workflow that performs some type of ETL process, you may end up seeing duplicate data that has been extracted from the original source, incorrect results from duplicate transformation processes, or duplicate data in the final source where data is loaded. So, in the case of setting up an Airflow cluster, you can only have a single Scheduler daemon running on the entire cluster. If this single Airflow Scheduler instance were to crash, your Airflow cluster won’t have any DAGs or tasks being scheduled.

The Solution

There isn’t a way in a plain distribution of Airflow to enable High Availability for the Scheduler. Instead what we did, at Clairvoyant, was to create a process that would allow for a Highly Available Scheduler instance which we call the Airflow Scheduler Failover Controller.

This process tries to ensure that there is always one and only one Scheduler instance running at a time. If one Scheduler instance dies, then the failover controller tries to start it back up again. If it still doesn’t startup on the original machine, it tries to start it up on another, trying to ensure that there’s at least one running in the cluster.

In addition, to prevent this process from becoming the one process that prevents the entire cluster from being highly available (because  if this processes dies then the scheduler will no longer be Highly Available), we also allow redundancy in the Scheduler Failover Controller. Once a Scheduler Failover Controller is selected as the ACTIVE instance and all others are listed in a STANDBY state until such a time when the active Failover Controller stops reporting in. Its recommended that you have the Scheduler Failover Controller running on the same machines as the machines you designate the Schedulers are running on.

Diagram

pdc_diagrams-09

How the Scheduler Failover Controller Works

There will ideally be multiple Scheduler Failover Controllers running. One that starts in an ACTIVE state, and at least one other thats is starts in a STANDBY state.

The ACTIVE Scheduler Failover Controller will regularly push a HEART BEAT into a metastore (Supported Metastore’s: MySQL DB, Zookeeper), which the STANDBY Scheduler Failover Controller will read from to see if it needs to become ACTIVE (if the last heart beat is too old, then the STANDBY Scheduler Failover Controller knows the ACTIVE instance is not running).

The ACTIVE Scheduler Failover Controller will poll every X seconds (default is 10 seconds but can be configured) to see if the Airflow Scheduler is running on the desired node. If it is not, the Scheduler Failover Controller will try to restart the daemon. If the Scheduler daemon still doesn’t startup, the Scheduler Failover Controller will attempt to start the Scheduler daemon on another master node in the cluster. As a part of this poll, the ACTIVE Scheduler Failover Controller will also check and make sure that Scheduler daemons aren’t running on the other nodes.

Setup Steps

  1. Setup Airflow on all the nodes you want to act in the cluster
  2. Configure Airflow to use CeleryExecutor
  3. Configure each Airflow instance to point to the same External MySQL instance and DB for sql_alchemy_conn and celery_result_backend properties
    • Its also recommended to follow steps to make MySQL, or whatever type of database you’re using, Highly Available too.
  4. If you’re using RabbitMQ as your Queueing Service, then set it up and to be Highly Available
    1. Setup RabbitMQ Cluster with HA
    2. Setup a Load Balancer for RabbitMQ
  5. Configure each Airflow instance to point to the same Queueing Service (set the broker_url argument)
  6. Deploy the Airflow Scheduler Failover Controller to all the nodes acting as Failover Controllers (same one acting as a Scheduler)
  7. Configure the Airflow Scheduler Failover Controller
  8. Setup a Load Balancer to balance requests between the the Nodes for the Web Server
    1. Port Forwarding
      1. Port 8080 (HTTP) → Port 8080 (HTTP)
    2. Health Check
      1. Protocol: HTTP
      2. Ping Port: 8080
      3. Ping Path: /
  9. Startup the Airflow services
    1. WebServer and Failover Controller instances to be started on the Master Nodes
    2. Worker instances to be started on the Worker Nodes
  10. Deploy your DAG to all Airflow instances DAG directory that’s acting as a Master Node

 

Upgrading Apache Airflow Versions

In a previous post we explained how to Install and Configure Apache Airflow (a platform to programmatically author, schedule and monitor workflows). The technology is actively being worked on and more and more features and bug fixes are being added to the project in the form of new releases. At some point, you will want to upgrade to take advantage of these new feature.

In this post we’ll go over the process that you should for upgrading apache airflow versions.

Note: You will need to separately make sure that your dags will be able to work on the new version of Airflow.

Upgrade Airflow

Note: These steps can also work to downgrade versions of Airflow

Note: Execute all of this on all the instances in your Airflow Cluster (if you have more then one machine)

  1. Gather information about your current environment and your target setup:
    • Get the Airflow Home directory. Placeholder for this value: {AIRFLOW_HOME}
    • Get the current version of Airflow you are running. Placeholder for this value: {OLD_AIRFLOW_VERSION}
      1. To get this value you can run:
        $ airflow version
    • Get the new version of Airflow you want to run. Placeholder for this value: {NEW_AIRFLOW_VERSION}
    • Are you using sqlite? Placeholder for this value:{USING_SQLITE?}
    • If you’re not using SQLite, search the airflow.cfg file for the metastore (celery_result_backend and sql_alchemy_conn configurations) type {AIRFLOW_DB_TYPE}, host name {AIRFLOW_DB_HOST}, database schema name {AIRFLOW_DB_SCHEMA}, username {AIRFLOW_DB_USERNAME}, and password {AIRFLOW_DB_PASSWORD}
  2. Ensure the new version of Airflow you want to Install is Available
    1. Run the follow command (don’t forget to include the ‘==’):
      $ pip install airflow==
      • Note: This will throw an error saying that the version is not provided and then show you all the versions available. This is supposed to happen and is a way that you can find out what version are available.
    2. View the list of versions available and make sure the version you want to install ‘{NEW_AIRFLOW_VERSION}’ is available
  3. Shutdown all the Airflow Services on the Master and Worker nodes
    1. webserver
      1. gunicorn processes
    2. scheduler
    3. worker – if applicable
      1. celeryd daemons
    4. flower – if applicable
    5. kerberos ticket renewer – if applicable
  4. Take backups of various components to ensure you can Rollback
    1. Optionally, you can create a directory to house all of these backups. The bellow steps assume you’re going to create this type of folder and push all your objects to the {AIRFLOW_BACKUP_FOLDER}. But you can just as easily rename the files you want to backup if that’s more convenient.
      • Create the backup folder:
        $ mkdir -p {AIRFLOW_BACKUP_FOLDER}
    2. Backup your Configurations
      • Move the airflow.cfg file to the backup folder:
        $ cd {AIRFLOW_HOME}
        $ mv airflow.cfg {AIRFLOW_BACKUP_FOLDER}
    3. Backup your DAGs
      • Zip up the Airflow DAGs folder and move it to the backup folder:
        $ cd {AIRFLOW_HOME}
        $ zip -r airflow_dags.zip dags
        $ mv airflow_dags.zip {AIRFLOW_BACKUP_FOLDER}
      • Note: You may need to install the zip package
    4. Backup your DB/Metastore
      1. If you’re using sqlite ({USING_SQLITE?}):
        • Move the airflow.db sqlite db to the backup folder:
          $ cd {AIRFLOW_HOME}
          $ mv airflow.db {AIRFLOW_BACKUP_FOLDER}
      2. If you’re using a SQL database like MySQL or PostgreSQL, take a dump of the database.
        • If you’re MySQL you can use the following command:
          $ mysqldump --host={AIRFLOW_DB_HOST} --user={AIRFLOW_DB_USERNAME} --password={AIRFLOW_DB_PASSWORD} {AIRFLOW_DB_SCHEMA} > {AIRFLOW_BACKUP_FOLDER}/airflow_metastore_backup.sql
  5. Upgrade Airflow
    1. Run the following PIP command to install Airflow and the required dependencies:
      $ sudo pip install airflow=={NEW_AIRFLOW_VERSION} --upgrade
      $ sudo pip install airflow[hive]=={NEW_AIRFLOW_VERSION} --upgrade
    2. Note: If you installed additional sub-packages of Airflow you will need to upgrade those too
  6. Regenerate and Update Airflow Configurations
    1. Regenerate the airflow.cfg that was backed up using the following command:
      $ airflow initdb
      • Note: The reason you want to regenerate the airflow.cfg file is because between version of airflow, new configurations might have been added or old configurations values (for things that you don’t need to update from the default values) might have changed.
    2. Remove the generated airflow.db file
      $ cd {AIRFLOW_HOME}
      $ rm airflow.db
    3. If you’re using sqlite, copy the old airflow.db file you backed up back to the original place
      $ cd {AIRFLOW_HOME}
      $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.db .
    4. Manually copy all of the individual updated configurations from the old airflow.cfg file that you backed up to the new airflow.cfg file
      • Compare the airflow.cfg files (backed up and new one) to determine which configurations you need to copy over. This may include the following configurations:
        • executor
        • sql_alchemy_conn
        • base_url
        • load_examples
        • broker_url
        • celery_result_backend
    5. Review the airflow.cfg file further to ensure all values are set to the correct value
  7. Upgrade Metastore DB
    • Run the following command:
      $ airflow upgradedb
  8. Restart your Airflow Services
    • The same ones you shutdown in step #3
  9. Test the upgraded Airflow Instance
    • High Level Checklist:
      • Services start up with out errors?
      • DAGs run as expected?
      • Do the plugins you have installed (if any) load and work as expected?
  10. Once/If you want, you can delete the {AIRFLOW_BACKUP_FOLDER} folder and its contents

Rollback Airflow

In the event you encountered a problem during the upgrade process and would like to rollback to the version you already had before, follow these instructions:

  1. Take note of what step you stopped at in the upgrade process
  2. Stop all the Airflow Services
  3. If you reached step #7 in the upgrade steps above (Step: Upgrade Metastore DB)
    1. Restore the database to the original state
      1. If you’re using sqlite ({USING_SQLITE?})
        1. Delete the airflow.db file that’s there and copy the old airflow.db file from your backup folder to its original place:
          $ cd {AIRFLOW_HOME}
          $ rm airflow.db
          $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.db .
      2. If you’re using a SQL database like MySQL or PostgreSQL, restore the dump of the database
        • If you’re using MySQL you can use the following command:
          $ mysql --host={AIRFLOW_DB_HOST} --user={AIRFLOW_DB_USERNAME} --password={AIRFLOW_DB_PASSWORD} {AIRFLOW_DB_SCHEMA} < {AIRFLOW_BACKUP_FOLDER}/airflow_metastore_backup.sql
  4. If you reached step #6 in the upgrade steps above (Step: Regenerate and Update Airflow Configurations)
    • Copy the airflow.cfg file that you backed up back to its original place:
      $ cd {AIRFLOW_HOME}
      $ rm airflow.cfg
      $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.cfg .
  5. If you reached step #5 in the upgrade steps above (Step: Upgrade Airflow)
    • Downgrade Airflow back to the original version:
      $ sudo pip install airflow=={OLD_AIRFLOW_VERSION} --upgrade
      $ sudo pip install airflow[hive]=={OLD_AIRFLOW_VERSION} --upgrade
    • Note: If you installed additional sub-packages of Airflow you will need to downgrade those too
  6. If you reached step #4 in the upgrade steps above (Step: Take backups)
    1. Restore the airflow.cfg file (if you haven’t already done so)
      $ cd {AIRFLOW_HOME}
      $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.cfg .
    2. If you’re using sqlite ({USING_SQLITE?}), restore the airflow.db file (if you haven’t already done so)
      $ cd {AIRFLOW_HOME}
      $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.db .
  7. Restart all the Airflow Services
  8. Test the restored Airflow Instance