Creating Custom Origin for Streamsets

Streamsets Data Collector:

StreamSets Data Collector is a lightweight and powerful engine that streams data in real time. It allows you to build continuous data pipelines, each of which consumes record-oriented data from a single origin, optionally operates on those records in one or more processors and writes data to one or more destinations.

Streamsets Origin Stage:

To define the flow of data for Data Collector, you configure a pipeline. A pipeline consists of stages that represents the origin and destination of the pipeline and any additional processing that you want to perform.

An origin stage represents the source for the pipeline.

For example, this pipeline, based on the SDC taxi data tutorial which uses the Directory origin, four processors and the Hadoop File System destination:




Stremsets comes bundled with many origin stage components to connect with almost all commonly used data sources and if you don’t find one for your source system, don’t worry  Streamsets APIs are there to help you in creating a customized origin stage for your system.

This blog explains how to get started writing your own custom Streamsets Origin stage to stream records from Amazon SQS(Simple Queue Service).


  • Java Installed
  • IDE(Eclipse/Intellij) setup
  • Streamset data collector

Creating and building the origin template

Follow the Streamset Datacollector documentation to download, install and run StreamSets Data Collector.

You will also need to download source for the Data Collector and its API. Just make sure that you have matching versions for the runtime and source, so you might find it easier to download tarballs from the relevant GitHub release pages rather than using git clone:

Build both the Data Collector and its API:

$ cd datacollector-api
$ mvn clean install -DskipTests ...output omitted...
$ cd ../datacollector
$ mvn clean install -DskipTests ...output omitted...

Maven puts the library JARs in its repository, so they’re available when we build our custom origin:

Create Skeleton Project:

Now create a new custom stage project using the Maven archetype:

$ mvn archetype:generate -DarchetypeGroupId=com.streamsets -DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial -DarchetypeVersion={version} -DinteractiveMode=true

The above command uses streamsets-datacollector-stage-lib-tutorial maven archetype to create the skeleton project and this is the easiest way to get started developing your own stages.

Provide values for property groupId, artifactId, version and package

Maven generates a template project from the archetype in a directory with the artifactId you provided as its name. As you can see, there is template code for an origin, a processor and a destination:




Origin template classes: 

In the above figure following are the important classes under Origin stage:

  • Responsible to hold the labels for the configuration tabs in datacollector UI
  • Contains stage and its configurations definitions and assigns those configurations to respective groups
  • This is the place where the actual logic to read data from the source is written

Basic custom origin stage

Now you can build the template:

$ cd example_stage
$ mvn clean package -DskipTests

Extract the tarball to SDC’s user-libs directory, restart SDC, and you should see the sample stages in the stage library

$ cd ~/streamsets-datacollector-{version}/user-libs/ 
$ tar xvfz {new project root dir}/target/example_stage-1.0-SNAPSHOT.tar.gz x example_stage/lib/example_stage-1.0-SNAPSHOT.jar  

Restart the data collector and you will be able to see sample origin in the stage library panel



Understanding the Origin Template Code
Let’s walk through the template code, starting with

The Groups enumeration holds the label for the configuration tab. Replace the label to have the label for AWS SQS

public enum Groups implements Label {
  private final String label;

Stage and Its configurations definitions

Inside define the stage and its configurations and assign those configurations to respective groups. In our case we require AWS credentials, SQS endpoint and queue name to in order to retrieve messages from SQS.

    version = 1,
    label = "SQS Origin",
    description = "",
    icon = "default.png",
    execution = ExecutionMode.STANDALONE,
    recordsByRef = true,
    onlineHelpRefUrl = ""
@ConfigGroups(value = Groups.class)
public class SampleDSource extends SampleSource {

          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Access Key",
          displayPosition = 10,
          group = "SQS"
  public String access_key;

          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Secrete Key",
          displayPosition = 10,
          group = "SQS"
  public String secrete_key;

