Skip to content

Commit

Permalink
Merge pull request inaka#112 from inaka/jfacorro.110.event_data_as_a_…
Browse files Browse the repository at this point in the history
…single_binary

[inaka#110] Return data as a binary
  • Loading branch information
Brujo Benavides committed Nov 5, 2015
2 parents 6009a23 + ef05584 commit 5e0602e
Showing 1 changed file with 72 additions and 75 deletions.
147 changes: 72 additions & 75 deletions src/shotgun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -280,19 +280,20 @@ events(Pid) ->
-spec parse_event(binary()) -> event().
parse_event(EventBin) ->
Lines = binary:split(EventBin, <<"\n">>, [global]),
FoldFun = fun(Line, #{data := DataList} = Event) ->
case Line of
<<"data: ", Data/binary>> ->
Event#{data => [Data | DataList]};
<<"id: ", Id/binary>> ->
Event#{id => Id};
<<"event: ", EventName/binary>> ->
Event#{event => EventName};
<<_Comment/binary>> ->
Event
end
end,
lists:foldr(FoldFun, #{data => []}, Lines).
FoldFun =
fun(Line, #{data := Data} = Event) ->
case Line of
<<"data: ", NewData/binary>> ->
Event#{data => <<Data/binary, NewData/binary, "\n">>};
<<"id: ", Id/binary>> ->
Event#{id => Id};
<<"event: ", EventName/binary>> ->
Event#{event => EventName};
<<_Comment/binary>> ->
Event
end
end,
lists:foldl(FoldFun, #{data => <<>>}, Lines).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_fsm callbacks
Expand Down Expand Up @@ -385,44 +386,44 @@ terminate(_Reason, _StateName, #{pid := Pid} = _State) ->
%% @private
-spec at_rest(any(), state()) -> {next_state, atom(), state()}.
at_rest(timeout, State) ->
case get_work(State) of
no_work ->
{next_state, at_rest, State};
{ok, Work, NewState} ->
ok = gen_fsm:send_event(self(), Work),
{next_state, at_rest, NewState}
end;
case get_work(State) of
no_work ->
{next_state, at_rest, State};
{ok, Work, NewState} ->
ok = gen_fsm:send_event(self(), Work),
{next_state, at_rest, NewState}
end;
at_rest({get_async, {HandleEvent, AsyncMode}, Args, From},
State = #{pid := Pid}) ->
StreamRef = do_http_verb(get, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
from => From,
pid => Pid,
stream => StreamRef,
handle_event => HandleEvent,
async => true,
async_mode => AsyncMode
},
{next_state, wait_response, NewState};
StreamRef = do_http_verb(get, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
from => From,
pid => Pid,
stream => StreamRef,
handle_event => HandleEvent,
async => true,
async_mode => AsyncMode
},
{next_state, wait_response, NewState};
at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
pid => Pid,
stream => StreamRef,
from => From
},
{next_state, wait_response, NewState}.
StreamRef = do_http_verb(HttpVerb, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
pid => Pid,
stream => StreamRef,
from => From
},
{next_state, wait_response, NewState}.

-spec at_rest(term(), pid(), term()) -> term().
at_rest(Event, From, State) ->
enqueue_work_or_stop(at_rest, Event, From, State).
enqueue_work_or_stop(at_rest, Event, From, State).

%% @private
-spec wait_response(term(), pid(), term()) -> term().
wait_response(Event, From, State) ->
enqueue_work_or_stop(wait_response, Event, From, State).
enqueue_work_or_stop(wait_response, Event, From, State).

%% @private
-spec wait_response(term(), term()) -> term().
Expand Down Expand Up @@ -468,7 +469,7 @@ wait_response(Event, State) ->
%% @private
-spec receive_data(term(), pid(), term()) -> term().
receive_data(Event, From, State) ->
enqueue_work_or_stop(receive_data, Event, From, State).
enqueue_work_or_stop(receive_data, Event, From, State).

%% @private
%% @doc Regular response
Expand Down Expand Up @@ -496,7 +497,7 @@ receive_data({gun_error, _Pid, StreamRef, _Reason},
%% @private
-spec receive_chunk(term(), pid(), term()) -> term().
receive_chunk(Event, From, State) ->
enqueue_work_or_stop(receive_chunk, Event, From, State).
enqueue_work_or_stop(receive_chunk, Event, From, State).

%% @private
%% @doc Chunked data response
Expand All @@ -519,16 +520,12 @@ receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @private
clean_state() ->
clean_state(queue:new()).
clean_state() -> clean_state(queue:new()).

%% @private
-spec clean_state(map()) -> map();
(queue:queue()) -> map().
-spec clean_state(map()) -> map(); (queue:queue()) -> map().
clean_state(State) when is_map(State) ->
clean_state(get_pending_reqs(State));

%% @private
clean_state(get_pending_reqs(State));
clean_state(Reqs) ->
#{
pid => undefined,
Expand Down Expand Up @@ -642,50 +639,50 @@ check_uri(_) -> throw(missing_slash_uri).

%% @private
enqueue_work_or_stop(FSM = at_rest, Event, From, State) ->
enqueue_work_or_stop(FSM, Event, From, State, 0);
enqueue_work_or_stop(FSM, Event, From, State, 0);
enqueue_work_or_stop(FSM, Event, From, State) ->
enqueue_work_or_stop(FSM, Event, From, State, infinity).
enqueue_work_or_stop(FSM, Event, From, State, infinity).

%% @private
enqueue_work_or_stop(FSM, Event, From, State, Timeout) ->
case create_work(Event, From) of
{ok, Work} ->
NewState = append_work(Work, State),
{next_state, FSM, NewState, Timeout};
not_work ->
{stop, {unexpected, Event}, State}
end.
case create_work(Event, From) of
{ok, Work} ->
NewState = append_work(Work, State),
{next_state, FSM, NewState, Timeout};
not_work ->
{stop, {unexpected, Event}, State}
end.

%% @private
create_work({M = get_async, {HandleEvent, AsyncMode}, Args}, From) ->
{ok, {M, {HandleEvent, AsyncMode}, Args, From}};
{ok, {M, {HandleEvent, AsyncMode}, Args, From}};
create_work({M, Args}, From)
when M == get orelse M == post
orelse M == delete orelse M == head
orelse M == options orelse M == patch
orelse M == put ->
{ok, {M, Args, From}};
{ok, {M, Args, From}};
create_work(_, _) ->
not_work.
not_work.

%% @private
get_work(State) ->
PendingReqs = maps:get(pending_requests, State),
case queue:is_empty(PendingReqs) of
true ->
no_work;
false ->
{{value, Work}, Rest} = queue:out(PendingReqs),
NewState = State#{pending_requests => Rest},
{ok, Work, NewState}
end.
PendingReqs = maps:get(pending_requests, State),
case queue:is_empty(PendingReqs) of
true ->
no_work;
false ->
{{value, Work}, Rest} = queue:out(PendingReqs),
NewState = State#{pending_requests => Rest},
{ok, Work, NewState}
end.

%% @private
append_work(Work, State) ->
PendingReqs = get_pending_reqs(State),
NewPending = queue:in(Work, PendingReqs),
maps:put(pending_requests, NewPending, State).
PendingReqs = get_pending_reqs(State),
NewPending = queue:in(Work, PendingReqs),
maps:put(pending_requests, NewPending, State).

%% @private
get_pending_reqs(State) ->
maps:get(pending_requests, State).
maps:get(pending_requests, State).

0 comments on commit 5e0602e

Please sign in to comment.