Big Data

Kafka – A great choice for large scale event processing

Kafka is a highly scalable, highly available queuing system, which is built to handle huge message throughput at lightning-fast speeds. Clairvoyant team has used Kafka as a core part of architecture in a production environment and overall, we were quite satisfied with the results, but there are still a few caveats to bear in mind.

The backdrop: For one of the largest health insurance companies in America, we had to build an archival system to house electronic copies of all documents produced for their members, and provide on demand search and retrieval of those documents based metadata and text content.

The solution: We built a system where events (which correspond to new documents produced) move through various queues. So, that the system can process them without getting overwhelmed and is scalable to handle huge number of documents that gets produced daily for all their members. Our messaging system of choice was…drum roll please…Kafka!

The diagram below gives a high level overview of the architecture. An external system makes a Rest call to the Queue API with the File Information and its metadata. The Queue API acts as a publisher and publishes the metadata to different Kafka topics. The consumers for these topics process the messages further and save the File (electronic copy) and metadata to object storage and MySQL/ES respectively.

ingestion-6-768x430

The good: Kafka is designed as a distributed system, and it is fast, scalable, partitioned and replicated. It can handle messages at insane scale. http://thenewstack.io/streaming-data-at-linkedin-apache-kafka-reaches-1-1-trillion-messages-per-day/ ‘Nuff said. And it’s open source. And it’s fairly easy to get up and running in a production environment. And the documentation is great. It is built around the concepts of consumer groups and replicated topics, which make the system highly available and provide easy horizontal scalability.

The bad: Kafka is a little weak operationally. Which is to say that it doesn’t have robust native mechanisms for things like message monitoring. Though a lot of new open sourced monitoring tools are coming up as Kafka is becoming popular; we’ll talk about these later. Kafka is a commit log and it is expected that it is job of the consumer to handle the failed messages by putting the messages in a dead letter queue or publishing the messages back to the queue for retrying.