      required = true,
      type = ConfigDef.Type.STRING,
      defaultValue = "",
      label = "Name",
      displayPosition = 10,
      group = "SQS"
  public String queue_name;

          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "End Point",
          displayPosition = 10,
          group = "SQS"
  public String end_point;

  /** Delete message once read from Queue */
          required = true,
          type = ConfigDef.Type.BOOLEAN,
          defaultValue = "",
          label = "Delete Message",
          displayPosition = 10,
          group = "SQS"
  public Boolean delete_flag;

  /** {@inheritDoc} */
  public String getEndPoint() {
    return end_point;

  /** {@inheritDoc} */
  public String getQueueName() {
    return queue_name;

  /** {@inheritDoc} */
  public String getAccessKey() {
    return access_key;

  /** {@inheritDoc} */
  public String getSecreteKey() {
    return secrete_key;

  /** {@inheritDoc} */
  public Boolean getDeleteFlag() {
    return delete_flag;

Read configurations and implement actual logic to read messages  from origin

Source extend BaseSource Interface from Streamset API

public abstract class SampleSource extends BaseSource {

An abstract method allows the source to get configuration data from its subclass:

The SampleSource class uses SampleDsource sub class to get access to the UI configurations. Remove the getConfig method with following methods

 * Gives access to the UI configuration of the stage provided by the {@link SampleDSource} class.
public abstract String getEndPoint();
public abstract String getQueueName();
public abstract String getAccessKey();
public abstract String getSecreteKey();
public abstract Boolean getDeleteFlag();

Validate Pipeline Configuration

SDC calls the init() method when validating and running a pipeline. The sample shows how to report configuration errors

protected List<ConfigIssue> init() {
    // Validate configuration values and open any required resources.
    List<ConfigIssue> issues = super.init();

    if (getEndPoint().isEmpty() || getQueueName().isEmpty() || getAccessKey().isEmpty() || getSecreteKey().isEmpty()) {
              , "config", Errors.SAMPLE_00, "Povide required parameters.."

    // If issues is not empty, the UI will inform the user of each configuration issue in the list.
    return issues;

SDC calls destroy() during validation, and when a pipeline is stopped

 * {@inheritDoc}
public void destroy() {
    // Clean up any open resources.

Put custom logic to read data from source system

Produce method is one where we write the actual logic to read the data from source system. Replace the code with following code logic to read messages from SQS

public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) throws StageException {
    // Offsets can vary depending on the data source. Here we use an integer as an example only.
    long nextSourceOffset = 0;
    if (lastSourceOffset != null) {
        nextSourceOffset = Long.parseLong(lastSourceOffset);

    int numRecords = 0;

    // Create records and add to batch. Records must have a string id. This can include the source offset
    // or other metadata to help uniquely identify the record itself.

    AWSSQSUtil awssqsUtil = new AWSSQSUtil(getAccessKey(),getSecreteKey(),getQueueName(),getEndPoint());

    String queuName = awssqsUtil.getQueueName();
    String queueUrl = awssqsUtil.getQueueUrl(queuName);

    //maximum number of meesage that can be retrieve in one request
    int maxMessagCount = 10;

        List<Message> messages = awssqsUtil.getMessagesFromQueue(queueUrl,maxMessagCount);
        for (Message message : messages) {
            Record record = getContext().createRecord("messageId::" + message.getMessageId());
            Map<String, Field> map = new HashMap<>();
            map.put("receipt_handle", Field.create(message.getReceiptHandle()));
            map.put("md5_of_body", Field.create(message.getMD5OfBody()));
            map.put("body", Field.create(message.getBody()));

            JSONObject attributeJson = new JSONObject();

            for (Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
                attributeJson.put(entry.getKey(), entry.getValue());

            map.put("attribute_list", Field.create(attributeJson.toString()));

    return String.valueOf(nextSourceOffset);

Create custom Errors messages

To create stage specific error messages implement ErrorCode Interface

public enum Errors implements ErrorCode {

  SAMPLE_00("A configuration is invalid because: {}"),
  SAMPLE_01("Specific reason writing record failed: {}"),
  private final String msg;

  Errors(String msg) {
    this.msg = msg;

  /** {@inheritDoc} */
  public String getCode() {
    return name();

  /** {@inheritDoc} */
  public String getMessage() {
    return msg;

Create the pipeline with custom origin

Follow the Build, Extract and Restart phase as done earlier and create the pipeline using the SQS Origin and provide configuration values. The pipeline will read click logs from SQS and extracts out the clicks which have been made from a particular browser and write it to the loca file system.




Run the pipeline and you will see the messages streaming from the SQS queue.



Congratulations!!! You have successfully created your first customized origin stage.

Encrypting Amazon EC2 boot volumes via Packer

In order to layer on some easy data-at-rest security, I want to encrypt the boot volumes of my Amazon EC2 instances.  I also want to use the CentOS images but those are not encrypted.  How can I end up with an encrypted copy of those AMIs in the fewest steps?

In the past, I have used shell scripts and the AWS CLI to perform the boot volume encryption dance. The steps are basically:

  1. Deploy an instance running the source AMI.
  2. Create an image from that instance.
  3. Copy the image and encrypt the copy.
  4. Delete the unencrypted image.
  5. Terminate the instance.
  6. Add tags to new AMI.

The script has a need for a lot of VPC/subnet/security group preparation (which I guess could have been added to the script), and if there were errors during the execution then cleanup was very manual (more possible script work). The script is very flexible and meets my needs, but it is a codebase that needs expertise in order to maintain. And I have better things to do with my time.

A simpler solution is Packer.

I had looked at Packer around July of 2016 and it was very promising, but it was missing one key feature: it could not actually encrypt the boot volume. Dave Konopka wrote a post describing the problem and his solution of using Ansible in Encrypted Amazon EC2 boot volumes with Packer and Ansible. Luckily, there was an outstanding pull request and as of version 0.11.0, Packer now has support for boot volume encryption whilst copying Marketplace AMIs.

The nice thing about a Packer template is that it takes care of dynamic generation of most objects. Temporary SSH keys and security groups are created just for the build and are then destroyed. The above steps for the boot volume encryption dance are followed with built-in error checking and recovery in case something goes wrong.

This template assumes automatic lookup of your AWS credentials. Read the docs (Specifying Amazon Credentials section) for more details.

Code can be downloaded from GitHub.

$ cat
    "description": "Copy the CentOS 7 AMI into our account so that we can add boot volume encryption.",
    "min_packer_version": "0.11.0",
    "variables": {
        "aws_region": "us-east-1",
        "aws_vpc": null,
        "aws_subnet": null,
        "ssh_username": "centos"
    "builders": [
            "type": "amazon-ebs",
            "ami_name": "CentOS Linux 7 x86_64 HVM EBS (encrypted) {{isotime \"20060102\"}}",
            "ami_description": "CentOS Linux 7 x86_64 HVM EBS (encrypted) {{isotime \"20060102\"}}",
            "instance_type": "t2.nano",
            "region": "{{user `aws_region`}}",
            "vpc_id": "{{user `aws_vpc`}}",
            "subnet_id": "{{user `aws_subnet`}}",
            "source_ami_filter": {
                "filters": {
                    "owner-alias": "aws-marketplace",
                    "product-code": "aw0evgkw8e5c1q413zgy5pjce",
                    "virtualization-type": "hvm"
                "most_recent": true
            "ami_virtualization_type": "hvm",
            "ssh_username": "{{user `ssh_username`}}",
            "associate_public_ip_address": true,
            "tags": {
                "Name": "CentOS 7",
                "OS": "CentOS",
                "OSVER": "7"
            "encrypt_boot": true,
            "ami_block_device_mappings": [
                    "device_name": "/dev/sda1",
                    "volume_type": "gp2",
                    "volume_size": 8,
                    "encrypted": true,
                    "delete_on_termination": true
            "communicator": "ssh",
            "ssh_pty": true
    "provisioners": [
            "type": "shell",
            "execute_command": "sudo -S sh '{{.Path}}'",
            "inline_shebang": "/bin/sh -e -x",
            "inline": [
                "echo '** Shreding sensitive data ...'",
                "shred -u /etc/ssh/*_key /etc/ssh/*",
                "shred -u /root/.*history /home/{{user `ssh_username`}}/.*history",
                "shred -u /root/.ssh/authorized_keys /home/{{user `ssh_username`}}/.ssh/authorized_keys",
                "sync; sleep 1; sync"

To copy the CentoS 6 AMI, change any references of CentOS “7” to “6” and the product-code from “aw0evgkw8e5c1q413zgy5pjce” to “6x5jmcajty9edm3f211pqjfn2”.

When you build with this Packer template, you will have to pass in the variables aws_vpc and aws_subnet. The AWS region defaults to us-east-1, but can be overridden by setting aws_region. The newest CentOS AMI in that region will be automatically discovered.

$ packer build -var 'aws_vpc=vpc-12345678' -var 'aws_subnet=subnet-23456789' \
amazon-ebs output will be in this color.

==> amazon-ebs: Prevalidating AMI Name...
    amazon-ebs: Found Image ID: ami-6d1c2007
==> amazon-ebs: Creating temporary keypair: packer_583c7438-d1d8-f33d-8517-1bdbbd84d2c9
==> amazon-ebs: Creating temporary security group for this instance...
==> amazon-ebs: Authorizing access to port 22 the temporary security group...
==> amazon-ebs: Launching a source AWS instance...
    amazon-ebs: Instance ID: i-5b68a2c4
==> amazon-ebs: Waiting for instance (i-5b68a2c4) to become ready...
==> amazon-ebs: Waiting for SSH to become available...
==> amazon-ebs: Connected to SSH!
==> amazon-ebs: Provisioning with shell script: /var/folders/42/drnmdknj7zz7bf03d91v8nkr0000gq/T/packer-shell797958164
    amazon-ebs: ** Shreding sensitive data ...
    amazon-ebs: shred: /root/.*history: failed to open for writing: No such file or directory
    amazon-ebs: shred: /home/centos/.*history: failed to open for writing: No such file or directory
==> amazon-ebs: Stopping the source instance...
==> amazon-ebs: Waiting for the instance to stop...
==> amazon-ebs: Creating the AMI: CentOS Linux 7 x86_64 HVM EBS (encrypted) 1480356920
    amazon-ebs: AMI: ami-33506f25
==> amazon-ebs: Waiting for AMI to become ready...
==> amazon-ebs: Creating Encrypted AMI Copy
==> amazon-ebs: Copying AMI: us-east-1(ami-33506f25)
==> amazon-ebs: Waiting for AMI copy to become ready...
==> amazon-ebs: Deregistering unencrypted AMI
==> amazon-ebs: Deleting unencrypted snapshots
    amazon-ebs: Snapshot ID: snap-5c87d7eb
==> amazon-ebs: Modifying attributes on AMI (ami-9d4b748b)...
    amazon-ebs: Modifying: description
==> amazon-ebs: Adding tags to AMI (ami-9d4b748b)...
    amazon-ebs: Adding tag: "OS": "CentOS"
    amazon-ebs: Adding tag: "OSVER": "7"
    amazon-ebs: Adding tag: "Name": "CentOS 7"
==> amazon-ebs: Tagging snapshot: snap-1eb5dc01
==> amazon-ebs: Terminating the source AWS instance...
==> amazon-ebs: Cleaning up any extra volumes...
==> amazon-ebs: Destroying volume (vol-aa727a37)...
==> amazon-ebs: Deleting temporary security group...
==> amazon-ebs: Deleting temporary keypair...
Build 'amazon-ebs' finished.

==> Builds finished. The artifacts of successful builds are:
--> amazon-ebs: AMIs were created:

us-east-1: ami-9d4b748b