Airflow, Big Data, python

Installing and Configuring Apache Airflow

Apache Airflow is a platform to programmatically author, schedule and monitor workflows – it supports integration with 3rd party platforms so that you, our developer and user community, can adapt it to your needs and stack.

Additional Documentation:

Documentation: https://airflow.incubator.apache.org/

Install Documentation: https://airflow.incubator.apache.org/installation.html

GitHub Repo: https://github.com/apache/incubator-airflow

Install Airflow

Install Airflow on Ubuntu

  1. SSH onto target machine where you want to install Airflow
  2. Login as Root
  3. Install Required Libraries
    #Run upgrade
    apt-get update
     
    #Unzip
    apt-get install unzip
     
    #Build Essentials - GCC Compiler
    apt-get install build-essential
     
    #Python Development
    apt-get install python-dev
     
    #SASL
    apt-get install libsasl2-dev
     
    #Pandas
    apt-get install python-pandas
  4. Check Python Version
    1. Run the command:
      python -V
    2. If the version comes back as “Python 2.7.X” you can skip the rest of this step
    3. Install Python 2.7.X
      cd /opt
      sudo wget --no-check-certificate https://www.python.org/ftp/python/2.7.6/Python-2.7.6.tar.xz
      tar xf Python-2.7.6.tar.xz
      cd Python-2.7.6
      ./configure --prefix=/usr/local
      make && make altinstall
      
      
      ls -ltr /usr/local/bin/python*
      
      
      vi ~/.bashrc
      #add this line alias python='/usr/local/bin/python2.7'
  5. Install PIP
    1. Run Install
      cd /tmp/
       
      wget https://bootstrap.pypa.io/ez_setup.py
       
      python ez_setup.py
      
      unzip setuptools-X.X.zip
      cd setuptools-X.X
      
      easy_install pip
    2. Verify Installation
       which pip
       
      #Should print out "/usr/local/bin/pip"
    3. If you come across an issue where while using pip bellow, its still refering to python2.6, you can follow these instructions
      1. Replace the binaries in the /usr/bin/ directory with the ones that were just installed
        cd /usr/bin/
        
        #Backup old binaries
        mv pip pip-BACKUP
        mv pip2 pip2-BACKUP
        mv pip2.6 pip2.6-BACKUP
        
        #Setup symlinks to the new version of pip that was installed
        ln -s /usr/local/bin/pip pip
        ln -s /usr/local/bin/pip2 pip2
        ln -s /usr/local/bin/pip2.7 pip2.7
  6. Install Airflow and other dependencies
    pip install airflow==1.7.0
    pip install airflow[hive]
    pip install airflow[celery]

Troubleshooting installation on Ubuntu:

  • Installing airflow with pip
    • If you get the error “error trying to exec ‘as’: execvp: No such file or directory
      • Install the following:
        apt-get install binutils
        apt-get install gcc
        apt-get install build-essential
        pip install pandas
      • Retry installation
      • If the problem persists, uninstall the packages listed above and reinstall. Then rerun.

Install Airflow on CentOS

  1. SSH onto target machine where you want to install Airflow
  2. Login as Root
  3. Install Required Libraries
    yum groupinstall "Development tools"
     
    yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel python-devel wget cyrus-sasl-devel.x86_64
  4. Check Python Version
    1. Run the command:
      python -V
    2. If the version comes back as “Python 2.7.X” you can skip the rest of this step
    3. Install Python 2.7.X
      cd /opt
      sudo wget --no-check-certificate https://www.python.org/ftp/python/2.7.6/Python-2.7.6.tar.xz
      tar xf Python-2.7.6.tar.xz
      cd Python-2.7.6
      ./configure --prefix=/usr/local
      make && make altinstall
      
      ls -ltr /usr/local/bin/python*
      
      vi ~/.bashrc
      #add this line alias python='/usr/local/bin/python2.7'
  5. Install PIP
    1. Run Install
      cd /tmp/
      
      wget https://bootstrap.pypa.io/ez_setup.py
      
      python ez_setup.py
      
      unzip setuptools-X.X.zip
      cd setuptools-X.X
      
      easy_install pip
    2. Verify Installation
       which pip
       
      #Should print out "/usr/local/bin/pip"
  6. Install Airflow and other dependencies
    pip install airflow==1.7.0
    pip install airflow[hive]
    pip install airflow[celery]

