Creating Custom Origin for Streamsets

Streamsets Data Collector:

StreamSets Data Collector is a lightweight and powerful engine that streams data in real time. It allows you to build continuous data pipelines, each of which consumes record-oriented data from a single origin, optionally operates on those records in one or more processors and writes data to one or more destinations.

Streamsets Origin Stage:

To define the flow of data for Data Collector, you configure a pipeline. A pipeline consists of stages that represents the origin and destination of the pipeline and any additional processing that you want to perform.

An origin stage represents the source for the pipeline.

For example, this pipeline, based on the SDC taxi data tutorial https://streamsets.com/documentation/datacollector/latest/help/#Tutorial/Overview.html which uses the Directory origin, four processors and the Hadoop File System destination:

 

pipeline

 

Stremsets comes bundled with many origin stage components to connect with almost all commonly used data sources and if you don’t find one for your source system, don’t worry  Streamsets APIs are there to help you in creating a customized origin stage for your system.

This blog explains how to get started writing your own custom Streamsets Origin stage to stream records from Amazon SQS(Simple Queue Service).

 Requirements: 

  • Java Installed
  • IDE(Eclipse/Intellij) setup
  • Streamset data collector

Creating and building the origin template

Follow the Streamset Datacollector documentation to download, install and run StreamSets Data Collector.

You will also need to download source for the Data Collector and its API. Just make sure that you have matching versions for the runtime and source, so you might find it easier to download tarballs from the relevant GitHub release pages rather than using git clone:

Build both the Data Collector and its API:

$ cd datacollector-api
$ mvn clean install -DskipTests ...output omitted...
$ cd ../datacollector
$ mvn clean install -DskipTests ...output omitted...

Maven puts the library JARs in its repository, so they’re available when we build our custom origin:

Create Skeleton Project:

Now create a new custom stage project using the Maven archetype:

$ mvn archetype:generate -DarchetypeGroupId=com.streamsets -DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial -DarchetypeVersion={version} -DinteractiveMode=true

The above command uses streamsets-datacollector-stage-lib-tutorial maven archetype to create the skeleton project and this is the easiest way to get started developing your own stages.

Provide values for property groupId, artifactId, version and package

Maven generates a template project from the archetype in a directory with the artifactId you provided as its name. As you can see, there is template code for an origin, a processor and a destination:

 

structure

 

Origin template classes: 

In the above figure following are the important classes under Origin stage:

  • Groups.java: Responsible to hold the labels for the configuration tabs in datacollector UI
  • SampleDsource.java: Contains stage and its configurations definitions and assigns those configurations to respective groups
  • SampleSource.java: This is the place where the actual logic to read data from the source is written

Basic custom origin stage

Now you can build the template:

$ cd example_stage
$ mvn clean package -DskipTests

Extract the tarball to SDC’s user-libs directory, restart SDC, and you should see the sample stages in the stage library

$ cd ~/streamsets-datacollector-{version}/user-libs/ 
$ tar xvfz {new project root dir}/target/example_stage-1.0-SNAPSHOT.tar.gz x example_stage/lib/example_stage-1.0-SNAPSHOT.jar  

Restart the data collector and you will be able to see sample origin in the stage library panel

 

stage_panel 

Understanding the Origin Template Code
Let’s walk through the template code, starting with Groups.java.

Groups.java

The Groups enumeration holds the label for the configuration tab. Replace the label to have the label for AWS SQS

@GenerateResourceBundle
public enum Groups implements Label {
  SQS("AWS SQS"),
  ;
  private final String label;

SampleDSource.java

Stage and Its configurations definitions

Inside SampleDSource.java define the stage and its configurations and assign those configurations to respective groups. In our case we require AWS credentials, SQS endpoint and queue name to in order to retrieve messages from SQS.

@StageDef(
    version = 1,
    label = "SQS Origin",
    description = "",
    icon = "default.png",
    execution = ExecutionMode.STANDALONE,
    recordsByRef = true,
    onlineHelpRefUrl = ""
)
@ConfigGroups(value = Groups.class)
@GenerateResourceBundle
public class SampleDSource extends SampleSource {

