Skip to content

Commit

Permalink
[#124] Handle SSE for 'Content-Type: text/event-stream' per RFC
Browse files Browse the repository at this point in the history
  • Loading branch information
jfacorro committed Dec 1, 2015
1 parent f045cdd commit 846c229
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 24 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ PROJECT = shotgun
DEPS = gun
dep_gun = git https://github.com/ninenines/gun.git 427230d

SHELL_DEPS = sync
SHELL_DEPS = sync recon
dep_sync = git git://github.com/inaka/sync.git 0.1.3
dep_recon = git https://github.com/ferd/recon 2.2.1

include erlang.mk

Expand Down
70 changes: 47 additions & 23 deletions src/shotgun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,11 @@ at_rest({HttpVerb, {_, _, Body} = Args, From}, State = #{pid := Pid}) ->
-spec wait_response(term(), term()) -> term().
wait_response({'DOWN', _, _, _, Reason}, _State) ->
exit(Reason);
wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers},
#{from := From,
async := Async,
responses := Responses} = State) ->
wait_response({gun_response, _Pid, StreamRef, fin, StatusCode, Headers},
#{ from := From
, stream := StreamRef
, async := Async
, responses := Responses} = State) ->
Response = #{status_code => StatusCode, headers => Headers},
NewResponses =
case Async of
Expand All @@ -504,23 +505,24 @@ wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers},
queue:in(Response, Responses)
end,
{next_state, at_rest, State#{responses => NewResponses}, 0};
wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers},
#{from := From, stream := StreamRef, async := Async} = State) ->
StateName =
case lists:keyfind(<<"transfer-encoding">>, 1, Headers) of
{<<"transfer-encoding">>, <<"chunked">>} when Async == true->
Result = {ok, StreamRef},
gen_fsm:reply(From, Result),
receive_chunk;
_ ->
receive_data
end,
wait_response({gun_response, _Pid, StreamRef, nofin, StatusCode, Headers},
#{ from := From
, stream := StreamRef
, async := Async} = State) ->
StateName = case is_chunked(Headers) orelse is_event_stream(Headers) of
true when Async ->
gen_fsm:reply(From, {ok, StreamRef}),
receive_chunk;
false ->
receive_data
end,
{ next_state
, StateName
, State#{status_code := StatusCode, headers := Headers}
};
wait_response({gun_error, _Pid, _StreamRef, Error},
#{from := From} = State) ->
wait_response({gun_error, _Pid, StreamRef, Error},
#{ from := From
, stream := StreamRef} = State) ->
gen_fsm:reply(From, {error, Error}),
{next_state, at_rest, State, 0};
wait_response(body_chunked,
Expand All @@ -536,12 +538,16 @@ wait_response(Event, State) ->
receive_data({'DOWN', _, _, _, _Reason}, _State) ->
error(incomplete);
receive_data({gun_data, _Pid, StreamRef, nofin, Data},
#{stream := StreamRef, data := DataAcc} = State) ->
#{ stream := StreamRef
, data := DataAcc} = State) ->
NewData = <<DataAcc/binary, Data/binary>>,
{next_state, receive_data, State#{data => NewData}};
receive_data({gun_data, _Pid, _StreamRef, fin, Data},
#{data := DataAcc, from := From, status_code
:= StatusCode, headers := Headers} = State) ->
receive_data({gun_data, _Pid, StreamRef, fin, Data},
#{ data := DataAcc
, from := From
, stream := StreamRef
, status_code := StatusCode
, headers := Headers} = State) ->
NewData = <<DataAcc/binary, Data/binary>>,
Result = {ok, #{status_code => StatusCode,
headers => Headers,
Expand All @@ -558,15 +564,17 @@ receive_data({gun_error, _Pid, StreamRef, _Reason},
-spec receive_chunk(term(), term()) -> term().
receive_chunk({'DOWN', _, _, _, _Reason}, _State) ->
error(incomplete);
receive_chunk({gun_data, _Pid, StreamRef, IsFin, Data}, State) ->
receive_chunk({gun_data, _Pid, StreamRef, IsFin, Data},
#{stream := StreamRef} = State) ->
NewState = manage_chunk(IsFin, StreamRef, Data, State),
case IsFin of
fin ->
{next_state, at_rest, NewState, 0};
nofin ->
{next_state, receive_chunk, NewState}
end;
receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) ->
receive_chunk({gun_error, _Pid, StreamRef, _Reason},
#{stream := StreamRef} = State) ->
{next_state, at_rest, State, 0}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand Down Expand Up @@ -677,6 +685,22 @@ encode_basic_auth([], []) ->
encode_basic_auth(Username, Password) ->
base64:encode(Username ++ [$: | Password]).

%% @private
-spec is_chunked([proplists:property()]) -> boolean().
is_chunked(Headers) ->
<<"chunked">> =:= keyfind(<<"transfer-encoding">>, Headers).

-spec is_event_stream([proplists:property()]) -> boolean().
is_event_stream(Headers) ->
<<"text/event-stream">> =:= keyfind(<<"content-type">>, Headers).

%% @private
keyfind(Key, List) ->
case lists:keyfind(Key, 1, List) of
{_, Value} -> Value;
_ -> undefined
end.

%% @private
sse_events(IsFin, Data, State = #{buffer := Buffer}) ->
NewBuffer = <<Buffer/binary, Data/binary>>,
Expand Down

0 comments on commit 846c229

Please sign in to comment.