Skip to content

Commit

Permalink
Use sidejob for publish.
Browse files Browse the repository at this point in the history
Add license to .erl files.
  • Loading branch information
mworrell committed Apr 9, 2018
1 parent 29f8501 commit 66d5252
Show file tree
Hide file tree
Showing 17 changed files with 433 additions and 86 deletions.
1 change: 1 addition & 0 deletions include/mqtt_sessions.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

% The default mqtt pool name.
-define(MQTT_SESSIONS_DEFAULT, '-mqtt-').
-define(MQTT_SESSIONS_JOBS, mqtt_sessionjobs).
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
{mqtt_packet_map, "1.0.0alpha2"},
{zotonic_stdlib, "1.0.0"},
{jsx, "2.8.2"},
{sidejob, "2.1.0"},
{router, {git, "https://github.com/zotonic/router.git", {branch, "master"}}}
]}.

Expand Down
64 changes: 52 additions & 12 deletions src/mqtt_sessions.erl
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
%% @author Marc Worrell <[email protected]>
%% @copyright 2018 Marc Worrell

%% Copyright 2018 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(mqtt_sessions).

-behaviour(application).
Expand Down Expand Up @@ -33,7 +50,10 @@
connect_transport/2,
connect_transport/3,
disconnect_transport/2,
disconnect_transport/3
disconnect_transport/3,

sidejobs_limit/0,
sidejobs_per_session/0
]).


Expand All @@ -57,6 +77,8 @@
topic/0
]).

-define(SIDEJOBS_PER_SESSION, 20).

-include("../include/mqtt_sessions.hrl").

%%====================================================================
Expand All @@ -69,13 +91,13 @@ start() ->

-spec start( application:start_type(), term() ) -> {ok, pid()} | {error, term()}.
start(_StartType, _StartArgs) ->
sidejob:new_resource(?MQTT_SESSIONS_JOBS, sidejob_supervisor, sidejobs_limit()),
mqtt_sessions_sup:start_link().

-spec stop( term() ) -> ok.
stop(_State) ->
ok.


