Skip to content

Commit

Permalink
Reproduce issue #26
Browse files Browse the repository at this point in the history
[issue #26](#26)
  • Loading branch information
didier-wenzek committed Aug 25, 2020
1 parent 70e1a87 commit 28df922
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions test/integration/integration.ml
Original file line number Diff line number Diff line change
Expand Up @@ -147,31 +147,32 @@ let main =
| Kafka.PartitionEnd(_,partition,_) -> Format.printf "Consume_batch_queue eof: %d\n%!" partition; acc
) [] messages) = ["message 0 ter"; "message 1 ter"; "message 2 ter"]);

(* Using a partitioner to produce messages on a partition computed after the messages' keys. *)
let partitioner_callback partition_cnt key = Some ((Hashtbl.hash key) mod partition_cnt) in
let partitioner_topic = Kafka.new_topic ~partitioner_callback producer "test" [
"message.timeout.ms","1000";
"partitioner", "murmur2"
] in

(* Produce some keyed messages *)
let partitioner_callback partition_cnt key =
Printf.printf "xoxox %s \n%!" key;
Some ((Hashtbl.hash key) mod partition_cnt)
in
let keyed_topic = Kafka.new_topic ~partitioner_callback producer "test" ["message.timeout.ms","1000"] in
Kafka.produce keyed_topic ~key:"key 0" "key-message 0";
Kafka.produce keyed_topic ~key:"key 1" "key-message 1";
Kafka.produce keyed_topic ~key:"key 2" "key-message 2";
let key_msg_pairs = List.map (fun k -> (k,"message "^k)) [""; "0"; "11"; "222"; "a"; "bb"; "ccc" ] in
key_msg_pairs |> List.iter (fun (key,msg) -> Kafka.produce partitioner_topic ~key msg);

(* Consume keyed messages *)
(* Consume the keyed messages, checking they have been received on the right partition *)
let rec consume_k t = match Kafka.consume_queue t with
| Kafka.Message(_,_,_,msg,key) -> key,msg
| Kafka.PartitionEnd(_,_,_) -> (
(* Printf.fprintf stderr "No keyed message for now\n%!"; *)
| Kafka.Message(_,_partition,_,msg,Some key) ->
(* assert (partition = (partitioner_callback 2 key |> Option.get)); -- issue #26 *)
key,msg
| Kafka.Message(_,_,_,_,None) | Kafka.PartitionEnd(_,_,_) -> (
consume_k t
)
| exception Kafka.Error(Kafka.TIMED_OUT,_) -> (
Printf.fprintf stderr "Timeout after: %d ms\n%!" timeout_ms;
consume_k t
)
in
let key_msg_pairs = [ consume_k queue; consume_k queue; consume_k queue ] in
let key_msg_pairs = List.sort (fun p1 p2 -> compare (snd p1) (snd p2)) key_msg_pairs in
assert (key_msg_pairs = [Some "key 0","key-message 0"; Some "key 1","key-message 1"; Some "key 2","key-message 2"]);
let received_key_msg_pairs = key_msg_pairs |> List.map (fun _ -> consume_k queue) |> List.sort (fun p1 p2 -> compare (snd p1) (snd p2)) in
assert (key_msg_pairs = received_key_msg_pairs);

Kafka.consume_stop consumer_topic 0;
Kafka.consume_stop consumer_topic 1;
Expand All @@ -194,7 +195,7 @@ let main =
(* Consumers, producers, topics and queues, all handles must be released. *)
Kafka.destroy_queue queue;
Kafka.destroy_topic producer_topic;
Kafka.destroy_topic keyed_topic;
Kafka.destroy_topic partitioner_topic;
Kafka.destroy_topic consumer_topic;
Kafka.destroy_handler producer;
Kafka.destroy_handler consumer;
Expand Down

0 comments on commit 28df922

Please sign in to comment.