Analytics, Big Data, Cloud Computing, Hadoop, Learning

Creating Custom Origin for Streamsets

Streamsets Data Collector:

StreamSets Data Collector is a lightweight and powerful engine that streams data in real time. It allows you to build continuous data pipelines, each of which consumes record-oriented data from a single origin, optionally operates on those records in one or more processors and writes data to one or more destinations.

Streamsets Origin Stage:

To define the flow of data for Data Collector, you configure a pipeline. A pipeline consists of stages that represents the origin and destination of the pipeline and any additional processing that you want to perform.

An origin stage represents the source for the pipeline.

For example, this pipeline, based on the SDC taxi data tutorial https://streamsets.com/documentation/datacollector/latest/help/#Tutorial/Overview.html which uses the Directory origin, four processors and the Hadoop File System destination:

 

pipeline

 

Stremsets comes bundled with many origin stage components to connect with almost all commonly used data sources and if you don’t find one for your source system, don’t worry  Streamsets APIs are there to help you in creating a customized origin stage for your system.

This blog explains how to get started writing your own custom Streamsets Origin stage to stream records from Amazon SQS(Simple Queue Service).

 Requirements: 

  • Java Installed
  • IDE(Eclipse/Intellij) setup
  • Streamset data collector

Creating and building the origin template

Follow the Streamset Datacollector documentation to download, install and run StreamSets Data Collector.

You will also need to download source for the Data Collector and its API. Just make sure that you have matching versions for the runtime and source, so you might find it easier to download tarballs from the relevant GitHub release pages rather than using git clone:

Build both the Data Collector and its API:

$ cd datacollector-api
$ mvn clean install -DskipTests ...output omitted...
$ cd ../datacollector
$ mvn clean install -DskipTests ...output omitted...

Maven puts the library JARs in its repository, so they’re available when we build our custom origin:

Create Skeleton Project:

Now create a new custom stage project using the Maven archetype:

$ mvn archetype:generate -DarchetypeGroupId=com.streamsets -DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial -DarchetypeVersion={version} -DinteractiveMode=true

The above command uses streamsets-datacollector-stage-lib-tutorial maven archetype to create the skeleton project and this is the easiest way to get started developing your own stages.

Provide values for property groupId, artifactId, version and package

Maven generates a template project from the archetype in a directory with the artifactId you provided as its name. As you can see, there is template code for an origin, a processor and a destination:

 

structure

 

Origin template classes: 

In the above figure following are the important classes under Origin stage:

  • Groups.java: Responsible to hold the labels for the configuration tabs in datacollector UI
  • SampleDsource.java: Contains stage and its configurations definitions and assigns those configurations to respective groups
  • SampleSource.java: This is the place where the actual logic to read data from the source is written

Basic custom origin stage

Now you can build the template:

$ cd example_stage
$ mvn clean package -DskipTests

Extract the tarball to SDC’s user-libs directory, restart SDC, and you should see the sample stages in the stage library

$ cd ~/streamsets-datacollector-{version}/user-libs/ 
$ tar xvfz {new project root dir}/target/example_stage-1.0-SNAPSHOT.tar.gz x example_stage/lib/example_stage-1.0-SNAPSHOT.jar  

Restart the data collector and you will be able to see sample origin in the stage library panel

 

stage_panel 

Understanding the Origin Template Code
Let’s walk through the template code, starting with Groups.java.

Groups.java

The Groups enumeration holds the label for the configuration tab. Replace the label to have the label for AWS SQS

@GenerateResourceBundle
public enum Groups implements Label {
  SQS("AWS SQS"),
  ;
  private final String label;

SampleDSource.java

Stage and Its configurations definitions

Inside SampleDSource.java define the stage and its configurations and assign those configurations to respective groups. In our case we require AWS credentials, SQS endpoint and queue name to in order to retrieve messages from SQS.

@StageDef(
    version = 1,
    label = "SQS Origin",
    description = "",
    icon = "default.png",
    execution = ExecutionMode.STANDALONE,
    recordsByRef = true,
    onlineHelpRefUrl = ""
)
@ConfigGroups(value = Groups.class)
@GenerateResourceBundle
public class SampleDSource extends SampleSource {

  @ConfigDef(
          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Access Key",
          displayPosition = 10,
          group = "SQS"
  )
  public String access_key;

  @ConfigDef(
          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Secrete Key",
          displayPosition = 10,
          group = "SQS"
  )
  public String secrete_key;

  @ConfigDef(
      required = true,
      type = ConfigDef.Type.STRING,
      defaultValue = "",
      label = "Name",
      displayPosition = 10,
      group = "SQS"
  )
  public String queue_name;

  @ConfigDef(
          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "End Point",
          displayPosition = 10,
          group = "SQS"
  )
  public String end_point;

  /** Delete message once read from Queue */
  @ConfigDef(
          required = true,
          type = ConfigDef.Type.BOOLEAN,
          defaultValue = "",
          label = "Delete Message",
          displayPosition = 10,
          group = "SQS"
  )
  public Boolean delete_flag;


  /** {@inheritDoc} */
  @Override
  public String getEndPoint() {
    return end_point;
  }

