Skip to content

Commit

Permalink
Merge pull request #153 from inaka/rel-3.1-worker-events-callback
Browse files Browse the repository at this point in the history
Add support for callback functions around worker process events
  • Loading branch information
michalwski authored Oct 24, 2018
2 parents e08b307 + e6bd8a3 commit e94a682
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 3 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ To start a new worker pool, you can either use `wpool:start_pool` (if you want t
* **strategy**: Not the worker selection strategy (discussed below) but the supervisor flags to be used in the supervisor over the individual workers (`wpool_process_sup`). Defaults to `{one_for_one, 5, 60}`
* **pool_sup_intensity** and **pool_sup_period**: The intensity and period for the supervisor that manages the worker pool system (`wpool_pool`). The strategy of this supervisor must be `one_for_all` but the intensity and period may be changed from their defaults of `5` and `60`.
* **queue_type**: Order in which requests will be stored and handled by workers. This option can take values `lifo` or `fifo`. Defaults to `fifo`.
* **enable_callbacks**: A boolean value determining if `event_manager` should be started for callback modules.
Defaults to `false`.
* **callbacks**: Initial list of callback modules implementing `wpool_process_callbacks` to be called on certain worker events.
This options will only work if the `enable_callbacks` is set to **true**. Callbacks can be added and removed later by `wpool_pool:add_callback_module/2` and `wpool_pool:remove_callback_module/2`.

#### Using the Workers
Since the workers are `gen_server`s, messages can be `call`ed or `cast`ed to them. To do that you can use `wpool:call` and `wpool:cast` as you would use the equivalent functions on `gen_server`.
Expand Down
41 changes: 39 additions & 2 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
-export([ next/2
, wpool_get/2
]).
-export([ add_callback_module/2
, remove_callback_module/2]).

%% Supervisor callbacks
-export([ init/1
Expand Down Expand Up @@ -253,6 +255,16 @@ wpool_size(Name) ->
-spec next(pos_integer(), wpool()) -> wpool().
next(Next, WPool) -> WPool#wpool{next = Next}.

-spec add_callback_module(wpool:name(), module()) -> ok | {error, term()}.
add_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:add_callback_module(EventManager, Module).

-spec remove_callback_module(wpool:name(), module()) -> ok | {error, term()}.
remove_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:remove_callback_module(EventManager, Module).

%% @doc Get values from the worker pool record. Useful when using a custom
%% strategy function.
-spec wpool_get(atom(), wpool()) -> any(); ([atom()], wpool()) -> any().
Expand Down Expand Up @@ -288,6 +300,7 @@ init({Name, Options}) ->
TimeChecker = time_checker_name(Name),
QueueManager = queue_manager_name(Name),
ProcessSup = process_sup_name(Name),
EventManagerName = event_manager_name(Name),
_Wpool =
store_wpool(
#wpool{ name = Name
Expand Down Expand Up @@ -315,9 +328,19 @@ init({Name, Options}) ->
, [wpool_queue_manager]
},

EventManagerSpec =
{ EventManagerName
, {gen_event, start_link, [{local, EventManagerName}]}
, permanent
, brutal_kill
, worker
, dynamic
},

SupShutdown = proplists:get_value(pool_sup_shutdown, Options, brutal_kill),
WorkerOpts =
[{queue_manager, QueueManager}, {time_checker, TimeChecker} | Options],
[{queue_manager, QueueManager}, {time_checker, TimeChecker}
| Options] ++ maybe_event_manager(Options, {event_manager, EventManagerName}),
ProcessSupSpec =
{ ProcessSup
, {wpool_process_sup, start_link, [Name, ProcessSup, WorkerOpts]}
Expand All @@ -327,10 +350,14 @@ init({Name, Options}) ->
, [wpool_process_sup]
},

Children = [TimeCheckerSpec, QueueManagerSpec] ++
maybe_event_manager(Options, EventManagerSpec) ++
[ProcessSupSpec],

SupIntensity = proplists:get_value(pool_sup_intensity, Options, 5),
SupPeriod = proplists:get_value(pool_sup_period, Options, 60),
SupStrategy = {one_for_all, SupIntensity, SupPeriod},
{ok, {SupStrategy, [TimeCheckerSpec, QueueManagerSpec, ProcessSupSpec]}}.
{ok, {SupStrategy, Children}}.

%% @private
-spec worker_name(wpool:name(), pos_integer()) -> atom().
Expand All @@ -345,6 +372,8 @@ process_sup_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-process-sup").
queue_manager_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-queue-manager").
event_manager_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-event-manager").

worker_with_no_task(Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
Expand Down Expand Up @@ -484,3 +513,11 @@ set_random_fun() ->
end
end,
application:set_env(worker_pool, random_fun, RndFun).

maybe_event_manager(Options, Item) ->
EnableEventManager = proplists:get_value(enable_callbacks, Options, false),
case EnableEventManager of
true ->
[Item];
_ -> []
end.
7 changes: 7 additions & 0 deletions src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,20 @@ cast_call(Process, From, Call) ->
%% @private
-spec init({atom(), atom(), term(), [wpool:option()]}) -> {ok, state()}.
init({Name, Mod, InitArgs, Options}) ->
wpool_process_callbacks:notify(handle_init_start, Options, [Name]),

case Mod:init(InitArgs) of
{ok, ModState} ->
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
, options = Options
}};
{ok, ModState, Timeout} ->
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
Expand All @@ -101,6 +105,7 @@ init({Name, Mod, InitArgs, Options}) ->
terminate(Reason, State) ->
#state{mod=Mod, state=ModState, name=Name, options=Options} = State,
ok = wpool_utils:notify_queue_manager(worker_dead, Name, Options),
wpool_process_callbacks:notify(handle_worker_death, Options, [Name, Reason]),
Mod:terminate(Reason, ModState).

%% @private
Expand Down Expand Up @@ -225,3 +230,5 @@ handle_call(Call, From, State) ->
, State#state.name
, State#state.options),
Reply.


101 changes: 101 additions & 0 deletions src/wpool_process_callbacks.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
-module(wpool_process_callbacks).

-behaviour(gen_event).

%% gen_event callbacks

-export([ init/1
, handle_event/2
, handle_call/2
, handle_info/2
, code_change/3
, terminate/2
]).

-export([ notify/3
, add_callback_module/2
, remove_callback_module/2
]).
-type state() :: module().

-type event() :: handle_init_start | handle_worker_creation | handle_worker_death.

-callback handle_init_start(wpool:name()) -> any().
-callback handle_worker_creation(wpool:name()) -> any().
-callback handle_worker_death(wpool:name(), term()) -> any().

-optional_callbacks([handle_init_start/1, handle_worker_creation/1, handle_worker_death/2]).

-spec init(module()) -> {ok, state()}.
init(Module) ->
{ok, Module}.

-spec handle_event({event(), [any()]}, state()) -> {ok, state()}.
handle_event({Event, Args}, Module) ->
call(Module, Event, Args),
{ok, Module};
handle_event(_, State) ->
{ok, State}.

-spec handle_call(any(), state()) -> {ok, ok, state()}.
handle_call(_, State) ->
{ok, ok, State}.

-spec handle_info(any(), state()) -> {ok, state()}.
handle_info(_, State) ->
{ok, State}.

-spec code_change(any(), state(), any()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

-spec terminate(any(), state()) -> ok.
terminate(_Reason, _State) ->
ok.

-spec notify(event(), [wpool:option()], [any()]) -> ok.
notify(Event, Options, Args) ->
case lists:keyfind(event_manager, 1, Options) of
{event_manager, EventMgr} ->
gen_event:notify(EventMgr, {Event, Args});
_ ->
ok
end.

-spec add_callback_module(wpool:name(), module()) -> ok | {error, any()}.
add_callback_module(EventManager, Module) ->
case ensure_loaded(Module) of
ok ->
gen_event:add_handler(EventManager,
{wpool_process_callbacks, Module}, Module);
Other ->
Other
end.


-spec remove_callback_module(wpool:name(), module()) -> ok | {error, any()}.
remove_callback_module(EventManager, Module) ->
gen_event:delete_handler(EventManager, {wpool_process_callbacks, Module}, Module).

call(Module, Event, Args) ->
try
case erlang:function_exported(Module, Event, length(Args)) of
true ->
erlang:apply(Module, Event, Args);
_ ->
ok
end
catch
E:R ->
error_logger:warning_msg("Could not call callback module, error:~p, reason:~p", [E, R])
end.

ensure_loaded(Module) ->
case code:ensure_loaded(Module) of
{module, Module} ->
ok;
{error, embedded} -> %% We are in embedded mode so the module was loaded if exists
ok;
Other ->
Other
end.
20 changes: 20 additions & 0 deletions src/wpool_process_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ start_link(Parent, Name, Options) ->
init({Name, Options}) ->
Workers = proplists:get_value(workers, Options, 100),
Strategy = proplists:get_value(strategy, Options, {one_for_one, 5, 60}),
maybe_add_event_handler(Options),
{WorkerType, Worker, InitArgs} =
case proplists:get_value(worker_type, Options, gen_server) of
gen_server ->
Expand All @@ -55,3 +56,22 @@ init({Name, Options}) ->
, [Worker]
} || I <- lists:seq(1, Workers)],
{ok, {Strategy, WorkerSpecs}}.

maybe_add_event_handler(Options) ->
case proplists:get_value(event_manager, Options, undefined) of
undefined ->
ok;
EventMgr ->
lists:foreach(fun(M) -> add_initial_callback(EventMgr, M) end,
proplists:get_value(callbacks, Options, []))
end.

add_initial_callback(EventManager, Module) ->
case wpool_process_callbacks:add_callback_module(EventManager, Module) of
ok ->
ok;
Other ->
error_logger:warning_msg("The callback module:~p could not be loaded, reason:~p",
[Module, Other])
end.

1 change: 0 additions & 1 deletion test/wpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ broadcast(_Config) ->
meck:unload(x),
{comment, []}.


%% =============================================================================
%% Helpers
%% =============================================================================
Expand Down
Loading

0 comments on commit e94a682

Please sign in to comment.