In previous articles we have seen, how to setup Multi-Broker Apache Kafka Cluster and Zookeeper. In this article we will see how to write a kafka producer in java to write data on kafka cluster.

Maven dependencies required for Kafka Java producer

In order to write a kafka producer in java, we need to add following maven dependency (kafka-java-client) to our pom.xml.

<dependency>
	<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-clients</artifactId>
	<version>1.0.0</version>
</dependency>

Kafka Producer using kafka java client

Following class can be used as kafka producer to send messages from java application to kafka cluster, please read the code and explanation written below:
package com.tb.kafka;

import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import com.tb.constants.KafkaConstants;

public class KafkaProducer {
	Properties props;
	Producer<String, String> producer;

	public KafkaProducer(String brokerString) {
		props = new Properties();
		props.put("bootstrap.servers", brokerString);
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", KafkaConstants.KAFKA_KEY_SERIALIZER);
		props.put("value.serializer", KafkaConstants.KAFKA_VALUE_SERIALIZER);
		producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
	}

	/* send() method is asynchronous */
	public Future<RecordMetadata> sendMessage(String topic, String message) {

		Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, message));
		return future;
	}

	public void sendMessage(String topic, String key, String value) {
		producer.send(new ProducerRecord<>(topic, key, value));
	}

	public void closeProducer() {
		producer.close();
	}

}

Utility class to hold constants

package com.tb.constants;

public class KafkaConstants {
	public static String KAFKA_BROKER_STRING = 
			"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094";
	public static String KAFKA_KEY_SERIALIZER = 
			"org.apache.kafka.common.serialization.StringSerializer";
	public static String KAFKA_VALUE_SERIALIZER = 
			"org.apache.kafka.common.serialization.StringSerializer";
	public static String KAFKA_TOPIC = "TEST-1";
	public static String KAFKA_CONSUMER_GROUP = "TEST";
}

"bootstrap.servers", this property specifies the kafka cluster brokers.

"acks", this specifies the type of acknowledgement expected from kafka while writing messages, "all" specifies blocking on the full commit of the record.

"retries", this property specifies the number of retries a kafka produced should perform when commit fails, this may lead to duplicated commit as well.

"batch.size", this property specifies the memory assigned to a biffer, these buffers are generally assigned to each partition.

"linger.ms", this property tells the producer to wait for the specified milliseconds before sending data to kafka, this is useful in enhancing batching of data, linger.ms=0 will send data to kafka immediately regardless of buffer size.

"buffer.memory", this property specifies total memory to be assigned to buffer messages.

"key.serializer" and "value.serializer", are classes to be used to decode the message into bytes.

Testing kafka producer

Lets run our kafka producer from main() class and try to write few randon string messages to kafka, as shown below:
package com.tb.manager;

import com.tb.constants.KafkaConstants;
import com.tb.kafka.KafkaProducer;

public class App {
	public static void main(String[] args) {
		KafkaProducer kafkaProducer = new KafkaProducer(KafkaConstants.KAFKA_BROKER_STRING);

		/* Sending messages without a key */
		for (int i = 0; i < 10; i++)
			kafkaProducer.sendMessage(KafkaConstants.KAFKA_TOPIC, "Message : " + i);

		/* Sending messages without a key */
		for (int i = 0; i < 10; i++)
			kafkaProducer.sendMessage(KafkaConstants.KAFKA_TOPIC, "Key : " + i, "Message : " + i);

		kafkaProducer.closeProducer();
	}
}

In this article we have seen how to write a kafka producer in java to write data on kafka cluster. In coming articles we will see more about kafka producer and consumers with Java.
  • By Techburps.com
  • Jan 22, 2018
  • Big Data