Troubleshooting installation on CentOS:

  • Installing PIP with easy_install
    • If you get an error saying ImportError: No module named extern
      1. Reinstall python-setuptools:
        yum reinstall python-setuptools
      2. Retry installation

Update: Common Issue with Celery

Recently there were some updates to the dependencies of Airflow where if you were to install the airflow[celery] dependency for Airflow 1.7.x, pip would install celery version 4.0.2. This version of celery is incompatible with Airflow 1.7.x. This would result in various types of errors including messages saying that the CeleryExecutor can’t be loaded or that tasks are not getting executed as they should.

To get around this issue, install an older version of celery using pip:

pip install celery==3.1.17

Install RabbitMQ

If you intend to use RabbitMQ as a message broker you will need to install RabbitMQ.If you don’t intend to, you can skip this step. For production it is recommended that you use CeleryExecutors which requires a message broker such as RabbitMQ.

Setup

Follow these steps: Install RabbitMQ

Recovering from a RabbitMQ Node Failure

If you’ve opted to setup RabbitMQ to run on as a cluster, and one of those cluster nodes fails, you can follow these steps to recover on airflow:

  1. Bring the RabbitMQ node and daemon back up
  2. Navigate to the RabbitMQ Managment UI
  3. Click on Queues
  4. Delete the “Default” queue
  5. Restart Airflow Scheduler service

Install MySQL

If you intend to use MySQL as an DB repo you will need to install some MySQL dependencies. If you don’t intend to, you can skip this step.

Install MySQL on Ubuntu

  1. Install MySQL Dependencies
     apt-get install python-dev libmysqlclient-dev
     pip install MySQL-python

Install MySQL on CentOS

  1. Install MySQL Dependencies
    yum install -y mysql-devel python-devel python-setuptools
    pip install MySQL-python

Configuring Airflow

Its recommended to use RabbitMQ.

Apache Airflow needs a home, ~/airflow is the default, but you can lay foundation somewhere else if you prefer (OPTIONAL)

export AIRFLOW_HOME=~/airflow

Run the following as the desired user (who ever you want executing the Airflow jobs) to setup the airflow directories and default configs

airflow initdb
 
#note: When you run this the first time, it will generate a sqlite file (airflow.db) in the AIRFLOW_HOME directory for the Airflow metastore. If you don't intend to use sqlite as the metastore then you can remove this file.

Make the following changes to the {AIRFLOW_HOME}/airflow.cfg file

  1. Change the Executor to CeleryExecutor (Recommended for production)
    executor = CeleryExecutor
  2. Point SQL Alchemy to MySQL (if using MySQL)
    sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
  3. Set dags are paused on startup. This is a good idea to avoid unwanted runs of the workflow. (Recommended)
    # Are DAGs paused by default at creation
    dags_are_paused_at_creation = True
  4. Don’t load examples
    load_examples = False
  5. Set the Broker URL (If you’re using CeleryExecutors)
    1. If you’re using RabbitMQ:
      broker_url = amqp://guest:guest@{RABBITMQ_HOST}:5672/
    2. If you’re using AWS SQS:
      broker_url = sqs://{ACCESS_KEY_ID}:{SECRET_KEY}@
       
      #Note: You will also need to install boto:
      $ pip install -U boto
  6. Point Celery to MySQL (if using MySQL)
    celery_result_backend = db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
  7. Set Flower Port
    # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
    # it `airflow flower`. This defines the port that Celery Flower runs on
    flower_port = 5556
  8. Set the default_queue name used by CeleryExecutors (Optional: Primarily for if you have a preference of the default queue name or plan on using the same broker for multiple airflow instances)
    # Default queue that tasks get assigned to and that worker listen on.
    default_queue = {YOUR_QUEUE_NAME_HERE}
  9. Setup MySQL (if using MySQL)
    1. Login to the mysql machine
    2. Create the airflow database if it doesn’t exist
      CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
    3. Grant access
      grant all on airflow.* TO ‘USERNAME'@'%' IDENTIFIED BY ‘{password}';
  10. Run initdb to setup the database tables
    airflow initdb
  11. Create needed directories
    cd {AIRFLOW_HOME}
    mkdir dags
    mkdir logs

