Skip to content

Commit

Permalink
Permits client requests in any FSM state.
Browse files Browse the repository at this point in the history
This commit adds a request queue to the shotgun FSM and three argument
versions of each FSM state function whose only purpose is to ether
stick valid client request data into the work queue and get back to
waiting, or stop the FSM if the client request data was invalid.
create_work/2 describes valid and invalid requests.

This lets us have multiple callers -directly or indirectly- call
shotgun:request while a gun request is still pending. Previously,
if the call happened when we were in any state other than at_rest,
the FSM would probably crash. This also means that if one's request
times out, one can immediately re-submit that request without
crashing shotgun. However, there *are* a few caveats:

* shotgun only processes one request at a time.
* Your new request is put on the end of the request queue.
* Your *previous* request is not cancelled. If shotgun never got
  around to servicing it, or is not done servicing it, then your
  old request that you no longer care about will be serviced before
  your new request.
* If the process that initiated a timed-out request is still alive,
  it will get a message in its mailbox when the request eventually
  succeeds. It also might get a message when the request eventually
  fails, I'm not sure.

The guts of at_rest/3 have been moved to at_rest/2. at_rest/3 now only
sticks client request data into the work queue and triggers the call
of at_rest/2.

Every transition from another state to at_rest now includes a timeout
value of 0. The purpose of at_rest(timeout, State) is to check for work
and either dispatch it to the FSM, or go back to the idle at_rest state.

There are also a few code style changes to appease Elvis.
  • Loading branch information
kennethlakin committed Oct 16, 2015
1 parent 46ddcdc commit 566676f
Showing 1 changed file with 138 additions and 34 deletions.
172 changes: 138 additions & 34 deletions src/shotgun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,21 @@
]).

-export([
at_rest/3,
at_rest/2,
wait_response/2,
receive_data/2,
receive_chunk/2,
parse_event/1
]).

%Work request handlers
-export([
at_rest/3,
wait_response/3,
receive_data/3,
receive_chunk/3
]).

-type response() ::
#{
status_code => integer(),
Expand All @@ -80,7 +88,8 @@
#{
transport_opts => [],
timeout => pos_integer() | infinity }.
%% transport_opts are passed to Ranch's TCP transport, which is -itself- a thin layer over gen_tcp. <br/>
%% transport_opts are passed to Ranch's TCP transport, which is -itself-
%% a thin layer over gen_tcp. <br/>
%% timeout is passed to gun:await_up. Default if not specified is 5000 ms.

%% @doc Starts the application and all the ones it depends on.
Expand Down Expand Up @@ -120,8 +129,11 @@ open(Host, Port, Type) when is_atom(Type) ->
open(Host, Port, Opts) when is_map(Opts) ->
open(Host, Port, http, Opts).

