From 6a787deff741b3005d99b0ff8f92c58bc0660ca4 Mon Sep 17 00:00:00 2001
From: Ady Liu
* This operation creates topic in a broker or enlarge the partition number of topic.
- *
* buffer format: size+errorCode(short)+payload+size+errorCode(short)+payload+...
- *
* size = 2(short)+length(payload)
- *
+ *
* format: creatorId:host:port - *
* * @return broker info saved in zookeeper */ diff --git a/src/main/java/com/sohu/jafka/consumer/ConsumerConfig.java b/src/main/java/com/sohu/jafka/consumer/ConsumerConfig.java index 3c09104..2f7e745 100644 --- a/src/main/java/com/sohu/jafka/consumer/ConsumerConfig.java +++ b/src/main/java/com/sohu/jafka/consumer/ConsumerConfig.java @@ -31,7 +31,6 @@ ** 1. Consumer id registry: - *
+ * *- * /consumers/[group_id]/ids[consumer_id] -> topic1,...topicN + * /consumers/[group_id]/ids[consumer_id] -- topic1,...topicN *- * + * * A consumer has a unique consumer id within a consumer group. A consumer registers its id as * an ephemeral znode and puts all topics that it subscribes to as the value of the znode. The * znode is deleted when the client is gone. A consumer subscribes to event changes of the * consumer id registry within its group. - * *
* The consumer id is picked up from configuration, instead of the sequential id assigned by * ZK. Generated sequential ids are hard to recover during temporary connection loss to ZK, * since it's difficult for the client to figure out whether the creation of a sequential znode * has succeeded or not. More details can be found at * (http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling) - *
** 2. Broker node registry: - *
*- * /brokers/[0...N] --> { "host" : "host:port", + * /brokers/[0...N] -- { "host" : "host:port", * "topics" : {"topic1": ["partition1" ... "partitionN"], ..., * "topicN": ["partition1" ... "partitionN"] } } *- * * This is a list of all present broker brokers. A unique logical node id is configured on each * broker node. A broker node registers itself on start-up and creates a znode with the logical * node id under /brokers. - * + * * The value of the znode is a JSON String that contains - * + * *
* (1) the host name and the port the broker is listening to, * (2) a list of topics that the broker serves, * (3) a list of logical partitions assigned to each topic on the broker. *- * + * * A consumer subscribes to event changes of the broker node registry. - * - * + * + * *
* 3. Partition owner registry: - *
+ * *- * /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id + * /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] -- consumer_node_id *- * + * * This stores the mapping before broker partitions and consumers. Each partition is owned by a * unique consumer within a consumer group. The mapping is reestablished after each * rebalancing. - * - * + * + * *
* 4. Consumer offset tracking: - *
+ * *- * /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value + * /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] -- offset_counter_value *- * + * * Each consumer tracks the offset of the latest message consumed for each partition. - * + * * * @author adyliu (imxylz@gmail.com) * @since 1.0 @@ -816,9 +812,8 @@ public void handleStateChanged(KeeperState state) throws Exception { /** * register consumer data in zookeeper *
- * register path: /consumers/groupid/ids/groupid-consumerid
+ * register path: /consumers/groupid/ids/groupid-consumerid
* data: {topic:count,topic:count}
- *
* The log directory will be removed also.
* @return segment counts deleted
*/
diff --git a/src/main/java/com/sohu/jafka/message/FileMessageSet.java b/src/main/java/com/sohu/jafka/message/FileMessageSet.java
index 4a6b6f9..7f01529 100644
--- a/src/main/java/com/sohu/jafka/message/FileMessageSet.java
+++ b/src/main/java/com/sohu/jafka/message/FileMessageSet.java
@@ -149,7 +149,7 @@ protected MessageAndOffset makeNext() {
}
/**
- * the max offset(next message id).
+ * the max offset(next message id).
* The #getSizeInBytes()
maybe is larger than {@link #highWaterMark()}
* while some messages were cached in memory(not flush to disk).
*/
@@ -279,7 +279,7 @@ void checkMutable() {
}
/**
- * The max offset(next message id) persisted in the log file.
+ * The max offset(next message id) persisted in the log file.
* Messages with smaller offsets have persisted in file.
*
* @return max offset
diff --git a/src/main/java/com/sohu/jafka/message/Message.java b/src/main/java/com/sohu/jafka/message/Message.java
index 9d59583..0d949e0 100644
--- a/src/main/java/com/sohu/jafka/message/Message.java
+++ b/src/main/java/com/sohu/jafka/message/Message.java
@@ -38,8 +38,7 @@
* 3. 4 byte CRC32 of the payload
* 4. N - 6 byte payload
*
- *
- *
*
* /topics/[topic]/[node_id-partition_num] - * /brokers/[0...N] --> host:port + * /brokers/[0...N] -- host:port ** * @author adyliu (imxylz@gmail.com) @@ -113,9 +113,8 @@ private int getPartitions(String topic) { /** * register broker in the zookeeper *
- * path: /brokers/ids/
+ * path: /brokers/ids/<id>
* data: creator:host:port
- *
* see https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/ * Mx4jTool.java - *
* * @author adyliu (imxylz@gmail.com) * @since 1.0 diff --git a/src/main/java/com/sohu/jafka/utils/zookeeper/ZkUtils.java b/src/main/java/com/sohu/jafka/utils/zookeeper/ZkUtils.java index ce19d79..061cda4 100644 --- a/src/main/java/com/sohu/jafka/utils/zookeeper/ZkUtils.java +++ b/src/main/java/com/sohu/jafka/utils/zookeeper/ZkUtils.java @@ -119,7 +119,7 @@ public static TopicCount getTopicCount(ZkClient zkClient, String group, String c * * @param zkClient the zookeeper client * @param topics topic names - * @return topic->(brokerid-0,brokerid-1...brokerid2-0,brokerid2-1...) + * @return topic->(brokerid-0,brokerid-1...brokerid2-0,brokerid2-1...) */ public static Map