From 4f3a8a74fbf6ace7d8ad907629877b036f884b3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Marschollek?= Date: Mon, 3 Dec 2018 09:35:06 +0000 Subject: [PATCH 1/2] Instantiate plugin threads after deserialiser This moves the instantiation of the plugin thread objects after the creation of the deserialiser. Previously, the threads were trying to access the deserialiser which wasn't initialised yet and were trying to work with a null pointer. Resolves: #10 --- .../java/net/opentsdb/tsd/KafkaRpcPluginGroup.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java index 0240ba8..82c24e9 100644 --- a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java +++ b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java @@ -129,12 +129,6 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) { : KafkaRpcPluginConfig.DEFAULT_CONSUMER_THREADS; kafka_consumers = new ArrayList(num_threads); - for (int i = 0; i < num_threads; i++) { - kafka_consumers.add(new KafkaRpcPluginThread(this, i, topics)); - } - - timer.newTimeout(this, config.threadCheckInterval(), TimeUnit.MILLISECONDS); - final String deser_class = config.getString( KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + groupID + ".deserializer"); if (Strings.isNullOrEmpty(deser_class)) { @@ -179,6 +173,12 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) { throw new IllegalArgumentException("Unable to find a deserializer " + "for class [" + deser_class + "]"); } + + for (int i = 0; i < num_threads; i++) { + kafka_consumers.add(new KafkaRpcPluginThread(this, i, topics)); + } + + timer.newTimeout(this, config.threadCheckInterval(), TimeUnit.MILLISECONDS); } @Override From 5db778ed300dd07fb0075630abfbc34a450c0ea6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Marschollek?= Date: Mon, 3 Dec 2018 09:48:17 +0000 Subject: [PATCH 2/2] Fix the name of the deserialiser config setting Config entries are case sensitive and the readme previously incorrectly capitalised "Deserializer". --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c401758..64f7b96 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ The following properties can be stored in the ``opentsdb.conf`` file: |KafkaRpcPlugin.groups|String|Required|A comma separated list of one or more consumer group names.||TsdbConsumer,TsdbRequeueConsumer| |KafkaRpcPlugin.\.topics|String|Required|A comma separated list of one or more topics for the ```` to consume from.||TSDB_1,TSDB_2| |KafkaRpcPlugin.\.consumerType|String|Required|The type of messages written to the queue. TODO. For now, leave it as ``raw``||raw| -|KafkaRpcPlugin.\.Deserializer|String|Required|The deserialization class to use for parsing messages from the Kafka topic.||net.opentsdb.data.deserializers.JSONDeserializer| +|KafkaRpcPlugin.\.deserializer|String|Required|The deserialization class to use for parsing messages from the Kafka topic.||net.opentsdb.data.deserializers.JSONDeserializer| |KafkaRpcPlugin.\.rate|Integer|Required|How many messages per second to throttle the total of consumer threads at for the consumer group||250000| |KafkaRpcPlugin.\.threads|Integer|Required|The number of consumer threads to create per group||4| |tsd.http.rpc.plugins|String|Optional|A comma separated list of HTTP RPC plugins to load. Included with this package is a plugin that allows for fetching stats from the Kafka plugin as well as viewing or modifying the write rate during runtime.||net.opentsdb.tsd.KafkaHttpRpcPlugin| @@ -85,4 +85,4 @@ The following properties can be stored in the ``opentsdb.conf`` file: |KafkaRpcPlugin.kafka.metadata.broker.list|String|Optional|The comma separated list of Kafka brokers and ports used to write messages to for the storage exception handler plugin||localhost:9092| |KafkaRpcPlugin.seh.topic.default|String|Optional|The topic used to write messages to for the storage exception handler.||TSDB_Requeue| -Note the ``KafkaRpcPlugin.groups`` and ```` entries above. Kafka consumers belong to a particular group. The Kafka RPC plugin can launch multiple groups consuming from multiple topics so that OpenTSDB messages can be organized by type or source for more efficient control over rate limits and priorities. When setting the ``KafkaRpcPlugin.groups`` value, make sure you have a complete set of ``KafkaRpcPlugin..*`` parameters per group or initialization will fail. \ No newline at end of file +Note the ``KafkaRpcPlugin.groups`` and ```` entries above. Kafka consumers belong to a particular group. The Kafka RPC plugin can launch multiple groups consuming from multiple topics so that OpenTSDB messages can be organized by type or source for more efficient control over rate limits and priorities. When setting the ``KafkaRpcPlugin.groups`` value, make sure you have a complete set of ``KafkaRpcPlugin..*`` parameters per group or initialization will fail.