diff --git a/src/shotgun.app.src b/src/shotgun.app.src index 4008ca6..3e104e4 100644 --- a/src/shotgun.app.src +++ b/src/shotgun.app.src @@ -2,7 +2,7 @@ [ {description, "better than just a gun"}, - {vsn, "0.1.12"}, + {vsn, "0.1.13"}, {applications, [kernel, stdlib, diff --git a/src/shotgun.erl b/src/shotgun.erl index 6cc082e..9c16fbf 100644 --- a/src/shotgun.erl +++ b/src/shotgun.erl @@ -12,9 +12,10 @@ -export([ start/0, stop/0, - start_link/3, + start_link/4, open/2, open/3, + open/4, close/1, %% get get/2, @@ -48,13 +49,21 @@ ]). -export([ - at_rest/3, + at_rest/2, wait_response/2, receive_data/2, receive_chunk/2, parse_event/1 ]). +%Work request handlers +-export([ + at_rest/3, + wait_response/3, + receive_data/3, + receive_chunk/3 + ]). + -type response() :: #{ status_code => integer(), @@ -75,6 +84,13 @@ -type connection_type() :: http | https. +-type open_opts() :: + #{ + transport_opts => [], + timeout => pos_integer() | infinity }. +%% transport_opts are passed to Ranch's TCP transport, which is -itself- +%% a thin layer over gen_tcp.
+%% timeout is passed to gun:await_up. Default if not specified is 5000 ms. %% @doc Starts the application and all the ones it depends on. -spec start() -> {ok, [atom()]}. @@ -87,26 +103,40 @@ stop() -> application:stop(shotgun). %% @private --spec start_link(string(), integer(), connection_type()) -> +-spec start_link(string(), integer(), connection_type(), open_opts()) -> {ok, pid()} | ignore | {error, term()}. -start_link(Host, Port, Type) -> - gen_fsm:start_link(shotgun, [Host, Port, Type], []). +start_link(Host, Port, Type, Opts) -> + gen_fsm:start_link(shotgun, [Host, Port, Type, Opts], []). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% API %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% @equiv get(Host, Port, http) --spec open(Host :: string(), Port :: integer()) -> {ok, pid()}. +%% @equiv get(Host, Port, http, #{}) +-spec open(Host :: string(), Port :: integer()) -> + {ok, pid()} | {error, gun_open_failed} | {error, gun_timeout}. open(Host, Port) -> open(Host, Port, http). -%% @doc Opens a connection of the type provided with the host in port specified. -spec open(Host :: string(), Port :: integer(), Type :: connection_type()) -> - {ok, pid()}. -open(Host, Port, Type) -> - supervisor:start_child(shotgun_sup, [Host, Port, Type]). - + {ok, pid()} | {error, gun_open_failed} | {error, gun_timeout}; + (Host :: string(), Port :: integer(), Opts :: open_opts()) -> + {ok, pid()} | {error, gun_open_failed} | {error, gun_timeout}. +%% @equiv get(Host, Port, Type, #{}) or get(Host, Port, http, Opts) +open(Host, Port, Type) when is_atom(Type) -> + open(Host, Port, Type, #{}); + +open(Host, Port, Opts) when is_map(Opts) -> + open(Host, Port, http, Opts). + +%% @doc Opens a connection of the type provided with the host and port +%% specified and the specified connection timeout and/or Ranch +%% transport options. +-spec open(Host :: string(), Port :: integer(), Type :: connection_type(), + Opts :: open_opts()) -> + {ok, pid()} | {error, gun_open_failed} | {error, gun_timeout}. +open(Host, Port, Type, Opts) -> + supervisor:start_child(shotgun_sup, [Host, Port, Type, Opts]). %% @doc Closes the connection with the host. -spec close(pid()) -> ok. @@ -259,19 +289,34 @@ parse_event(EventBin) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @private --spec init([term()]) -> term(). -init([Host, Port, Type]) -> +-spec init([term()]) -> {ok, at_rest, map()} + | {stop, gun_open_timeout} | {stop, gun_open_failed}. +init([Host, Port, Type, Opts]) -> GunType = case Type of http -> tcp; https -> ssl end, - Opts = #{transport => GunType, - retry => 1, - retry_timeout => 1 - }, - {ok, Pid} = gun:open(Host, Port, Opts), - State = clean_state(), - {ok, at_rest, State#{pid => Pid}}. + TransportOpts = maps:get(transport_opts, Opts, []), + GunOpts = #{transport => GunType, + retry => 1, + retry_timeout => 1, + transport_opts => TransportOpts + }, + Timeout = maps:get(timeout, Opts, 5000), + {ok, Pid} = gun:open(Host, Port, GunOpts), + case gun:await_up(Pid, Timeout) of + {ok, _} -> + State = clean_state(), + {ok, at_rest, State#{pid => Pid}}; + %The only apparent timeout for gun:open is the connection timeout of the + %underlying transport. So, a timeout message here comes from gun:await_up. + {error, timeout} -> + {stop, gun_open_timeout}; + %gun currently terminates with reason normal if gun:open fails to open + %the requested connection. This bubbles up through gun:await_up. + {error, normal} -> + {stop, gun_open_failed} + end. %% @private -spec handle_event(term(), atom(), term()) -> term(). @@ -322,29 +367,48 @@ terminate(_Reason, _StateName, #{pid := Pid} = _State) -> %% gen_fsm states %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%See if we have work. If we do, dispatch. +%If we don't, stay in at_rest. %% @private +at_rest(timeout, State) -> + case get_work(State) of + no_work -> + {next_state, at_rest, State}; + {ok, Work, NewState} -> + ok = gen_fsm:send_event(self(), Work), + {next_state, at_rest, NewState} + end; +at_rest({get_async, {HandleEvent, AsyncMode}, Args, From}, + State = #{pid := Pid}) -> + StreamRef = do_http_verb(get, Pid, Args), + CleanState = clean_state(State), + NewState = CleanState#{ + from => From, + pid => Pid, + stream => StreamRef, + handle_event => HandleEvent, + async => true, + async_mode => AsyncMode + }, + {next_state, wait_response, NewState}; +at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) -> + StreamRef = do_http_verb(HttpVerb, Pid, Args), + CleanState = clean_state(State), + NewState = CleanState#{ + pid => Pid, + stream => StreamRef, + from => From + }, + {next_state, wait_response, NewState}. + -spec at_rest(term(), pid(), term()) -> term(). -at_rest({get_async, {HandleEvent, AsyncMode}, Args}, From, #{pid := Pid}) -> - StreamRef = do_http_verb(get, Pid, Args), - CleanState = clean_state(), - NewState = CleanState#{ - from => From, - pid => Pid, - stream => StreamRef, - handle_event => HandleEvent, - async => true, - async_mode => AsyncMode - }, - {next_state, wait_response, NewState}; -at_rest({HttpVerb, Args}, From, #{pid := Pid}) -> - StreamRef = do_http_verb(HttpVerb, Pid, Args), - CleanState = clean_state(), - NewState = CleanState#{ - pid => Pid, - stream => StreamRef, - from => From - }, - {next_state, wait_response, NewState}. +at_rest(Event, From, State) -> + enqueue_work_or_stop(at_rest, Event, From, State). + +%% @private +-spec wait_response(term(), pid(), term()) -> term(). +wait_response(Event, From, State) -> + enqueue_work_or_stop(wait_response, Event, From, State). %% @private -spec wait_response(term(), term()) -> term(). @@ -363,7 +427,7 @@ wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers}, true -> gen_fsm:reply(From, {ok, Response}) end, - {next_state, at_rest, State#{responses => NewResponses}}; + {next_state, at_rest, State#{responses => NewResponses}, 0}; wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers}, #{from := From, stream := StreamRef, async := Async} = State) -> StateName = @@ -383,10 +447,15 @@ wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers}, wait_response({gun_error, _Pid, _StreamRef, Error}, #{from := From} = State) -> gen_fsm:reply(From, {error, Error}), - {next_state, at_rest, State}; + {next_state, at_rest, State, 0}; wait_response(Event, State) -> {stop, {unexpected, Event}, State}. +%% @private +-spec receive_data(term(), pid(), term()) -> term(). +receive_data(Event, From, State) -> + enqueue_work_or_stop(receive_data, Event, From, State). + %% @private %% @doc Regular response -spec receive_data(term(), term()) -> term(). @@ -400,15 +469,20 @@ receive_data({gun_data, _Pid, _StreamRef, fin, Data}, #{data := DataAcc, from := From, status_code := StatusCode, headers := Headers} = State) -> NewData = <>, - Result= {ok, #{status_code => StatusCode, + Result = {ok, #{status_code => StatusCode, headers => Headers, body => NewData }}, gen_fsm:reply(From, Result), - {next_state, at_rest, State}; + {next_state, at_rest, State, 0}; receive_data({gun_error, _Pid, StreamRef, _Reason}, #{stream := StreamRef} = State) -> - {next_state, at_rest, State}. + {next_state, at_rest, State, 0}. + +%% @private +-spec receive_chunk(term(), pid(), term()) -> term(). +receive_chunk(Event, From, State) -> + enqueue_work_or_stop(receive_chunk, Event, From, State). %% @private %% @doc Chunked data response @@ -419,12 +493,12 @@ receive_chunk({gun_data, _Pid, StreamRef, IsFin, Data}, State) -> NewState = manage_chunk(IsFin, StreamRef, Data, State), case IsFin of fin -> - {next_state, at_rest, NewState}; + {next_state, at_rest, NewState, 0}; nofin -> {next_state, receive_chunk, NewState} end; receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) -> - {next_state, at_rest, State}. + {next_state, at_rest, State, 0}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Private @@ -432,6 +506,16 @@ receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) -> %% @private clean_state() -> + clean_state(queue:new()). + +%% @private +-spec clean_state(map()) -> map(); + (queue:queue()) -> map(). +clean_state(State) when is_map(State) -> + clean_state(get_pending_reqs(State)); + +%% @private +clean_state(Reqs) -> #{ pid => undefined, stream => undefined, @@ -443,7 +527,8 @@ clean_state() -> headers => undefined, async => false, async_mode => binary, - buffer => <<"">> + buffer => <<"">>, + pending_requests => Reqs }. %% @private @@ -540,3 +625,54 @@ sse_events(IsFin, Data, State = #{buffer := Buffer}) -> %% @private check_uri([$/ | _]) -> ok; check_uri(_) -> throw(missing_slash_uri). + +%% @private +enqueue_work_or_stop(FSM = at_rest, Event, From, State) -> + enqueue_work_or_stop(FSM, Event, From, State, 0); +enqueue_work_or_stop(FSM, Event, From, State) -> + enqueue_work_or_stop(FSM, Event, From, State, infinity). + +%% @private +enqueue_work_or_stop(FSM, Event, From, State, Timeout) -> + case create_work(Event, From) of + {ok, Work} -> + NewState = append_work(Work, State), + {next_state, FSM, NewState, Timeout}; + not_work -> + {stop, {unexpected, Event}, State} + end. + +%% @private +create_work({M = get_async, {HandleEvent, AsyncMode}, Args}, From) -> + {ok, {M, {HandleEvent, AsyncMode}, Args, From}}; +create_work({M, Args}, From) + when M == get orelse M == post + orelse M == delete orelse M == head + orelse M == options orelse M == patch + orelse M == put -> + {ok, {M, Args, From}}; +create_work(_, _) -> + not_work. + +%% @private +get_work(State) -> + PendingReqs = maps:get(pending_requests, State), + case queue:is_empty(PendingReqs) of + true -> + no_work; + false -> + {{value, Work}, Rest} = queue:out(PendingReqs), + NewState = State#{pending_requests => Rest}, + {ok, Work, NewState} + end. + +%% @private +append_work(Work, State) -> + PendingReqs = get_pending_reqs(State), + NewPending = queue:in(Work, PendingReqs), + maps:put(pending_requests, NewPending, State). + +%% @private +get_pending_reqs(State) -> + maps:get(pending_requests, State). +