Skip to content

Commit

Permalink
Port to jbuilder.
Browse files Browse the repository at this point in the history
  • Loading branch information
didier-wenzek committed Sep 12, 2017
1 parent 229229c commit 21839c9
Show file tree
Hide file tree
Showing 20 changed files with 71 additions and 7 deletions.
12 changes: 12 additions & 0 deletions bin/jbuild
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(jbuild_version 1)

(executables
((names (sendto_kafka_topic tail_kafka_topic))
(libraries (okafka.lwt cmdliner))))

(install
((section bin)
(files (
(sendto_kafka_topic.exe as sendto_kafka_topic)
(tail_kafka_topic.exe as tail_kafka_topic)
))))
File renamed without changes.
File renamed without changes.
7 changes: 7 additions & 0 deletions lib/jbuild
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
(jbuild_version 1)

(library
((name kafka)
(public_name okafka)
(c_names (ocaml_kafka))
(c_library_flags (-lrdkafka -lpthread -lz))))
File renamed without changes.
File renamed without changes.
File renamed without changes.
6 changes: 6 additions & 0 deletions lib_helpers/jbuild
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
(jbuild_version 1)

(library
((name kafka_helpers)
(public_name okafka.helpers)
(libraries (okafka))))
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
8 changes: 8 additions & 0 deletions lib_lwt/jbuild
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(jbuild_version 1)

(library
((name kafka_lwt)
(public_name okafka.lwt)
(libraries (okafka lwt.unix))
(c_names (ocaml_lwt_kafka))
(c_library_flags (-lrdkafka -lpthread -lz))))
File renamed without changes.
File renamed without changes.
File renamed without changes.
16 changes: 16 additions & 0 deletions okafka.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
opam-version: "1.2"
version: "0.3.0"
name: "okafka"
synopsis: "OCaml bindings for Kafka"
maintainer: "Didier Wenzek <[email protected]>"
authors: ["Didier Wenzek <[email protected]>"]
homepage: "https://github.com/didier-wenzek/ocaml-kafka"
bug-reports: "https://github.com/didier-wenzek/ocaml-kafka/issues"
dev-repo: "https://github.com/didier-wenzek/ocaml-kafka"
license: "GPL"
build: [
["jbuilder" "build" "--only" "okafka" "--root" "." "-j" jobs "@install"]
]
install: [make "install"]
remove: [make "uninstall"]
depends: ["jbuilder"]
File renamed without changes.
10 changes: 10 additions & 0 deletions test/jbuild
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
(jbuild_version 1)

(executables
((names (tests issue3))
(libraries (okafka.lwt okafka.helpers))))

(alias
((name runtest)
(deps (tests.exe))
(action (run ${<}))))
19 changes: 12 additions & 7 deletions tests.ml → test/tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ let main =
let rec consume t p = match Kafka.consume ~timeout_ms t p with
| Kafka.Message(_,_,_,msg,_) -> msg
| Kafka.PartitionEnd(_,_,_) -> (
Printf.fprintf stderr "No message for now\n%!";
(* Printf.fprintf stderr "No message for now\n%!"; *)
consume t p
)
| exception Kafka.Error(Kafka.TIMED_OUT,_) -> (
Expand Down Expand Up @@ -112,7 +112,7 @@ let main =
else (n,m+1)
)
| Kafka.PartitionEnd(_,_,_) -> (
Printf.fprintf stderr "No queued message for now\n%!";
(* Printf.fprintf stderr "No queued message for now\n%!"; *)
consume_queue (n,m)
)
| exception Kafka.Error(Kafka.TIMED_OUT,_) -> (
Expand Down Expand Up @@ -147,7 +147,7 @@ let main =
let rec consume_k t = match Kafka.consume_queue t with
| Kafka.Message(_,_,_,msg,key) -> key,msg
| Kafka.PartitionEnd(_,_,_) -> (
Printf.fprintf stderr "No keyed message for now\n%!";
(* Printf.fprintf stderr "No keyed message for now\n%!"; *)
consume_k t
)
| exception Kafka.Error(Kafka.TIMED_OUT,_) -> (
Expand Down Expand Up @@ -183,15 +183,20 @@ let main =
Kafka.destroy_handler consumer;

(* KafkaConsumer / KafkaProducer API *)
let open Kafka_helpers in
let stop_at_end = true in
let iterable_of_list xs f = List.iter f xs in
let sink = KafkaProducer.partition_sink "test" Kafka.partition_unassigned in
let src = KafkaConsumer.fold_topic ~stop_at_end "test" [] in
let sink = Kafka_producer.partition_sink "test" Kafka.partition_unassigned in
let src = Kafka_consumer.fold_topic ~stop_at_end "test" [] in
let offsets = [0,Kafka.offset_tail 3; 1,Kafka.offset_tail 3] in

["message 123"; "message 124"; "message 125"] |> iterable_of_list |> KafkaProducer.stream_to sink;
["message 123"; "message 124"; "message 125"] |> iterable_of_list |> Kafka_producer.stream_to sink;
let messages = src (fun acc -> function Kafka.Message(_,_,_,msg,_) -> msg::acc | _ -> acc) offsets [] in
assert (List.length messages = 6);
assert (List.exists (fun msg -> msg = "message 123") messages);
assert (List.exists (fun msg -> msg = "message 124") messages);
assert (List.exists (fun msg -> msg = "message 125") messages)
assert (List.exists (fun msg -> msg = "message 125") messages);


"Tests successful\n%!"

0 comments on commit 21839c9

Please sign in to comment.