Configuring Airflow – Advanced (Optional)

Email Alerting

Allow Email alerting for if a task or job fails.

  1. Edit the {AIRFLOW_HOME}/airflow.cfg file
  2. Set the properties
    1. Properties
      • SMTP_HOST - Host of the SMTP Server
      • SMTP_TLS - Whether to use TLS when connecting to the SMTP Server
      • SMTP_USE_SSL - Whether to use SSL when connecting to the SMTP Server
      • STMP_USER - Username for connecting to SMTP Server
      • SMTP_PORT - Port to use for SMTP Server
      • SMTP_PASSWORD - Password associated with the user thats used to connect to SMTP Server
      • SMTP_EMAIL_FROM - Email to send Alert Emails as
    2. Example
      [email]
      email_backend = airflow.utils.send_email_smtp
      
      [smtp]
      # If you want airflow to send emails on retries, failure, and you want to
      # the airflow.utils.send_email function, you have to configure an smtp
      # server here
      smtp_host = {SMTP_HOST}
      smtp_starttls = {SMTP_TLS: True or False}
      smtp_ssl = {SMTP_USE_SSL: True or False}
      smtp_user = {STMP_USER}
      smtp_port = {SMTP_PORT}
      smtp_password = {SMTP_PASSWORD}
      smtp_mail_from = {SMTP_EMAIL_FROM}

Password Authentication

To enable password authentication for the web app.

Follow these instructions: http://airflow.incubator.apache.org/security.html

Running Airflow Services

  1. Start Web Server
    nohup airflow webserver $* >> ~/airflow/logs/webserver.logs &
  2. Start Celery Workers
    nohup airflow worker $* >> ~/airflow/logs/celery.logs &
  3. Start Scheduler
    nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &
  4. Navigate to the Airflow UI
  5. Start Flower (Optional)
    • Flower is a web UI built on top of Celery, to monitor your workers.
    nohup airflow flower >> ~/airflow/logs/flower.logs &
  6. Navigate to the Flower UI (Optional)

Testing Airflow

Example Dags

https://github.com/apache/incubator-airflow/tree/master/airflow/example_dags

High Level Testing

Note: You will need to deploy the tutorial.py dag.

airflow test tutorial print_date 2016-03-30

#[2016-03-30 18:39:46,621] {bash_operator.py:72} INFO - Output:
#[2016-03-30 18:39:46,623] {bash_operator.py:76} INFO - Wed Mar 30 18:39:46 UTC 2016

Running a Sample Airflow DAG

Assume the following code is in the dag at ~/airflow/dags/sample.py

from airflow import DAG
from airflow.operators import DummyOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(seconds=10),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('sample', default_args=default_args)

op = DummyOperator(task_id='dummy', dag=dag)
Running a Test

Let’s test by running the actual task instances on a specific date. The date specified in this context is an execution_date, which simulates the scheduler running your task or dag at a specific date + time:

airflow test sample dummy 2016-03-30
Run

Heres how to run a particular task. Note: It might fail if the dependent tasks are not run successfully.

airflow run sample dummy 2016-04-22T00:00:00 --local
Backfill

Backfill will respect your dependencies, emit logs into files and talk to the database to record status. If you do have a webserver up, you’ll be able to track the progress. airflow webserver will start a web server if you are interested in tracking the progress visually as your backfill progresses.

airflow backfill sample -s 2016-08-21

