Streamsets Data Collector:
StreamSets Data Collector is a lightweight and powerful engine that streams data in real time. It allows you to build continuous data pipelines, each of which consumes record-oriented data from a single origin, optionally operates on those records in one or more processors and writes data to one or more destinations.
Streamsets Origin Stage:
To define the flow of data for Data Collector, you configure a pipeline. A pipeline consists of stages that represents the origin and destination of the pipeline and any additional processing that you want to perform.
An origin stage represents the source for the pipeline.
For example, this pipeline, based on the SDC taxi data tutorial https://streamsets.com/documentation/datacollector/latest/help/#Tutorial/Overview.html which uses the Directory origin, four processors and the Hadoop File System destination:
Stremsets comes bundled with many origin stage components to connect with almost all commonly used data sources and if you don’t find one for your source system, don’t worry Streamsets APIs are there to help you in creating a customized origin stage for your system.
This blog explains how to get started writing your own custom Streamsets Origin stage to stream records from Amazon SQS(Simple Queue Service).
Requirements:
- Java Installed
- IDE(Eclipse/Intellij) setup
- Streamset data collector
Creating and building the origin template
Follow the Streamset Datacollector documentation to download, install and run StreamSets Data Collector.
You will also need to download source for the Data Collector and its API. Just make sure that you have matching versions for the runtime and source, so you might find it easier to download tarballs from the relevant GitHub release pages rather than using git clone:
- https://github.com/streamsets/datacollector/releases
- https://github.com/streamsets/datacollector-api/releases
Build both the Data Collector and its API:
$ cd datacollector-api
$ mvn clean install -DskipTests ...output omitted...
$ cd ../datacollector
$ mvn clean install -DskipTests ...output omitted...
Maven puts the library JARs in its repository, so they’re available when we build our custom origin:
Create Skeleton Project:
Now create a new custom stage project using the Maven archetype:
$ mvn archetype:generate -DarchetypeGroupId=com.streamsets -DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial -DarchetypeVersion={version} -DinteractiveMode=true
The above command uses streamsets-datacollector-stage-lib-tutorial maven archetype to create the skeleton project and this is the easiest way to get started developing your own stages.
Provide values for property groupId, artifactId, version and package
Maven generates a template project from the archetype in a directory with the artifactId you provided as its name. As you can see, there is template code for an origin, a processor and a destination:
Origin template classes:
In the above figure following are the important classes under Origin stage:
- Groups.java: Responsible to hold the labels for the configuration tabs in datacollector UI
- SampleDsource.java: Contains stage and its configurations definitions and assigns those configurations to respective groups
- SampleSource.java: This is the place where the actual logic to read data from the source is written
Basic custom origin stage
Now you can build the template:
$ cd example_stage
$ mvn clean package -DskipTests
Extract the tarball to SDC’s user-libs directory, restart SDC, and you should see the sample stages in the stage library
$ cd ~/streamsets-datacollector-{version}/user-libs/
$ tar xvfz {new project root dir}/target/example_stage-1.0-SNAPSHOT.tar.gz x example_stage/lib/example_stage-1.0-SNAPSHOT.jar
Restart the data collector and you will be able to see sample origin in the stage library panel
Understanding the Origin Template Code
Let’s walk through the template code, starting with Groups.java.
Groups.java
The Groups enumeration holds the label for the configuration tab. Replace the label to have the label for AWS SQS
@GenerateResourceBundle public enum Groups implements Label { SQS("AWS SQS"), ; private final String label;
SampleDSource.java
Stage and Its configurations definitions
Inside SampleDSource.java define the stage and its configurations and assign those configurations to respective groups. In our case we require AWS credentials, SQS endpoint and queue name to in order to retrieve messages from SQS.
@StageDef( version = 1, label = "SQS Origin", description = "", icon = "default.png", execution = ExecutionMode.STANDALONE, recordsByRef = true, onlineHelpRefUrl = "" ) @ConfigGroups(value = Groups.class) @GenerateResourceBundle public class SampleDSource extends SampleSource { @ConfigDef( required = true, type = ConfigDef.Type.STRING, defaultValue = "", label = "Access Key", displayPosition = 10, group = "SQS" ) public String access_key; @ConfigDef( required = true, type = ConfigDef.Type.STRING, defaultValue = "", label = "Secrete Key", displayPosition = 10, group = "SQS" ) public String secrete_key; @ConfigDef( required = true, type = ConfigDef.Type.STRING, defaultValue = "", label = "Name", displayPosition = 10, group = "SQS" ) public String queue_name; @ConfigDef( required = true, type = ConfigDef.Type.STRING, defaultValue = "", label = "End Point", displayPosition = 10, group = "SQS" ) public String end_point; /** Delete message once read from Queue */ @ConfigDef( required = true, type = ConfigDef.Type.BOOLEAN, defaultValue = "", label = "Delete Message", displayPosition = 10, group = "SQS" ) public Boolean delete_flag; /** {@inheritDoc} */ @Override public String getEndPoint() { return end_point; } /** {@inheritDoc} */ @Override public String getQueueName() { return queue_name; } /** {@inheritDoc} */ @Override public String getAccessKey() { return access_key; } /** {@inheritDoc} */ @Override public String getSecreteKey() { return secrete_key; } /** {@inheritDoc} */ @Override public Boolean getDeleteFlag() { return delete_flag; } }
SampleSource.java
Read configurations and implement actual logic to read messages from origin
Source extend BaseSource Interface from Streamset API
public abstract class SampleSource extends BaseSource {
An abstract method allows the source to get configuration data from its subclass:
The SampleSource class uses SampleDsource sub class to get access to the UI configurations. Remove the getConfig method with following methods
/** * Gives access to the UI configuration of the stage provided by the {@link SampleDSource} class. */ public abstract String getEndPoint(); public abstract String getQueueName(); public abstract String getAccessKey(); public abstract String getSecreteKey(); public abstract Boolean getDeleteFlag();
Validate Pipeline Configuration
SDC calls the init() method when validating and running a pipeline. The sample shows how to report configuration errors
@Override protected List<ConfigIssue> init() { // Validate configuration values and open any required resources. List<ConfigIssue> issues = super.init(); if (getEndPoint().isEmpty() || getQueueName().isEmpty() || getAccessKey().isEmpty() || getSecreteKey().isEmpty()) { issues.add( getContext().createConfigIssue( Groups.SQS.name(), "config", Errors.SAMPLE_00, "Povide required parameters.." ) ); } // If issues is not empty, the UI will inform the user of each configuration issue in the list. return issues; }
SDC calls destroy() during validation, and when a pipeline is stopped
/** * {@inheritDoc} */ @Override public void destroy() { // Clean up any open resources. super.destroy(); }
Put custom logic to read data from source system
Produce method is one where we write the actual logic to read the data from source system. Replace the code with following code logic to read messages from SQS
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) throws StageException { // Offsets can vary depending on the data source. Here we use an integer as an example only. long nextSourceOffset = 0; if (lastSourceOffset != null) { nextSourceOffset = Long.parseLong(lastSourceOffset); } int numRecords = 0; // Create records and add to batch. Records must have a string id. This can include the source offset // or other metadata to help uniquely identify the record itself. AWSSQSUtil awssqsUtil = new AWSSQSUtil(getAccessKey(),getSecreteKey(),getQueueName(),getEndPoint()); String queuName = awssqsUtil.getQueueName(); String queueUrl = awssqsUtil.getQueueUrl(queuName); //maximum number of meesage that can be retrieve in one request int maxMessagCount = 10; List<Message> messages = awssqsUtil.getMessagesFromQueue(queueUrl,maxMessagCount); for (Message message : messages) { Record record = getContext().createRecord("messageId::" + message.getMessageId()); Map<String, Field> map = new HashMap<>(); map.put("receipt_handle", Field.create(message.getReceiptHandle())); map.put("md5_of_body", Field.create(message.getMD5OfBody())); map.put("body", Field.create(message.getBody())); JSONObject attributeJson = new JSONObject(); for (Map.Entry<String, String> entry : message.getAttributes().entrySet()) { attributeJson.put(entry.getKey(), entry.getValue()); } map.put("attribute_list", Field.create(attributeJson.toString())); record.set(Field.create(map)); batchMaker.addRecord(record); ++nextSourceOffset; ++numRecords; if(getDeleteFlag()){ awssqsUtil.deleteMessageFromQueue(queueUrl,message); } } return String.valueOf(nextSourceOffset); }
Errors.java
Create custom Errors messages
To create stage specific error messages implement ErrorCode Interface
@GenerateResourceBundle public enum Errors implements ErrorCode { SAMPLE_00("A configuration is invalid because: {}"), SAMPLE_01("Specific reason writing record failed: {}"), ; private final String msg; Errors(String msg) { this.msg = msg; } /** {@inheritDoc} */ @Override public String getCode() { return name(); } /** {@inheritDoc} */ @Override public String getMessage() { return msg; } }
Create the pipeline with custom origin
Follow the Build, Extract and Restart phase as done earlier and create the pipeline using the SQS Origin and provide configuration values. The pipeline will read click logs from SQS and extracts out the clicks which have been made from a particular browser and write it to the loca file system.
Run the pipeline and you will see the messages streaming from the SQS queue.
Congratulations!!! You have successfully created your first customized origin stage.
Pingback: Last week in Stream Processing & Analytics 12/19/2016 | Enjoy IT - SOA, Java, Event-Driven Computing and Integration