From ef055848b2394d00e523bc43bdb960910840a387 Mon Sep 17 00:00:00 2001 From: Juan Facorro Date: Mon, 2 Nov 2015 15:20:22 -0300 Subject: [PATCH] [#110] Return data as a binary --- src/shotgun.erl | 147 ++++++++++++++++++++++++------------------------ 1 file changed, 72 insertions(+), 75 deletions(-) diff --git a/src/shotgun.erl b/src/shotgun.erl index 592446b..ead516f 100644 --- a/src/shotgun.erl +++ b/src/shotgun.erl @@ -280,19 +280,20 @@ events(Pid) -> -spec parse_event(binary()) -> event(). parse_event(EventBin) -> Lines = binary:split(EventBin, <<"\n">>, [global]), - FoldFun = fun(Line, #{data := DataList} = Event) -> - case Line of - <<"data: ", Data/binary>> -> - Event#{data => [Data | DataList]}; - <<"id: ", Id/binary>> -> - Event#{id => Id}; - <<"event: ", EventName/binary>> -> - Event#{event => EventName}; - <<_Comment/binary>> -> - Event - end - end, - lists:foldr(FoldFun, #{data => []}, Lines). + FoldFun = + fun(Line, #{data := Data} = Event) -> + case Line of + <<"data: ", NewData/binary>> -> + Event#{data => <>}; + <<"id: ", Id/binary>> -> + Event#{id => Id}; + <<"event: ", EventName/binary>> -> + Event#{event => EventName}; + <<_Comment/binary>> -> + Event + end + end, + lists:foldl(FoldFun, #{data => <<>>}, Lines). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% gen_fsm callbacks @@ -385,44 +386,44 @@ terminate(_Reason, _StateName, #{pid := Pid} = _State) -> %% @private -spec at_rest(any(), state()) -> {next_state, atom(), state()}. at_rest(timeout, State) -> - case get_work(State) of - no_work -> - {next_state, at_rest, State}; - {ok, Work, NewState} -> - ok = gen_fsm:send_event(self(), Work), - {next_state, at_rest, NewState} - end; + case get_work(State) of + no_work -> + {next_state, at_rest, State}; + {ok, Work, NewState} -> + ok = gen_fsm:send_event(self(), Work), + {next_state, at_rest, NewState} + end; at_rest({get_async, {HandleEvent, AsyncMode}, Args, From}, State = #{pid := Pid}) -> - StreamRef = do_http_verb(get, Pid, Args), - CleanState = clean_state(State), - NewState = CleanState#{ - from => From, - pid => Pid, - stream => StreamRef, - handle_event => HandleEvent, - async => true, - async_mode => AsyncMode - }, - {next_state, wait_response, NewState}; + StreamRef = do_http_verb(get, Pid, Args), + CleanState = clean_state(State), + NewState = CleanState#{ + from => From, + pid => Pid, + stream => StreamRef, + handle_event => HandleEvent, + async => true, + async_mode => AsyncMode + }, + {next_state, wait_response, NewState}; at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) -> - StreamRef = do_http_verb(HttpVerb, Pid, Args), - CleanState = clean_state(State), - NewState = CleanState#{ - pid => Pid, - stream => StreamRef, - from => From - }, - {next_state, wait_response, NewState}. + StreamRef = do_http_verb(HttpVerb, Pid, Args), + CleanState = clean_state(State), + NewState = CleanState#{ + pid => Pid, + stream => StreamRef, + from => From + }, + {next_state, wait_response, NewState}. -spec at_rest(term(), pid(), term()) -> term(). at_rest(Event, From, State) -> - enqueue_work_or_stop(at_rest, Event, From, State). + enqueue_work_or_stop(at_rest, Event, From, State). %% @private -spec wait_response(term(), pid(), term()) -> term(). wait_response(Event, From, State) -> - enqueue_work_or_stop(wait_response, Event, From, State). + enqueue_work_or_stop(wait_response, Event, From, State). %% @private -spec wait_response(term(), term()) -> term(). @@ -468,7 +469,7 @@ wait_response(Event, State) -> %% @private -spec receive_data(term(), pid(), term()) -> term(). receive_data(Event, From, State) -> - enqueue_work_or_stop(receive_data, Event, From, State). + enqueue_work_or_stop(receive_data, Event, From, State). %% @private %% @doc Regular response @@ -496,7 +497,7 @@ receive_data({gun_error, _Pid, StreamRef, _Reason}, %% @private -spec receive_chunk(term(), pid(), term()) -> term(). receive_chunk(Event, From, State) -> - enqueue_work_or_stop(receive_chunk, Event, From, State). + enqueue_work_or_stop(receive_chunk, Event, From, State). %% @private %% @doc Chunked data response @@ -519,16 +520,12 @@ receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @private -clean_state() -> - clean_state(queue:new()). +clean_state() -> clean_state(queue:new()). %% @private --spec clean_state(map()) -> map(); - (queue:queue()) -> map(). +-spec clean_state(map()) -> map(); (queue:queue()) -> map(). clean_state(State) when is_map(State) -> - clean_state(get_pending_reqs(State)); - -%% @private + clean_state(get_pending_reqs(State)); clean_state(Reqs) -> #{ pid => undefined, @@ -642,50 +639,50 @@ check_uri(_) -> throw(missing_slash_uri). %% @private enqueue_work_or_stop(FSM = at_rest, Event, From, State) -> - enqueue_work_or_stop(FSM, Event, From, State, 0); + enqueue_work_or_stop(FSM, Event, From, State, 0); enqueue_work_or_stop(FSM, Event, From, State) -> - enqueue_work_or_stop(FSM, Event, From, State, infinity). + enqueue_work_or_stop(FSM, Event, From, State, infinity). %% @private enqueue_work_or_stop(FSM, Event, From, State, Timeout) -> - case create_work(Event, From) of - {ok, Work} -> - NewState = append_work(Work, State), - {next_state, FSM, NewState, Timeout}; - not_work -> - {stop, {unexpected, Event}, State} - end. + case create_work(Event, From) of + {ok, Work} -> + NewState = append_work(Work, State), + {next_state, FSM, NewState, Timeout}; + not_work -> + {stop, {unexpected, Event}, State} + end. %% @private create_work({M = get_async, {HandleEvent, AsyncMode}, Args}, From) -> - {ok, {M, {HandleEvent, AsyncMode}, Args, From}}; + {ok, {M, {HandleEvent, AsyncMode}, Args, From}}; create_work({M, Args}, From) when M == get orelse M == post orelse M == delete orelse M == head orelse M == options orelse M == patch orelse M == put -> - {ok, {M, Args, From}}; + {ok, {M, Args, From}}; create_work(_, _) -> - not_work. + not_work. %% @private get_work(State) -> - PendingReqs = maps:get(pending_requests, State), - case queue:is_empty(PendingReqs) of - true -> - no_work; - false -> - {{value, Work}, Rest} = queue:out(PendingReqs), - NewState = State#{pending_requests => Rest}, - {ok, Work, NewState} - end. + PendingReqs = maps:get(pending_requests, State), + case queue:is_empty(PendingReqs) of + true -> + no_work; + false -> + {{value, Work}, Rest} = queue:out(PendingReqs), + NewState = State#{pending_requests => Rest}, + {ok, Work, NewState} + end. %% @private append_work(Work, State) -> - PendingReqs = get_pending_reqs(State), - NewPending = queue:in(Work, PendingReqs), - maps:put(pending_requests, NewPending, State). + PendingReqs = get_pending_reqs(State), + NewPending = queue:in(Work, PendingReqs), + maps:put(pending_requests, NewPending, State). %% @private get_pending_reqs(State) -> - maps:get(pending_requests, State). + maps:get(pending_requests, State).