The meat: There’s nothing we can say here about the internals of Kafka which isn’t already explained clearly in its excellent documentation (http://kafka.apache.org/documentation.html). What’s typically more interesting is someone’s experience of deploying a technology into a production environment and gotchas that were encountered along the way.

Hardware and scale

 We were running a 3-node cluster where each server had 4 x 2.7GHz cores, 16 GB of memory and 1 TB of hard disk space. Kafka persist messages on disk, so hard disk space is crucial.The number of messages we processed was on the order of about 3 million per day (yup…pretty low compared to the big boys). Pretty much all of this volume was funneled through 3 topics (a million a piece).

The Zookeeper/Kafka broker config

Each node was running an instance of Zookeeper (3.4.6) and Kafka (started with 0.8.2.1 and later updated to 0.10.0.0). Here’s how we set it up.

Zookeeper Setup

Version 3.4.6: http://apache.org/dist/zookeeper/zookeeper-3.4.6/

Unpack it: tar xvf zookeeper-3.4.6.tar.gz

Server 1 (ports: 2181, 2888, 3888)
cp conf/zoo_sample.cfg conf/zoo.cfg
Update conf/zoo.cfg as such:(Update existing property) dataDir=/opt/kafkafs/zookeeperdata/1
(Create new property) server.1=<host1 name>:2888:3888
(Create new property) server.2=<host2 name>:2889:3889
(Create new property) server.3=<host3 name>:2890:3890
Create "myid" file mkdir /opt/kafkafs/zookeeperdata
mkdir /opt/kafkafs/zookeeperdata/1
Create a file in here called "myid" with simply one character in it: 1

Server 2 (ports: 2182, 2889, 3889)
cp conf/zoo_sample.cfg conf/zoo.cfg
Update conf/zoo.cfg as such:(Update existing property) dataDir=/opt/kafkafs/zookeeperdata/2
(Update existing property) clientPort=2182
(Create new property) server.1=<host1 name>:2888:3888
(Create new property) server.2=<host2 name>:2889:3889
(Create new property) server.3=<host3 name>:2890:3890
Create "myid" file mkdir /opt/kafkafs/zookeeperdata
mkdir /opt/kafkafs/zookeeperdata/2
Create a file in here called "myid" with simply one character in it: 2

Server 3 (ports: 2183, 2890, 3890)
cp conf/zoo_sample.cfg conf/zoo.cfg
Update conf/zoo.cfg as such:(Update existing property) dataDir=/opt/kafkafs/zookeeperdata/3
(Update existing property) clientPort=2183
(Create new property) server.1=<host1 name>:2888:3888
(Create new property) server.2=<host2 name>:2889:3889
(Create new property) server.3=<host3 name>:2890:3890
Create "myid" file mkdir /opt/kafkafs/zookeeperdata
mkdir /opt/kafkafs/zookeeperdata/3
Create a file in here called "myid" with simply one character in it: 3

Start server: bin/zkServer.sh start zoo.cfg
Stop server: bin/zkServer.sh stop zoo.cfg

Kafka setup

 Version kafka_2.10-0.10.0.0: http://download.nextag.com/apache/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz

Unpack it: tar xvf kafka_2.10-0.10.0.0.tgz

Server 1 (ports: 9092)
Update existing config/server.properties as such:
 listeners=PLAINTEXT://<host1 name>:9092
 log.dir=/opt/kafkafs/kafka-logs // location where message will stored
 zookeeper.connect=<host1 name>:2181,<host2 name>:2182,<host3 name>:2183
Create new Properties: 
 delete.topic.enable=true // Enable delete topic command
 auto.create.topics.enable=false // Disable auto topics creation
 max.session.timeout.ms= 600000 // No consumer can have more session.timeout.ms than this
Create data directory: mkdir /opt/kafkafs/kafka-logs

Server 2 (ports: 9093)
Update existing config/server.properties as such:
 id=1
 listeners=PLAINTEXT://<host2 name>:9093
 log.dir=/opt/kafkafs/kafka-logs
 zookeeper.connect=<host1 name>:2181,<host2 name>:2182,<host3 name>:2183
Create new Properties: 
 delete.topic.enable=true
 auto.create.topics.enable=false
 max.session.timeout.ms= 600000
Create data directory: mkdir /opt/kafkafs/kafka-logs

Server 3 (ports: 9094)
Update existing config/server.properties as such:
 id=2
 listeners=PLAINTEXT://<host3 name>:9094
 log.dir=/opt/kafkafs/kafka-logs
 zookeeper.connect=<host1 name>:2181,<host2 name>:2182,<host3 name>:2183
Create new Properties: 
 delete.topic.enable=true
 auto.create.topics.enable=false
 max.session.timeout.ms= 600000
Create data directory: mkdir /opt/kafkafs/kafka-logs

Start server: bin/kafka-server-start.sh config/server.properties &
Stop server: bin/kafka-server-stop.sh config/server.properties

Topics and partitions

We had about 15 queues, each with 6 partitions and a replication factor of 2. Here is an example of how to create such a topic:

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 2 –partitions 6 –topic new.topic

Based on our fault-tolerance needs, we decided that each topic would be replicated once (this is specified as a replication factor of 2, which means 1 master and 1 replica). The number of partitions is important as this decides the max number of active consumers in a consumer group and the max capacity of parallel consumption you will be able to have for a particular topic.

Producers and consumers

 Our Java application was built using micro-services architecture. Out of a total of 11 micro-services, 9 produced events, and 5 of the services were consumers. We started out using Kafka 0.8.2.1 with the Java producer and high-level consumer with spring integration. We subsequently upgraded to Kafka 0.10.0.0, in which a unified consumer was released (the high-level and simple consumer were merged into a unified consumer).

Java producer

 Below are the Kafka producer configs we used:

//Create the properties for Kafka consumer
Properties properties = new Properties();

//A list of host/port pairs to use for establishing the initial connection to the Kafka cluster
properties.put("bootstrap.servers", bootstrapServer);

//Serializer class for key that implements the Serializer interface
properties.put("key.serializer", StringSerializer.class.getCanonicalName());

//Serializer class for key that implements the Serializer interface
properties.put("value.serializer", EventSerializer.class.getCanonicalName());
properties.put(“retries”, 5);

//The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.

//properties.put("batch.size", 12345);
KafkaProducer producer = new KafkaProducer(properties);

while (true) {
 try {
 ProducerRecord producerRecord = new ProducerRecord < > (topicName, partitionnumber, key, event);
 //Publish the message
 Future < RecordMetadata > recordMetadata = producer.send(producerRecord);
 //Below statement will hold up until it gets response for record send
 RecordMetadata r = recordMetadata.get();
 } catch (Exception exception) {
 System.out.println("Exception.. Retrying to publish to the queue directly for topic ");
 }
}

Java consumer

 A lot of properties need to be configured for the Kafka consumer. Before deciding their values, it is advisable to review the Kafka documentation on each property. A note on how Kafka handles consumers: It uses the concept of “consumer groups”. A consumer group consists of multiple consumers amongst which messages are distributed. For example, say you have a topic with 6 partitions, and 3 consumers all consuming messages from that topic. Each consumer will get assigned 2 partitions to consume from, and messages are distributed accordingly. This gives Kafka the power of parallel consumption and fault tolerance (as partitions can be replicated across consumers). Also, another option is to set up multiple consumer groups which are subscribed to the same topic, in which case all messages are sent to both groups.

Below is how we configured the consumers:

 Properties props = new Properties();

//A list of host/port pairs to use for establishing the initial connection to the Kafka cluster
props.put("bootstrap.servers", “localhost:9092”);

//A unique string that identifies the consumer group this consumer belongs to
props.put("group.id", “test-groupid”);

//Deserializer class for key that implements the Deserializer interface
props.put("key.deserializer", StringDeserializer.class.getName());

//Deserializer class for value that implements the Deserializer interface
props.put("value.deserializer", ObjectSerializer.class.getName());

//What to do when there is no initial offset in Kafka. Setting it to earliest will ensure message consumption from last not committed offset.
props.put("auto.offset.reset", “earliest”);

//This property is a very useful one and can really help us make sure we process all the messages without loosing any. 
//If true the consumer's offset will be periodically committed in the background. We handled message commit ourself, because we wanted 
//to be sure that we commit message only after we consumed and processed it successfully.
props.put("enable.auto.commit", false);

//This is one of the most important properties of consumer. 
//Consumer has to poll at least once in “session.timeout.ms” time otherwise, consumer group coordinator will mark it as dead. 
//So basically we kept the timeout, a bit high then time needed for consumption and process of consumed event in poll.
props.put("session.timeout.ms", sessionTimeout);

//This has to be more the session.timeout.ms
props.put("request.timeout.ms", requestTimeout);

//This has be less then “session.timeout.ms”, it is advised to keep it 1/3rd of value of “session.timeout.ms”.
props.put("heartbeat.interval.ms", heartBeatInterval);

//Max number of messages to be polled from topic partitions in each poll
props.put(“max.poll.records",maxMessages);

try {
// Create consumer with properties and subscribe to topic

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(“testtopic”));

while (true) {

//The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
//immediately with any records that are available now

ConsumerRecords<Object, Object> records=consumer.poll(polltime);

//Process messages it should be able complete processing before “session.timeout.ms"
//or control the “max.poll.records” to poll few events in each poll.

processmessage(records);

//commit the messages if we are done to successful processing
consumer.commitSync();
}
} catch (Exception e) {

logger.error("Closing Consumer : Unhandled exception in while consuming messages ",e);

} finally {
consumer.close()
}

Spring Integration

A special note here: Initially, we didn’t use the Java Kafka APIs as is; we used the Spring-Kafka adapter. This integration had pros and cons; on balance, we found it to be less intuitive and less flexible than using the Kafka APIs directly. On the upside, it provides a simple xml structure to configure a number of the consumer/producer properties. On the flip-side, however, it doesn’t support all features that the actual Kafka API provides. So we later moved to the Kafka APIs for consumers and producers.

Monitoring

There are some good open-source Kafka console monitors that are available; to name a few:

Developed by Yahoo: https://github.com/yahoo/kafka-manager

Developed by Linkedin: https://github.com/linkedin/kafka-monitor

However, we had a few specific requirements. We wanted to be able to send emails when a node is down, and we also wanted to run a job that constantly checks queue lag (the number of messages remaining to be consumed). If messages were being processed at a suboptimal rate, we wanted an email notification for that too. We had to build our own tool for this, and it ended being very helpful in understanding the status of the consumers in production. These are the APIs we used to get the information we needed:

  • admin.TopicCommand.listTopics : to list all the topics
  • admin.TopicCommand.describeTopic : gives us information about partitions in a topic and their corresponding leaders and followers
  • admin.ConsumerGroupCommand.main : gives us information about a particular consumer, the offset that is committed, and the lag

The verdict

 Kafka is a great choice for handling messages at huge scale. Obviously it is; if it’s the choice for a host of companies, and if it’s only gaining a wider and wider foothold, it must be doing things right. We saw the strengths of this system play out in real-time (even though we did wrestle with one or two issues along the way). Kafka should be considered an essential part of any company’s technology toolkit.

Leave a Reply

Your email address will not be published. Required fields are marked *