diff --git a/test/integration/integration.ml b/test/integration/integration.ml index 385ed6d..87b0e28 100644 --- a/test/integration/integration.ml +++ b/test/integration/integration.ml @@ -147,21 +147,23 @@ let main = | Kafka.PartitionEnd(_,partition,_) -> Format.printf "Consume_batch_queue eof: %d\n%!" partition; acc ) [] messages) = ["message 0 ter"; "message 1 ter"; "message 2 ter"]); + (* Using a partitioner to produce messages on a partition computed after the messages' keys. *) + let partitioner_callback partition_cnt key = Some ((Hashtbl.hash key) mod partition_cnt) in + let partitioner_topic = Kafka.new_topic ~partitioner_callback producer "test" [ + "message.timeout.ms","1000"; + "partitioner", "murmur2" + ] in + (* Produce some keyed messages *) - let partitioner_callback partition_cnt key = - Printf.printf "xoxox %s \n%!" key; - Some ((Hashtbl.hash key) mod partition_cnt) - in - let keyed_topic = Kafka.new_topic ~partitioner_callback producer "test" ["message.timeout.ms","1000"] in - Kafka.produce keyed_topic ~key:"key 0" "key-message 0"; - Kafka.produce keyed_topic ~key:"key 1" "key-message 1"; - Kafka.produce keyed_topic ~key:"key 2" "key-message 2"; + let key_msg_pairs = List.map (fun k -> (k,"message "^k)) [""; "0"; "11"; "222"; "a"; "bb"; "ccc" ] in + key_msg_pairs |> List.iter (fun (key,msg) -> Kafka.produce partitioner_topic ~key msg); - (* Consume keyed messages *) + (* Consume the keyed messages, checking they have been received on the right partition *) let rec consume_k t = match Kafka.consume_queue t with - | Kafka.Message(_,_,_,msg,key) -> key,msg - | Kafka.PartitionEnd(_,_,_) -> ( - (* Printf.fprintf stderr "No keyed message for now\n%!"; *) + | Kafka.Message(_,_partition,_,msg,Some key) -> + (* assert (partition = (partitioner_callback 2 key |> Option.get)); -- issue #26 *) + key,msg + | Kafka.Message(_,_,_,_,None) | Kafka.PartitionEnd(_,_,_) -> ( consume_k t ) | exception Kafka.Error(Kafka.TIMED_OUT,_) -> ( @@ -169,9 +171,8 @@ let main = consume_k t ) in - let key_msg_pairs = [ consume_k queue; consume_k queue; consume_k queue ] in - let key_msg_pairs = List.sort (fun p1 p2 -> compare (snd p1) (snd p2)) key_msg_pairs in - assert (key_msg_pairs = [Some "key 0","key-message 0"; Some "key 1","key-message 1"; Some "key 2","key-message 2"]); + let received_key_msg_pairs = key_msg_pairs |> List.map (fun _ -> consume_k queue) |> List.sort (fun p1 p2 -> compare (snd p1) (snd p2)) in + assert (key_msg_pairs = received_key_msg_pairs); Kafka.consume_stop consumer_topic 0; Kafka.consume_stop consumer_topic 1; @@ -194,7 +195,7 @@ let main = (* Consumers, producers, topics and queues, all handles must be released. *) Kafka.destroy_queue queue; Kafka.destroy_topic producer_topic; - Kafka.destroy_topic keyed_topic; + Kafka.destroy_topic partitioner_topic; Kafka.destroy_topic consumer_topic; Kafka.destroy_handler producer; Kafka.destroy_handler consumer;