In previous article we have seen, how to write a Kafka producer in java. In this article we will see how to write a kafka consumer in java with automatic offset committing to get data from kafka cluster.

Maven dependencies required for Kafka Java consumer

In order to read data from kafka using a kafka java consumer, we need to add following maven dependency (kafka-java-client) to our pom.xml.

<dependency>
	<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-clients</artifactId>
	<version>1.0.0</version>
</dependency>
Kafka consumers works in a way that only one consumer of same consumer group gets a massage from a topic's partition or we can say different consumers with same consumer group consumes messages from different partitions of a topic.

For example if a topic has 3 partitions and we have 4 consumers than ideally 3 consumers will consume from three different partitions and one consumer will left idle. In case partitions are 3 and consumers are 2 then 1 consumer will consume one partition and other one will consume data from remaining two partitions, in any case one partition is being consumed with at most one consumer of same group.

Kafka java client provides three main consumers:

1) Automatic offset commiting consumer
2) Manual offset commiting consumer
3) Manualy assign a consumer to a partition

In this article we will see how to implement a "Automatic offset committing consumer", this type of consumer the offset by which the data has been consumed in a partition from a consumer is committed to kafka automatically without programmer intervention.

Automatic offset commiting kafka java consumer

Following class can be used as "Automatic offset committing kafka java consumer, please read the code and explanation written below:
package com.til.kafka.consumer;

import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import com.tb.constants.KafkaConstants;

//Automatic Offset Committing
public class AOCKafkaConsumer {
	Properties props;
	KafkaConsumer<String, String> consumer;

	public AOCKafkaConsumer(String brokerString) {
		props = new Properties();
		props.put("bootstrap.servers", brokerString);
		props.put("group.id", KafkaConstants.KAFKA_CONSUMER_GROUP);
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("key.deserializer", KafkaConstants.KAFKA_KEY_SERIALIZER);
		props.put("value.deserializer", KafkaConstants.KAFKA_VALUE_SERIALIZER);
		consumer = new KafkaConsumer<>(props);
	}

	public void subscribe(List<String> topics) {
		consumer.subscribe(topics);
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records)
				System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
		}
	}
}

Utility class to hold constants

package com.tb.constants;

public class KafkaConstants {
	public static String KAFKA_BROKER_STRING = 
			"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094";
	public static String KAFKA_KEY_SERIALIZER = 
			"org.apache.kafka.common.serialization.StringSerializer";
	public static String KAFKA_VALUE_SERIALIZER = 
			"org.apache.kafka.common.serialization.StringSerializer";
	public static String KAFKA_TOPIC = "TEST-1";
	public static String KAFKA_CONSUMER_GROUP = "TEST";
}

"bootstrap.servers", this property specifies the kafka cluster brokers.

"group.id", this specifies the consumer group, only one consumer of a group gets a message from a topic's partition.

"enable.auto.commit", this property specifies that the offset by which the data has been consumed in a partition from a consumer is committed to kafka automatically without programmer intervention.

"auto.commit.interval.ms", this property specifies time in ms to denote how often, last consumed offset by a consumer should be committed to kafka.

"key.serializer" and "value.serializer", are classes to be used to decode the message into bytes.

Testing kafka consumer

Lets run our kafka consumer from main() class and try to read data:
package com.tb.manager;

import java.util.Arrays;

import com.tb.constants.KafkaConstants;
import com.til.kafka.consumer.AOCKafkaConsumer;

public class App {
	public static void main(String[] args) {

		// This will start a consumer in new thread
		new Thread(new Runnable() {
			@Override
			public void run() {
				AOCKafkaConsumer kafkaConsumer = new AOCKafkaConsumer(KafkaConstants.KAFKA_BROKER_STRING);
				kafkaConsumer.subscribe(Arrays.asList(KafkaConstants.KAFKA_TOPIC));

			}
		}).start();
	}
}

In this article we have seen how to write a "Automatic offset commingling consumer" in java. In coming articles we will see more about kafka producers and consumers in Java.
  • By Techburps.com
  • Jan 22, 2018
  • Big Data