Upgrading Apache Airflow Versions

In a previous post we explained how to Install and Configure Apache Airflow (a platform to programmatically author, schedule and monitor workflows). The technology is actively being worked on and more and more features and bug fixes are being added to the project in the form of new releases. At some point, you will want to upgrade to take advantage of these new feature.

In this post we’ll go over the process that you should for upgrading apache airflow versions.

Note: You will need to separately make sure that your dags will be able to work on the new version of Airflow.

Upgrade Airflow

Note: These steps can also work to downgrade versions of Airflow

Note: Execute all of this on all the instances in your Airflow Cluster (if you have more then one machine)

  1. Gather information about your current environment and your target setup:
    • Get the Airflow Home directory. Placeholder for this value: {AIRFLOW_HOME}
    • Get the current version of Airflow you are running. Placeholder for this value: {OLD_AIRFLOW_VERSION}
      1. To get this value you can run:
        $ airflow version
    • Get the new version of Airflow you want to run. Placeholder for this value: {NEW_AIRFLOW_VERSION}
    • Are you using sqlite? Placeholder for this value:{USING_SQLITE?}
    • If you’re not using SQLite, search the airflow.cfg file for the metastore (celery_result_backend and sql_alchemy_conn configurations) type {AIRFLOW_DB_TYPE}, host name {AIRFLOW_DB_HOST}, database schema name {AIRFLOW_DB_SCHEMA}, username {AIRFLOW_DB_USERNAME}, and password {AIRFLOW_DB_PASSWORD}
  2. Ensure the new version of Airflow you want to Install is Available
    1. Run the follow command (don’t forget to include the ‘==’):
      $ pip install airflow==
      • Note: This will throw an error saying that the version is not provided and then show you all the versions available. This is supposed to happen and is a way that you can find out what version are available.
    2. View the list of versions available and make sure the version you want to install ‘{NEW_AIRFLOW_VERSION}’ is available
  3. Shutdown all the Airflow Services on the Master and Worker nodes
    1. webserver
      1. gunicorn processes
    2. scheduler
    3. worker – if applicable
      1. celeryd daemons
    4. flower – if applicable
    5. kerberos ticket renewer – if applicable
  4. Take backups of various components to ensure you can Rollback
    1. Optionally, you can create a directory to house all of these backups. The bellow steps assume you’re going to create this type of folder and push all your objects to the {AIRFLOW_BACKUP_FOLDER}. But you can just as easily rename the files you want to backup if that’s more convenient.
      • Create the backup folder:
        $ mkdir -p {AIRFLOW_BACKUP_FOLDER}
    2. Backup your Configurations
      • Move the airflow.cfg file to the backup folder:
        $ cd {AIRFLOW_HOME}
        $ mv airflow.cfg {AIRFLOW_BACKUP_FOLDER}
    3. Backup your DAGs
      • Zip up the Airflow DAGs folder and move it to the backup folder:
        $ cd {AIRFLOW_HOME}
        $ zip -r airflow_dags.zip dags
        $ mv airflow_dags.zip {AIRFLOW_BACKUP_FOLDER}
      • Note: You may need to install the zip package
    4. Backup your DB/Metastore
      1. If you’re using sqlite ({USING_SQLITE?}):
        • Move the airflow.db sqlite db to the backup folder:
          $ cd {AIRFLOW_HOME}
          $ mv airflow.db {AIRFLOW_BACKUP_FOLDER}
      2. If you’re using a SQL database like MySQL or PostgreSQL, take a dump of the database.
        • If you’re MySQL you can use the following command:
          $ mysqldump --host={AIRFLOW_DB_HOST} --user={AIRFLOW_DB_USERNAME} --password={AIRFLOW_DB_PASSWORD} {AIRFLOW_DB_SCHEMA} > {AIRFLOW_BACKUP_FOLDER}/airflow_metastore_backup.sql
  5. Upgrade Airflow
    1. Run the following PIP command to install Airflow and the required dependencies:
      $ sudo pip install airflow=={NEW_AIRFLOW_VERSION} --upgrade
      $ sudo pip install airflow[hive]=={NEW_AIRFLOW_VERSION} --upgrade
    2. Note: If you installed additional sub-packages of Airflow you will need to upgrade those too
  6. Regenerate and Update Airflow Configurations
    1. Regenerate the airflow.cfg that was backed up using the following command:
      $ airflow initdb
      • Note: The reason you want to regenerate the airflow.cfg file is because between version of airflow, new configurations might have been added or old configurations values (for things that you don’t need to update from the default values) might have changed.
    2. Remove the generated airflow.db file
      $ cd {AIRFLOW_HOME}
      $ rm airflow.db
    3. If you’re using sqlite, copy the old airflow.db file you backed up back to the original place
      $ cd {AIRFLOW_HOME}
      $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.db .
    4. Manually copy all of the individual updated configurations from the old airflow.cfg file that you backed up to the new airflow.cfg file
      • Compare the airflow.cfg files (backed up and new one) to determine which configurations you need to copy over. This may include the following configurations:
        • executor
        • sql_alchemy_conn
        • base_url
        • load_examples
        • broker_url
        • celery_result_backend
    5. Review the airflow.cfg file further to ensure all values are set to the correct value
  7. Upgrade Metastore DB
    • Run the following command:
      $ airflow upgradedb
  8. Restart your Airflow Services
    • The same ones you shutdown in step #3
  9. Test the upgraded Airflow Instance
    • High Level Checklist:
      • Services start up with out errors?
      • DAGs run as expected?
      • Do the plugins you have installed (if any) load and work as expected?
  10. Once/If you want, you can delete the {AIRFLOW_BACKUP_FOLDER} folder and its contents

