Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instantiate plugin threads after deserialiser #11

Merged
merged 2 commits into from
Dec 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ The following properties can be stored in the ``opentsdb.conf`` file:
|KafkaRpcPlugin.groups|String|Required|A comma separated list of one or more consumer group names.||TsdbConsumer,TsdbRequeueConsumer|
|KafkaRpcPlugin.\<GROUPNAME\>.topics|String|Required|A comma separated list of one or more topics for the ``<GROUPNAME>`` to consume from.||TSDB_1,TSDB_2|
|KafkaRpcPlugin.\<GROUPNAME\>.consumerType|String|Required|The type of messages written to the queue. TODO. For now, leave it as ``raw``||raw|
|KafkaRpcPlugin.\<GROUPNAME\>.Deserializer|String|Required|The deserialization class to use for parsing messages from the Kafka topic.||net.opentsdb.data.deserializers.JSONDeserializer|
|KafkaRpcPlugin.\<GROUPNAME\>.deserializer|String|Required|The deserialization class to use for parsing messages from the Kafka topic.||net.opentsdb.data.deserializers.JSONDeserializer|
|KafkaRpcPlugin.\<GROUPNAME\>.rate|Integer|Required|How many messages per second to throttle the total of consumer threads at for the consumer group||250000|
|KafkaRpcPlugin.\<GROUPNAME\>.threads|Integer|Required|The number of consumer threads to create per group||4|
|tsd.http.rpc.plugins|String|Optional|A comma separated list of HTTP RPC plugins to load. Included with this package is a plugin that allows for fetching stats from the Kafka plugin as well as viewing or modifying the write rate during runtime.||net.opentsdb.tsd.KafkaHttpRpcPlugin|
Expand All @@ -85,4 +85,4 @@ The following properties can be stored in the ``opentsdb.conf`` file:
|KafkaRpcPlugin.kafka.metadata.broker.list|String|Optional|The comma separated list of Kafka brokers and ports used to write messages to for the storage exception handler plugin||localhost:9092|
|KafkaRpcPlugin.seh.topic.default|String|Optional|The topic used to write messages to for the storage exception handler.||TSDB_Requeue|

Note the ``KafkaRpcPlugin.groups`` and ``<GROUP_NAME>`` entries above. Kafka consumers belong to a particular group. The Kafka RPC plugin can launch multiple groups consuming from multiple topics so that OpenTSDB messages can be organized by type or source for more efficient control over rate limits and priorities. When setting the ``KafkaRpcPlugin.groups`` value, make sure you have a complete set of ``KafkaRpcPlugin.<GROUP_NAME>.*`` parameters per group or initialization will fail.
Note the ``KafkaRpcPlugin.groups`` and ``<GROUP_NAME>`` entries above. Kafka consumers belong to a particular group. The Kafka RPC plugin can launch multiple groups consuming from multiple topics so that OpenTSDB messages can be organized by type or source for more efficient control over rate limits and priorities. When setting the ``KafkaRpcPlugin.groups`` value, make sure you have a complete set of ``KafkaRpcPlugin.<GROUP_NAME>.*`` parameters per group or initialization will fail.
12 changes: 6 additions & 6 deletions src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,6 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) {
: KafkaRpcPluginConfig.DEFAULT_CONSUMER_THREADS;
kafka_consumers = new ArrayList<KafkaRpcPluginThread>(num_threads);

for (int i = 0; i < num_threads; i++) {
kafka_consumers.add(new KafkaRpcPluginThread(this, i, topics));
}

timer.newTimeout(this, config.threadCheckInterval(), TimeUnit.MILLISECONDS);

final String deser_class = config.getString(
KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + groupID + ".deserializer");
if (Strings.isNullOrEmpty(deser_class)) {
Expand Down Expand Up @@ -179,6 +173,12 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) {
throw new IllegalArgumentException("Unable to find a deserializer "
+ "for class [" + deser_class + "]");
}

for (int i = 0; i < num_threads; i++) {
kafka_consumers.add(new KafkaRpcPluginThread(this, i, topics));
}

timer.newTimeout(this, config.threadCheckInterval(), TimeUnit.MILLISECONDS);
}

@Override
Expand Down