-spec find_session( session_ref() ) -> {ok, pid()} | {error, notfound}.
find_session( ClientId ) ->
find_session(?MQTT_SESSIONS_DEFAULT, ClientId).
Expand Down Expand Up @@ -112,39 +134,40 @@ publish(Pool, #{ type := publish, topic := Topic } = Msg, UserContext) when is_a
{error, eacces}
end;
publish(Topic, Payload, UserContext) when is_list(Topic), is_binary(Topic) ->
publish(?MQTT_SESSIONS_DEFAULT, Topic, Payload, [], UserContext).
publish(?MQTT_SESSIONS_DEFAULT, Topic, Payload, #{}, UserContext).

-spec publish( atom(), topic(), term(), term() ) -> ok | {error, eacces}.
publish(Pool, Topic, Payload, UserContext) ->
publish(Pool, Topic, Payload, [], UserContext).
publish(Pool, Topic, Payload, #{}, UserContext).

-spec publish( atom(), topic(), term(), list(), term() ) -> ok | {error, eacces}.
-spec publish( atom(), topic(), term(), map(), term() ) -> ok | {error, eacces}.
publish(Pool, Topic, Payload, Options, UserContext) ->
Msg = #{
type => publish,
payload => Payload,
topic => maybe_split_topic(Topic),
qos => proplists:get_value(qos, Options, 0),
retain => proplists:get_value(retain, Options, false)
qos => maps:get(qos, Options, 0),
retain => maps:get(retain, Options, false),
properties => maps:get(properties, Options, #{})
},
publish(Pool, Msg, UserContext).


-spec subscribe( topic(), term() ) -> ok | {error, eacces}.
subscribe(TopicFilter, UserContext) ->
subscribe(?MQTT_SESSIONS_DEFAULT, TopicFilter, self(), self(), [], UserContext).
subscribe(?MQTT_SESSIONS_DEFAULT, TopicFilter, self(), self(), #{}, UserContext).

-spec subscribe( atom(), topic(), term() ) -> ok | {error, eacces}.
subscribe(Pool, TopicFilter, UserContext) ->
subscribe(Pool, TopicFilter, self(), self(), [], UserContext).
subscribe(Pool, TopicFilter, self(), self(), #{}, UserContext).

-spec subscribe( atom(), topic(), mfa() | pid(), term() ) -> ok | {error, eacces}.
subscribe(Pool, TopicFilter, {_, _, _} = MFA, UserContext) ->
subscribe(Pool, TopicFilter, MFA, self(), [], UserContext);
subscribe(Pool, TopicFilter, MFA, self(), #{}, UserContext);
subscribe(Pool, TopicFilter, Pid, UserContext) when is_pid(Pid) ->
subscribe(Pool, TopicFilter, Pid, Pid, [], UserContext).
subscribe(Pool, TopicFilter, Pid, Pid, #{}, UserContext).

-spec subscribe( atom(), topic(), pid()|mfa(), pid(), list(), term() ) -> ok | {error, eacces}.
-spec subscribe( atom(), topic(), pid()|mfa(), pid(), map(), term() ) -> ok | {error, eacces}.
subscribe(Pool, TopicFilter, Receiver, OwnerPid, Options, UserContext) ->
Runtime = runtime(),
Topic1 = maybe_split_topic(TopicFilter),
Expand Down Expand Up @@ -250,3 +273,20 @@ disconnect_transport(Pool, ClientId, Pid) ->
Error
end.


%% @doc Limit the number of sidejobs for message dispatching.
-spec sidejobs_limit() -> pos_integer().
sidejobs_limit() ->
case application:get_env(mqtt_sessions, sidejobs_limit) of
{ok, N} -> N;
undefined -> erlang:max(erlang:system_info(process_limit) div 10, 10000)
end.

-spec sidejobs_per_session() -> pos_integer().
sidejobs_per_session() ->
case application:get_env(mqtt_sessions, sidejobs_per_session) of
{ok, N} -> N;
undefined -> ?SIDEJOBS_PER_SESSION
end.


17 changes: 17 additions & 0 deletions src/mqtt_sessions_incoming.erl
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
%% @author Marc Worrell <[email protected]>
%% @copyright 2018 Marc Worrell

%% Copyright 2018 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(mqtt_sessions_incoming).

-export([
Expand Down
137 changes: 137 additions & 0 deletions src/mqtt_sessions_job.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
%% @doc Sidejobs for handling topic subscriptions
%% @author Marc Worrell <[email protected]>
%% @copyright 2018 Marc Worrell

%% Copyright 2018 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(mqtt_sessions_job).

-export([
publish/5,
publish_retained/6,
publish_job/6,
publish_retained_job/6,
publish1/6
]).


-include_lib("router/include/router.hrl").
-include_lib("../include/mqtt_sessions.hrl").

-spec publish( atom(), mqtt_sessions:topic(), list(), mqtt_packet_map:mqtt_message(), term()) -> ok | {error, overload}.
publish(_Pool, _Topic, [], _Msg, _PublisherContext) ->
ok;
publish(Pool, Topic, Routes, Msg, PublisherContext) ->
case sidejob_supervisor:spawn(
?MQTT_SESSIONS_JOBS,
{?MODULE, publish_job, [ Pool, Topic, Routes, Msg, PublisherContext, self() ]})
of
{ok, JobPid} ->
self() ! {publish_job, JobPid};
{error, overload} ->
lager:debug("MQTT sidejob overload, delaying job ~p ...", [ Topic ]),
timer:sleep(100),
publish(Pool, Topic, Routes, Msg, PublisherContext)
end.

-spec publish_retained( atom(), mqtt_sessions:topic(), list(), pid(), list(), term()) -> ok | {error, overload}.
publish_retained(_Pool, _TopicFilter, [], _Subscriber, _Options, _SubscriberContext) ->
ok;
publish_retained(Pool, TopicFilter, Ms, Subscriber, Options, SubscriberContext) ->
case sidejob_supervisor:spawn(
?MQTT_SESSIONS_JOBS,
{?MODULE, publish_retained_job, [ Pool, TopicFilter, Ms, Subscriber, Options, SubscriberContext ]})
of
{ok, _JobPid} ->
ok;
{error, overload} ->
lager:debug("MQTT sidejob overload, delaying retained job ~p ...", [ TopicFilter ]),
timer:sleep(100),
publish_retained(Pool, TopicFilter, Ms, Subscriber, Options, SubscriberContext)
end.


publish_job(Pool, Topic, Routes, Msg, PublisherContext, PublishedPid) ->
lists:foreach(
fun(Route) ->
publish1(Pool, Topic, Route, Msg, PublisherContext, PublishedPid)
end,
Routes).

publish_retained_job(Pool, TopicFilter, Ms, Subscriber, Options, SubscriberContext) ->
Runtime = mqtt_sessions:runtime(),
lists:foreach(
fun({#{ topic := Topic } = Msg, PublisherContext}) ->
case Runtime:is_allowed(subscribe, Topic, Msg, SubscriberContext) of
true ->
Bound = bind(Topic, TopicFilter),
Dest = {Subscriber, undefined, Options},
publish1(Pool, Topic, #route{ bound_args = Bound, destination = Dest }, Msg, PublisherContext, none);
false ->
ok
end
end,
Ms).


publish1(Pool, Topic, #route{ bound_args = Bound, destination = Dest }, Msg, PublisherContext, PublisherPid) ->
case is_no_local(Dest, PublisherPid) of
true ->
ok;
false ->
{Callback, _OwnerPid, Options} = Dest,
Msg1 = case maps:get(retain, Msg, false) of
true ->
case maps:get(retain_as_published, Msg, false) of
false -> Msg#{ retain => false };
true -> Msg
end;
false -> Msg
end,
MqttMsg = Options#{
type => publish,
pool => Pool,
topic => Topic,
topic_bindings => Bound,
message => Msg1,
publisher_context => PublisherContext
},
callback(Callback, MqttMsg)
end.


callback({io, format, A}, MqttMsg) ->
erlang:apply(io, format, A ++ [ [ MqttMsg ] ]);
callback({M, F, A} , MqttMsg) ->
erlang:apply(M, F, A ++ [ MqttMsg ]);
callback(Pid, MqttMsg) when is_pid(Pid) ->
Pid ! {mqtt_msg, MqttMsg}.

is_no_local(_, none) -> false;
is_no_local({_Callback, OwnerPid, #{ no_local := true }}, OwnerPid) -> true;
is_no_local(_Destination, _Pid) -> false.

%% Bind variables from the match to the path
bind(Path, Match) ->
bind(Path, Match, []).

bind([], [], Acc) ->
lists:reverse(Acc);
bind(P, [<<"#">>], Acc) ->
lists:reverse([{'#', P}|Acc]);
bind([H|Path], [<<"+">>|Match], Acc) ->
bind(Path, Match, [H|Acc]);
bind([_|Path], [_|Match], Acc) ->
bind(Path, Match, Acc).
17 changes: 17 additions & 0 deletions src/mqtt_sessions_payload.erl
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
%% @author Marc Worrell <[email protected]>
%% @copyright 2018 Marc Worrell

%% Copyright 2018 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(mqtt_sessions_payload).

-export([
Expand Down
17 changes: 17 additions & 0 deletions src/mqtt_sessions_pool_sup.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
%% @doc Start a supervisor with a session registry and a topic tree
%% @author Marc Worrell <[email protected]>
%% @copyright 2018 Marc Worrell

%% Copyright 2018 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.


-module(mqtt_sessions_pool_sup).

Expand Down
Loading

0 comments on commit 66d5252

Please sign in to comment.