Rollback Airflow

In the event you encountered a problem during the upgrade process and would like to rollback to the version you already had before, follow these instructions:

  1. Take note of what step you stopped at in the upgrade process
  2. Stop all the Airflow Services
  3. If you reached step #7 in the upgrade steps above (Step: Upgrade Metastore DB)
    1. Restore the database to the original state
      1. If you’re using sqlite ({USING_SQLITE?})
        1. Delete the airflow.db file that’s there and copy the old airflow.db file from your backup folder to its original place:
          $ cd {AIRFLOW_HOME}
          $ rm airflow.db
          $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.db .
      2. If you’re using a SQL database like MySQL or PostgreSQL, restore the dump of the database
        • If you’re using MySQL you can use the following command:
          $ mysql --host={AIRFLOW_DB_HOST} --user={AIRFLOW_DB_USERNAME} --password={AIRFLOW_DB_PASSWORD} {AIRFLOW_DB_SCHEMA} < {AIRFLOW_BACKUP_FOLDER}/airflow_metastore_backup.sql
  4. If you reached step #6 in the upgrade steps above (Step: Regenerate and Update Airflow Configurations)
    • Copy the airflow.cfg file that you backed up back to its original place:
      $ cd {AIRFLOW_HOME}
      $ rm airflow.cfg
      $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.cfg .
  5. If you reached step #5 in the upgrade steps above (Step: Upgrade Airflow)
    • Downgrade Airflow back to the original version:
      $ sudo pip install airflow=={OLD_AIRFLOW_VERSION} --upgrade
      $ sudo pip install airflow[hive]=={OLD_AIRFLOW_VERSION} --upgrade
    • Note: If you installed additional sub-packages of Airflow you will need to downgrade those too
  6. If you reached step #4 in the upgrade steps above (Step: Take backups)
    1. Restore the airflow.cfg file (if you haven’t already done so)
      $ cd {AIRFLOW_HOME}
      $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.cfg .
    2. If you’re using sqlite ({USING_SQLITE?}), restore the airflow.db file (if you haven’t already done so)
      $ cd {AIRFLOW_HOME}
      $ cp {AIRFLOW_BACKUP_FOLDER}/airflow.db .
  7. Restart all the Airflow Services
  8. Test the restored Airflow Instance

Creating Custom Origin for Streamsets

Streamsets Data Collector:

StreamSets Data Collector is a lightweight and powerful engine that streams data in real time. It allows you to build continuous data pipelines, each of which consumes record-oriented data from a single origin, optionally operates on those records in one or more processors and writes data to one or more destinations.

Streamsets Origin Stage:

To define the flow of data for Data Collector, you configure a pipeline. A pipeline consists of stages that represents the origin and destination of the pipeline and any additional processing that you want to perform.

An origin stage represents the source for the pipeline.

For example, this pipeline, based on the SDC taxi data tutorial https://streamsets.com/documentation/datacollector/latest/help/#Tutorial/Overview.html which uses the Directory origin, four processors and the Hadoop File System destination:

 

pipeline

 

Stremsets comes bundled with many origin stage components to connect with almost all commonly used data sources and if you don’t find one for your source system, don’t worry  Streamsets APIs are there to help you in creating a customized origin stage for your system.