  /** {@inheritDoc} */
  @Override
  public String getQueueName() {
    return queue_name;
  }


  /** {@inheritDoc} */
  @Override
  public String getAccessKey() {
    return access_key;
  }

  /** {@inheritDoc} */
  @Override
  public String getSecreteKey() {
    return secrete_key;
  }

  /** {@inheritDoc} */
  @Override
  public Boolean getDeleteFlag() {
    return delete_flag;
  }
}

SampleSource.java

Read configurations and implement actual logic to read messages  from origin

Source extend BaseSource Interface from Streamset API

public abstract class SampleSource extends BaseSource {

An abstract method allows the source to get configuration data from its subclass:

The SampleSource class uses SampleDsource sub class to get access to the UI configurations. Remove the getConfig method with following methods

/**
 * Gives access to the UI configuration of the stage provided by the {@link SampleDSource} class.
 */
public abstract String getEndPoint();
public abstract String getQueueName();
public abstract String getAccessKey();
public abstract String getSecreteKey();
public abstract Boolean getDeleteFlag();

Validate Pipeline Configuration

SDC calls the init() method when validating and running a pipeline. The sample shows how to report configuration errors

@Override
protected List<ConfigIssue> init() {
    // Validate configuration values and open any required resources.
    List<ConfigIssue> issues = super.init();

    if (getEndPoint().isEmpty() || getQueueName().isEmpty() || getAccessKey().isEmpty() || getSecreteKey().isEmpty()) {
        issues.add(
                getContext().createConfigIssue(
                        Groups.SQS.name(), "config", Errors.SAMPLE_00, "Povide required parameters.."
                )
        );
    }

    // If issues is not empty, the UI will inform the user of each configuration issue in the list.
    return issues;
}

SDC calls destroy() during validation, and when a pipeline is stopped

/**
 * {@inheritDoc}
 */
@Override
public void destroy() {
    // Clean up any open resources.
    super.destroy();
}

Put custom logic to read data from source system

Produce method is one where we write the actual logic to read the data from source system. Replace the code with following code logic to read messages from SQS

public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) throws StageException {
    // Offsets can vary depending on the data source. Here we use an integer as an example only.
    long nextSourceOffset = 0;
    if (lastSourceOffset != null) {
        nextSourceOffset = Long.parseLong(lastSourceOffset);
    }

    int numRecords = 0;

    // Create records and add to batch. Records must have a string id. This can include the source offset
    // or other metadata to help uniquely identify the record itself.

    AWSSQSUtil awssqsUtil = new AWSSQSUtil(getAccessKey(),getSecreteKey(),getQueueName(),getEndPoint());

    String queuName = awssqsUtil.getQueueName();
    String queueUrl = awssqsUtil.getQueueUrl(queuName);

    //maximum number of meesage that can be retrieve in one request
    int maxMessagCount = 10;

        List<Message> messages = awssqsUtil.getMessagesFromQueue(queueUrl,maxMessagCount);
        for (Message message : messages) {
            Record record = getContext().createRecord("messageId::" + message.getMessageId());
            Map<String, Field> map = new HashMap<>();
            map.put("receipt_handle", Field.create(message.getReceiptHandle()));
            map.put("md5_of_body", Field.create(message.getMD5OfBody()));
            map.put("body", Field.create(message.getBody()));

            JSONObject attributeJson = new JSONObject();

            for (Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
                attributeJson.put(entry.getKey(), entry.getValue());
            }

            map.put("attribute_list", Field.create(attributeJson.toString()));

            record.set(Field.create(map));
            batchMaker.addRecord(record);
            ++nextSourceOffset;
            ++numRecords;
            if(getDeleteFlag()){
                awssqsUtil.deleteMessageFromQueue(queueUrl,message);
            }
        }
    return String.valueOf(nextSourceOffset);
}

Errors.java

Create custom Errors messages

To create stage specific error messages implement ErrorCode Interface

@GenerateResourceBundle
public enum Errors implements ErrorCode {

  SAMPLE_00("A configuration is invalid because: {}"),
  SAMPLE_01("Specific reason writing record failed: {}"),
  ;
  private final String msg;

  Errors(String msg) {
    this.msg = msg;
  }

  /** {@inheritDoc} */
  @Override
  public String getCode() {
    return name();
  }

  /** {@inheritDoc} */
  @Override
  public String getMessage() {
    return msg;
  }
}

Create the pipeline with custom origin

Follow the Build, Extract and Restart phase as done earlier and create the pipeline using the SQS Origin and provide configuration values. The pipeline will read click logs from SQS and extracts out the clicks which have been made from a particular browser and write it to the loca file system.

screen-shot-2016-11-24-at-3-05-06-pm

screen-shot-2016-11-24-at-3-16-16-pm

 

Run the pipeline and you will see the messages streaming from the SQS queue.

screen-shot-2016-11-24-at-3-26-20-pm

 

Congratulations!!! You have successfully created your first customized origin stage.

One comment

  1. Pingback: Last week in Stream Processing & Analytics 12/19/2016 | Enjoy IT - SOA, Java, Event-Driven Computing and Integration

Leave a Reply

Your email address will not be published. Required fields are marked *