From 1937b6ff76ebd093d3d2e381a4e3d4a7dc11cd6e Mon Sep 17 00:00:00 2001 From: Led Date: Wed, 19 Jun 2019 12:41:53 +0300 Subject: [PATCH 1/2] turtle_publisher: add guards to mk_publish/5 --- src/turtle_publisher.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/turtle_publisher.erl b/src/turtle_publisher.erl index 50aeb7f..c86a3b6 100644 --- a/src/turtle_publisher.erl +++ b/src/turtle_publisher.erl @@ -352,7 +352,8 @@ properties(ContentType, #{}) -> #'P_basic' { content_type = ContentType }. %% Create a new publish package -mk_publish(Exch, Key, ContentType, IODataPayload, Opts) -> +mk_publish(Exch, Key, ContentType, IODataPayload, Opts) when is_binary(Exch), is_binary(Key), + ContentType =:= undefined orelse is_binary(ContentType) -> Pub = #'basic.publish' { exchange = Exch, routing_key = Key From cd187ed95f5cd60e3d063c7d4bcee6df4add4ff5 Mon Sep 17 00:00:00 2001 From: Led Date: Sat, 22 Jun 2019 21:00:54 +0300 Subject: [PATCH 2/2] turtle_publisher: add channel monitoring --- src/turtle_publisher.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/turtle_publisher.erl b/src/turtle_publisher.erl index c86a3b6..030594a 100644 --- a/src/turtle_publisher.erl +++ b/src/turtle_publisher.erl @@ -221,6 +221,7 @@ handle_info({gproc, Ref, registered, {_, Pid, _}}, {initializing, N, Ref, CName, ok = handle_confirms(Channel, Options), {ok, ReplyQueue, Tag} = handle_rpc(Channel, Options), ConnMRef = monitor(process, Pid), + _ = monitor(process, Channel), reg(N), {noreply, #state { @@ -240,6 +241,7 @@ handle_info({gproc, Ref, registered, {_, Pid, _}}, {initializing_takeover, N, Re ok = handle_confirms(Channel, Options), {ok, ReplyQueue, Tag} = handle_rpc(Channel, Options), ConnMRef = monitor(process, Pid), + _ = monitor(process, Channel), case where(N) of undefined -> reg(N); @@ -274,6 +276,8 @@ handle_info({channel_closed, Ch, Reason}, #state { channel = Ch } = State) -> {stop, Exit, State}; handle_info({'DOWN', MRef, process, _, Reason}, #state { conn_ref = MRef } = State) -> {stop, {error, {connection_down, Reason}}, State}; +handle_info({'DOWN', _, process, Pid, Reason}, #state { channel = Pid } = State) -> + {stop, {error, {channel_died, Reason}}, State}; handle_info({'DOWN', MRef, process, _, _Reason}, #state { in_flight = IF } = State) -> %% Remove in-flight monitor if the RPC caller goes away {noreply, State#state { in_flight = track_cancel_monitor(MRef, IF) }};