Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SSL configuration #46

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
direnv 2.32.1
erlang 24.3.4.8
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ become worse.

- Upgrade dependencies; exometer_core and lager

* *Version 1.9.5* - SSL and Dynamic Creation

- Can now specify ssl_options in connection_config in config.system

- Can now dynamically create Connections, Services, and Publishers
during runtime instead of only in configuration or Supervisors:

- turtle_conn:new(Name,Config)
- turtle_service:new(Supervisor,ServiceSpec)
- turtle_publisher:new(Supervisor,PublisherSpec)

* *Version 1.9.4* - Introduce Connection Name Validation

- Connection names are now validated when a publisher or a service
Expand Down
15 changes: 7 additions & 8 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
{erl_opts, [
debug_info,
{parse_transform, lager_transform}
debug_info
]}.

{deps, [
{lager, "3.6.7"},
{rabbit_common, "3.7.12"},
{amqp_client, "3.7.12"},
{exometer_core, "1.5.7"},
{gproc, "0.8.0"},
{uuid, "1.7.1", {pkg, uuid_erl}}
{amqp_client, "3.11.13"},
{amqp_client, "3.11.13"},
{exometer_core, "1.6.2"},
{exometer_core , ".*", {git, "https://github.com/Feuerlabs/exometer_core.git", {branch, "master"}}},
{gproc, "0.9.0"},
{uuid, "2.0.5", {pkg, uuid_erl}}
]}.