This blog explains how to get started writing your own custom Streamsets Origin stage to stream records from Amazon SQS(Simple Queue Service).

 Requirements: 

  • Java Installed
  • IDE(Eclipse/Intellij) setup
  • Streamset data collector

Creating and building the origin template

Follow the Streamset Datacollector documentation to download, install and run StreamSets Data Collector.

You will also need to download source for the Data Collector and its API. Just make sure that you have matching versions for the runtime and source, so you might find it easier to download tarballs from the relevant GitHub release pages rather than using git clone:

Build both the Data Collector and its API:

$ cd datacollector-api
$ mvn clean install -DskipTests ...output omitted...
$ cd ../datacollector
$ mvn clean install -DskipTests ...output omitted...

Maven puts the library JARs in its repository, so they’re available when we build our custom origin:

Create Skeleton Project:

Now create a new custom stage project using the Maven archetype:

$ mvn archetype:generate -DarchetypeGroupId=com.streamsets -DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial -DarchetypeVersion={version} -DinteractiveMode=true

The above command uses streamsets-datacollector-stage-lib-tutorial maven archetype to create the skeleton project and this is the easiest way to get started developing your own stages.

Provide values for property groupId, artifactId, version and package

Maven generates a template project from the archetype in a directory with the artifactId you provided as its name. As you can see, there is template code for an origin, a processor and a destination:

 

structure

 

Origin template classes: 

In the above figure following are the important classes under Origin stage:

  • Groups.java: Responsible to hold the labels for the configuration tabs in datacollector UI
  • SampleDsource.java: Contains stage and its configurations definitions and assigns those configurations to respective groups
  • SampleSource.java: This is the place where the actual logic to read data from the source is written

Basic custom origin stage

Now you can build the template:

$ cd example_stage
$ mvn clean package -DskipTests

Extract the tarball to SDC’s user-libs directory, restart SDC, and you should see the sample stages in the stage library

$ cd ~/streamsets-datacollector-{version}/user-libs/ 
$ tar xvfz {new project root dir}/target/example_stage-1.0-SNAPSHOT.tar.gz x example_stage/lib/example_stage-1.0-SNAPSHOT.jar  

Restart the data collector and you will be able to see sample origin in the stage library panel

 

stage_panel 

Understanding the Origin Template Code
Let’s walk through the template code, starting with Groups.java.

Groups.java

The Groups enumeration holds the label for the configuration tab. Replace the label to have the label for AWS SQS

@GenerateResourceBundle
public enum Groups implements Label {
  SQS("AWS SQS"),
  ;
  private final String label;

SampleDSource.java

Stage and Its configurations definitions

Inside SampleDSource.java define the stage and its configurations and assign those configurations to respective groups. In our case we require AWS credentials, SQS endpoint and queue name to in order to retrieve messages from SQS.

@StageDef(
    version = 1,
    label = "SQS Origin",
    description = "",
    icon = "default.png",
    execution = ExecutionMode.STANDALONE,
    recordsByRef = true,
    onlineHelpRefUrl = ""
)
@ConfigGroups(value = Groups.class)
@GenerateResourceBundle
public class SampleDSource extends SampleSource {

  @ConfigDef(
          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Access Key",
          displayPosition = 10,
          group = "SQS"
  )
  public String access_key;

  @ConfigDef(
          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Secrete Key",
          displayPosition = 10,
          group = "SQS"
  )
  public String secrete_key;

  @ConfigDef(
      required = true,
      type = ConfigDef.Type.STRING,
      defaultValue = "",
      label = "Name",
      displayPosition = 10,
      group = "SQS"
  )
  public String queue_name;

  @ConfigDef(
          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "End Point",
          displayPosition = 10,
          group = "SQS"
  )
  public String end_point;

  /** Delete message once read from Queue */
  @ConfigDef(
          required = true,
          type = ConfigDef.Type.BOOLEAN,
          defaultValue = "",
          label = "Delete Message",
          displayPosition = 10,
          group = "SQS"
  )
  public Boolean delete_flag;


  /** {@inheritDoc} */
  @Override
  public String getEndPoint() {
    return end_point;
  }

  /** {@inheritDoc} */
  @Override
  public String getQueueName() {
    return queue_name;
  }


  /** {@inheritDoc} */
  @Override
  public String getAccessKey() {
    return access_key;
  }

  /** {@inheritDoc} */
  @Override
  public String getSecreteKey() {
    return secrete_key;
  }

