diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..21ee3fd --- /dev/null +++ b/.tool-versions @@ -0,0 +1,2 @@ +direnv 2.32.1 +erlang 24.3.4.8 diff --git a/README.md b/README.md index 205e927..f0a73ed 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,17 @@ become worse. - Upgrade dependencies; exometer_core and lager +* *Version 1.9.5* - SSL and Dynamic Creation + + - Can now specify ssl_options in connection_config in config.system + + - Can now dynamically create Connections, Services, and Publishers + during runtime instead of only in configuration or Supervisors: + + - turtle_conn:new(Name,Config) + - turtle_service:new(Supervisor,ServiceSpec) + - turtle_publisher:new(Supervisor,PublisherSpec) + * *Version 1.9.4* - Introduce Connection Name Validation - Connection names are now validated when a publisher or a service diff --git a/rebar.config b/rebar.config index a71362c..d275d4f 100644 --- a/rebar.config +++ b/rebar.config @@ -1,15 +1,14 @@ {erl_opts, [ - debug_info, - {parse_transform, lager_transform} + debug_info ]}. {deps, [ - {lager, "3.6.7"}, - {rabbit_common, "3.7.12"}, - {amqp_client, "3.7.12"}, - {exometer_core, "1.5.7"}, - {gproc, "0.8.0"}, - {uuid, "1.7.1", {pkg, uuid_erl}} + {amqp_client, "3.11.13"}, + {amqp_client, "3.11.13"}, + {exometer_core, "1.6.2"}, + {exometer_core , ".*", {git, "https://github.com/Feuerlabs/exometer_core.git", {branch, "master"}}}, + {gproc, "0.9.0"}, + {uuid, "2.0.5", {pkg, uuid_erl}} ]}. {overrides, diff --git a/rebar.lock b/rebar.lock index cc0038d..7e04580 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,36 +1,34 @@ {"1.1.0", -[{<<"amqp_client">>,{pkg,<<"amqp_client">>,<<"3.7.12">>},0}, - {<<"bear">>,{pkg,<<"bear">>,<<"0.8.7">>},2}, - {<<"exometer_core">>,{pkg,<<"exometer_core">>,<<"1.5.7">>},0}, - {<<"folsom">>,{pkg,<<"folsom">>,<<"0.8.7">>},1}, - {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, - {<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},0}, - {<<"hut">>,{pkg,<<"hut">>,<<"1.2.1">>},1}, - {<<"jsx">>,{pkg,<<"jsx">>,<<"2.9.0">>},1}, - {<<"lager">>,{pkg,<<"lager">>,<<"3.6.7">>},0}, - {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.0">>},1}, - {<<"quickrand">>,{pkg,<<"quickrand">>,<<"1.7.5">>},1}, - {<<"rabbit_common">>,{pkg,<<"rabbit_common">>,<<"3.7.12">>},0}, - {<<"ranch">>,{pkg,<<"ranch">>,<<"1.7.1">>},1}, - {<<"recon">>,{pkg,<<"recon">>,<<"2.3.6">>},1}, - {<<"setup">>,{pkg,<<"setup">>,<<"2.0.2">>},1}, - {<<"uuid">>,{pkg,<<"uuid_erl">>,<<"1.7.1">>},0}]}. +[{<<"amqp_client">>,{pkg,<<"amqp_client">>,<<"3.11.13">>},0}, + {<<"bear">>,{pkg,<<"bear">>,<<"1.0.0">>},2}, + {<<"credentials_obfuscation">>, + {pkg,<<"credentials_obfuscation">>,<<"3.2.0">>}, + 2}, + {<<"exometer_core">>,{pkg,<<"exometer_core">>,<<"1.6.2">>},0}, + {<<"folsom">>,{pkg,<<"folsom">>,<<"1.0.0">>},1}, + {<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0}, + {<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},1}, + {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.4.1">>},1}, + {<<"quickrand">>,{pkg,<<"quickrand">>,<<"2.0.5">>},1}, + {<<"rabbit_common">>,{pkg,<<"rabbit_common">>,<<"3.11.13">>},1}, + {<<"recon">>,{pkg,<<"recon">>,<<"2.5.3">>},2}, + {<<"setup">>,{pkg,<<"setup">>,<<"2.1.0">>},1}, + {<<"thoas">>,{pkg,<<"thoas">>,<<"1.0.0">>},2}, + {<<"uuid">>,{pkg,<<"uuid_erl">>,<<"2.0.5">>},0}]}. [ {pkg_hash,[ - {<<"amqp_client">>, <<"5ACCC1EF354E19B8200D48C72D2F68D6327672B8C8CBD1DCD1556264F348DAFD">>}, - {<<"bear">>, <<"16264309AE5D005D03718A5C82641FCC259C9E8F09ADEB6FD79CA4271168656F">>}, - {<<"exometer_core">>, <<"AB97E34A5D69AB14E6AE161DB4CCA5B5E655E635B842F830EE6AB2CBFCFDC30A">>}, - {<<"folsom">>, <<"A885F0AEEE4C84270954C88A55A5A473D6B2C7493E32FFDC5765412DD555A951">>}, - {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, - {<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>}, - {<<"hut">>, <<"08D46679523043424870723923971889E8A34D63B2F946A35B46CF921D1236E7">>}, - {<<"jsx">>, <<"D2F6E5F069C00266CAD52FB15D87C428579EA4D7D73A33669E12679E203329DD">>}, - {<<"lager">>, <<"2FBF823944CAA0FC10DF5EC13F3F047524A249BB32F0D801B7900C9610264286">>}, - {<<"parse_trans">>, <<"09765507A3C7590A784615CFD421D101AEC25098D50B89D7AA1D66646BC571C1">>}, - {<<"quickrand">>, <<"E3086A153EB13A057FC19192D05E2D4C6BB2BDBB55746A699BEAE9847AC17CA8">>}, - {<<"rabbit_common">>, <<"26B8C10BF8B7064FDF44792F6B2D8DE79F5E1BD6B36C4B1C318FB3953C1E5D86">>}, - {<<"ranch">>, <<"6B1FAB51B49196860B733A49C07604465A47BDB78AA10C1C16A3D199F7F8C881">>}, - {<<"recon">>, <<"2BCAD0CF621FB277CABBB6413159CD3AA30265C2DEE42C968697988B30108604">>}, - {<<"setup">>, <<"1203F4CDA11306C2E34434244576DED0A7BBFB0908D9A572356C809BD0CDF085">>}, - {<<"uuid">>, <<"252D12D1154BC75C40BC03E2E67714BD843DDFAD40FD3BC1541F65F622C9E7FC">>}]} + {<<"amqp_client">>, <<"DDCF0859E80E56081B2436DE0906D13FA616E62775A32422F34E0EFAEBEC0011">>}, + {<<"bear">>, <<"430419C1126B477686CDE843E88BA0F2C7DC5CDF0881C677500074F704339A99">>}, + {<<"credentials_obfuscation">>, <<"BECF48ED7C96938600F88F486031BCBF9B9C4E3353CC314CA131C347C4C7815C">>}, + {<<"exometer_core">>, <<"E480FBD9CF3BECFCA15805BDD42C39CD855CF701C2F0204BAD9C096C74FDCAA8">>}, + {<<"folsom">>, <<"50ECC998D2149939F1D5E0AA3E32788F8ED16A58E390D81B5C0BE4CC4EF25589">>}, + {<<"gproc">>, <<"853CCB7805E9ADA25D227A157BA966F7B34508F386A3E7E21992B1B484230699">>}, + {<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>}, + {<<"parse_trans">>, <<"6E6AA8167CB44CC8F39441D05193BE6E6F4E7C2946CB2759F015F8C56B76E5FF">>}, + {<<"quickrand">>, <<"06FCAD85CB47D5C85C51D6BC9C84A082501BA098A89D64AD0A2F69599E034C04">>}, + {<<"rabbit_common">>, <<"9797373B8CECB53D762EDA729BAD974D2A5952C8BB498D33E64790F2D42F9A16">>}, + {<<"recon">>, <<"739107B9050EA683C30E96DE050BC59248FD27EC147696F79A8797FF9FA17153">>}, + {<<"setup">>, <<"05F69185A5EB71474C9BC6BA892565651EC7507791F85632B7B914DBFE130510">>}, + {<<"thoas">>, <<"567C03902920827A18A89F05B79A37B5BF93553154B883E0131801600CF02CE0">>}, + {<<"uuid">>, <<"60FAEEB7EDFD40847ED13CB0DD1044BAABE4E79A00C0CA9C4D13A073914B1016">>}]} ]. diff --git a/src/turtle.app.src b/src/turtle.app.src index f3db479..acdb0aa 100644 --- a/src/turtle.app.src +++ b/src/turtle.app.src @@ -11,7 +11,6 @@ amqp_client, exometer_core, gproc, - lager, uuid ]}, {env,[ diff --git a/src/turtle_config.erl b/src/turtle_config.erl index f7dd910..c80ba58 100644 --- a/src/turtle_config.erl +++ b/src/turtle_config.erl @@ -17,25 +17,45 @@ read_params() -> -spec conn_params(map()) -> term(). % @todo fix this typespec conn_params(Ps) -> - #amqp_params_network { - username = username(Ps), - password = password(Ps), - virtual_host = virtual_host(Ps), + case maps:get(ssl_options,Ps,false) of + false -> + #amqp_params_network { + username = username(Ps), + password = password(Ps), + virtual_host = virtual_host(Ps), - channel_max = maps:get(channel_max, Ps, 0), - frame_max = maps:get(frame_max, Ps, 0), - heartbeat = maps:get(heartbeat, Ps, 15) + channel_max = maps:get(channel_max, Ps, 0), + frame_max = maps:get(frame_max, Ps, 0), + heartbeat = maps:get(heartbeat, Ps, 15) + + %% Not setting: + %% - ssl_options + %% - auth_mechanisms + %% - client_properties + }; + _ -> + #amqp_params_network { + username = username(Ps), + password = password(Ps), + virtual_host = virtual_host(Ps), + ssl_options = maps:get(ssl_options,Ps,[]), + + channel_max = maps:get(channel_max, Ps, 0), + frame_max = maps:get(frame_max, Ps, 0), + heartbeat = maps:get(heartbeat, Ps, 15) + + %% Not setting: + %% - auth_mechanisms + %% - client_properties + } + end. - %% Not setting: - %% - ssl_options - %% - auth_mechanisms - %% - client_properties - }. username(#{ username := U }) -> list_to_binary(U). password(#{ password := PW }) -> list_to_binary(PW). virtual_host(#{ virtual_host := VH }) -> list_to_binary(VH). +ssl_options(#{ ssl_options := SO }) -> SO. -spec validate_conn_name(term()) -> ok | unknown_conn_name. diff --git a/src/turtle_conn.erl b/src/turtle_conn.erl index 22cdc11..004b302 100644 --- a/src/turtle_conn.erl +++ b/src/turtle_conn.erl @@ -20,7 +20,8 @@ %% API -export([ conn/1, - close/1 + close/1, + new/1 ]). -export([ @@ -28,11 +29,12 @@ handle_call/3, handle_cast/2, handle_info/2, + stop/1, terminate/2, code_change/3 ]). --define(DEFAULT_RETRY_TIME, 15*1000). +-define(DEFAULT_RETRY_TIME, 5*1000). -define(DEFAULT_ATTEMPT_COUNT, 10). -type network_connection() :: {string(), inet:port_number()}. @@ -74,6 +76,12 @@ call(Loc, Msg) -> gen_server:call(Pid, Msg, 20*1000) end. +new(Configuration) -> + turtle_sup:add_connection(Configuration). + +stop(Name) -> + turtle_sup:stop_connection(Name). + %% CALLBACKS %% ------------------------------------------------------------------- @@ -98,12 +106,12 @@ handle_call(conn, _From, #state { connection = undefined } = State) -> handle_call(conn, _From, #state { connection = Conn } = State) -> {reply, Conn, State}; handle_call(Call, From, State) -> - lager:warning("Unknown call from ~p: ~p", [From, Call]), + logger:warning("Unknown call from ~p: ~p", [From, Call]), {reply, {error, unknown_call}, State}. %% @private handle_cast(Cast, State) -> - lager:warning("Unknown cast: ~p", [Cast]), + logger:warning("Unknown cast: ~p", [Cast]), {noreply, State}. %% @private @@ -124,21 +132,21 @@ handle_info(connect, #state { name = Name, reg(Name), {noreply, ConnectedState}; {error, unknown_host, #state { cg = CG } = NextState} -> - lager:error("Unknown host while connecting to RabbitMQ: ~p", + logger:error("Unknown host while connecting to RabbitMQ: ~p", [group_report(CG)]), {stop, {error, unknown_host}, NextState}; {error,econnrefused, #state { cg = CG } = NextState} -> - lager:info("AMQP Connection refused, retrying in ~Bs: ~p", + logger:info("AMQP Connection refused, retrying in ~Bs: ~p", [Retry div 1000, group_report(CG)]), erlang:send_after(Retry, self(), connect), {noreply, NextState}; {error, timeout, #state { cg = CG } = NextState} -> - lager:warning("Timeout while connecting to RabbitMQ, retrying in ~Bs: ~p", + logger:warning("Timeout while connecting to RabbitMQ, retrying in ~Bs: ~p", [Retry div 1000, group_report(CG)]), erlang:send_after(Retry, self(), connect), {noreply, NextState}; {error, Reason, #state { cg = CG } = NextState} -> - lager:warning("Error connecting to RabbitMQ, reason: ~p, retrying in ~Bs: ~p", + logger:warning("Error connecting to RabbitMQ, reason: ~p, retrying in ~Bs: ~p", [Reason, Retry div 1000, group_report(CG)]), erlang:send_after(Retry, self(), connect), {noreply, NextState} @@ -148,7 +156,7 @@ handle_info({timeout, TRef, deadline}, #state { deadline = TRef } = State) -> handle_info({'DOWN', MRef, process, _Pid, _Reason}, #state { monitor = MRef } = State) -> {stop, connection_died, State}; handle_info(Info, State) -> - lager:warning("Received unknown info-message in turtle_conn: ~p", [Info]), + logger:warning("Received unknown info-message in turtle_conn: ~p", [Info]), {noreply, State}. %% @private @@ -192,10 +200,10 @@ group_next(#conn_group { orig = [{_N, G} | _] = Orig, next = [] } = CG) -> group_next(#conn_group { orig_group = G, next = [{N, []} | Cns] } = CG) -> group_next(CG#conn_group { next = [{N, G} | Cns] }); group_next(#conn_group { attempts = 0, next = Next } = CG) -> - lager:warning("Turtle AMQP Connect: Exhausted every host in group, moving to next group"), + logger:warning("Turtle AMQP Connect: Exhausted every host in group, moving to next group"), case Next of [_] -> - lager:critical( + logger:critical( "Turtle AMQP Connect: " "Exhausted all connection groups, starting over with the first group"), group_next(CG#conn_group { next = [] }); diff --git a/src/turtle_publisher.erl b/src/turtle_publisher.erl index 50aeb7f..830de9c 100644 --- a/src/turtle_publisher.erl +++ b/src/turtle_publisher.erl @@ -5,11 +5,14 @@ -include_lib("amqp_client/include/amqp_client.hrl"). %% Lifetime --export([start_link/3, - start_link/4, - where/1, +-export([ + await/2, child_spec/4, - await/2 + stop/2, + new/2, + start_link/3, + start_link/4, + where/1 ]). %% API @@ -118,6 +121,17 @@ where(N) -> await(N, Timeout) -> gproc:await({n,l,{turtle,publisher,N}}, Timeout). +%% @doc new/2 creates a new publisher and adds to supervisor +%% @end +new(Supervisor,ChildSpec) -> + supervisor:start_child(Supervisor,ChildSpec). + +%% @doc close/2 terminates a publisher process +%% @end +stop(Supervisor,Name) -> + supervisor:terminate_child(Supervisor,Name), + supervisor:delete_child(Supervisor,Name). + %% @doc publish a message asynchronously to RabbitMQ %% %% The specification is that you have to provide all parameters, because experience @@ -186,17 +200,17 @@ handle_call({transfer_ownership, Pid}, _From, #state{name = Name} = State) -> gproc:goodbye(), {stop, normal, ok, State}; handle_call(Call, From, State) -> - lager:warning("Unknown call from ~p: ~p", [From, Call]), + logger:warning("Unknown call from ~p: ~p", [From, Call]), {reply, {error, unknown_call}, State}. %% @private handle_cast(Pub, {initializing, _, _, _, _} = Init) -> %% Messages cast to an initializing publisher are thrown away, but it shouldn't %% happen, so we log them - lager:warning("Publish while initializing: ~p", [Pub]), + logger:warning("Publish while initializing: ~p", [Pub]), {noreply, Init}; handle_cast(Pub, {initializing_takeover, _, _, _, _} = Init) -> - lager:warning("Publish while takeover initialization: ~p", [Pub]), + logger:warning("Publish while takeover initialization: ~p", [Pub]), {noreply, Init}; handle_cast({publish, Pub, Props, Payload}, #state { conn_name = ConnName, name = Name } = InState) -> @@ -209,7 +223,7 @@ handle_cast({publish, Pub, Props, Payload}, {noreply, State} end; handle_cast(Cast, State) -> - lager:warning("Unknown cast: ~p", [Cast]), + logger:warning("Unknown cast: ~p", [Cast]), {noreply, State}. %% @private @@ -282,10 +296,10 @@ handle_info({#'basic.deliver' { delivery_tag = Tag}, Content}, State) -> handle_info(#'basic.consume_ok'{}, State) -> {noreply, State}; handle_info(#'basic.cancel_ok'{}, State) -> - lager:info("Consumption canceled"), + logger:info("Consumption canceled"), {stop, normal, State}; handle_info(Info, State) -> - lager:warning("Received unknown info msg: ~p", [Info]), + logger:warning("Received unknown info msg: ~p", [Info]), {noreply, State}. %% @private diff --git a/src/turtle_service.erl b/src/turtle_service.erl index 583fb9f..8c146ce 100644 --- a/src/turtle_service.erl +++ b/src/turtle_service.erl @@ -10,7 +10,13 @@ -behaviour(supervisor). %% API --export([start_link/1, child_spec/1]). +-export([ + child_spec/1, + new/2, + start_link/1, + stop/2, + validate_config/1 + ]). %% Supervisor callbacks -export([init/1]). @@ -28,6 +34,17 @@ start_link(#{ name := Name } = Conf) -> validate_config(Conf), supervisor:start_link({via, gproc, {n,l,{turtle,service,Name}}}, ?MODULE, [Conf]). +%% @doc Create a new service during runtime +%% @end +new(Supervisor,ServiceChildSpec) -> + supervisor:start_child(Supervisor,ServiceChildSpec). + +%% @doc Stop a service during runtime +%% @end +stop(Supervisor,Name) -> + supervisor:terminate_child(Supervisor, Name), + supervisor:delete_child(Supervisor, Name). + %% @doc Generate a child specification for this supervisor %% The Configuration is a map with the following keys: %% diff --git a/src/turtle_service_mgr.erl b/src/turtle_service_mgr.erl index 1531098..5bf221d 100644 --- a/src/turtle_service_mgr.erl +++ b/src/turtle_service_mgr.erl @@ -78,12 +78,12 @@ handle_call({config_update, PoolName, Config}, _From, #state{} = State) -> add_subscribers(Pool, Config, K), {reply, ok, State#state{conf = Config}}; handle_call(Call, From, State) -> - lager:warning("Unknown call from ~p: ~p", [From, Call]), + logger:warning("Unknown call from ~p: ~p", [From, Call]), {reply, {error, unknown_call}, State}. %% @private handle_cast(Cast, State) -> - lager:warning("Unknown cast: ~p", [Cast]), + logger:warning("Unknown cast: ~p", [Cast]), {noreply, State}. %% @private @@ -100,7 +100,7 @@ handle_info({gproc, Ref, registered, {_, Pid, _}}, {initializing, Ref, #{ name : handle_info({'DOWN', MRef, process, _, Reason}, #state { conn_ref = MRef } = State) -> {stop, {error, {connection_down, Reason}}, State}; handle_info(Info, State) -> - lager:warning("Unknown info msg: ~p", [Info]), + logger:warning("Unknown info msg: ~p", [Info]), {noreply, State}. %% @private diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl index 6e8f01e..e14ce44 100644 --- a/src/turtle_subscriber.erl +++ b/src/turtle_subscriber.erl @@ -71,26 +71,26 @@ init([#{ %% @private handle_call(Call, From, State) -> - lager:warning("Unknown call from ~p: ~p", [From, Call]), + logger:warning("Unknown call from ~p: ~p", [From, Call]), {reply, {error, unknown_call}, State}. %% @private handle_cast(Cast, State) -> - lager:warning("Unknown cast: ~p", [Cast]), + logger:warning("Unknown cast: ~p", [Cast]), {noreply, State}. %% @private handle_info(#'basic.consume_ok'{}, State) -> {noreply, State}; handle_info(#'basic.cancel_ok'{}, State) -> - lager:info("Consumption canceled"), + logger:info("Consumption canceled"), {stop, normal, shutdown(rabbitmq_gone, State)}; handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = single } = State) -> handle_deliver_single(Msg, State); handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = bulk } = State) -> handle_deliver_bulk(Msg, State); handle_info(#'basic.return' {} = Return, #state { name = Name } = State) -> - lager:info("Channel ~p received a return from AMQP: ~p", [Name, Return]), + logger:info("Channel ~p received a return from AMQP: ~p", [Name, Return]), {noreply, State}; handle_info({channel_closed, Ch, Reason}, #state { channel = Ch } = State) -> Exit = case Reason of @@ -103,7 +103,7 @@ handle_info({channel_closed, Ch, Reason}, #state { channel = Ch } = State) -> handle_info({'DOWN', MRef, process, _Pid, _Reason}, #state { monitor = MRef } = State) -> {stop, channel_died, State}; handle_info(Info, #state { handle_info = undefined } = State) -> - lager:warning("Unknown info message: ~p", [Info]), + logger:warning("Unknown info message: ~p", [Info]), {noreply, State}; handle_info(Info, #state { handle_info = HandleInfo, invoke_state = IState } = State) -> S = turtle_time:monotonic_time(), @@ -112,10 +112,10 @@ handle_info(Info, #state { handle_info = HandleInfo, invoke_state = IState } = S {Cmds, IState2} when is_list(Cmds) -> handle_commands(S, Cmds, State#state { invoke_state = IState2 }) catch - Class:Error -> - lager:error("Handle info crashed: {~p, ~p}, stack: ~p", - [Class, Error, erlang:get_stacktrace()]), - {stop, {Class, Error}, State} + Class:Reason:Stacktrace -> + logger:error("Handle info crashed: {~p, ~p}, stack: ~p", + [Class, Reason, Stacktrace]), + {stop, {Class, Reason}, State} end. %% @private @@ -166,12 +166,12 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, {Cmds, S2} when is_list(Cmds) -> handle_commands(S, Cmds, State#state { invoke_state = S2 }) catch - Class:Error -> - lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", - [Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]), - lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]), + Class:Reason:Stacktrace -> + logger:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", + [Class, Reason, Stacktrace, format_amqp_msg(Content)]), + logger:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]), ok = amqp_channel:call(Channel, #'basic.reject' { delivery_tag = Tag, requeue = false }), - {stop, {Class, Error}, State} + {stop, {Class, Reason}, State} end. handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, @@ -196,10 +196,10 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} end, handle_commands(S, Cmds, State#state { invoke_state = S2 }) catch - Class:Error -> - lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", - [Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]), - lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]), + Class:Error:Stacktrace -> + logger:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", + [Class, Error, Stacktrace, format_amqp_msg(Content)]), + logger:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]), ok = amqp_channel:call(Channel, #'basic.reject' { delivery_tag = DTag, requeue = false }), {stop, {Class, Error}, State} end. @@ -300,7 +300,7 @@ handle_info(#{ handle_info := Handler }) -> Handler; handle_info(_) -> undefined. reply(_Ch, {_Tag, undefined, _CorrID}, _CType, _Msg) -> - lager:warning("Replying to target with no reply-to queue defined"), + logger:warning("Replying to target with no reply-to queue defined"), ok; reply(Ch, {_Tag, ReplyTo, CorrID}, CType, Msg) -> Publish = #'basic.publish' { @@ -316,7 +316,7 @@ await_cancel_ok() -> #'basic.cancel_ok'{} -> ok after 500 -> - lager:error("No basic.cancel_ok received"), + logger:error("No basic.cancel_ok received"), not_cancelled end. @@ -349,16 +349,16 @@ shutdown(Reason, #state { handle_info = HandleInfo, invoke_state = IState } = St State#state { invoke_state = IState2 }, Reason)} catch - Class:Error -> - lager:error("Handle info crashed: {~p, ~p}, stack: ~p", - [Class, Error, erlang:get_stacktrace()]), - {stop, {Class, Error}, State} + Class:Reason:Stacktrace -> + logger:error("Handle info crashed: {~p, ~p}, stack: ~p", + [Class, Reason, Stacktrace]), + {stop, {Class, Reason}, State} end. shutdown_process_commands(_S, [], State, _Reason) -> State; shutdown_process_commands(_S, Cmds, State, Reason) -> - lager:warning("Ignoring ~B commands due to ungraceful shutdown: ~p", + logger:warning("Ignoring ~B commands due to ungraceful shutdown: ~p", [length(Cmds), Reason]), State. diff --git a/src/turtle_sup.erl b/src/turtle_sup.erl index 809d83c..566862e 100644 --- a/src/turtle_sup.erl +++ b/src/turtle_sup.erl @@ -10,7 +10,16 @@ -include_lib("amqp_client/include/amqp_client.hrl"). %% API --export([start_link/0]). +-export([ + add_connection/1, + start_link/0, + stop_connection/1 + ]). + +%test +-export([ + remove_application_config/1 + ]). %% Supervisor callbacks -export([init/1]). @@ -24,6 +33,17 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). +add_connection(Configuration) -> + Spec = conn_sup(Configuration), + add_application_config(Configuration), + supervisor:start_child(turtle_sup,Spec). + +stop_connection(Configuration) -> + Name = maps:get(conn_name, Configuration), + remove_application_config(Name), + supervisor:terminate_child(turtle_sup,Name), + supervisor:delete_child(turtle_sup,Name). + %%==================================================================== %% Supervisor callbacks %%==================================================================== @@ -54,3 +74,26 @@ conn_sup(#{conn_name := Name} = Ps) -> shutdown => 5000, type => worker }. + +add_application_config(Configuration) -> + CurrentConfig = application:get_env(turtle, connection_config, []), + NewConfig = CurrentConfig ++ [Configuration], + application:set_env(turtle,connection_config,NewConfig). + +remove_application_config(ConfigName) -> + CurrentConfig = application:get_env(turtle, connection_config, []), + ConfigMap = list_of_maps_to_map(CurrentConfig,conn_name), + NewConfigMap = maps:remove(ConfigName,ConfigMap), + NewConfig = maps:values(NewConfigMap), + application:set_env(turtle,connection_config,NewConfig). + +list_of_maps_to_map(ListOfMaps, Key) -> + list_of_maps_to_map(ListOfMaps, Key, #{}). + +list_of_maps_to_map([],_Key,MapAcc) -> + MapAcc; +list_of_maps_to_map(ListOfMaps,Key,MapAcc) -> + KeyValue = maps:get(Key,hd(ListOfMaps)), + NewMap = hd(ListOfMaps), + MapAcc@1 = maps:put(KeyValue,NewMap,MapAcc), + list_of_maps_to_map(tl(ListOfMaps),Key,MapAcc@1).