  @ConfigDef(
          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Access Key",
          displayPosition = 10,
          group = "SQS"
  )
  public String access_key;

  @ConfigDef(
          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "Secrete Key",
          displayPosition = 10,
          group = "SQS"
  )
  public String secrete_key;

  @ConfigDef(
      required = true,
      type = ConfigDef.Type.STRING,
      defaultValue = "",
      label = "Name",
      displayPosition = 10,
      group = "SQS"
  )
  public String queue_name;

  @ConfigDef(
          required = true,
          type = ConfigDef.Type.STRING,
          defaultValue = "",
          label = "End Point",
          displayPosition = 10,
          group = "SQS"
  )
  public String end_point;

  /** Delete message once read from Queue */
  @ConfigDef(
          required = true,
          type = ConfigDef.Type.BOOLEAN,
          defaultValue = "",
          label = "Delete Message",
          displayPosition = 10,
          group = "SQS"
  )
  public Boolean delete_flag;


  /** {@inheritDoc} */
  @Override
  public String getEndPoint() {
    return end_point;
  }

  /** {@inheritDoc} */
  @Override
  public String getQueueName() {
    return queue_name;
  }


  /** {@inheritDoc} */
  @Override
  public String getAccessKey() {
    return access_key;
  }

  /** {@inheritDoc} */
  @Override
  public String getSecreteKey() {
    return secrete_key;
  }

  /** {@inheritDoc} */
  @Override
  public Boolean getDeleteFlag() {
    return delete_flag;
  }
}

SampleSource.java

Read configurations and implement actual logic to read messages  from origin

Source extend BaseSource Interface from Streamset API

public abstract class SampleSource extends BaseSource {

An abstract method allows the source to get configuration data from its subclass:

The SampleSource class uses SampleDsource sub class to get access to the UI configurations. Remove the getConfig method with following methods

/**
 * Gives access to the UI configuration of the stage provided by the {@link SampleDSource} class.
 */
public abstract String getEndPoint();
public abstract String getQueueName();
public abstract String getAccessKey();
public abstract String getSecreteKey();
public abstract Boolean getDeleteFlag();

Validate Pipeline Configuration

SDC calls the init() method when validating and running a pipeline. The sample shows how to report configuration errors

@Override
protected List<ConfigIssue> init() {
    // Validate configuration values and open any required resources.
    List<ConfigIssue> issues = super.init();

    if (getEndPoint().isEmpty() || getQueueName().isEmpty() || getAccessKey().isEmpty() || getSecreteKey().isEmpty()) {
        issues.add(
                getContext().createConfigIssue(
                        Groups.SQS.name(), "config", Errors.SAMPLE_00, "Povide required parameters.."
                )
        );
    }

    // If issues is not empty, the UI will inform the user of each configuration issue in the list.
    return issues;
}

SDC calls destroy() during validation, and when a pipeline is stopped

/**
 * {@inheritDoc}
 */
@Override
public void destroy() {
    // Clean up any open resources.
    super.destroy();
}

Put custom logic to read data from source system

Produce method is one where we write the actual logic to read the data from source system. Replace the code with following code logic to read messages from SQS

public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) throws StageException {
    // Offsets can vary depending on the data source. Here we use an integer as an example only.
    long nextSourceOffset = 0;
    if (lastSourceOffset != null) {
        nextSourceOffset = Long.parseLong(lastSourceOffset);
    }

    int numRecords = 0;

    // Create records and add to batch. Records must have a string id. This can include the source offset
    // or other metadata to help uniquely identify the record itself.

    AWSSQSUtil awssqsUtil = new AWSSQSUtil(getAccessKey(),getSecreteKey(),getQueueName(),getEndPoint());

    String queuName = awssqsUtil.getQueueName();
    String queueUrl = awssqsUtil.getQueueUrl(queuName);

    //maximum number of meesage that can be retrieve in one request
    int maxMessagCount = 10;

        List<Message> messages = awssqsUtil.getMessagesFromQueue(queueUrl,maxMessagCount);
        for (Message message : messages) {
            Record record = getContext().createRecord("messageId::" + message.getMessageId());
            Map<String, Field> map = new HashMap<>();
            map.put("receipt_handle", Field.create(message.getReceiptHandle()));
            map.put("md5_of_body", Field.create(message.getMD5OfBody()));
            map.put("body", Field.create(message.getBody()));

            JSONObject attributeJson = new JSONObject();

            for (Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
                attributeJson.put(entry.getKey(), entry.getValue());
            }

            map.put("attribute_list", Field.create(attributeJson.toString()));

            record.set(Field.create(map));
            batchMaker.addRecord(record);
            ++nextSourceOffset;
            ++numRecords;
            if(getDeleteFlag()){
                awssqsUtil.deleteMessageFromQueue(queueUrl,message);
            }
        }
    return String.valueOf(nextSourceOffset);
}

Errors.java

Create custom Errors messages

To create stage specific error messages implement ErrorCode Interface

@GenerateResourceBundle
public enum Errors implements ErrorCode {

