Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for callback functions around worker process events #153

Merged
merged 18 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ To start a new worker pool, you can either use `wpool:start_pool` (if you want t
* **strategy**: Not the worker selection strategy (discussed below) but the supervisor flags to be used in the supervisor over the individual workers (`wpool_process_sup`). Defaults to `{one_for_one, 5, 60}`
* **pool_sup_intensity** and **pool_sup_period**: The intensity and period for the supervisor that manages the worker pool system (`wpool_pool`). The strategy of this supervisor must be `one_for_all` but the intensity and period may be changed from their defaults of `5` and `60`.
* **queue_type**: Order in which requests will be stored and handled by workers. This option can take values `lifo` or `fifo`. Defaults to `fifo`.
* **enable_callbacks**: A boolean value determining if `event_manager` should be started for callback modules.
elbrujohalcon marked this conversation as resolved.
Show resolved Hide resolved
* **callbacks**: Initial list of callback modules implementing `wpool_process_callbacks` to be called on certain worker events.
This options will only work if the `enable_callbacks` is set to **true**. Callbacks can be added and removed later by `wpool_pool:add_callback_module/2` and `wpool_pool:remove_callback_module/2`.

#### Using the Workers
Since the workers are `gen_server`s, messages can be `call`ed or `cast`ed to them. To do that you can use `wpool:call` and `wpool:cast` as you would use the equivalent functions on `gen_server`.
Expand Down
41 changes: 39 additions & 2 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
-export([ next/2
, wpool_get/2
]).
-export([ add_callback_module/2
, remove_callback_module/2]).

%% Supervisor callbacks
-export([ init/1
Expand Down Expand Up @@ -253,6 +255,16 @@ wpool_size(Name) ->
-spec next(pos_integer(), wpool()) -> wpool().
next(Next, WPool) -> WPool#wpool{next = Next}.

-spec add_callback_module(wpool:name(), module()) -> ok | {error, term()}.
add_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
gen_event:add_handler(EventManager, {wpool_process_callbacks, Module}, Module).

-spec remove_callback_module(wpool:name(), module()) -> ok | {error, term()}.
remove_callback_module(Pool, Module) ->
EventManager = event_manager_name(Pool),
gen_event:delete_handler(EventManager, {wpool_process_callbacks, Module}, Module).

%% @doc Get values from the worker pool record. Useful when using a custom
%% strategy function.
-spec wpool_get(atom(), wpool()) -> any(); ([atom()], wpool()) -> any().
Expand Down Expand Up @@ -288,6 +300,7 @@ init({Name, Options}) ->
TimeChecker = time_checker_name(Name),
QueueManager = queue_manager_name(Name),
ProcessSup = process_sup_name(Name),
EventManagerName = event_manager_name(Name),
_Wpool =
store_wpool(
#wpool{ name = Name
Expand Down Expand Up @@ -315,9 +328,19 @@ init({Name, Options}) ->
, [wpool_queue_manager]
},

EventManagerSpec =
{ EventManagerName
, {gen_event, start_link, [{local, EventManagerName}]}
, permanent
, brutal_kill
, worker
, dynamic
},

SupShutdown = proplists:get_value(pool_sup_shutdown, Options, brutal_kill),
WorkerOpts =
[{queue_manager, QueueManager}, {time_checker, TimeChecker} | Options],
[{queue_manager, QueueManager}, {time_checker, TimeChecker}
| Options] ++ maybe_event_manager(Options, {event_manager, EventManagerName}),
ProcessSupSpec =
{ ProcessSup
, {wpool_process_sup, start_link, [Name, ProcessSup, WorkerOpts]}
Expand All @@ -327,10 +350,14 @@ init({Name, Options}) ->
, [wpool_process_sup]
},

Children = [TimeCheckerSpec, QueueManagerSpec] ++
maybe_event_manager(Options, EventManagerSpec) ++
[ProcessSupSpec],

SupIntensity = proplists:get_value(pool_sup_intensity, Options, 5),
SupPeriod = proplists:get_value(pool_sup_period, Options, 60),
SupStrategy = {one_for_all, SupIntensity, SupPeriod},
{ok, {SupStrategy, [TimeCheckerSpec, QueueManagerSpec, ProcessSupSpec]}}.
{ok, {SupStrategy, Children}}.

%% @private
-spec worker_name(wpool:name(), pos_integer()) -> atom().
Expand All @@ -345,6 +372,8 @@ process_sup_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-process-sup").
queue_manager_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-queue-manager").
event_manager_name(Sup) ->
list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-event-manager").

worker_with_no_task(Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
Expand Down Expand Up @@ -484,3 +513,11 @@ set_random_fun() ->
end
end,
application:set_env(worker_pool, random_fun, RndFun).

maybe_event_manager(Options, Item) ->
EnableEventManager = proplists:get_value(enable_callbacks, Options, false),
case EnableEventManager of
true ->
[Item];
_ -> []
end.
7 changes: 7 additions & 0 deletions src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,20 @@ cast_call(Process, From, Call) ->
%% @private
-spec init({atom(), atom(), term(), [wpool:option()]}) -> {ok, state()}.
init({Name, Mod, InitArgs, Options}) ->
wpool_process_callbacks:notify(handle_init_start, Options, [Name]),

case Mod:init(InitArgs) of
{ok, ModState} ->
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
, options = Options
}};
{ok, ModState, Timeout} ->
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
Expand All @@ -101,6 +105,7 @@ init({Name, Mod, InitArgs, Options}) ->
terminate(Reason, State) ->
#state{mod=Mod, state=ModState, name=Name, options=Options} = State,
ok = wpool_utils:notify_queue_manager(worker_dead, Name, Options),
wpool_process_callbacks:notify(handle_worker_death, Options, [Name, Reason]),
Mod:terminate(Reason, ModState).

%% @private
Expand Down Expand Up @@ -225,3 +230,5 @@ handle_call(Call, From, State) ->
, State#state.name
, State#state.options),
Reply.


72 changes: 72 additions & 0 deletions src/wpool_process_callbacks.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-module(wpool_process_callbacks).

-behaviour(gen_event).

%% gen_event callbacks

-export([ init/1
, handle_event/2
, handle_call/2
, handle_info/2
, code_change/3
, terminate/2]).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
, terminate/2]).
, terminate/2
]).


