Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for callback functions around worker process events #153

Merged
merged 18 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ To start a new worker pool, you can either use `wpool:start_pool` (if you want t
* **strategy**: Not the worker selection strategy (discussed below) but the supervisor flags to be used in the supervisor over the individual workers (`wpool_process_sup`). Defaults to `{one_for_one, 5, 60}`
* **pool_sup_intensity** and **pool_sup_period**: The intensity and period for the supervisor that manages the worker pool system (`wpool_pool`). The strategy of this supervisor must be `one_for_all` but the intensity and period may be changed from their defaults of `5` and `60`.
* **queue_type**: Order in which requests will be stored and handled by workers. This option can take values `lifo` or `fifo`. Defaults to `fifo`.
* **enable_callbacks**: A boolean value determining if `event_manager` should be started for callback modules.
elbrujohalcon marked this conversation as resolved.
Show resolved Hide resolved
Defaults to `false`.
* **callbacks**: Initial list of callback modules implementing `wpool_process_callbacks` to be called on certain worker events.
This options will only work if the `enable_callbacks` is set to **true**. Callbacks can be added and removed later by `wpool_pool:add_callback_module/2` and `wpool_pool:remove_callback_module/2`.

#### Using the Workers
Since the workers are `gen_server`s, messages can be `call`ed or `cast`ed to them. To do that you can use `wpool:call` and `wpool:cast` as you would use the equivalent functions on `gen_server`.
Expand Down
41 changes: 39 additions & 2 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
-export([ next/2
, wpool_get/2
]).
-export([ add_callback_module/2
, remove_callback_module/2]).

%% Supervisor callbacks
-export([ init/1
Expand Down Expand Up @@ -253,6 +255,16 @@ wpool_size(Name) ->
-spec next(pos_integer(), wpool()) -> wpool().
next(Next, WPool) -> WPool#wpool{next = Next}.

-spec add_callback_module(wpool:name(), module()) -> ok | {error, term()}.
add_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:add_callback_module(EventManager, Module).

-spec remove_callback_module(wpool:name(), module()) -> ok | {error, term()}.
remove_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
wpool_process_callbacks:remove_callback_module(EventManager, Module).

%% @doc Get values from the worker pool record. Useful when using a custom
%% strategy function.
-spec wpool_get(atom(), wpool()) -> any(); ([atom()], wpool()) -> any().
Expand Down Expand Up @@ -288,6 +300,7 @@ init({Name, Options}) ->
TimeChecker = time_checker_name(Name),
QueueManager = queue_manager_name(Name),
ProcessSup = process_sup_name(Name),
EventManagerName = event_manager_name(Name),
_Wpool =
store_wpool(
#wpool{ name = Name
Expand Down Expand Up @@ -315,9 +328,19 @@ init({Name, Options}) ->
, [wpool_queue_manager]
},

EventManagerSpec =
{ EventManagerName
, {gen_event, start_link, [{local, EventManagerName}]}
, permanent
, brutal_kill
, worker
, dynamic
},

SupShutdown = proplists:get_value(pool_sup_shutdown, Options, brutal_kill),
WorkerOpts =
[{queue_manager, QueueManager}, {time_checker, TimeChecker} | Options],
[{queue_manager, QueueManager}, {time_checker, TimeChecker}
| Options] ++ maybe_event_manager(Options, {event_manager, EventManagerName}),
ProcessSupSpec =
{ ProcessSup
, {wpool_process_sup, start_link, [Name, ProcessSup, WorkerOpts]}
Expand All @@ -327,10 +350,14 @@ init({Name, Options}) ->
, [wpool_process_sup]
},

Children = [TimeCheckerSpec, QueueManagerSpec] ++
maybe_event_manager(Options, EventManagerSpec) ++
[ProcessSupSpec],

SupIntensity = proplists:get_value(pool_sup_intensity, Options, 5),
SupPeriod = proplists:get_value(pool_sup_period, Options, 60),
SupStrategy = {one_for_all, SupIntensity, SupPeriod},
{ok, {SupStrategy, [TimeCheckerSpec, QueueManagerSpec, ProcessSupSpec]}}.
{ok, {SupStrategy, Children}}.

%% @private
-spec worker_name(wpool:name(), pos_integer()) -> atom().
Expand All @@ -345,6 +372,8 @@ process_sup_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-process-sup").
queue_manager_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-queue-manager").
event_manager_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-event-manager").

worker_with_no_task(Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
Expand Down Expand Up @@ -484,3 +513,11 @@ set_random_fun() ->
end
end,
application:set_env(worker_pool, random_fun, RndFun).

maybe_event_manager(Options, Item) ->
EnableEventManager = proplists:get_value(enable_callbacks, Options, false),
case EnableEventManager of
true ->
[Item];
_ -> []
end.
7 changes: 7 additions & 0 deletions src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,20 @@ cast_call(Process, From, Call) ->
%% @private
-spec init({atom(), atom(), term(), [wpool:option()]}) -> {ok, state()}.
init({Name, Mod, InitArgs, Options}) ->
wpool_process_callbacks:notify(handle_init_start, Options, [Name]),

case Mod:init(InitArgs) of
{ok, ModState} ->
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
, options = Options
}};
{ok, ModState, Timeout} ->
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
Expand All @@ -101,6 +105,7 @@ init({Name, Mod, InitArgs, Options}) ->
terminate(Reason, State) ->
#state{mod=Mod, state=ModState, name=Name, options=Options} = State,
ok = wpool_utils:notify_queue_manager(worker_dead, Name, Options),
wpool_process_callbacks:notify(handle_worker_death, Options, [Name, Reason]),
Mod:terminate(Reason, ModState).

%% @private
Expand Down Expand Up @@ -225,3 +230,5 @@ handle_call(Call, From, State) ->
, State#state.name
, State#state.options),
Reply.


101 changes: 101 additions & 0 deletions src/wpool_process_callbacks.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
-module(wpool_process_callbacks).

-behaviour(gen_event).

%% gen_event callbacks

-export([ init/1
, handle_event/2
, handle_call/2
, handle_info/2
, code_change/3
, terminate/2
]).

-export([ notify/3
, add_callback_module/2
, remove_callback_module/2
]).
-type state() :: module().

-type event() :: handle_init_start | handle_worker_creation | handle_worker_death.

-callback handle_init_start(wpool:name()) -> any().
-callback handle_worker_creation(wpool:name()) -> any().
-callback handle_worker_death(wpool:name(), term()) -> any().

-optional_callbacks([handle_init_start/1, handle_worker_creation/1, handle_worker_death/2]).

-spec init(module()) -> {ok, state()}.
init(Module) ->
{ok, Module}.

-spec handle_event({event(), [any()]}, state()) -> {ok, state()}.
handle_event({Event, Args}, Module) ->
call(Module, Event, Args),
{ok, Module};
handle_event(_, State) ->
{ok, State}.

-spec handle_call(any(), state()) -> {ok, ok, state()}.
handle_call(_, State) ->
{ok, ok, State}.

-spec handle_info(any(), state()) -> {ok, state()}.
handle_info(_, State) ->
{ok, State}.

-spec code_change(any(), state(), any()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

-spec terminate(any(), state()) -> ok.
terminate(_Reason, _State) ->
ok.

-spec notify(event(), [wpool:option()], [any()]) -> ok.
notify(Event, Options, Args) ->
case lists:keyfind(event_manager, 1, Options) of
{event_manager, EventMgr} ->
gen_event:notify(EventMgr, {Event, Args});
_ ->
elbrujohalcon marked this conversation as resolved.
Show resolved Hide resolved
ok
end.

-spec add_callback_module(wpool:name(), module()) -> ok | {error, any()}.
add_callback_module(EventManager, Module) ->
case ensure_loaded(Module) of
ok ->
gen_event:add_handler(EventManager,
{wpool_process_callbacks, Module}, Module);
Other ->
Other
end.


-spec remove_callback_module(wpool:name(), module()) -> ok | {error, any()}.
remove_callback_module(EventManager, Module) ->
gen_event:delete_handler(EventManager, {wpool_process_callbacks, Module}, Module).

call(Module, Event, Args) ->
try
case erlang:function_exported(Module, Event, length(Args)) of
true ->
erlang:apply(Module, Event, Args);
_ ->
ok
elbrujohalcon marked this conversation as resolved.
Show resolved Hide resolved
end
catch
E:R ->
error_logger:warning_msg("Could not call callback module, error:~p, reason:~p", [E, R])
end.

ensure_loaded(Module) ->
case code:ensure_loaded(Module) of
{module, Module} ->
ok;
{error, embedded} -> %% We are in embedded mode so the module was loaded if exists
ok;
Other ->
Other
end.
20 changes: 20 additions & 0 deletions src/wpool_process_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ start_link(Parent, Name, Options) ->
init({Name, Options}) ->
Workers = proplists:get_value(workers, Options, 100),
Strategy = proplists:get_value(strategy, Options, {one_for_one, 5, 60}),
_ = maybe_add_event_handler(Options),
michalwski marked this conversation as resolved.
Show resolved Hide resolved
{WorkerType, Worker, InitArgs} =
case proplists:get_value(worker_type, Options, gen_server) of
gen_server ->
Expand All @@ -55,3 +56,22 @@ init({Name, Options}) ->
, [Worker]
} || I <- lists:seq(1, Workers)],
{ok, {Strategy, WorkerSpecs}}.

maybe_add_event_handler(Options) ->
case proplists:get_value(event_manager, Options, undefined) of
undefined ->
ok;
EventMgr ->
lists:foreach(fun(M) -> add_initial_callback(EventMgr, M) end,
proplists:get_value(callbacks, Options, []))
end.

add_initial_callback(EventManager, Module) ->
case wpool_process_callbacks:add_callback_module(EventManager, Module) of
ok ->
ok;
Other ->
error_logger:warning_msg("The callback module:~p could not be loaded, reason:~p",
[Module, Other])
end.

1 change: 0 additions & 1 deletion test/wpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ broadcast(_Config) ->
meck:unload(x),
{comment, []}.


%% =============================================================================
%% Helpers
%% =============================================================================
Expand Down
Loading