{overrides,
Expand Down
62 changes: 30 additions & 32 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
{"1.1.0",
[{<<"amqp_client">>,{pkg,<<"amqp_client">>,<<"3.7.12">>},0},
{<<"bear">>,{pkg,<<"bear">>,<<"0.8.7">>},2},
{<<"exometer_core">>,{pkg,<<"exometer_core">>,<<"1.5.7">>},0},
{<<"folsom">>,{pkg,<<"folsom">>,<<"0.8.7">>},1},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},0},
{<<"hut">>,{pkg,<<"hut">>,<<"1.2.1">>},1},
{<<"jsx">>,{pkg,<<"jsx">>,<<"2.9.0">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.6.7">>},0},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.0">>},1},
{<<"quickrand">>,{pkg,<<"quickrand">>,<<"1.7.5">>},1},
{<<"rabbit_common">>,{pkg,<<"rabbit_common">>,<<"3.7.12">>},0},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.7.1">>},1},
{<<"recon">>,{pkg,<<"recon">>,<<"2.3.6">>},1},
{<<"setup">>,{pkg,<<"setup">>,<<"2.0.2">>},1},
{<<"uuid">>,{pkg,<<"uuid_erl">>,<<"1.7.1">>},0}]}.
[{<<"amqp_client">>,{pkg,<<"amqp_client">>,<<"3.11.13">>},0},
{<<"bear">>,{pkg,<<"bear">>,<<"1.0.0">>},2},
{<<"credentials_obfuscation">>,
{pkg,<<"credentials_obfuscation">>,<<"3.2.0">>},
2},
{<<"exometer_core">>,{pkg,<<"exometer_core">>,<<"1.6.2">>},0},
{<<"folsom">>,{pkg,<<"folsom">>,<<"1.0.0">>},1},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0},
{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},1},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.4.1">>},1},
{<<"quickrand">>,{pkg,<<"quickrand">>,<<"2.0.5">>},1},
{<<"rabbit_common">>,{pkg,<<"rabbit_common">>,<<"3.11.13">>},1},
{<<"recon">>,{pkg,<<"recon">>,<<"2.5.3">>},2},
{<<"setup">>,{pkg,<<"setup">>,<<"2.1.0">>},1},
{<<"thoas">>,{pkg,<<"thoas">>,<<"1.0.0">>},2},
{<<"uuid">>,{pkg,<<"uuid_erl">>,<<"2.0.5">>},0}]}.
[
{pkg_hash,[
{<<"amqp_client">>, <<"5ACCC1EF354E19B8200D48C72D2F68D6327672B8C8CBD1DCD1556264F348DAFD">>},
{<<"bear">>, <<"16264309AE5D005D03718A5C82641FCC259C9E8F09ADEB6FD79CA4271168656F">>},
{<<"exometer_core">>, <<"AB97E34A5D69AB14E6AE161DB4CCA5B5E655E635B842F830EE6AB2CBFCFDC30A">>},
{<<"folsom">>, <<"A885F0AEEE4C84270954C88A55A5A473D6B2C7493E32FFDC5765412DD555A951">>},
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>},
{<<"hut">>, <<"08D46679523043424870723923971889E8A34D63B2F946A35B46CF921D1236E7">>},
{<<"jsx">>, <<"D2F6E5F069C00266CAD52FB15D87C428579EA4D7D73A33669E12679E203329DD">>},
{<<"lager">>, <<"2FBF823944CAA0FC10DF5EC13F3F047524A249BB32F0D801B7900C9610264286">>},
{<<"parse_trans">>, <<"09765507A3C7590A784615CFD421D101AEC25098D50B89D7AA1D66646BC571C1">>},
{<<"quickrand">>, <<"E3086A153EB13A057FC19192D05E2D4C6BB2BDBB55746A699BEAE9847AC17CA8">>},
{<<"rabbit_common">>, <<"26B8C10BF8B7064FDF44792F6B2D8DE79F5E1BD6B36C4B1C318FB3953C1E5D86">>},
{<<"ranch">>, <<"6B1FAB51B49196860B733A49C07604465A47BDB78AA10C1C16A3D199F7F8C881">>},
{<<"recon">>, <<"2BCAD0CF621FB277CABBB6413159CD3AA30265C2DEE42C968697988B30108604">>},
{<<"setup">>, <<"1203F4CDA11306C2E34434244576DED0A7BBFB0908D9A572356C809BD0CDF085">>},
{<<"uuid">>, <<"252D12D1154BC75C40BC03E2E67714BD843DDFAD40FD3BC1541F65F622C9E7FC">>}]}
{<<"amqp_client">>, <<"DDCF0859E80E56081B2436DE0906D13FA616E62775A32422F34E0EFAEBEC0011">>},
{<<"bear">>, <<"430419C1126B477686CDE843E88BA0F2C7DC5CDF0881C677500074F704339A99">>},
{<<"credentials_obfuscation">>, <<"BECF48ED7C96938600F88F486031BCBF9B9C4E3353CC314CA131C347C4C7815C">>},
{<<"exometer_core">>, <<"E480FBD9CF3BECFCA15805BDD42C39CD855CF701C2F0204BAD9C096C74FDCAA8">>},
{<<"folsom">>, <<"50ECC998D2149939F1D5E0AA3E32788F8ED16A58E390D81B5C0BE4CC4EF25589">>},
{<<"gproc">>, <<"853CCB7805E9ADA25D227A157BA966F7B34508F386A3E7E21992B1B484230699">>},
{<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>},
{<<"parse_trans">>, <<"6E6AA8167CB44CC8F39441D05193BE6E6F4E7C2946CB2759F015F8C56B76E5FF">>},
{<<"quickrand">>, <<"06FCAD85CB47D5C85C51D6BC9C84A082501BA098A89D64AD0A2F69599E034C04">>},
{<<"rabbit_common">>, <<"9797373B8CECB53D762EDA729BAD974D2A5952C8BB498D33E64790F2D42F9A16">>},
{<<"recon">>, <<"739107B9050EA683C30E96DE050BC59248FD27EC147696F79A8797FF9FA17153">>},
{<<"setup">>, <<"05F69185A5EB71474C9BC6BA892565651EC7507791F85632B7B914DBFE130510">>},
{<<"thoas">>, <<"567C03902920827A18A89F05B79A37B5BF93553154B883E0131801600CF02CE0">>},
{<<"uuid">>, <<"60FAEEB7EDFD40847ED13CB0DD1044BAABE4E79A00C0CA9C4D13A073914B1016">>}]}
].
1 change: 0 additions & 1 deletion src/turtle.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
amqp_client,
exometer_core,
gproc,
lager,
uuid
]},
{env,[
Expand Down
44 changes: 32 additions & 12 deletions src/turtle_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,45 @@ read_params() ->

-spec conn_params(map()) -> term(). % @todo fix this typespec
conn_params(Ps) ->
#amqp_params_network {
username = username(Ps),
password = password(Ps),
virtual_host = virtual_host(Ps),
case maps:get(ssl_options,Ps,false) of
false ->
#amqp_params_network {
username = username(Ps),
password = password(Ps),
virtual_host = virtual_host(Ps),

channel_max = maps:get(channel_max, Ps, 0),
frame_max = maps:get(frame_max, Ps, 0),
heartbeat = maps:get(heartbeat, Ps, 15)
channel_max = maps:get(channel_max, Ps, 0),
frame_max = maps:get(frame_max, Ps, 0),
heartbeat = maps:get(heartbeat, Ps, 15)

%% Not setting:
%% - ssl_options
%% - auth_mechanisms
%% - client_properties
};
_ ->
#amqp_params_network {
username = username(Ps),
password = password(Ps),
virtual_host = virtual_host(Ps),
ssl_options = maps:get(ssl_options,Ps,[]),

channel_max = maps:get(channel_max, Ps, 0),
frame_max = maps:get(frame_max, Ps, 0),
heartbeat = maps:get(heartbeat, Ps, 15)

%% Not setting:
%% - auth_mechanisms
%% - client_properties
}
end.