-export([notify/3]).
-type state() :: module().

-type event() :: handle_init_start | handle_worker_creation | handle_worker_death.

-callback handle_init_start(wpool:name()) -> any().
-callback handle_worker_creation(wpool:name()) -> any().
-callback handle_worker_death(wpool:name(), term()) -> any().

-optional_callbacks([handle_init_start/1, handle_worker_creation/1, handle_worker_death/2]).

-spec init(module()) -> {ok, state()}.
init(Module) ->
{ok, Module}.

-spec handle_event({event(), [any()]}, state()) -> {ok, state()}.
handle_event({Event, Args}, Module) ->
call(Module, Event, Args),
{ok, Module};
handle_event(_, State) ->
{ok, State}.

-spec handle_call(any(), state()) -> {ok, ok, state()}.
handle_call(_, State) ->
{ok, ok, State}.

-spec handle_info(any(), state()) -> {ok, state()}.
handle_info(_, State) ->
{ok, State}.

-spec code_change(any(), state(), any()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

-spec terminate(any(), state()) -> ok.
terminate(_Reason, _State) ->
ok.

-spec notify(event(), [wpool:option()], [any()]) -> ok.
notify(Event, Options, Args) ->
case lists:keyfind(event_manager, 1, Options) of
{event_manager, EventMgr} ->
gen_event:notify(EventMgr, {Event, Args});
_ ->
elbrujohalcon marked this conversation as resolved.
Show resolved Hide resolved
ok
end.

call(Module, Event, Args) ->
try
case erlang:function_exported(Module, Event, length(Args)) of
true ->
erlang:apply(Module, Event, Args);
_ ->
ok
elbrujohalcon marked this conversation as resolved.
Show resolved Hide resolved
end
catch
E:R ->
error_logger:warning_msg("Could not call callback module, error:~p, reason:~p", [E, R])
end.
11 changes: 11 additions & 0 deletions src/wpool_process_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ start_link(Parent, Name, Options) ->
init({Name, Options}) ->
Workers = proplists:get_value(workers, Options, 100),
Strategy = proplists:get_value(strategy, Options, {one_for_one, 5, 60}),
_ = maybe_add_event_handler(Options),
michalwski marked this conversation as resolved.
Show resolved Hide resolved
{WorkerType, Worker, InitArgs} =
case proplists:get_value(worker_type, Options, gen_server) of
gen_server ->
Expand All @@ -55,3 +56,13 @@ init({Name, Options}) ->
, [Worker]
} || I <- lists:seq(1, Workers)],
{ok, {Strategy, WorkerSpecs}}.

maybe_add_event_handler(Options) ->
case proplists:get_value(event_manager, Options, undefined) of
undefined ->
ok;
EventMgr ->
[gen_event:add_handler(EventMgr, {wpool_process_callbacks, Module}, Module)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't care about the result of this list comprehension, why not using lists:foreach instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List comprehension feels more natural for me, that's why I used it here. There is no need to create a fun fro that, we can use the gen_event:add_handler directly here.
To optimize the code a bit I can return ok just after the list comprehension so the result will go to garbage sooner.
If you insist I can change to lists:foreach.

|| Module <- proplists:get_value(callbacks, Options, [])]
end.

1 change: 0 additions & 1 deletion test/wpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ broadcast(_Config) ->
meck:unload(x),
{comment, []}.


%% =============================================================================
%% Helpers
%% =============================================================================
Expand Down
151 changes: 151 additions & 0 deletions test/wpool_process_callbacks_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
-module(wpool_process_callbacks_SUITE).

-type config() :: [{atom(), term()}].

-export([ all/0
]).
-export([ init_per_suite/1
, end_per_suite/1
]).
-export([ complete_callback_passed_when_starting_pool/1
, partial_callback_passed_when_starting_pool/1
, callback_can_be_added_and_removed_after_pool_is_started/1
, crashing_callback_does_not_affect_others/1
]).