Helpful Operations

Getting Airflow Version

airflow version

Find Airflow Site-Packages Installation Location

Sometimes it might be helpful to find the source code so you can perform some other operations to help customize the experience in Airflow. This is how you can find the location of where the airflow source code is installed:

  1. Start up a Python CLI
    python
  2. Run the following code to find where the airflow source code is installed
    import site
    import os
    SITE_PACKAGES = site.getsitepackages()
    print "All Site Packages: " + str(SITE_PACKAGES)
    for site_package in SITE_PACKAGES:
    	test_path = site_package + "/airflow"
    	if os.path.exists(test_path):
    		AIRFLOW_INSTALL_DIR = test_path
    
    
    print "Site Page Containing Airflow: " + str(AIRFLOW_INSTALL_DIR)

Usual Site Package Paths:

  • Centos
    • /usr/lib/python2.7/site-packages

Change Alert Email Subject

By default, the Airflow Alert Emails are always sent with the subject like: Airflow alert: <TaskInstance: [DAG_NAME].[TASK_ID] [DATE] [failed]>. If you would like to change this to provide more information as to which Airflow cluster you’re working with you can follow these steps.

Note: It requires a very small modification of the Airflow Source Code.

  1. Go to the Airflow Site-Packages Installation Location
    1. Example Path: /usr/lib/python2.7/site-packages/airflow
  2. Edit the models.py file
  3. Search for the text “Airflow alert: ”
    1. Using nano
      1. Open the file
      2. Hit CTRL+w
      3. Type in “Airflow alert” and hit enter
  4. Modify this string to whatever you would like.
    1. Original value ‘title = “Airflow alert: {self}”.format(**locals())”‘ will produce ‘Airflow alert: <TaskInstance: [DAG_NAME].[TASK_ID] [DATE] [failed]>’
    2. An updated value like ‘title = “Test Updated Airflow alert: {self}”.format(**locals())”‘ will produce ‘Test Updated Airflow alert: <TaskInstance: [DAG_NAME].[TASK_ID] [DATE] [failed]>’

Set Logging Level

If you want to get more information in the logs (debug) or log less information (warn) you can follow these steps to set the logging level

Note: It requires a very small modification of the Airflow Source Code.

  1. Go to the Airflow Site-Packages Installation Location of airflow
  2. Edit the settings.py file
  3. Set the LOGGING_LEVEL variable to your desired value
    1. debug → logging.DEBUG
    2. info → logging.INFO
    3. warn → logging.WARN
  4. Restart the Airflow Services

4 comments

  1. Pingback: Setting up an Apache Airflow Cluster - Home

  2. Dan

    Hey Robert!

    Amazing article I really really appreciate you writing this out. I have an issue that I was hoping you might be able to help me out with though, related to mysql set up. I followed your instructions but I am getting the following error

    sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1071, ‘Specified key was too long; max key length is 1000 bytes’) [SQL: u’\nCREATE TABLE sla_miss (\n\ttask_id VARCHAR(250) NOT NULL, \n\tdag_id VARCHAR(250) NOT NULL, \n\texecution_date DATETIME NOT NULL, \n\temail_sent BOOL, \n\ttimestamp DATETIME, \n\tdescription TEXT, \n\tPRIMARY KEY (task_id, dag_id, execution_date), \n\tCHECK (email_sent IN (0, 1))\n)\n\n’]

    I am hoping that there might be some way I can change the CHARACTER SET and or the sqlalchemy connection to handle this max_key_length issue?

    1. Dan

      Actually I think I was able to get this to work, just changed my create database to be ascii instead of utf-8 and added that to my sqlalchemy engine

      Created this question too http://stackoverflow.com/questions/42260618/apache-airflow-mysql-specified-key-was-too-long-max-key-length-is-1000-bytes

      Thanks again for the post I really enjoyed it!

  3. Pingback: Upgrading Apache Airflow Versions - Home

Leave a Reply

Your email address will not be published. Required fields are marked *