%% Not setting:
%% - ssl_options
%% - auth_mechanisms
%% - client_properties
}.


username(#{ username := U }) -> list_to_binary(U).
password(#{ password := PW }) -> list_to_binary(PW).
virtual_host(#{ virtual_host := VH }) -> list_to_binary(VH).
ssl_options(#{ ssl_options := SO }) -> SO.


-spec validate_conn_name(term()) -> ok | unknown_conn_name.
Expand Down
30 changes: 19 additions & 11 deletions src/turtle_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@
%% API
-export([
conn/1,
close/1
close/1,
new/1
]).

-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
stop/1,
terminate/2,
code_change/3
]).

-define(DEFAULT_RETRY_TIME, 15*1000).
-define(DEFAULT_RETRY_TIME, 5*1000).
-define(DEFAULT_ATTEMPT_COUNT, 10).

-type network_connection() :: {string(), inet:port_number()}.
Expand Down Expand Up @@ -74,6 +76,12 @@ call(Loc, Msg) ->
gen_server:call(Pid, Msg, 20*1000)
end.

new(Configuration) ->
turtle_sup:add_connection(Configuration).

stop(Name) ->
turtle_sup:stop_connection(Name).

%% CALLBACKS
%% -------------------------------------------------------------------

Expand All @@ -98,12 +106,12 @@ handle_call(conn, _From, #state { connection = undefined } = State) ->
handle_call(conn, _From, #state { connection = Conn } = State) ->
{reply, Conn, State};
handle_call(Call, From, State) ->
lager:warning("Unknown call from ~p: ~p", [From, Call]),
logger:warning("Unknown call from ~p: ~p", [From, Call]),
{reply, {error, unknown_call}, State}.

%% @private
handle_cast(Cast, State) ->
lager:warning("Unknown cast: ~p", [Cast]),
logger:warning("Unknown cast: ~p", [Cast]),
{noreply, State}.

%% @private
Expand All @@ -124,21 +132,21 @@ handle_info(connect, #state { name = Name,
reg(Name),
{noreply, ConnectedState};
{error, unknown_host, #state { cg = CG } = NextState} ->
lager:error("Unknown host while connecting to RabbitMQ: ~p",
logger:error("Unknown host while connecting to RabbitMQ: ~p",
[group_report(CG)]),
{stop, {error, unknown_host}, NextState};
{error,econnrefused, #state { cg = CG } = NextState} ->
lager:info("AMQP Connection refused, retrying in ~Bs: ~p",
logger:info("AMQP Connection refused, retrying in ~Bs: ~p",
[Retry div 1000, group_report(CG)]),
erlang:send_after(Retry, self(), connect),
{noreply, NextState};
{error, timeout, #state { cg = CG } = NextState} ->
lager:warning("Timeout while connecting to RabbitMQ, retrying in ~Bs: ~p",
logger:warning("Timeout while connecting to RabbitMQ, retrying in ~Bs: ~p",
[Retry div 1000, group_report(CG)]),
erlang:send_after(Retry, self(), connect),
{noreply, NextState};
{error, Reason, #state { cg = CG } = NextState} ->
lager:warning("Error connecting to RabbitMQ, reason: ~p, retrying in ~Bs: ~p",
logger:warning("Error connecting to RabbitMQ, reason: ~p, retrying in ~Bs: ~p",
[Reason, Retry div 1000, group_report(CG)]),
erlang:send_after(Retry, self(), connect),
{noreply, NextState}
Expand All @@ -148,7 +156,7 @@ handle_info({timeout, TRef, deadline}, #state { deadline = TRef } = State) ->
handle_info({'DOWN', MRef, process, _Pid, _Reason}, #state { monitor = MRef } = State) ->
{stop, connection_died, State};
handle_info(Info, State) ->
lager:warning("Received unknown info-message in turtle_conn: ~p", [Info]),
logger:warning("Received unknown info-message in turtle_conn: ~p", [Info]),
{noreply, State}.

%% @private
Expand Down Expand Up @@ -192,10 +200,10 @@ group_next(#conn_group { orig = [{_N, G} | _] = Orig, next = [] } = CG) ->
group_next(#conn_group { orig_group = G, next = [{N, []} | Cns] } = CG) ->
group_next(CG#conn_group { next = [{N, G} | Cns] });
group_next(#conn_group { attempts = 0, next = Next } = CG) ->
lager:warning("Turtle AMQP Connect: Exhausted every host in group, moving to next group"),
logger:warning("Turtle AMQP Connect: Exhausted every host in group, moving to next group"),
case Next of
[_] ->
lager:critical(
logger:critical(
"Turtle AMQP Connect: "
"Exhausted all connection groups, starting over with the first group"),
group_next(CG#conn_group { next = [] });
Expand Down
34 changes: 24 additions & 10 deletions src/turtle_publisher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
-include_lib("amqp_client/include/amqp_client.hrl").

%% Lifetime
-export([start_link/3,
start_link/4,
where/1,
-export([
await/2,
child_spec/4,
await/2
stop/2,
new/2,
start_link/3,
start_link/4,
where/1
]).

%% API
Expand Down Expand Up @@ -118,6 +121,17 @@ where(N) ->
await(N, Timeout) ->
gproc:await({n,l,{turtle,publisher,N}}, Timeout).

%% @doc new/2 creates a new publisher and adds to supervisor
%% @end
new(Supervisor,ChildSpec) ->
supervisor:start_child(Supervisor,ChildSpec).

%% @doc close/2 terminates a publisher process
%% @end
stop(Supervisor,Name) ->
supervisor:terminate_child(Supervisor,Name),
supervisor:delete_child(Supervisor,Name).

%% @doc publish a message asynchronously to RabbitMQ
%%
%% The specification is that you have to provide all parameters, because experience
Expand Down Expand Up @@ -186,17 +200,17 @@ handle_call({transfer_ownership, Pid}, _From, #state{name = Name} = State) ->
gproc:goodbye(),
{stop, normal, ok, State};
handle_call(Call, From, State) ->
lager:warning("Unknown call from ~p: ~p", [From, Call]),
logger:warning("Unknown call from ~p: ~p", [From, Call]),
{reply, {error, unknown_call}, State}.

%% @private
handle_cast(Pub, {initializing, _, _, _, _} = Init) ->
%% Messages cast to an initializing publisher are thrown away, but it shouldn't
%% happen, so we log them
lager:warning("Publish while initializing: ~p", [Pub]),
logger:warning("Publish while initializing: ~p", [Pub]),
{noreply, Init};
handle_cast(Pub, {initializing_takeover, _, _, _, _} = Init) ->
lager:warning("Publish while takeover initialization: ~p", [Pub]),
logger:warning("Publish while takeover initialization: ~p", [Pub]),
{noreply, Init};
handle_cast({publish, Pub, Props, Payload},
#state { conn_name = ConnName, name = Name } = InState) ->
Expand All @@ -209,7 +223,7 @@ handle_cast({publish, Pub, Props, Payload},
{noreply, State}
end;
handle_cast(Cast, State) ->
lager:warning("Unknown cast: ~p", [Cast]),
logger:warning("Unknown cast: ~p", [Cast]),
{noreply, State}.

%% @private
Expand Down Expand Up @@ -282,10 +296,10 @@ handle_info({#'basic.deliver' { delivery_tag = Tag}, Content}, State) ->
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State};
handle_info(#'basic.cancel_ok'{}, State) ->
lager:info("Consumption canceled"),
logger:info("Consumption canceled"),
{stop, normal, State};
handle_info(Info, State) ->
lager:warning("Received unknown info msg: ~p", [Info]),
logger:warning("Received unknown info msg: ~p", [Info]),
{noreply, State}.

%% @private
Expand Down
19 changes: 18 additions & 1 deletion src/turtle_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
-behaviour(supervisor).

%% API
-export([start_link/1, child_spec/1]).
-export([
child_spec/1,
new/2,
start_link/1,
stop/2,
validate_config/1
]).

%% Supervisor callbacks
-export([init/1]).
Expand All @@ -28,6 +34,17 @@ start_link(#{ name := Name } = Conf) ->
validate_config(Conf),
supervisor:start_link({via, gproc, {n,l,{turtle,service,Name}}}, ?MODULE, [Conf]).

%% @doc Create a new service during runtime
%% @end
new(Supervisor,ServiceChildSpec) ->
supervisor:start_child(Supervisor,ServiceChildSpec).

%% @doc Stop a service during runtime
%% @end
stop(Supervisor,Name) ->
supervisor:terminate_child(Supervisor, Name),
supervisor:delete_child(Supervisor, Name).

%% @doc Generate a child specification for this supervisor
%% The Configuration is a map with the following keys:
%%
Expand Down
Loading