-spec all() -> [atom()].
all() ->
[ complete_callback_passed_when_starting_pool
, partial_callback_passed_when_starting_pool
, callback_can_be_added_and_removed_after_pool_is_started
, crashing_callback_does_not_affect_others
].

-spec init_per_suite(config()) -> config().
init_per_suite(Config) ->
ok = wpool:start(),
Config.

-spec end_per_suite(config()) -> config().
end_per_suite(Config) ->
wpool:stop(),
Config.


-spec complete_callback_passed_when_starting_pool(config()) -> ok.
complete_callback_passed_when_starting_pool(_Config) ->
Pool = callbacks_test,
WorkersCount = 13,
meck:new(callbacks, [non_strict]),
meck:expect(callbacks, handle_init_start, fun(_AWorkerName) -> ok end),
meck:expect(callbacks, handle_worker_creation, fun(_AWorkerName) -> ok end),
meck:expect(callbacks, handle_worker_death, fun(_AWName, _Reason) -> ok end),
{ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount},
{enable_callbacks, true},
{worker, {crashy_server, []}},
{callbacks, [callbacks]}]),

WorkersCount = ktn_task:wait_for(function_calls(callbacks, handle_init_start,
['_']), WorkersCount),
WorkersCount = ktn_task:wait_for(function_calls(callbacks,
handle_worker_creation,
['_']), WorkersCount),
Worker = wpool_pool:random_worker(Pool),
Worker ! crash,
1 = ktn_task:wait_for(function_calls(callbacks, handle_worker_death,
['_', '_']), 1),
wpool:stop_pool(Pool),
meck:unload(callbacks),

ok.

-spec partial_callback_passed_when_starting_pool(config) -> ok.
partial_callback_passed_when_starting_pool(_Config) ->
Pool = partial_callbacks_test,
WorkersCount = 7,
meck:new(callbacks, [non_strict]),
meck:expect(callbacks, handle_worker_creation, fun(_AWorkerName) -> ok end),
meck:expect(callbacks, handle_worker_death, fun(_AWName, _Reason) -> ok end),
{ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount},
{enable_callbacks, true},
{callbacks, [callbacks]}]),
WorkersCount = ktn_task:wait_for(function_calls(callbacks,
handle_worker_creation,
['_']), WorkersCount),
wpool:stop_pool(Pool),
meck:unload(callbacks),

ok.

-spec callback_can_be_added_and_removed_after_pool_is_started(config()) -> ok.
callback_can_be_added_and_removed_after_pool_is_started(_Config) ->
Pool = after_start_callbacks_test,
WorkersCount = 3,
meck:new(callbacks, [non_strict]),
meck:expect(callbacks, handle_worker_death, fun(_AWName, _Reason) -> ok end),
meck:new(callbacks2, [non_strict]),
meck:expect(callbacks2, handle_worker_death, fun(_AWName, _Reason) -> ok end),
{ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount},
{worker, {crashy_server, []}},
{enable_callbacks, true}]),
%% Now we are adding 2 callback modules
_ = wpool_pool:add_callback_module(Pool, callbacks),
_ = wpool_pool:add_callback_module(Pool, callbacks2),
Worker = wpool_pool:random_worker(Pool),
Worker ! crash,

%% they both are called
1 = ktn_task:wait_for(function_calls(callbacks, handle_worker_death,
['_', '_']), 1),
1 = ktn_task:wait_for(function_calls(callbacks2, handle_worker_death,
['_', '_']), 1),

%% then the first module is removed
_ = wpool_pool:remove_callback_module(Pool, callbacks),
Worker2 = wpool_pool:random_worker(Pool),
Worker2 ! crash,

%% and only the scond one is called
1 = ktn_task:wait_for(function_calls(callbacks, handle_worker_death,
['_', '_']), 1),
2 = ktn_task:wait_for(function_calls(callbacks2, handle_worker_death,
['_', '_']), 2),

wpool:stop_pool(Pool),
meck:unload(callbacks),
meck:unload(callbacks2),

ok.


-spec crashing_callback_does_not_affect_others(config()) -> ok.
crashing_callback_does_not_affect_others(_Config) ->
Pool = crashing_callbacks_test,
WorkersCount = 3,
meck:new(callbacks, [non_strict]),
meck:expect(callbacks, handle_worker_creation, fun(_AWorkerName) -> ok end),
meck:new(callbacks2, [non_strict]),
meck:expect(callbacks2, handle_worker_creation,
fun(AWorkerName) -> {not_going_to_work} = AWorkerName end),
{ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount},
{worker, {crashy_server, []}},
{enable_callbacks, true},
{callbacks, [callbacks, callbacks2]}]),

WorkersCount = ktn_task:wait_for(function_calls(callbacks,
handle_worker_creation,
['_']), WorkersCount),
WorkersCount = ktn_task:wait_for(function_calls(callbacks2,
handle_worker_creation,
['_']), WorkersCount),

wpool:stop_pool(Pool),
meck:unload(callbacks),
meck:unload(callbacks2),

ok.

function_calls(Module, Function, MeckMatchSpec) ->
fun() ->
meck:num_calls(Module, Function, MeckMatchSpec)
end.