  SAMPLE_00("A configuration is invalid because: {}"),
  SAMPLE_01("Specific reason writing record failed: {}"),
  ;
  private final String msg;

  Errors(String msg) {
    this.msg = msg;
  }

  /** {@inheritDoc} */
  @Override
  public String getCode() {
    return name();
  }

  /** {@inheritDoc} */
  @Override
  public String getMessage() {
    return msg;
  }
}

Create the pipeline with custom origin

Follow the Build, Extract and Restart phase as done earlier and create the pipeline using the SQS Origin and provide configuration values. The pipeline will read click logs from SQS and extracts out the clicks which have been made from a particular browser and write it to the loca file system.

screen-shot-2016-11-24-at-3-05-06-pm

screen-shot-2016-11-24-at-3-16-16-pm

 

Run the pipeline and you will see the messages streaming from the SQS queue.

screen-shot-2016-11-24-at-3-26-20-pm

 

Congratulations!!! You have successfully created your first customized origin stage.

Kafka – A great choice for large scale event processing

Kafka is a highly scalable, highly available queuing system, which is built to handle huge message throughput at lightning-fast speeds. Clairvoyant team has used Kafka as a core part of architecture in a production environment and overall, we were quite satisfied with the results, but there are still a few caveats to bear in mind.

The backdrop: For one of the largest health insurance companies in America, we had to build an archival system to house electronic copies of all documents produced for their members, and provide on demand search and retrieval of those documents based metadata and text content.

The solution: We built a system where events (which correspond to new documents produced) move through various queues. So, that the system can process them without getting overwhelmed and is scalable to handle huge number of documents that gets produced daily for all their members. Our messaging system of choice was…drum roll please…Kafka!

The diagram below gives a high level overview of the architecture. An external system makes a Rest call to the Queue API with the File Information and its metadata. The Queue API acts as a publisher and publishes the metadata to different Kafka topics. The consumers for these topics process the messages further and save the File (electronic copy) and metadata to object storage and MySQL/ES respectively.

ingestion-6-768x430

The good: Kafka is designed as a distributed system, and it is fast, scalable, partitioned and replicated. It can handle messages at insane scale. http://thenewstack.io/streaming-data-at-linkedin-apache-kafka-reaches-1-1-trillion-messages-per-day/ ‘Nuff said. And it’s open source. And it’s fairly easy to get up and running in a production environment. And the documentation is great. It is built around the concepts of consumer groups and replicated topics, which make the system highly available and provide easy horizontal scalability.

The bad: Kafka is a little weak operationally. Which is to say that it doesn’t have robust native mechanisms for things like message monitoring. Though a lot of new open sourced monitoring tools are coming up as Kafka is becoming popular; we’ll talk about these later. Kafka is a commit log and it is expected that it is job of the consumer to handle the failed messages by putting the messages in a dead letter queue or publishing the messages back to the queue for retrying.

The meat: There’s nothing we can say here about the internals of Kafka which isn’t already explained clearly in its excellent documentation (http://kafka.apache.org/documentation.html). What’s typically more interesting is someone’s experience of deploying a technology into a production environment and gotchas that were encountered along the way.

Hardware and scale

 We were running a 3-node cluster where each server had 4 x 2.7GHz cores, 16 GB of memory and 1 TB of hard disk space. Kafka persist messages on disk, so hard disk space is crucial.The number of messages we processed was on the order of about 3 million per day (yup…pretty low compared to the big boys). Pretty much all of this volume was funneled through 3 topics (a million a piece).

The Zookeeper/Kafka broker config

Each node was running an instance of Zookeeper (3.4.6) and Kafka (started with 0.8.2.1 and later updated to 0.10.0.0). Here’s how we set it up.

Zookeeper Setup

Version 3.4.6: http://apache.org/dist/zookeeper/zookeeper-3.4.6/

Unpack it: tar xvf zookeeper-3.4.6.tar.gz

Server 1 (ports: 2181, 2888, 3888)
cp conf/zoo_sample.cfg conf/zoo.cfg
Update conf/zoo.cfg as such:(Update existing property) dataDir=/opt/kafkafs/zookeeperdata/1
(Create new property) server.1=<host1 name>:2888:3888
(Create new property) server.2=<host2 name>:2889:3889
(Create new property) server.3=<host3 name>:2890:3890
Create "myid" file mkdir /opt/kafkafs/zookeeperdata
mkdir /opt/kafkafs/zookeeperdata/1
Create a file in here called "myid" with simply one character in it: 1

Server 2 (ports: 2182, 2889, 3889)
cp conf/zoo_sample.cfg conf/zoo.cfg
Update conf/zoo.cfg as such:(Update existing property) dataDir=/opt/kafkafs/zookeeperdata/2
(Update existing property) clientPort=2182
(Create new property) server.1=<host1 name>:2888:3888
(Create new property) server.2=<host2 name>:2889:3889
(Create new property) server.3=<host3 name>:2890:3890
Create "myid" file mkdir /opt/kafkafs/zookeeperdata
mkdir /opt/kafkafs/zookeeperdata/2
Create a file in here called "myid" with simply one character in it: 2

Server 3 (ports: 2183, 2890, 3890)
cp conf/zoo_sample.cfg conf/zoo.cfg
Update conf/zoo.cfg as such:(Update existing property) dataDir=/opt/kafkafs/zookeeperdata/3
(Update existing property) clientPort=2183
(Create new property) server.1=<host1 name>:2888:3888
(Create new property) server.2=<host2 name>:2889:3889
(Create new property) server.3=<host3 name>:2890:3890
Create "myid" file mkdir /opt/kafkafs/zookeeperdata
mkdir /opt/kafkafs/zookeeperdata/3
Create a file in here called "myid" with simply one character in it: 3

Start server: bin/zkServer.sh start zoo.cfg
Stop server: bin/zkServer.sh stop zoo.cfg

Kafka setup

 Version kafka_2.10-0.10.0.0: http://download.nextag.com/apache/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz

Unpack it: tar xvf kafka_2.10-0.10.0.0.tgz

Server 1 (ports: 9092)
Update existing config/server.properties as such:
 listeners=PLAINTEXT://<host1 name>:9092
 log.dir=/opt/kafkafs/kafka-logs // location where message will stored
 zookeeper.connect=<host1 name>:2181,<host2 name>:2182,<host3 name>:2183
Create new Properties: 
 delete.topic.enable=true // Enable delete topic command
 auto.create.topics.enable=false // Disable auto topics creation
 max.session.timeout.ms= 600000 // No consumer can have more session.timeout.ms than this
Create data directory: mkdir /opt/kafkafs/kafka-logs

Server 2 (ports: 9093)
Update existing config/server.properties as such:
 id=1
 listeners=PLAINTEXT://<host2 name>:9093
 log.dir=/opt/kafkafs/kafka-logs
 zookeeper.connect=<host1 name>:2181,<host2 name>:2182,<host3 name>:2183
Create new Properties: 
 delete.topic.enable=true
 auto.create.topics.enable=false
 max.session.timeout.ms= 600000
Create data directory: mkdir /opt/kafkafs/kafka-logs

Server 3 (ports: 9094)
Update existing config/server.properties as such:
 id=2
 listeners=PLAINTEXT://<host3 name>:9094
 log.dir=/opt/kafkafs/kafka-logs
 zookeeper.connect=<host1 name>:2181,<host2 name>:2182,<host3 name>:2183
Create new Properties: 
 delete.topic.enable=true
 auto.create.topics.enable=false
 max.session.timeout.ms= 600000
Create data directory: mkdir /opt/kafkafs/kafka-logs

Start server: bin/kafka-server-start.sh config/server.properties &
Stop server: bin/kafka-server-stop.sh config/server.properties

Topics and partitions

We had about 15 queues, each with 6 partitions and a replication factor of 2. Here is an example of how to create such a topic:

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 2 –partitions 6 –topic new.topic

Based on our fault-tolerance needs, we decided that each topic would be replicated once (this is specified as a replication factor of 2, which means 1 master and 1 replica). The number of partitions is important as this decides the max number of active consumers in a consumer group and the max capacity of parallel consumption you will be able to have for a particular topic.

Producers and consumers

 Our Java application was built using micro-services architecture. Out of a total of 11 micro-services, 9 produced events, and 5 of the services were consumers. We started out using Kafka 0.8.2.1 with the Java producer and high-level consumer with spring integration. We subsequently upgraded to Kafka 0.10.0.0, in which a unified consumer was released (the high-level and simple consumer were merged into a unified consumer).

Java producer

 Below are the Kafka producer configs we used:

//Create the properties for Kafka consumer
Properties properties = new Properties();

//A list of host/port pairs to use for establishing the initial connection to the Kafka cluster
properties.put("bootstrap.servers", bootstrapServer);

//Serializer class for key that implements the Serializer interface
properties.put("key.serializer", StringSerializer.class.getCanonicalName());

//Serializer class for key that implements the Serializer interface
properties.put("value.serializer", EventSerializer.class.getCanonicalName());
properties.put(“retries”, 5);

//The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.

//properties.put("batch.size", 12345);
KafkaProducer producer = new KafkaProducer(properties);

while (true) {
 try {
 ProducerRecord producerRecord = new ProducerRecord < > (topicName, partitionnumber, key, event);
 //Publish the message
 Future < RecordMetadata > recordMetadata = producer.send(producerRecord);
 //Below statement will hold up until it gets response for record send
 RecordMetadata r = recordMetadata.get();
 } catch (Exception exception) {
 System.out.println("Exception.. Retrying to publish to the queue directly for topic ");
 }
}

Java consumer

 A lot of properties need to be configured for the Kafka consumer. Before deciding their values, it is advisable to review the Kafka documentation on each property. A note on how Kafka handles consumers: It uses the concept of “consumer groups”. A consumer group consists of multiple consumers amongst which messages are distributed. For example, say you have a topic with 6 partitions, and 3 consumers all consuming messages from that topic. Each consumer will get assigned 2 partitions to consume from, and messages are distributed accordingly. This gives Kafka the power of parallel consumption and fault tolerance (as partitions can be replicated across consumers). Also, another option is to set up multiple consumer groups which are subscribed to the same topic, in which case all messages are sent to both groups.

Below is how we configured the consumers:

 Properties props = new Properties();

//A list of host/port pairs to use for establishing the initial connection to the Kafka cluster
props.put("bootstrap.servers", “localhost:9092”);

//A unique string that identifies the consumer group this consumer belongs to
props.put("group.id", “test-groupid”);

//Deserializer class for key that implements the Deserializer interface
props.put("key.deserializer", StringDeserializer.class.getName());

//Deserializer class for value that implements the Deserializer interface
props.put("value.deserializer", ObjectSerializer.class.getName());

//What to do when there is no initial offset in Kafka. Setting it to earliest will ensure message consumption from last not committed offset.
props.put("auto.offset.reset", “earliest”);

//This property is a very useful one and can really help us make sure we process all the messages without loosing any. 
//If true the consumer's offset will be periodically committed in the background. We handled message commit ourself, because we wanted 
//to be sure that we commit message only after we consumed and processed it successfully.
props.put("enable.auto.commit", false);

//This is one of the most important properties of consumer. 
//Consumer has to poll at least once in “session.timeout.ms” time otherwise, consumer group coordinator will mark it as dead. 
//So basically we kept the timeout, a bit high then time needed for consumption and process of consumed event in poll.
props.put("session.timeout.ms", sessionTimeout);

//This has to be more the session.timeout.ms
props.put("request.timeout.ms", requestTimeout);

//This has be less then “session.timeout.ms”, it is advised to keep it 1/3rd of value of “session.timeout.ms”.
props.put("heartbeat.interval.ms", heartBeatInterval);

//Max number of messages to be polled from topic partitions in each poll
props.put(“max.poll.records",maxMessages);

try {
// Create consumer with properties and subscribe to topic

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(“testtopic”));

while (true) {

//The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
//immediately with any records that are available now

ConsumerRecords<Object, Object> records=consumer.poll(polltime);

//Process messages it should be able complete processing before “session.timeout.ms"
//or control the “max.poll.records” to poll few events in each poll.

processmessage(records);

//commit the messages if we are done to successful processing
consumer.commitSync();
}
} catch (Exception e) {

logger.error("Closing Consumer : Unhandled exception in while consuming messages ",e);

} finally {
consumer.close()
}

Spring Integration

A special note here: Initially, we didn’t use the Java Kafka APIs as is; we used the Spring-Kafka adapter. This integration had pros and cons; on balance, we found it to be less intuitive and less flexible than using the Kafka APIs directly. On the upside, it provides a simple xml structure to configure a number of the consumer/producer properties. On the flip-side, however, it doesn’t support all features that the actual Kafka API provides. So we later moved to the Kafka APIs for consumers and producers.

Monitoring

There are some good open-source Kafka console monitors that are available; to name a few:

Developed by Yahoo: https://github.com/yahoo/kafka-manager

Developed by Linkedin: https://github.com/linkedin/kafka-monitor

However, we had a few specific requirements. We wanted to be able to send emails when a node is down, and we also wanted to run a job that constantly checks queue lag (the number of messages remaining to be consumed). If messages were being processed at a suboptimal rate, we wanted an email notification for that too. We had to build our own tool for this, and it ended being very helpful in understanding the status of the consumers in production. These are the APIs we used to get the information we needed:

  • admin.TopicCommand.listTopics : to list all the topics
  • admin.TopicCommand.describeTopic : gives us information about partitions in a topic and their corresponding leaders and followers
  • admin.ConsumerGroupCommand.main : gives us information about a particular consumer, the offset that is committed, and the lag

The verdict

 Kafka is a great choice for handling messages at huge scale. Obviously it is; if it’s the choice for a host of companies, and if it’s only gaining a wider and wider foothold, it must be doing things right. We saw the strengths of this system play out in real-time (even though we did wrestle with one or two issues along the way). Kafka should be considered an essential part of any company’s technology toolkit.