diff --git a/build.gradle b/build.gradle
index 224f550403cae..b25aa953a5af6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -259,6 +259,7 @@ project(':core') {
println "Building project 'core' with Scala version $resolvedScalaVersion"
apply plugin: 'scala'
+ apply plugin: 'checkstyle'
archivesBaseName = "kafka_${baseScalaVersion}"
dependencies {
@@ -334,8 +335,6 @@ project(':core') {
into 'site-docs'
tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
into "kafka_${baseScalaVersion}-${version}"
compression = Compression.GZIP
@@ -390,15 +389,27 @@ project(':core') {
into "$buildDir/dependant-testlibs"
+ checkstyle {
+ configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+ configProperties = [importControlFile: "$rootDir/checkstyle/import-control-core.xml"]
+ }
+ test.dependsOn('checkstyleMain', 'checkstyleTest')
project(':examples') {
+ apply plugin: 'checkstyle'
archivesBaseName = "kafka-examples"
dependencies {
compile project(':core')
+ checkstyle {
+ configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+ configProperties = [importControlFile: "$rootDir/checkstyle/import-control-core.xml"]
+ }
+ test.dependsOn('checkstyleMain', 'checkstyleTest')
project(':clients') {
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
new file mode 100644
index 0000000000000..d53e9e8734444
--- /dev/null
+++ b/checkstyle/import-control-core.xml
@@ -0,0 +1,69 @@
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index 444cd1d4b34e6..4de2a4c768afd 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -17,72 +17,72 @@
package kafka.javaapi.consumer;
-import java.util.List;
-import java.util.Map;
import kafka.common.OffsetAndMetadata;
import kafka.common.TopicAndPartition;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.serializer.Decoder;
+import java.util.List;
+import java.util.Map;
public interface ConsumerConnector {
- /**
- * Create a list of MessageStreams of type T for each topic.
- *
- * @param topicCountMap a map of (topic, #streams) pair
- * @param keyDecoder a decoder that decodes the message key
- * @param valueDecoder a decoder that decodes the message itself
- * @return a map of (topic, list of KafkaStream) pairs.
- * The number of items in the list is #streams. Each stream supports
- * an iterator over message/metadata pairs.
- */
- public Map>>
- createMessageStreams(Map topicCountMap, Decoder keyDecoder, Decoder valueDecoder);
- public Map>> createMessageStreams(Map topicCountMap);
+ /**
+ * Create a list of MessageStreams of type T for each topic.
+ *
+ * @param topicCountMap a map of (topic, #streams) pair
+ * @param keyDecoder a decoder that decodes the message key
+ * @param valueDecoder a decoder that decodes the message itself
+ * @return a map of (topic, list of KafkaStream) pairs.
+ * The number of items in the list is #streams. Each stream supports
+ * an iterator over message/metadata pairs.
+ */
+ public Map>>
+ createMessageStreams(Map topicCountMap, Decoder keyDecoder, Decoder valueDecoder);
+ public Map>> createMessageStreams(Map topicCountMap);
+ /**
+ * Create a list of MessageAndTopicStreams containing messages of type T.
+ *
+ * @param topicFilter a TopicFilter that specifies which topics to
+ * subscribe to (encapsulates a whitelist or a blacklist).
+ * @param numStreams the number of message streams to return.
+ * @param keyDecoder a decoder that decodes the message key
+ * @param valueDecoder a decoder that decodes the message itself
+ * @return a list of KafkaStream. Each stream supports an
+ * iterator over its MessageAndMetadata elements.
+ */
+ public List>
+ createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder keyDecoder, Decoder valueDecoder);
+ public List> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
+ public List> createMessageStreamsByFilter(TopicFilter topicFilter);
- /**
- * Create a list of MessageAndTopicStreams containing messages of type T.
- *
- * @param topicFilter a TopicFilter that specifies which topics to
- * subscribe to (encapsulates a whitelist or a blacklist).
- * @param numStreams the number of message streams to return.
- * @param keyDecoder a decoder that decodes the message key
- * @param valueDecoder a decoder that decodes the message itself
- * @return a list of KafkaStream. Each stream supports an
- * iterator over its MessageAndMetadata elements.
- */
- public List>
- createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder keyDecoder, Decoder valueDecoder);
- public List> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
- public List> createMessageStreamsByFilter(TopicFilter topicFilter);
+ /**
+ * Commit the offsets of all broker partitions connected by this connector.
+ */
+ public void commitOffsets();
- /**
- * Commit the offsets of all broker partitions connected by this connector.
- */
- public void commitOffsets();
- public void commitOffsets(boolean retryOnFailure);
+ public void commitOffsets(boolean retryOnFailure);
- /**
- * Commit offsets using the provided offsets map
- *
- * @param offsetsToCommit a map containing the offset to commit for each partition.
- * @param retryOnFailure enable retries on the offset commit if it fails.
- */
- public void commitOffsets(Map offsetsToCommit, boolean retryOnFailure);
+ /**
+ * Commit offsets using the provided offsets map
+ *
+ * @param offsetsToCommit a map containing the offset to commit for each partition.
+ * @param retryOnFailure enable retries on the offset commit if it fails.
+ */
+ public void commitOffsets(Map offsetsToCommit, boolean retryOnFailure);
- /**
- * Wire in a consumer rebalance listener to be executed when consumer rebalance occurs.
- * @param listener The consumer rebalance listener to wire in
- */
- public void setConsumerRebalanceListener(ConsumerRebalanceListener listener);
+ /**
+ * Wire in a consumer rebalance listener to be executed when consumer rebalance occurs.
+ * @param listener The consumer rebalance listener to wire in
+ */
+ public void setConsumerRebalanceListener(ConsumerRebalanceListener listener);
- /**
- * Shut down the connector
- */
- public void shutdown();
+ /**
+ * Shut down the connector
+ */
+ public void shutdown();
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index f19df0cf5edd3..b1ab64974128f 100755
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -17,10 +17,15 @@
package kafka.tools;
-import joptsimple.*;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
+import org.apache.kafka.common.utils.Utils;
import java.io.File;
import java.io.FileInputStream;
@@ -38,8 +43,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.common.utils.Utils;
* This is a kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally,
@@ -58,429 +61,427 @@
* the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
@SuppressWarnings({"unchecked", "rawtypes"})
-public class KafkaMigrationTool
- private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
- private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
- private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
- private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
- private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
- private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
- private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
- private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
- private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
- private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
- private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
- private static Class> KafkaStaticConsumer_07 = null;
- private static Class> ConsumerConfig_07 = null;
- private static Class> ConsumerConnector_07 = null;
- private static Class> KafkaStream_07 = null;
- private static Class> TopicFilter_07 = null;
- private static Class> WhiteList_07 = null;
- private static Class> BlackList_07 = null;
- private static Class> KafkaConsumerIteratorClass_07 = null;
- private static Class> KafkaMessageAndMetatDataClass_07 = null;
- private static Class> KafkaMessageClass_07 = null;
- public static void main(String[] args) throws InterruptedException, IOException {
- OptionParser parser = new OptionParser();
- ArgumentAcceptingOptionSpec consumerConfigOpt
- = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.")
- .withRequiredArg()
- .describedAs("config file")
- .ofType(String.class);
- ArgumentAcceptingOptionSpec producerConfigOpt
- = parser.accepts("producer.config", "Producer config.")
- .withRequiredArg()
- .describedAs("config file")
- .ofType(String.class);
- ArgumentAcceptingOptionSpec numProducersOpt
- = parser.accepts("num.producers", "Number of producer instances")
- .withRequiredArg()
- .describedAs("Number of producers")
- .ofType(Integer.class)
- .defaultsTo(1);
- ArgumentAcceptingOptionSpec zkClient01JarOpt
- = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
- .withRequiredArg()
- .describedAs("zkClient 0.1 jar file required by Kafka 0.7")
- .ofType(String.class);
- ArgumentAcceptingOptionSpec kafka07JarOpt
- = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
- .withRequiredArg()
- .describedAs("kafka 0.7 jar")
- .ofType(String.class);
- ArgumentAcceptingOptionSpec numStreamsOpt
- = parser.accepts("num.streams", "Number of consumer streams")
- .withRequiredArg()
- .describedAs("Number of consumer threads")
- .ofType(Integer.class)
- .defaultsTo(1);
- ArgumentAcceptingOptionSpec whitelistOpt
- = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster")
- .withRequiredArg()
- .describedAs("Java regex (String)")
- .ofType(String.class);
- ArgumentAcceptingOptionSpec blacklistOpt
- = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster")
- .withRequiredArg()
- .describedAs("Java regex (String)")
- .ofType(String.class);
- ArgumentAcceptingOptionSpec queueSizeOpt
- = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
- .withRequiredArg()
- .describedAs("Queue size in terms of number of messages")
- .ofType(Integer.class)
- .defaultsTo(10000);
- OptionSpecBuilder helpOpt
- = parser.accepts("help", "Print this message.");
- OptionSet options = parser.parse(args);
- if (options.has(helpOpt)) {
- parser.printHelpOn(System.out);
- System.exit(0);
- }
+public class KafkaMigrationTool {
+ private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
+ private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
+ private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
+ private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
+ private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
+ private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
+ private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
+ private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
+ private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
+ private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
+ private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
+ private static Class> kafkaStaticConsumer07 = null;
+ private static Class> consumerConfig07 = null;
+ private static Class> consumerConnector07 = null;
+ private static Class> kafkaStream07 = null;
+ private static Class> topicFilter07 = null;
+ private static Class> whiteList07 = null;
+ private static Class> blackList07 = null;
+ private static Class> kafkaConsumerIteratorClass07 = null;
+ private static Class> kafkaMessageAndMetaDataClass07 = null;
+ private static Class> kafkaMessageClass07 = null;
+ public static void main(String[] args) throws InterruptedException, IOException {
+ OptionParser parser = new OptionParser();
+ ArgumentAcceptingOptionSpec consumerConfigOpt
+ = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
+ ArgumentAcceptingOptionSpec producerConfigOpt
+ = parser.accepts("producer.config", "Producer config.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
+ ArgumentAcceptingOptionSpec numProducersOpt
+ = parser.accepts("num.producers", "Number of producer instances")
+ .withRequiredArg()
+ .describedAs("Number of producers")
+ .ofType(Integer.class)
+ .defaultsTo(1);
+ ArgumentAcceptingOptionSpec zkClient01JarOpt
+ = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
+ .withRequiredArg()
+ .describedAs("zkClient 0.1 jar file required by Kafka 0.7")
+ .ofType(String.class);
+ ArgumentAcceptingOptionSpec kafka07JarOpt
+ = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
+ .withRequiredArg()
+ .describedAs("kafka 0.7 jar")
+ .ofType(String.class);
+ ArgumentAcceptingOptionSpec numStreamsOpt
+ = parser.accepts("num.streams", "Number of consumer streams")
+ .withRequiredArg()
+ .describedAs("Number of consumer threads")
+ .ofType(Integer.class)
+ .defaultsTo(1);
+ ArgumentAcceptingOptionSpec whitelistOpt
+ = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(String.class);
+ ArgumentAcceptingOptionSpec blacklistOpt
+ = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(String.class);
+ ArgumentAcceptingOptionSpec queueSizeOpt
+ = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
+ .withRequiredArg()
+ .describedAs("Queue size in terms of number of messages")
+ .ofType(Integer.class)
+ .defaultsTo(10000);
+ OptionSpecBuilder helpOpt
+ = parser.accepts("help", "Print this message.");
+ OptionSet options = parser.parse(args);
+ if (options.has(helpOpt)) {
+ parser.printHelpOn(System.out);
+ System.exit(0);
+ }
- checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
- int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
- int blackListCount = options.has(blacklistOpt) ? 1 : 0;
- if(whiteListCount + blackListCount != 1) {
- System.err.println("Exactly one of whitelist or blacklist is required.");
- System.exit(1);
- }
+ checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
+ int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
+ int blackListCount = options.has(blacklistOpt) ? 1 : 0;
+ if (whiteListCount + blackListCount != 1) {
+ System.err.println("Exactly one of whitelist or blacklist is required.");
+ System.exit(1);
+ }
- String kafkaJarFile_07 = options.valueOf(kafka07JarOpt);
- String zkClientJarFile = options.valueOf(zkClient01JarOpt);
- String consumerConfigFile_07 = options.valueOf(consumerConfigOpt);
- int numConsumers = options.valueOf(numStreamsOpt);
- String producerConfigFile_08 = options.valueOf(producerConfigOpt);
- int numProducers = options.valueOf(numProducersOpt);
- final List migrationThreads = new ArrayList(numConsumers);
- final List producerThreads = new ArrayList(numProducers);
- try {
- File kafkaJar_07 = new File(kafkaJarFile_07);
- File zkClientJar = new File(zkClientJarFile);
- ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[] {
- kafkaJar_07.toURI().toURL(),
- zkClientJar.toURI().toURL()
- });
- /** Construct the 07 consumer config **/
- ConsumerConfig_07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
- KafkaStaticConsumer_07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
- ConsumerConnector_07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
- KafkaStream_07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
- TopicFilter_07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
- WhiteList_07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
- BlackList_07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
- KafkaMessageClass_07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
- KafkaConsumerIteratorClass_07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
- KafkaMessageAndMetatDataClass_07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
- Constructor ConsumerConfigConstructor_07 = ConsumerConfig_07.getConstructor(Properties.class);
- Properties kafkaConsumerProperties_07 = new Properties();
- kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
- /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
- if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) {
- log.warn("Shallow iterator should not be used in the migration tool");
- kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
- }
- Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
- /** Construct the 07 consumer connector **/
- Method ConsumerConnectorCreationMethod_07 = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07);
- final Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07);
- Method ConsumerConnectorCreateMessageStreamsMethod_07 = ConsumerConnector_07.getMethod(
- "createMessageStreamsByFilter",
- TopicFilter_07, int.class);
- final Method ConsumerConnectorShutdownMethod_07 = ConsumerConnector_07.getMethod("shutdown");
- Constructor WhiteListConstructor_07 = WhiteList_07.getConstructor(String.class);
- Constructor BlackListConstructor_07 = BlackList_07.getConstructor(String.class);
- Object filterSpec = null;
- if(options.has(whitelistOpt))
- filterSpec = WhiteListConstructor_07.newInstance(options.valueOf(whitelistOpt));
- else
- filterSpec = BlackListConstructor_07.newInstance(options.valueOf(blacklistOpt));
- Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers);
- Properties kafkaProducerProperties_08 = new Properties();
- kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
- kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
- // create a producer channel instead
- int queueSize = options.valueOf(queueSizeOpt);
- ProducerDataChannel> producerDataChannel = new ProducerDataChannel>(queueSize);
- int threadId = 0;
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07);
- } catch(Exception e) {
- log.error("Error while shutting down Kafka consumer", e);
- }
- for(MigrationThread migrationThread : migrationThreads) {
- migrationThread.shutdown();
- }
- for(ProducerThread producerThread : producerThreads) {
- producerThread.shutdown();
- }
- for(ProducerThread producerThread : producerThreads) {
- producerThread.awaitShutdown();
- }
- log.info("Kafka migration tool shutdown successfully");
+ String kafkaJarFile07 = options.valueOf(kafka07JarOpt);
+ String zkClientJarFile = options.valueOf(zkClient01JarOpt);
+ String consumerConfigFile07 = options.valueOf(consumerConfigOpt);
+ int numConsumers = options.valueOf(numStreamsOpt);
+ String producerConfigFile08 = options.valueOf(producerConfigOpt);
+ int numProducers = options.valueOf(numProducersOpt);
+ final List migrationThreads = new ArrayList(numConsumers);
+ final List producerThreads = new ArrayList(numProducers);
+ try {
+ File kafkaJar07 = new File(kafkaJarFile07);
+ File zkClientJar = new File(zkClientJarFile);
+ ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[]{
+ kafkaJar07.toURI().toURL(),
+ zkClientJar.toURI().toURL()
+ });
+ /** Construct the 07 consumer config **/
+ consumerConfig07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
+ kafkaStaticConsumer07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
+ consumerConnector07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
+ kafkaStream07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
+ topicFilter07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
+ whiteList07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
+ blackList07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
+ kafkaMessageClass07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
+ kafkaConsumerIteratorClass07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
+ kafkaMessageAndMetaDataClass07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
+ Constructor consumerConfigConstructor07 = consumerConfig07.getConstructor(Properties.class);
+ Properties kafkaConsumerProperties07 = new Properties();
+ kafkaConsumerProperties07.load(new FileInputStream(consumerConfigFile07));
+ /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
+ if (kafkaConsumerProperties07.getProperty("shallow.iterator.enable", "").equals("true")) {
+ log.warn("Shallow iterator should not be used in the migration tool");
+ kafkaConsumerProperties07.setProperty("shallow.iterator.enable", "false");
+ }
+ Object consumerConfig07 = consumerConfigConstructor07.newInstance(kafkaConsumerProperties07);
+ /** Construct the 07 consumer connector **/
+ Method consumerConnectorCreationMethod07 = kafkaStaticConsumer07.getMethod("createJavaConsumerConnector", KafkaMigrationTool.consumerConfig07);
+ final Object consumerConnector07 = consumerConnectorCreationMethod07.invoke(null, consumerConfig07);
+ Method consumerConnectorCreateMessageStreamsMethod07 = KafkaMigrationTool.consumerConnector07.getMethod(
+ "createMessageStreamsByFilter",
+ topicFilter07, int.class);
+ final Method consumerConnectorShutdownMethod07 = KafkaMigrationTool.consumerConnector07.getMethod("shutdown");
+ Constructor whiteListConstructor07 = whiteList07.getConstructor(String.class);
+ Constructor blackListConstructor07 = blackList07.getConstructor(String.class);
+ Object filterSpec = null;
+ if (options.has(whitelistOpt))
+ filterSpec = whiteListConstructor07.newInstance(options.valueOf(whitelistOpt));
+ else
+ filterSpec = blackListConstructor07.newInstance(options.valueOf(blacklistOpt));
+ Object retKafkaStreams = consumerConnectorCreateMessageStreamsMethod07.invoke(consumerConnector07, filterSpec, numConsumers);
+ Properties kafkaProducerProperties08 = new Properties();
+ kafkaProducerProperties08.load(new FileInputStream(producerConfigFile08));
+ kafkaProducerProperties08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
+ // create a producer channel instead
+ int queueSize = options.valueOf(queueSizeOpt);
+ ProducerDataChannel> producerDataChannel = new ProducerDataChannel>(queueSize);
+ int threadId = 0;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ consumerConnectorShutdownMethod07.invoke(consumerConnector07);
+ } catch (Exception e) {
+ log.error("Error while shutting down Kafka consumer", e);
+ }
+ for (MigrationThread migrationThread : migrationThreads) {
+ migrationThread.shutdown();
+ }
+ for (ProducerThread producerThread : producerThreads) {
+ producerThread.shutdown();
+ }
+ for (ProducerThread producerThread : producerThreads) {
+ producerThread.awaitShutdown();
+ }
+ log.info("Kafka migration tool shutdown successfully");
+ }
+ });
+ // start consumer threads
+ for (Object stream : (List) retKafkaStreams) {
+ MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
+ threadId++;
+ thread.start();
+ migrationThreads.add(thread);
+ }
+ String clientId = kafkaProducerProperties08.getProperty("client.id");
+ // start producer threads
+ for (int i = 0; i < numProducers; i++) {
+ kafkaProducerProperties08.put("client.id", clientId + "-" + i);
+ ProducerConfig producerConfig08 = new ProducerConfig(kafkaProducerProperties08);
+ Producer producer = new Producer(producerConfig08);
+ ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
+ producerThread.start();
+ producerThreads.add(producerThread);
+ }
+ } catch (Throwable e) {
+ System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
+ log.error("Kafka migration tool failed: ", e);
- });
- // start consumer threads
- for(Object stream: (List)retKafkaStreams) {
- MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
- threadId ++;
- thread.start();
- migrationThreads.add(thread);
- }
- String clientId = kafkaProducerProperties_08.getProperty("client.id");
- // start producer threads
- for (int i = 0; i < numProducers; i++) {
- kafkaProducerProperties_08.put("client.id", clientId + "-" + i);
- ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
- Producer producer = new Producer(producerConfig_08);
- ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
- producerThread.start();
- producerThreads.add(producerThread);
- }
- }
- catch (Throwable e){
- System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
- log.error("Kafka migration tool failed: ", e);
- }
- private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
- for(OptionSpec arg : required) {
- if(!options.has(arg)) {
- System.err.println("Missing required argument \"" + arg + "\"");
- parser.printHelpOn(System.err);
- System.exit(1);
- }
+ private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
+ for (OptionSpec arg : required) {
+ if (!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"");
+ parser.printHelpOn(System.err);
+ System.exit(1);
+ }
+ }
- }
- static class ProducerDataChannel {
- private final int producerQueueSize;
- private final BlockingQueue producerRequestQueue;
+ static class ProducerDataChannel {
+ private final int producerQueueSize;
+ private final BlockingQueue producerRequestQueue;
- public ProducerDataChannel(int queueSize) {
- producerQueueSize = queueSize;
- producerRequestQueue = new ArrayBlockingQueue(producerQueueSize);
- }
+ public ProducerDataChannel(int queueSize) {
+ producerQueueSize = queueSize;
+ producerRequestQueue = new ArrayBlockingQueue(producerQueueSize);
+ }
- public void sendRequest(T data) throws InterruptedException {
- producerRequestQueue.put(data);
- }
+ public void sendRequest(T data) throws InterruptedException {
+ producerRequestQueue.put(data);
+ }
- public T receiveRequest() throws InterruptedException {
- return producerRequestQueue.take();
- }
- }
- private static class MigrationThread extends Thread {
- private final Object stream;
- private final ProducerDataChannel> producerDataChannel;
- private final int threadId;
- private final String threadName;
- private final org.apache.log4j.Logger logger;
- private CountDownLatch shutdownComplete = new CountDownLatch(1);
- private final AtomicBoolean isRunning = new AtomicBoolean(true);
- MigrationThread(Object _stream, ProducerDataChannel> _producerDataChannel, int _threadId) {
- stream = _stream;
- producerDataChannel = _producerDataChannel;
- threadId = _threadId;
- threadName = "MigrationThread-" + threadId;
- logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName());
- this.setName(threadName);
+ public T receiveRequest() throws InterruptedException {
+ return producerRequestQueue.take();
+ }
- public void run() {
- try {
- Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload");
- Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message");
- Method KafkaGetTopicMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("topic");
- Method ConsumerIteratorMethod = KafkaStream_07.getMethod("iterator");
- Method KafkaStreamHasNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("hasNext");
- Method KafkaStreamNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("next");
- Object iterator = ConsumerIteratorMethod.invoke(stream);
- while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()) {
- Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator);
- Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07);
- Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07);
- Object payload_07 = MessageGetPayloadMethod_07.invoke(message_07);
- int size = ((ByteBuffer)payload_07).remaining();
- byte[] bytes = new byte[size];
- ((ByteBuffer)payload_07).get(bytes);
- if(logger.isDebugEnabled())
- logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic);
- KeyedMessage producerData = new KeyedMessage((String)topic, null, bytes);
- producerDataChannel.sendRequest(producerData);
+ private static class MigrationThread extends Thread {
+ private final Object stream;
+ private final ProducerDataChannel> producerDataChannel;
+ private final int threadId;
+ private final String threadName;
+ private final org.apache.log4j.Logger logger;
+ private CountDownLatch shutdownComplete = new CountDownLatch(1);
+ private final AtomicBoolean isRunning = new AtomicBoolean(true);
+ MigrationThread(Object stream, ProducerDataChannel> producerDataChannel, int threadId) {
+ this.stream = stream;
+ this.producerDataChannel = producerDataChannel;
+ this.threadId = threadId;
+ threadName = "MigrationThread-" + threadId;
+ logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName());
+ this.setName(threadName);
- logger.info("Migration thread " + threadName + " finished running");
- } catch (InvocationTargetException t){
- logger.fatal("Migration thread failure due to root cause ", t.getCause());
- } catch (Throwable t){
- logger.fatal("Migration thread failure due to ", t);
- } finally {
- shutdownComplete.countDown();
- }
- }
- public void shutdown() {
- logger.info("Migration thread " + threadName + " shutting down");
- isRunning.set(false);
- interrupt();
- try {
- shutdownComplete.await();
- } catch(InterruptedException ie) {
- logger.warn("Interrupt during shutdown of MigrationThread", ie);
- }
- logger.info("Migration thread " + threadName + " shutdown complete");
- }
- }
- static class ProducerThread extends Thread {
- private final ProducerDataChannel> producerDataChannel;
- private final Producer producer;
- private final int threadId;
- private String threadName;
- private org.apache.log4j.Logger logger;
- private CountDownLatch shutdownComplete = new CountDownLatch(1);
- private KeyedMessage shutdownMessage = new KeyedMessage("shutdown", null, null);
- public ProducerThread(ProducerDataChannel> _producerDataChannel,
- Producer _producer,
- int _threadId) {
- producerDataChannel = _producerDataChannel;
- producer = _producer;
- threadId = _threadId;
- threadName = "ProducerThread-" + threadId;
- logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName());
- this.setName(threadName);
- }
+ public void run() {
+ try {
+ Method messageGetPayloadMethod07 = kafkaMessageClass07.getMethod("payload");
+ Method kafkaGetMessageMethod07 = kafkaMessageAndMetaDataClass07.getMethod("message");
+ Method kafkaGetTopicMethod07 = kafkaMessageAndMetaDataClass07.getMethod("topic");
+ Method consumerIteratorMethod = kafkaStream07.getMethod("iterator");
+ Method kafkaStreamHasNextMethod07 = kafkaConsumerIteratorClass07.getMethod("hasNext");
+ Method kafkaStreamNextMethod07 = kafkaConsumerIteratorClass07.getMethod("next");
+ Object iterator = consumerIteratorMethod.invoke(stream);
+ while (((Boolean) kafkaStreamHasNextMethod07.invoke(iterator)).booleanValue()) {
+ Object messageAndMetaData07 = kafkaStreamNextMethod07.invoke(iterator);
+ Object message07 = kafkaGetMessageMethod07.invoke(messageAndMetaData07);
+ Object topic = kafkaGetTopicMethod07.invoke(messageAndMetaData07);
+ Object payload07 = messageGetPayloadMethod07.invoke(message07);
+ int size = ((ByteBuffer) payload07).remaining();
+ byte[] bytes = new byte[size];
+ ((ByteBuffer) payload07).get(bytes);
+ if (logger.isDebugEnabled())
+ logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic " + topic);
+ KeyedMessage producerData = new KeyedMessage((String) topic, null, bytes);
+ producerDataChannel.sendRequest(producerData);
+ }
+ logger.info("Migration thread " + threadName + " finished running");
+ } catch (InvocationTargetException t) {
+ logger.fatal("Migration thread failure due to root cause ", t.getCause());
+ } catch (Throwable t) {
+ logger.fatal("Migration thread failure due to ", t);
+ } finally {
+ shutdownComplete.countDown();
+ }
+ }
- public void run() {
- try{
- while(true) {
- KeyedMessage data = producerDataChannel.receiveRequest();
- if(!data.equals(shutdownMessage)) {
- producer.send(data);
- if(logger.isDebugEnabled()) logger.debug(String.format("Sending message %s", new String(data.message())));
- }
- else
- break;
+ public void shutdown() {
+ logger.info("Migration thread " + threadName + " shutting down");
+ isRunning.set(false);
+ interrupt();
+ try {
+ shutdownComplete.await();
+ } catch (InterruptedException ie) {
+ logger.warn("Interrupt during shutdown of MigrationThread", ie);
+ }
+ logger.info("Migration thread " + threadName + " shutdown complete");
- logger.info("Producer thread " + threadName + " finished running");
- } catch (Throwable t){
- logger.fatal("Producer thread failure due to ", t);
- } finally {
- shutdownComplete.countDown();
- }
- public void shutdown() {
- try {
- logger.info("Producer thread " + threadName + " shutting down");
- producerDataChannel.sendRequest(shutdownMessage);
- } catch(InterruptedException ie) {
- logger.warn("Interrupt during shutdown of ProducerThread", ie);
- }
- }
+ static class ProducerThread extends Thread {
+ private final ProducerDataChannel> producerDataChannel;
+ private final Producer producer;
+ private final int threadId;
+ private String threadName;
+ private org.apache.log4j.Logger logger;
+ private CountDownLatch shutdownComplete = new CountDownLatch(1);
+ private KeyedMessage shutdownMessage = new KeyedMessage("shutdown", null, null);
+ public ProducerThread(ProducerDataChannel> producerDataChannel,
+ Producer producer,
+ int threadId) {
+ this.producerDataChannel = producerDataChannel;
+ this.producer = producer;
+ this.threadId = threadId;
+ threadName = "ProducerThread-" + threadId;
+ logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName());
+ this.setName(threadName);
+ }
- public void awaitShutdown() {
- try {
- shutdownComplete.await();
- producer.close();
- logger.info("Producer thread " + threadName + " shutdown complete");
- } catch(InterruptedException ie) {
- logger.warn("Interrupt during shutdown of ProducerThread", ie);
- }
- }
- }
+ public void run() {
+ try {
+ while (true) {
+ KeyedMessage data = producerDataChannel.receiveRequest();
+ if (!data.equals(shutdownMessage)) {
+ producer.send(data);
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Sending message %s", new String(data.message())));
+ } else
+ break;
+ }
+ logger.info("Producer thread " + threadName + " finished running");
+ } catch (Throwable t) {
+ logger.fatal("Producer thread failure due to ", t);
+ } finally {
+ shutdownComplete.countDown();
+ }
+ }
- /**
- * A parent-last class loader that will try the child class loader first and then the parent.
- * This takes a fair bit of doing because java really prefers parent-first.
- */
- private static class ParentLastURLClassLoader extends ClassLoader {
- private ChildURLClassLoader childClassLoader;
+ public void shutdown() {
+ try {
+ logger.info("Producer thread " + threadName + " shutting down");
+ producerDataChannel.sendRequest(shutdownMessage);
+ } catch (InterruptedException ie) {
+ logger.warn("Interrupt during shutdown of ProducerThread", ie);
+ }
+ }
- /**
- * This class allows me to call findClass on a class loader
- */
- private static class FindClassClassLoader extends ClassLoader {
- public FindClassClassLoader(ClassLoader parent) {
- super(parent);
- }
- @Override
- public Class> findClass(String name) throws ClassNotFoundException {
- return super.findClass(name);
- }
+ public void awaitShutdown() {
+ try {
+ shutdownComplete.await();
+ producer.close();
+ logger.info("Producer thread " + threadName + " shutdown complete");
+ } catch (InterruptedException ie) {
+ logger.warn("Interrupt during shutdown of ProducerThread", ie);
+ }
+ }
- * This class delegates (child then parent) for the findClass method for a URLClassLoader.
- * We need this because findClass is protected in URLClassLoader
+ * A parent-last class loader that will try the child class loader first and then the parent.
+ * This takes a fair bit of doing because java really prefers parent-first.
- private static class ChildURLClassLoader extends URLClassLoader {
- private FindClassClassLoader realParent;
- public ChildURLClassLoader( URL[] urls, FindClassClassLoader realParent) {
- super(urls, null);
- this.realParent = realParent;
- }
- @Override
- public Class> findClass(String name) throws ClassNotFoundException {
- try{
- // first try to use the URLClassLoader findClass
- return super.findClass(name);
+ private static class ParentLastURLClassLoader extends ClassLoader {
+ private ChildURLClassLoader childClassLoader;
+ /**
+ * This class allows me to call findClass on a class loader
+ */
+ private static class FindClassClassLoader extends ClassLoader {
+ public FindClassClassLoader(ClassLoader parent) {
+ super(parent);
+ }
+ @Override
+ public Class> findClass(String name) throws ClassNotFoundException {
+ return super.findClass(name);
+ }
- catch( ClassNotFoundException e ) {
- // if that fails, we ask our real parent class loader to load the class (we give up)
- return realParent.loadClass(name);
+ /**
+ * This class delegates (child then parent) for the findClass method for a URLClassLoader.
+ * We need this because findClass is protected in URLClassLoader
+ */
+ private static class ChildURLClassLoader extends URLClassLoader {
+ private FindClassClassLoader realParent;
+ public ChildURLClassLoader(URL[] urls, FindClassClassLoader realParent) {
+ super(urls, null);
+ this.realParent = realParent;
+ }
+ @Override
+ public Class> findClass(String name) throws ClassNotFoundException {
+ try {
+ // first try to use the URLClassLoader findClass
+ return super.findClass(name);
+ } catch (ClassNotFoundException e) {
+ // if that fails, we ask our real parent class loader to load the class (we give up)
+ return realParent.loadClass(name);
+ }
+ }
- }
- }
- public ParentLastURLClassLoader(URL[] urls) {
- super(Thread.currentThread().getContextClassLoader());
- childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent()));
- }
+ public ParentLastURLClassLoader(URL[] urls) {
+ super(Thread.currentThread().getContextClassLoader());
+ childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent()));
+ }
- @Override
- protected synchronized Class> loadClass(String name, boolean resolve) throws ClassNotFoundException {
- try {
- // first we try to find a class inside the child class loader
- return childClassLoader.findClass(name);
- }
- catch( ClassNotFoundException e ) {
- // didn't find it, try the parent
- return super.loadClass(name, resolve);
- }
+ @Override
+ protected synchronized Class> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ try {
+ // first we try to find a class inside the child class loader
+ return childClassLoader.findClass(name);
+ } catch (ClassNotFoundException e) {
+ // didn't find it, try the parent
+ return super.loadClass(name, resolve);
+ }
+ }
- }
diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java
index 3bb93ee299236..2a0c59a9c877c 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
@@ -16,52 +16,50 @@
package kafka.examples;
-import java.util.Collections;
-import java.util.Properties;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-public class Consumer extends ShutdownableThread
- private final KafkaConsumer consumer;
- private final String topic;
+import java.util.Collections;
+import java.util.Properties;
+public class Consumer extends ShutdownableThread {
+ private final KafkaConsumer consumer;
+ private final String topic;
- public Consumer(String topic)
- {
- super("KafkaConsumerExample", false);
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ public Consumer(String topic) {
+ super("KafkaConsumerExample", false);
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- consumer = new KafkaConsumer<>(props);
- this.topic = topic;
- }
+ consumer = new KafkaConsumer<>(props);
+ this.topic = topic;
+ }
- @Override
- public void doWork() {
- consumer.subscribe(Collections.singletonList(this.topic));
- ConsumerRecords records = consumer.poll(1000);
- for (ConsumerRecord record : records) {
- System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+ @Override
+ public void doWork() {
+ consumer.subscribe(Collections.singletonList(this.topic));
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records) {
+ System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+ }
- }
- @Override
- public String name() {
- return null;
- }
+ @Override
+ public String name() {
+ return null;
+ }
- @Override
- public boolean isInterruptible() {
- return false;
- }
\ No newline at end of file
+ @Override
+ public boolean isInterruptible() {
+ return false;
+ }
diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index e96991a29032e..e732d5c1d776c 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
@@ -16,16 +16,14 @@
package kafka.examples;
-public class KafkaConsumerProducerDemo implements KafkaProperties
- public static void main(String[] args)
- {
- final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
- Producer producerThread = new Producer(KafkaProperties.topic, isAsync);
- producerThread.start();
+public class KafkaConsumerProducerDemo implements KafkaProperties {
+ public static void main(String[] args) {
+ final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
+ Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
+ producerThread.start();
+ Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
+ consumerThread.start();
- Consumer consumerThread = new Consumer(KafkaProperties.topic);
- consumerThread.start();
- }
+ }
diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java b/examples/src/main/java/kafka/examples/KafkaProperties.java
index 9d1cd3120b3da..b57e1bdc4d671 100644
--- a/examples/src/main/java/kafka/examples/KafkaProperties.java
+++ b/examples/src/main/java/kafka/examples/KafkaProperties.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,17 +16,16 @@
package kafka.examples;
-public interface KafkaProperties
- final static String zkConnect = "";
- final static String groupId = "group1";
- final static String topic = "topic1";
- final static String kafkaServerURL = "localhost";
- final static int kafkaServerPort = 9092;
- final static int kafkaProducerBufferSize = 64*1024;
- final static int connectionTimeOut = 100000;
- final static int reconnectInterval = 10000;
- final static String topic2 = "topic2";
- final static String topic3 = "topic3";
- final static String clientId = "SimpleConsumerDemoClient";
+public interface KafkaProperties {
+ String ZK_CONNECT = "";
+ String GROUP_ID = "group1";
+ String TOPIC = "topic1";
+ String KAFKA_SERVER_URL = "localhost";
+ int KAFKA_SERVER_PORT = 9092;
+ int CONNECTION_TIMEOUT = 100000;
+ String TOPIC2 = "topic2";
+ String TOPIC3 = "topic3";
+ String CLIENT_ID = "SimpleConsumerDemoClient";
diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java
index ccc9925caecd9..393bf1e6ec13e 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
@@ -16,90 +16,86 @@
package kafka.examples;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-public class Producer extends Thread
- private final KafkaProducer producer;
- private final String topic;
- private final Boolean isAsync;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+public class Producer extends Thread {
+ private final KafkaProducer producer;
+ private final String topic;
+ private final Boolean isAsync;
- public Producer(String topic, Boolean isAsync)
- {
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("client.id", "DemoProducer");
- props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- producer = new KafkaProducer(props);
- this.topic = topic;
- this.isAsync = isAsync;
- }
+ public Producer(String topic, Boolean isAsync) {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("client.id", "DemoProducer");
+ props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ producer = new KafkaProducer(props);
+ this.topic = topic;
+ this.isAsync = isAsync;
+ }
- public void run() {
- int messageNo = 1;
- while(true)
- {
- String messageStr = "Message_" + messageNo;
- long startTime = System.currentTimeMillis();
- if (isAsync) { // Send asynchronously
- producer.send(new ProducerRecord(topic,
- messageNo,
- messageStr), new DemoCallBack(startTime, messageNo, messageStr));
- } else { // Send synchronously
- try {
- producer.send(new ProducerRecord(topic,
- messageNo,
- messageStr)).get();
- System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
+ public void run() {
+ int messageNo = 1;
+ while (true) {
+ String messageStr = "Message_" + messageNo;
+ long startTime = System.currentTimeMillis();
+ if (isAsync) { // Send asynchronously
+ producer.send(new ProducerRecord(topic,
+ messageNo,
+ messageStr), new DemoCallBack(startTime, messageNo, messageStr));
+ } else { // Send synchronously
+ try {
+ producer.send(new ProducerRecord(topic,
+ messageNo,
+ messageStr)).get();
+ System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ ++messageNo;
- }
- ++messageNo;
- }
class DemoCallBack implements Callback {
- private long startTime;
- private int key;
- private String message;
+ private long startTime;
+ private int key;
+ private String message;
- public DemoCallBack(long startTime, int key, String message) {
- this.startTime = startTime;
- this.key = key;
- this.message = message;
- }
+ public DemoCallBack(long startTime, int key, String message) {
+ this.startTime = startTime;
+ this.key = key;
+ this.message = message;
+ }
- /**
- * A callback method the user can implement to provide asynchronous handling of request completion. This method will
- * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
- * non-null.
- *
- * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
- * occurred.
- * @param exception The exception thrown during processing of this record. Null if no error occurred.
- */
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (metadata != null) {
- System.out.println(
- "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
- "), " +
- "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
- } else {
- exception.printStackTrace();
+ /**
+ * A callback method the user can implement to provide asynchronous handling of request completion. This method will
+ * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
+ * non-null.
+ *
+ * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
+ * occurred.
+ * @param exception The exception thrown during processing of this record. Null if no error occurred.
+ */
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (metadata != null) {
+ System.out.println(
+ "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
+ "), " +
+ "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
+ } else {
+ exception.printStackTrace();
+ }
- }
diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
index c43b46144e6d1..1c568674526ee 100644
--- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
+++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
@@ -19,74 +19,74 @@
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Collections;
-import java.util.List;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class SimpleConsumerDemo {
- private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
- for(MessageAndOffset messageAndOffset: messageSet) {
- ByteBuffer payload = messageAndOffset.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- System.out.println(new String(bytes, "UTF-8"));
+ private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
+ for (MessageAndOffset messageAndOffset : messageSet) {
+ ByteBuffer payload = messageAndOffset.message().payload();
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ System.out.println(new String(bytes, "UTF-8"));
+ }
- }
- private static void generateData() {
- Producer producer2 = new Producer(KafkaProperties.topic2, false);
- producer2.start();
- Producer producer3 = new Producer(KafkaProperties.topic3, false);
- producer3.start();
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
+ private static void generateData() {
+ Producer producer2 = new Producer(KafkaProperties.TOPIC2, false);
+ producer2.start();
+ Producer producer3 = new Producer(KafkaProperties.TOPIC3, false);
+ producer3.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
- }
- public static void main(String[] args) throws Exception {
- generateData();
- SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
- KafkaProperties.kafkaServerPort,
- KafkaProperties.connectionTimeOut,
- KafkaProperties.kafkaProducerBufferSize,
- KafkaProperties.clientId);
- System.out.println("Testing single fetch");
- FetchRequest req = new FetchRequestBuilder()
- .clientId(KafkaProperties.clientId)
- .addFetch(KafkaProperties.topic2, 0, 0L, 100)
+ public static void main(String[] args) throws Exception {
+ generateData();
+ SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.KAFKA_SERVER_URL,
+ KafkaProperties.KAFKA_SERVER_PORT,
+ KafkaProperties.CLIENT_ID);
+ System.out.println("Testing single fetch");
+ FetchRequest req = new FetchRequestBuilder()
+ .clientId(KafkaProperties.CLIENT_ID)
+ .addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
- FetchResponse fetchResponse = simpleConsumer.fetch(req);
- printMessages(fetchResponse.messageSet(KafkaProperties.topic2, 0));
+ FetchResponse fetchResponse = simpleConsumer.fetch(req);
+ printMessages(fetchResponse.messageSet(KafkaProperties.TOPIC2, 0));
- System.out.println("Testing single multi-fetch");
- Map> topicMap = new HashMap>();
- topicMap.put(KafkaProperties.topic2, Collections.singletonList(0));
- topicMap.put(KafkaProperties.topic3, Collections.singletonList(0));
- req = new FetchRequestBuilder()
- .clientId(KafkaProperties.clientId)
- .addFetch(KafkaProperties.topic2, 0, 0L, 100)
- .addFetch(KafkaProperties.topic3, 0, 0L, 100)
+ System.out.println("Testing single multi-fetch");
+ Map> topicMap = new HashMap>();
+ topicMap.put(KafkaProperties.TOPIC2, Collections.singletonList(0));
+ topicMap.put(KafkaProperties.TOPIC3, Collections.singletonList(0));
+ req = new FetchRequestBuilder()
+ .clientId(KafkaProperties.CLIENT_ID)
+ .addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
+ .addFetch(KafkaProperties.TOPIC3, 0, 0L, 100)
- fetchResponse = simpleConsumer.fetch(req);
- int fetchReq = 0;
- for ( Map.Entry> entry : topicMap.entrySet() ) {
- String topic = entry.getKey();
- for ( Integer offset : entry.getValue()) {
- System.out.println("Response from fetch request no: " + ++fetchReq);
- printMessages(fetchResponse.messageSet(topic, offset));
- }
+ fetchResponse = simpleConsumer.fetch(req);
+ int fetchReq = 0;
+ for (Map.Entry> entry : topicMap.entrySet()) {
+ String topic = entry.getKey();
+ for (Integer offset : entry.getValue()) {
+ System.out.println("Response from fetch request no: " + ++fetchReq);
+ printMessages(fetchResponse.messageSet(topic, offset));
+ }
+ }
- }