%% @doc Opens a connection of the type provided with the host and port specified and the specified connection timeout and/or Ranch transport options.
-spec open(Host :: string(), Port :: integer(), Type :: connection_type(), Opts :: open_opts()) ->
%% @doc Opens a connection of the type provided with the host and port
%% specified and the specified connection timeout and/or Ranch
%% transport options.
-spec open(Host :: string(), Port :: integer(), Type :: connection_type(),
Opts :: open_opts()) ->
{ok, pid()} | {error, gun_open_failed} | {error, gun_timeout}.
open(Host, Port, Type, Opts) ->
supervisor:start_child(shotgun_sup, [Host, Port, Type, Opts]).
Expand Down Expand Up @@ -277,7 +289,8 @@ parse_event(EventBin) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @private
-spec init([term()]) -> {ok, at_rest, map()} | {stop, gun_open_timeout} | {stop, gun_open_failed}.
-spec init([term()]) -> {ok, at_rest, map()}
| {stop, gun_open_timeout} | {stop, gun_open_failed}.
init([Host, Port, Type, Opts]) ->
GunType = case Type of
http -> tcp;
Expand Down Expand Up @@ -354,29 +367,48 @@ terminate(_Reason, _StateName, #{pid := Pid} = _State) ->
%% gen_fsm states
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%See if we have work. If we do, dispatch.
%If we don't, stay in at_rest.
%% @private
at_rest(timeout, State) ->
case get_work(State) of
no_work ->
{next_state, at_rest, State};
{ok, Work, NewState} ->
ok = gen_fsm:send_event(self(), Work),
{next_state, at_rest, NewState}
end;
at_rest({get_async, {HandleEvent, AsyncMode}, Args, From},
State = #{pid := Pid}) ->
StreamRef = do_http_verb(get, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
from => From,
pid => Pid,
stream => StreamRef,
handle_event => HandleEvent,
async => true,
async_mode => AsyncMode
},
{next_state, wait_response, NewState};
at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
pid => Pid,
stream => StreamRef,
from => From
},
{next_state, wait_response, NewState}.

-spec at_rest(term(), pid(), term()) -> term().
at_rest({get_async, {HandleEvent, AsyncMode}, Args}, From, #{pid := Pid}) ->
StreamRef = do_http_verb(get, Pid, Args),
CleanState = clean_state(),
NewState = CleanState#{
from => From,
pid => Pid,
stream => StreamRef,
handle_event => HandleEvent,
async => true,
async_mode => AsyncMode
},
{next_state, wait_response, NewState};
at_rest({HttpVerb, Args}, From, #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Args),
CleanState = clean_state(),
NewState = CleanState#{
pid => Pid,
stream => StreamRef,
from => From
},
{next_state, wait_response, NewState}.
at_rest(Event, From, State) ->
enqueue_work_or_stop(at_rest, Event, From, State).

%% @private
-spec wait_response(term(), pid(), term()) -> term().
wait_response(Event, From, State) ->
enqueue_work_or_stop(wait_response, Event, From, State).

%% @private
-spec wait_response(term(), term()) -> term().
Expand All @@ -395,7 +427,7 @@ wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers},
true ->
gen_fsm:reply(From, {ok, Response})
end,
{next_state, at_rest, State#{responses => NewResponses}};
{next_state, at_rest, State#{responses => NewResponses}, 0};
wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers},
#{from := From, stream := StreamRef, async := Async} = State) ->
StateName =
Expand All @@ -415,10 +447,15 @@ wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers},
wait_response({gun_error, _Pid, _StreamRef, Error},
#{from := From} = State) ->
gen_fsm:reply(From, {error, Error}),
{next_state, at_rest, State};
{next_state, at_rest, State, 0};
wait_response(Event, State) ->
{stop, {unexpected, Event}, State}.

%% @private
-spec receive_data(term(), pid(), term()) -> term().
receive_data(Event, From, State) ->
enqueue_work_or_stop(receive_data, Event, From, State).

%% @private
%% @doc Regular response
-spec receive_data(term(), term()) -> term().
Expand All @@ -432,15 +469,20 @@ receive_data({gun_data, _Pid, _StreamRef, fin, Data},
#{data := DataAcc, from := From, status_code
:= StatusCode, headers := Headers} = State) ->
NewData = <<DataAcc/binary, Data/binary>>,
Result= {ok, #{status_code => StatusCode,
Result = {ok, #{status_code => StatusCode,
headers => Headers,
body => NewData
}},
gen_fsm:reply(From, Result),
{next_state, at_rest, State};
{next_state, at_rest, State, 0};
receive_data({gun_error, _Pid, StreamRef, _Reason},
#{stream := StreamRef} = State) ->
{next_state, at_rest, State}.
{next_state, at_rest, State, 0}.

%% @private
-spec receive_chunk(term(), pid(), term()) -> term().
receive_chunk(Event, From, State) ->
enqueue_work_or_stop(receive_chunk, Event, From, State).

%% @private
%% @doc Chunked data response
Expand All @@ -451,19 +493,29 @@ receive_chunk({gun_data, _Pid, StreamRef, IsFin, Data}, State) ->
NewState = manage_chunk(IsFin, StreamRef, Data, State),
case IsFin of
fin ->
{next_state, at_rest, NewState};
{next_state, at_rest, NewState, 0};
nofin ->
{next_state, receive_chunk, NewState}
end;
receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) ->
{next_state, at_rest, State}.
{next_state, at_rest, State, 0}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Private
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @private
clean_state() ->
clean_state(queue:new()).

%% @private
-spec clean_state(map()) -> map();
(queue:queue()) -> map().
clean_state(State) when is_map(State) ->
clean_state(get_pending_reqs(State));

%% @private
clean_state(Reqs) ->
#{
pid => undefined,
stream => undefined,
Expand All @@ -475,7 +527,8 @@ clean_state() ->
headers => undefined,
async => false,
async_mode => binary,
buffer => <<"">>
buffer => <<"">>,
pending_requests => Reqs
}.

%% @private
Expand Down Expand Up @@ -572,3 +625,54 @@ sse_events(IsFin, Data, State = #{buffer := Buffer}) ->
%% @private
check_uri([$/ | _]) -> ok;
check_uri(_) -> throw(missing_slash_uri).

%% @private
enqueue_work_or_stop(FSM = at_rest, Event, From, State) ->
enqueue_work_or_stop(FSM, Event, From, State, 0);
enqueue_work_or_stop(FSM, Event, From, State) ->
enqueue_work_or_stop(FSM, Event, From, State, infinity).

%% @private
enqueue_work_or_stop(FSM, Event, From, State, Timeout) ->
case create_work(Event, From) of
{ok, Work} ->
NewState = append_work(Work, State),
{next_state, FSM, NewState, Timeout};
not_work ->
{stop, {unexpected, Event}, State}
end.

%% @private
create_work({M = get_async, {HandleEvent, AsyncMode}, Args}, From) ->
{ok, {M, {HandleEvent, AsyncMode}, Args, From}};
create_work({M, Args}, From)
when M == get orelse M == post
orelse M == delete orelse M == head
orelse M == options orelse M == patch
orelse M == put ->
{ok, {M, Args, From}};
create_work(_, _) ->
not_work.

%% @private
get_work(State) ->
PendingReqs = maps:get(pending_requests, State),
case queue:is_empty(PendingReqs) of
true ->
no_work;
false ->
{{value, Work}, Rest} = queue:out(PendingReqs),
NewState = State#{pending_requests => Rest},
{ok, Work, NewState}
end.

%% @private
append_work(Work, State) ->
PendingReqs = get_pending_reqs(State),
NewPending = queue:in(Work, PendingReqs),
maps:put(pending_requests, NewPending, State).

%% @private
get_pending_reqs(State) ->
maps:get(pending_requests, State).

0 comments on commit 566676f

Please sign in to comment.