-
Notifications
You must be signed in to change notification settings - Fork 11
Home
Camel-Kafka is an Apache Camel component that allows you to work with Apache Kafka message oriented middleware.
Available as of Camel 2.13
Maven users will need to add the following dependency to their pom.xml for this component:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
Apache Kafka, is a distributed messaging system that due to its architecture, performance and scalability characteristics, has proven to be revolutionary in today’s messaging technologies and has been used with success in several domains and projects - most notably among them the LinkedIn’s real-time activity data pipeline.
kafka:topicName[?options]
Below are examples with the mandatory options :
XML
<to uri="kafka:SampleTopic?zkConnect=localhost:2181&metadataBrokerList=localhost:9092"/>
Java DSL
to("kafka:SampleTopic?zkConnect=localhost:2181&metadataBrokerList=localhost:9092"/>
Property | Value |
---|---|
TOPIC | _kafka.TOPIC_ |
KEY | _kafka.CONTENT_TYPE_ |
OFFSET | _kafka.OFFSET_ |
PARTITION | _kafka.EXCHANGE_NAME_ |
PARTITION KEY | _kafka.PARTITION_KEY_ |
<tr>
<td><tt>topicName</tt></td>
<td>kafka.DEFAULT_TOPIC</td>
<td>Topic Name_</td>
</tr>
<tr>
<td><tt>transferExchange</tt></td>
<td>false</td>
<td>Transfer Exchange</td>
</tr>
<tr>
<td><tt>concurrentConsumers</tt></td>
<td>10</td>
<td>Concurrent consumers (i.e streams in this context)</td>
</tr>
<tr>
<td><tt>partitionKey</tt></td>
<td>DEFAULT_PARTITION</td>
<td>Partition Key</td>
</tr>
<tr>
<td><tt>groupId</tt></td>
<td>kafka.DEFAULT_GROUP</td>
<td>Topic Name</td>
</tr>
<tr>
<td><tt>socketTimeoutMs</tt></td>
<td>30000</td>
<td>Socket Timeout (ms)</td>
</tr>
<tr>
<td><tt>socketReceiveBufferBytes</tt></td>
<td>65536</td>
<td>Socket Receive Buffer Bytes</td>
</tr>
<tr>
<td><tt>fetchMessageMaxBytes</tt></td>
<td>1048576</td>
<td>Fetch Message Max Bytes</td>
</tr>
<tr>
<td><tt>autoCommitEnable</tt></td>
<td>true</td>
<td>Auto Commit Enable</td>
</tr>
<tr>
<td><tt>autoCommitIntervalMs</tt>
</td><td>60000</td>
<td>Auto Commit Interval (ms)</td>
</tr>
<tr>
<td><tt>queuedMaxMessageChunks</tt>
</td><td>10</td>
<td>Queued Max Message Chunks</td>
</tr>
<tr>
<td><tt>rebalanceMaxRetries</tt></td>
<td>4</td>
<td>Rebalance Max Retries</td>
</tr>
<tr>
<td><tt>fetchMinBytes</tt></td>
<td>1</td>
<td>Fetch Min Bytes</td>
</tr>
<tr>
<td><tt>fetchWaitMaxMs</tt></td>
<td>100</td>
<td>Fetch Wait Max (ms)</td>
</tr>
<tr>
<td><tt>rebalanceBackoffMs</tt></td>
<td>2000</td>
<td>Rebalance Backoff (ms)</td>
</tr>
<tr>
<td><tt>refreshLeaderBackoffMs</tt></td>
<td>200</td>
<td>Refresh Leader Backoff (ms)</td>
</tr>
<tr>
<td><tt>autoOffsetReset</tt>
</td><td>largest</td>
<td>Auto Offset Reset</td>
</tr>
<tr>
<td><tt>consumerTimeoutMs</tt>
</td><td>-1</td>
<td>Consumer Timeout (ms)</td>
</tr>
<tr>
<td><tt>zookeeperSessionTimeoutMs</tt>
</td><td>6000</td>
<td>Zookeeper Session Timeout (ms)</td>
</tr>
<tr>
<td><tt>zookeeperConnectionTimeoutMs</tt>
</td><td>60000</td>
<td>Zookeeper Connection Timeout (ms)</td>
</tr>
<tr>
<td><tt>zookeeperSyncTimeMs</tt>
</td><td>2000</td>
<td>Zookeeper Sync Time (ms)</td>
</tr>
<tr>
<td><tt>requestRequiredAcks</tt></td>
<td>0</td>
<td>Request Required Acks</td>
</tr>
<tr>
<td><tt>requestTimeoutMs</tt> </td>
<td>10000</td>
<td>Request Timeout (ms)</td>
</tr>
<tr>
<td><tt>producerType</tt></td>
<td>sync</td>
<td>Producer Type</td>
</tr>
<tr>
<td><tt>serializerClass</tt>
</td><td>kafka.serializer.DefaultEncoder</td>
<td>Serializer Class</td>
</tr>
<tr>
<td><tt>partitionerClass</tt>
</td><td>kafka.producer.DefaultPartitioner</td>
<td>Partitioner Class</td>
</tr>
<tr>
<td><tt>compressionCodec</tt>
</td><td>none</td>
<td>Compression Codec</td>
</tr>
<tr>
<td><tt>compressedTopics</tt>
</td><td>null</td>
<td>Compressed Topics</td>
</tr>
<tr>
<td><tt>messageSendMaxRetries</tt>
</td><td>3</td>
<td>Message Send Max Retries</td>
</tr>
<tr>
<td><tt>retryBackoffMs</tt></td>
<td>100</td>
<td>Retry Backoff (ms)</td>
</tr>
<tr>
<td><tt>topicMetadataRefreshIntervalMs</tt></td>
<td>600000</td>
<td>Topic Metadata Refresh Interval (ms)</td>
</tr>
<tr>
<td><tt>queueBufferingMaxMs</tt></td>
<td>5000</td>
<td>Queue Buffering Max Time (ms)</td>
</tr>
<tr>
<td><tt>queueBufferingMaxMessages</tt>
</td><td>10000</td>
<td>Queue Buffering Max Messages</td>
</tr>
<tr>
<td><tt>queueEnqueueTimeoutMs</tt>
</td><td>-1</td>
<td>Queue Enqueue Timeout (ms)</td>
</tr>
<tr>
<td><tt>batchNumMessages</tt>
</td><td>200</td>
<td>Batch Number of Messages</td>
<tr>
<td><tt>sendBufferBytes</tt>
</td><td>102400</td>
<td>Send Buffer Bytes</td>
</tr>
<tr>
<td><tt>clientId</tt>
</td><td>kafka.DEFAULT_CLIENT_ID</td>
<td>Client Id</td>
</tr>
<tr>
<td><tt>keySerializerClass</tt>
</td><td>kafka.serializer.StringEncoder</td>
<td>Key Serializer Class</td>
</tr>
Property | Default | Description |
---|
NOTE: more information and detail description of the configuration can be found at Kafka Documentation pages.
The usage examples are taken from the integration tests which can be found along the source.
Simple Usage
from("direct:kaiotep").to("kafka:kaiot?zkConnect=localhost:2181&metadataBrokerList=localhost:9092&producerType=async&groupId="+ uid + KafkaConstants.DEFAULT_GROUP.value);
from("kafka:kaiot?zkConnect=localhost:2181&groupId="+ uid +KafkaConstants.DEFAULT_GROUP.value).to("mock:result");
Transfer Exchange
from("direct:ateioep").to("kafka:ateio?zkConnect=localhost:2181&metadataBrokerList=localhost:9092&transferExchange=true&producerType=async&groupId="+ uid + KafkaConstants.DEFAULT_GROUP.value);
from("kafka:ateio?zkConnect=localhost:2181&transferExchange=true&groupId="+ uid +KafkaConstants.DEFAULT_GROUP.value).to("mock:result");
Custom Partitioner
from("direct:spuiaioutep").to("kafka:spuiaiout?zkConnect=localhost:2181&partitionerClass=org.apache.camel.component.kafka.partitioner.SimplePartitioner&producerType=async&metadataBrokerList=localhost:9092&groupId="+ uid + KafkaConstants.DEFAULT_GROUP.value);
from("kafka:spuiaiout?zkConnect=localhost:2181&groupId="+ uid + KafkaConstants.DEFAULT_GROUP.value).to("mock:result");
For more please refer to the full usage examples which exist in the form of integration tests at camel-kafka repository.
For more please information and in depth configuration refer to the Apache Kafka Documentation.