Skip to content

Commit

Permalink
Merge pull request inaka#100 from kennethlakin/request-fixes
Browse files Browse the repository at this point in the history
shotgun FSM fixes
  • Loading branch information
jfacorro committed Oct 23, 2015
2 parents e327cb0 + 566676f commit 1b5bb0d
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/shotgun.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[
{description,
"better than just a gun"},
{vsn, "0.1.12"},
{vsn, "0.1.13"},
{applications,
[kernel,
stdlib,
Expand Down
236 changes: 186 additions & 50 deletions src/shotgun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
-export([
start/0,
stop/0,
start_link/3,
start_link/4,
open/2,
open/3,
open/4,
close/1,
%% get
get/2,
Expand Down Expand Up @@ -48,13 +49,21 @@
]).

-export([
at_rest/3,
at_rest/2,
wait_response/2,
receive_data/2,
receive_chunk/2,
parse_event/1
]).

%Work request handlers
-export([
at_rest/3,
wait_response/3,
receive_data/3,
receive_chunk/3
]).

-type response() ::
#{
status_code => integer(),
Expand All @@ -75,6 +84,13 @@

-type connection_type() :: http | https.

-type open_opts() ::
#{
transport_opts => [],
timeout => pos_integer() | infinity }.
%% transport_opts are passed to Ranch's TCP transport, which is -itself-
%% a thin layer over gen_tcp. <br/>
%% timeout is passed to gun:await_up. Default if not specified is 5000 ms.

%% @doc Starts the application and all the ones it depends on.
-spec start() -> {ok, [atom()]}.
Expand All @@ -87,26 +103,40 @@ stop() ->
application:stop(shotgun).

%% @private
-spec start_link(string(), integer(), connection_type()) ->
-spec start_link(string(), integer(), connection_type(), open_opts()) ->
{ok, pid()} | ignore | {error, term()}.
start_link(Host, Port, Type) ->
gen_fsm:start_link(shotgun, [Host, Port, Type], []).
start_link(Host, Port, Type, Opts) ->
gen_fsm:start_link(shotgun, [Host, Port, Type, Opts], []).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @equiv get(Host, Port, http)
-spec open(Host :: string(), Port :: integer()) -> {ok, pid()}.
%% @equiv get(Host, Port, http, #{})
-spec open(Host :: string(), Port :: integer()) ->
{ok, pid()} | {error, gun_open_failed} | {error, gun_timeout}.
open(Host, Port) ->
open(Host, Port, http).

%% @doc Opens a connection of the type provided with the host in port specified.
-spec open(Host :: string(), Port :: integer(), Type :: connection_type()) ->
{ok, pid()}.
open(Host, Port, Type) ->
supervisor:start_child(shotgun_sup, [Host, Port, Type]).

{ok, pid()} | {error, gun_open_failed} | {error, gun_timeout};
(Host :: string(), Port :: integer(), Opts :: open_opts()) ->
{ok, pid()} | {error, gun_open_failed} | {error, gun_timeout}.
%% @equiv get(Host, Port, Type, #{}) or get(Host, Port, http, Opts)
open(Host, Port, Type) when is_atom(Type) ->
open(Host, Port, Type, #{});

open(Host, Port, Opts) when is_map(Opts) ->
open(Host, Port, http, Opts).

%% @doc Opens a connection of the type provided with the host and port
%% specified and the specified connection timeout and/or Ranch
%% transport options.
-spec open(Host :: string(), Port :: integer(), Type :: connection_type(),
Opts :: open_opts()) ->
{ok, pid()} | {error, gun_open_failed} | {error, gun_timeout}.
open(Host, Port, Type, Opts) ->
supervisor:start_child(shotgun_sup, [Host, Port, Type, Opts]).

%% @doc Closes the connection with the host.
-spec close(pid()) -> ok.
Expand Down Expand Up @@ -259,19 +289,34 @@ parse_event(EventBin) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @private
-spec init([term()]) -> term().
init([Host, Port, Type]) ->
-spec init([term()]) -> {ok, at_rest, map()}
| {stop, gun_open_timeout} | {stop, gun_open_failed}.
init([Host, Port, Type, Opts]) ->
GunType = case Type of
http -> tcp;
https -> ssl
end,
Opts = #{transport => GunType,
retry => 1,
retry_timeout => 1
},
{ok, Pid} = gun:open(Host, Port, Opts),
State = clean_state(),
{ok, at_rest, State#{pid => Pid}}.
TransportOpts = maps:get(transport_opts, Opts, []),
GunOpts = #{transport => GunType,
retry => 1,
retry_timeout => 1,
transport_opts => TransportOpts
},
Timeout = maps:get(timeout, Opts, 5000),
{ok, Pid} = gun:open(Host, Port, GunOpts),
case gun:await_up(Pid, Timeout) of
{ok, _} ->
State = clean_state(),
{ok, at_rest, State#{pid => Pid}};
%The only apparent timeout for gun:open is the connection timeout of the
%underlying transport. So, a timeout message here comes from gun:await_up.
{error, timeout} ->
{stop, gun_open_timeout};
%gun currently terminates with reason normal if gun:open fails to open
%the requested connection. This bubbles up through gun:await_up.
{error, normal} ->
{stop, gun_open_failed}
end.

%% @private
-spec handle_event(term(), atom(), term()) -> term().
Expand Down Expand Up @@ -322,29 +367,48 @@ terminate(_Reason, _StateName, #{pid := Pid} = _State) ->
%% gen_fsm states
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%See if we have work. If we do, dispatch.
%If we don't, stay in at_rest.
%% @private
at_rest(timeout, State) ->
case get_work(State) of
no_work ->
{next_state, at_rest, State};
{ok, Work, NewState} ->
ok = gen_fsm:send_event(self(), Work),
{next_state, at_rest, NewState}
end;
at_rest({get_async, {HandleEvent, AsyncMode}, Args, From},
State = #{pid := Pid}) ->
StreamRef = do_http_verb(get, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
from => From,
pid => Pid,
stream => StreamRef,
handle_event => HandleEvent,
async => true,
async_mode => AsyncMode
},
{next_state, wait_response, NewState};
at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
pid => Pid,
stream => StreamRef,
from => From
},
{next_state, wait_response, NewState}.

-spec at_rest(term(), pid(), term()) -> term().
at_rest({get_async, {HandleEvent, AsyncMode}, Args}, From, #{pid := Pid}) ->
StreamRef = do_http_verb(get, Pid, Args),
CleanState = clean_state(),
NewState = CleanState#{
from => From,
pid => Pid,
stream => StreamRef,
handle_event => HandleEvent,
async => true,
async_mode => AsyncMode
},
{next_state, wait_response, NewState};
at_rest({HttpVerb, Args}, From, #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Args),
CleanState = clean_state(),
NewState = CleanState#{
pid => Pid,
stream => StreamRef,
from => From
},
{next_state, wait_response, NewState}.
at_rest(Event, From, State) ->
enqueue_work_or_stop(at_rest, Event, From, State).

%% @private
-spec wait_response(term(), pid(), term()) -> term().
wait_response(Event, From, State) ->
enqueue_work_or_stop(wait_response, Event, From, State).

%% @private
-spec wait_response(term(), term()) -> term().
Expand All @@ -363,7 +427,7 @@ wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers},
true ->
gen_fsm:reply(From, {ok, Response})
end,
{next_state, at_rest, State#{responses => NewResponses}};
{next_state, at_rest, State#{responses => NewResponses}, 0};
wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers},
#{from := From, stream := StreamRef, async := Async} = State) ->
StateName =
Expand All @@ -383,10 +447,15 @@ wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers},
wait_response({gun_error, _Pid, _StreamRef, Error},
#{from := From} = State) ->
gen_fsm:reply(From, {error, Error}),
{next_state, at_rest, State};
{next_state, at_rest, State, 0};
wait_response(Event, State) ->
{stop, {unexpected, Event}, State}.

%% @private
-spec receive_data(term(), pid(), term()) -> term().
receive_data(Event, From, State) ->
enqueue_work_or_stop(receive_data, Event, From, State).

%% @private
%% @doc Regular response
-spec receive_data(term(), term()) -> term().
Expand All @@ -400,15 +469,20 @@ receive_data({gun_data, _Pid, _StreamRef, fin, Data},
#{data := DataAcc, from := From, status_code
:= StatusCode, headers := Headers} = State) ->
NewData = <<DataAcc/binary, Data/binary>>,
Result= {ok, #{status_code => StatusCode,
Result = {ok, #{status_code => StatusCode,
headers => Headers,
body => NewData
}},
gen_fsm:reply(From, Result),
{next_state, at_rest, State};
{next_state, at_rest, State, 0};
receive_data({gun_error, _Pid, StreamRef, _Reason},
#{stream := StreamRef} = State) ->
{next_state, at_rest, State}.
{next_state, at_rest, State, 0}.

%% @private
-spec receive_chunk(term(), pid(), term()) -> term().
receive_chunk(Event, From, State) ->
enqueue_work_or_stop(receive_chunk, Event, From, State).

%% @private
%% @doc Chunked data response
Expand All @@ -419,19 +493,29 @@ receive_chunk({gun_data, _Pid, StreamRef, IsFin, Data}, State) ->
NewState = manage_chunk(IsFin, StreamRef, Data, State),
case IsFin of
fin ->
{next_state, at_rest, NewState};
{next_state, at_rest, NewState, 0};
nofin ->
{next_state, receive_chunk, NewState}
end;
receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) ->
{next_state, at_rest, State}.
{next_state, at_rest, State, 0}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Private
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @private
clean_state() ->
clean_state(queue:new()).

%% @private
-spec clean_state(map()) -> map();
(queue:queue()) -> map().
clean_state(State) when is_map(State) ->
clean_state(get_pending_reqs(State));

%% @private
clean_state(Reqs) ->
#{
pid => undefined,
stream => undefined,
Expand All @@ -443,7 +527,8 @@ clean_state() ->
headers => undefined,
async => false,
async_mode => binary,
buffer => <<"">>
buffer => <<"">>,
pending_requests => Reqs
}.

%% @private
Expand Down Expand Up @@ -540,3 +625,54 @@ sse_events(IsFin, Data, State = #{buffer := Buffer}) ->
%% @private
check_uri([$/ | _]) -> ok;
check_uri(_) -> throw(missing_slash_uri).

%% @private
enqueue_work_or_stop(FSM = at_rest, Event, From, State) ->
enqueue_work_or_stop(FSM, Event, From, State, 0);
enqueue_work_or_stop(FSM, Event, From, State) ->
enqueue_work_or_stop(FSM, Event, From, State, infinity).

%% @private
enqueue_work_or_stop(FSM, Event, From, State, Timeout) ->
case create_work(Event, From) of
{ok, Work} ->
NewState = append_work(Work, State),
{next_state, FSM, NewState, Timeout};
not_work ->
{stop, {unexpected, Event}, State}
end.

%% @private
create_work({M = get_async, {HandleEvent, AsyncMode}, Args}, From) ->
{ok, {M, {HandleEvent, AsyncMode}, Args, From}};
create_work({M, Args}, From)
when M == get orelse M == post
orelse M == delete orelse M == head
orelse M == options orelse M == patch
orelse M == put ->
{ok, {M, Args, From}};
create_work(_, _) ->
not_work.

%% @private
get_work(State) ->
PendingReqs = maps:get(pending_requests, State),
case queue:is_empty(PendingReqs) of
true ->
no_work;
false ->
{{value, Work}, Rest} = queue:out(PendingReqs),
NewState = State#{pending_requests => Rest},
{ok, Work, NewState}
end.

%% @private
append_work(Work, State) ->
PendingReqs = get_pending_reqs(State),
NewPending = queue:in(Work, PendingReqs),
maps:put(pending_requests, NewPending, State).

%% @private
get_pending_reqs(State) ->
maps:get(pending_requests, State).

0 comments on commit 1b5bb0d

Please sign in to comment.