Creating Custom Origin for Streamsets

Streamsets Data Collector:

StreamSets Data Collector is a lightweight and powerful engine that streams data in real time. It allows you to build continuous data pipelines, each of which consumes record-oriented data from a single origin, optionally operates on those records in one or more processors and writes data to one or more destinations.

Streamsets Origin Stage:

To define the flow of data for Data Collector, you configure a pipeline. A pipeline consists of stages that represents the origin and destination of the pipeline and any additional processing that you want to perform.

An origin stage represents the source for the pipeline.

For example, this pipeline, based on the SDC taxi data tutorial which uses the Directory origin, four processors and the Hadoop File System destination:




Stremsets comes bundled with many origin stage components to connect with almost all commonly used data sources and if you don’t find one for your source system, don’t worry  Streamsets APIs are there to help you in creating a customized origin stage for your system.

This blog explains how to get started writing your own custom Streamsets Origin stage to stream records from Amazon SQS(Simple Queue Service).


  • Java Installed
  • IDE(Eclipse/Intellij) setup
  • Streamset data collector

Creating and building the origin template

Follow the Streamset Datacollector documentation to download, install and run StreamSets Data Collector.

You will also need to download source for the Data Collector and its API. Just make sure that you have matching versions for the runtime and source, so you might find it easier to download tarballs from the relevant GitHub release pages rather than using git clone:

Build both the Data Collector and its API:

$ cd datacollector-api
$ mvn clean install -DskipTests ...output omitted...
$ cd ../datacollector
$ mvn clean install -DskipTests ...output omitted...

Maven puts the library JARs in its repository, so they’re available when we build our custom origin:

Create Skeleton Project:

Now create a new custom stage project using the Maven archetype:

$ mvn archetype:generate -DarchetypeGroupId=com.streamsets -DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial -DarchetypeVersion={version} -DinteractiveMode=true

The above command uses streamsets-datacollector-stage-lib-tutorial maven archetype to create the skeleton project and this is the easiest way to get started developing your own stages.

Provide values for property groupId, artifactId, version and package

Maven generates a template project from the archetype in a directory with the artifactId you provided as its name. As you can see, there is template code for an origin, a processor and a destination:




Origin template classes: 

In the above figure following are the important classes under Origin stage:

  • Responsible to hold the labels for the configuration tabs in datacollector UI
  • Contains stage and its configurations definitions and assigns those configurations to respective groups
  • This is the place where the actual logic to read data from the source is written

Basic custom origin stage

Now you can build the template:

$ cd example_stage
$ mvn clean package -DskipTests

Extract the tarball to SDC’s user-libs directory, restart SDC, and you should see the sample stages in the stage library

$ cd ~/streamsets-datacollector-{version}/user-libs/ 
$ tar xvfz {new project root dir}/target/example_stage-1.0-SNAPSHOT.tar.gz x example_stage/lib/example_stage-1.0-SNAPSHOT.jar  

Restart the data collector and you will be able to see sample origin in the stage library panel



Understanding the Origin Template Code
Let’s walk through the template code, starting with

The Groups enumeration holds the label for the configuration tab. Replace the label to have the label for AWS SQS

public enum Groups implements Label {
  private final String label;

Stage and Its configurations definitions

Inside define the stage and its configurations and assign those configurations to respective groups. In our case we require AWS credentials, SQS endpoint and queue name to in order to retrieve messages from SQS.

    version = 1,
    label = "SQS Origin",
    description = "",
    icon = "default.png",
    execution = ExecutionMode.STANDALONE,
    recordsByRef = true,
    onlineHelpRefUrl = ""
@ConfigGroups(value = Groups.class)
public class SampleDSource extends SampleSource {

          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Access Key",
          displayPosition = 10,
          group = "SQS"
  public String access_key;

          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Secrete Key",
          displayPosition = 10,
          group = "SQS"
  public String secrete_key;