  /** {@inheritDoc} */
  @Override
  public Boolean getDeleteFlag() {
    return delete_flag;
  }
}

SampleSource.java

Read configurations and implement actual logic to read messages  from origin

Source extend BaseSource Interface from Streamset API

public abstract class SampleSource extends BaseSource {

An abstract method allows the source to get configuration data from its subclass:

The SampleSource class uses SampleDsource sub class to get access to the UI configurations. Remove the getConfig method with following methods

/**
 * Gives access to the UI configuration of the stage provided by the {@link SampleDSource} class.
 */
public abstract String getEndPoint();
public abstract String getQueueName();
public abstract String getAccessKey();
public abstract String getSecreteKey();
public abstract Boolean getDeleteFlag();

Validate Pipeline Configuration

SDC calls the init() method when validating and running a pipeline. The sample shows how to report configuration errors

@Override
protected List<ConfigIssue> init() {
    // Validate configuration values and open any required resources.
    List<ConfigIssue> issues = super.init();

    if (getEndPoint().isEmpty() || getQueueName().isEmpty() || getAccessKey().isEmpty() || getSecreteKey().isEmpty()) {
        issues.add(
                getContext().createConfigIssue(
                        Groups.SQS.name(), "config", Errors.SAMPLE_00, "Povide required parameters.."
                )
        );
    }

    // If issues is not empty, the UI will inform the user of each configuration issue in the list.
    return issues;
}

SDC calls destroy() during validation, and when a pipeline is stopped

/**
 * {@inheritDoc}
 */
@Override
public void destroy() {
    // Clean up any open resources.
    super.destroy();
}

Put custom logic to read data from source system

Produce method is one where we write the actual logic to read the data from source system. Replace the code with following code logic to read messages from SQS

public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) throws StageException {
    // Offsets can vary depending on the data source. Here we use an integer as an example only.
    long nextSourceOffset = 0;
    if (lastSourceOffset != null) {
        nextSourceOffset = Long.parseLong(lastSourceOffset);
    }

    int numRecords = 0;

    // Create records and add to batch. Records must have a string id. This can include the source offset
    // or other metadata to help uniquely identify the record itself.

    AWSSQSUtil awssqsUtil = new AWSSQSUtil(getAccessKey(),getSecreteKey(),getQueueName(),getEndPoint());

    String queuName = awssqsUtil.getQueueName();
    String queueUrl = awssqsUtil.getQueueUrl(queuName);

    //maximum number of meesage that can be retrieve in one request
    int maxMessagCount = 10;

        List<Message> messages = awssqsUtil.getMessagesFromQueue(queueUrl,maxMessagCount);
        for (Message message : messages) {
            Record record = getContext().createRecord("messageId::" + message.getMessageId());
            Map<String, Field> map = new HashMap<>();
            map.put("receipt_handle", Field.create(message.getReceiptHandle()));
            map.put("md5_of_body", Field.create(message.getMD5OfBody()));
            map.put("body", Field.create(message.getBody()));

            JSONObject attributeJson = new JSONObject();

            for (Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
                attributeJson.put(entry.getKey(), entry.getValue());
            }

            map.put("attribute_list", Field.create(attributeJson.toString()));

            record.set(Field.create(map));
            batchMaker.addRecord(record);
            ++nextSourceOffset;
            ++numRecords;
            if(getDeleteFlag()){
                awssqsUtil.deleteMessageFromQueue(queueUrl,message);
            }
        }
    return String.valueOf(nextSourceOffset);
}

Errors.java

Create custom Errors messages

To create stage specific error messages implement ErrorCode Interface

@GenerateResourceBundle
public enum Errors implements ErrorCode {

  SAMPLE_00("A configuration is invalid because: {}"),
  SAMPLE_01("Specific reason writing record failed: {}"),
  ;
  private final String msg;

  Errors(String msg) {
    this.msg = msg;
  }

  /** {@inheritDoc} */
  @Override
  public String getCode() {
    return name();
  }

  /** {@inheritDoc} */
  @Override
  public String getMessage() {
    return msg;
  }
}

Create the pipeline with custom origin

Follow the Build, Extract and Restart phase as done earlier and create the pipeline using the SQS Origin and provide configuration values. The pipeline will read click logs from SQS and extracts out the clicks which have been made from a particular browser and write it to the loca file system.

screen-shot-2016-11-24-at-3-05-06-pm

screen-shot-2016-11-24-at-3-16-16-pm

 

Run the pipeline and you will see the messages streaming from the SQS queue.

screen-shot-2016-11-24-at-3-26-20-pm

 

Congratulations!!! You have successfully created your first customized origin stage.