      required = true,
      type = ConfigDef.Type.STRING,
      defaultValue = "",
      label = "Name",
      displayPosition = 10,
      group = "SQS"
  public String queue_name;

          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "End Point",
          displayPosition = 10,
          group = "SQS"
  public String end_point;

  /** Delete message once read from Queue */
          required = true,
          type = ConfigDef.Type.BOOLEAN,
          defaultValue = "",
          label = "Delete Message",
          displayPosition = 10,
          group = "SQS"
  public Boolean delete_flag;

  /** {@inheritDoc} */
  public String getEndPoint() {
    return end_point;

  /** {@inheritDoc} */
  public String getQueueName() {
    return queue_name;

  /** {@inheritDoc} */
  public String getAccessKey() {
    return access_key;

  /** {@inheritDoc} */
  public String getSecreteKey() {
    return secrete_key;

  /** {@inheritDoc} */
  public Boolean getDeleteFlag() {
    return delete_flag;

Read configurations and implement actual logic to read messages  from origin

Source extend BaseSource Interface from Streamset API

public abstract class SampleSource extends BaseSource {

An abstract method allows the source to get configuration data from its subclass:

The SampleSource class uses SampleDsource sub class to get access to the UI configurations. Remove the getConfig method with following methods

 * Gives access to the UI configuration of the stage provided by the {@link SampleDSource} class.
public abstract String getEndPoint();
public abstract String getQueueName();
public abstract String getAccessKey();
public abstract String getSecreteKey();
public abstract Boolean getDeleteFlag();

Validate Pipeline Configuration

SDC calls the init() method when validating and running a pipeline. The sample shows how to report configuration errors

protected List<ConfigIssue> init() {
    // Validate configuration values and open any required resources.
    List<ConfigIssue> issues = super.init();

    if (getEndPoint().isEmpty() || getQueueName().isEmpty() || getAccessKey().isEmpty() || getSecreteKey().isEmpty()) {
              , "config", Errors.SAMPLE_00, "Povide required parameters.."

    // If issues is not empty, the UI will inform the user of each configuration issue in the list.
    return issues;

SDC calls destroy() during validation, and when a pipeline is stopped

 * {@inheritDoc}
public void destroy() {
    // Clean up any open resources.

Put custom logic to read data from source system

Produce method is one where we write the actual logic to read the data from source system. Replace the code with following code logic to read messages from SQS

public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) throws StageException {
    // Offsets can vary depending on the data source. Here we use an integer as an example only.
    long nextSourceOffset = 0;
    if (lastSourceOffset != null) {
        nextSourceOffset = Long.parseLong(lastSourceOffset);

    int numRecords = 0;

    // Create records and add to batch. Records must have a string id. This can include the source offset
    // or other metadata to help uniquely identify the record itself.

    AWSSQSUtil awssqsUtil = new AWSSQSUtil(getAccessKey(),getSecreteKey(),getQueueName(),getEndPoint());

    String queuName = awssqsUtil.getQueueName();
    String queueUrl = awssqsUtil.getQueueUrl(queuName);

    //maximum number of meesage that can be retrieve in one request
    int maxMessagCount = 10;

        List<Message> messages = awssqsUtil.getMessagesFromQueue(queueUrl,maxMessagCount);
        for (Message message : messages) {
            Record record = getContext().createRecord("messageId::" + message.getMessageId());
            Map<String, Field> map = new HashMap<>();
            map.put("receipt_handle", Field.create(message.getReceiptHandle()));
            map.put("md5_of_body", Field.create(message.getMD5OfBody()));
            map.put("body", Field.create(message.getBody()));

            JSONObject attributeJson = new JSONObject();

            for (Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
                attributeJson.put(entry.getKey(), entry.getValue());

            map.put("attribute_list", Field.create(attributeJson.toString()));

    return String.valueOf(nextSourceOffset);

Create custom Errors messages

To create stage specific error messages implement ErrorCode Interface

public enum Errors implements ErrorCode {

  SAMPLE_00("A configuration is invalid because: {}"),
  SAMPLE_01("Specific reason writing record failed: {}"),
  private final String msg;

  Errors(String msg) {
    this.msg = msg;

  /** {@inheritDoc} */
  public String getCode() {
    return name();

  /** {@inheritDoc} */
  public String getMessage() {
    return msg;

Create the pipeline with custom origin

Follow the Build, Extract and Restart phase as done earlier and create the pipeline using the SQS Origin and provide configuration values. The pipeline will read click logs from SQS and extracts out the clicks which have been made from a particular browser and write it to the loca file system.




Run the pipeline and you will see the messages streaming from the SQS queue.



Congratulations!!! You have successfully created your first customized origin stage.

Installing SparkR on a Hadoop Cluster


SparkR is an extension to Apache Spark which allows you to run Spark jobs with the R programming language. This provides the benefit of being able to use R packages and libraries in your Spark jobs. In the case of both Cloudera and MapR, SparkR is not supported and would need to be installed separately.

Installation Steps

Here are the steps you can take to Install SparkR on a Hadoop Cluster:

  1. Execute the following steps on all the Spark Gateways/Edge Nodes
    1. Login to the target machine as root
    2. Install R and other Dependencies
      1. Execute the following to Install
        1. Ubuntu
          sh -c 'echo "deb lenny-cran/" >> /etc/apt/sources.list'
          apt-get install r-base r-base-dev
        2. Centos
          1. Install the repo
            rpm -ivh
          2. Enable the repo
            1. Edit the /etc/yum.repos.d/epel-testing.repo file with your favorite text editing software
            2. Change all the enabled sections to ‘1’
          3. Clean yum cache
            yum clean all
          4. Install R and Dependencies
            yum install R R-devel libcurl-devel openssl-devel
      2. Test R installation
        1. Start up an R Session
        2. Within the R Shell, execute an addition command to ensure things are ran correctly
          1 + 1
        3. Quit when you’re done
      3. Note: R libraries gets installed at “/usr/lib64/R”
    3. Get the version of Spark you currently have installed
      1. Run the following command
        spark-submit --version
      2. Example output: 1.6.0
      3. Replace the Placeholder {SPARK_VERSION} with this value
    4. Install SparkR
      1. Start up the R console
      2. Install the Depending R Packages
      3. Install the SparkR Packages
        devtools::install_github('apache/spark@v{SPARK_VERSION}', subdir='R/pkg')
      4. Close out of the R shell
    5. Find the Spark Home Directory and replace the Placeholder {SPARK_HOME_DIRECTORY} with this value
    6. Install the SparkR OS Dependencies
      cd /tmp/
      unzip v{SPARK_VERSION}.zip
      cd spark-{SPARK_VERSION}
      cd bin
      cp sparkR {SPARK_HOME_DIRECTORY}/bin/
    7. Run Dev Install
    8. Create a new file “/user/bin/sparkR” and set the contents
      1. Copy the contents of the /usr/bin/spark-shell file to /usr/bin/sparkR
        cp /usr/bin/spark-shell /usr/bin/sparkR
      2. Edit the /usr/bin/sparkR file. Replace “spark-shell” with “sparkR” on the bottom exec command.
    9. Finish install
      sudo chmod 755 /usr/bin/sparkR
    10. Verify that the sparkR command is available
      cd ~
      which sparkR
    11. Your done!


Upon completion of the installation steps, here are some ways that you can test the installation to verify everything is running correctly.

  • Test from R Console – Run on a Spark Gateway
    1. Start an R Shell
    2. Execute the following commands in the R Shell
      sc = spark_connect(master = "yarn-client")
    3. If this runs without errors then you know it’s working!
  • Test from SparkR Console – Run on a Spark Gateway
    1. Open the SparkR Console
    2. Verify the Spark Context is available with the following command:
    3. If the sc variable is listed then you know it’s working!
  • Sample code you can run to test more
    rdd = SparkR:::parallelize(sc, 1:5)