diff --git a/README.md b/README.md index 3287f93..4967ffa 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,10 @@ To start a new worker pool, you can either use `wpool:start_pool` (if you want t * **strategy**: Not the worker selection strategy (discussed below) but the supervisor flags to be used in the supervisor over the individual workers (`wpool_process_sup`). Defaults to `{one_for_one, 5, 60}` * **pool_sup_intensity** and **pool_sup_period**: The intensity and period for the supervisor that manages the worker pool system (`wpool_pool`). The strategy of this supervisor must be `one_for_all` but the intensity and period may be changed from their defaults of `5` and `60`. * **queue_type**: Order in which requests will be stored and handled by workers. This option can take values `lifo` or `fifo`. Defaults to `fifo`. +* **enable_callbacks**: A boolean value determining if `event_manager` should be started for callback modules. + Defaults to `false`. +* **callbacks**: Initial list of callback modules implementing `wpool_process_callbacks` to be called on certain worker events. + This options will only work if the `enable_callbacks` is set to **true**. Callbacks can be added and removed later by `wpool_pool:add_callback_module/2` and `wpool_pool:remove_callback_module/2`. #### Using the Workers Since the workers are `gen_server`s, messages can be `call`ed or `cast`ed to them. To do that you can use `wpool:call` and `wpool:cast` as you would use the equivalent functions on `gen_server`. diff --git a/src/wpool_pool.erl b/src/wpool_pool.erl index 14b77c9..16f8b78 100644 --- a/src/wpool_pool.erl +++ b/src/wpool_pool.erl @@ -44,6 +44,8 @@ -export([ next/2 , wpool_get/2 ]). +-export([ add_callback_module/2 + , remove_callback_module/2]). %% Supervisor callbacks -export([ init/1 @@ -253,6 +255,16 @@ wpool_size(Name) -> -spec next(pos_integer(), wpool()) -> wpool(). next(Next, WPool) -> WPool#wpool{next = Next}. +-spec add_callback_module(wpool:name(), module()) -> ok | {error, term()}. +add_callback_module(Pool, Module) -> + EventManager = event_manager_name(Pool), + wpool_process_callbacks:add_callback_module(EventManager, Module). + +-spec remove_callback_module(wpool:name(), module()) -> ok | {error, term()}. +remove_callback_module(Pool, Module) -> + EventManager = event_manager_name(Pool), + wpool_process_callbacks:remove_callback_module(EventManager, Module). + %% @doc Get values from the worker pool record. Useful when using a custom %% strategy function. -spec wpool_get(atom(), wpool()) -> any(); ([atom()], wpool()) -> any(). @@ -288,6 +300,7 @@ init({Name, Options}) -> TimeChecker = time_checker_name(Name), QueueManager = queue_manager_name(Name), ProcessSup = process_sup_name(Name), + EventManagerName = event_manager_name(Name), _Wpool = store_wpool( #wpool{ name = Name @@ -315,9 +328,19 @@ init({Name, Options}) -> , [wpool_queue_manager] }, + EventManagerSpec = + { EventManagerName + , {gen_event, start_link, [{local, EventManagerName}]} + , permanent + , brutal_kill + , worker + , dynamic + }, + SupShutdown = proplists:get_value(pool_sup_shutdown, Options, brutal_kill), WorkerOpts = - [{queue_manager, QueueManager}, {time_checker, TimeChecker} | Options], + [{queue_manager, QueueManager}, {time_checker, TimeChecker} + | Options] ++ maybe_event_manager(Options, {event_manager, EventManagerName}), ProcessSupSpec = { ProcessSup , {wpool_process_sup, start_link, [Name, ProcessSup, WorkerOpts]} @@ -327,10 +350,14 @@ init({Name, Options}) -> , [wpool_process_sup] }, + Children = [TimeCheckerSpec, QueueManagerSpec] ++ + maybe_event_manager(Options, EventManagerSpec) ++ + [ProcessSupSpec], + SupIntensity = proplists:get_value(pool_sup_intensity, Options, 5), SupPeriod = proplists:get_value(pool_sup_period, Options, 60), SupStrategy = {one_for_all, SupIntensity, SupPeriod}, - {ok, {SupStrategy, [TimeCheckerSpec, QueueManagerSpec, ProcessSupSpec]}}. + {ok, {SupStrategy, Children}}. %% @private -spec worker_name(wpool:name(), pos_integer()) -> atom(). @@ -345,6 +372,8 @@ process_sup_name(Sup) -> list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-process-sup"). queue_manager_name(Sup) -> list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-queue-manager"). +event_manager_name(Sup) -> + list_to_atom(?MODULE_STRING ++ [$-|atom_to_list(Sup)] ++ "-event-manager"). worker_with_no_task(Wpool) -> %% Moving the beginning of the list to a random point to ensure that clients @@ -484,3 +513,11 @@ set_random_fun() -> end end, application:set_env(worker_pool, random_fun, RndFun). + +maybe_event_manager(Options, Item) -> + EnableEventManager = proplists:get_value(enable_callbacks, Options, false), + case EnableEventManager of + true -> + [Item]; + _ -> [] + end. diff --git a/src/wpool_process.erl b/src/wpool_process.erl index 9f367ca..8709cd1 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -77,9 +77,12 @@ cast_call(Process, From, Call) -> %% @private -spec init({atom(), atom(), term(), [wpool:option()]}) -> {ok, state()}. init({Name, Mod, InitArgs, Options}) -> + wpool_process_callbacks:notify(handle_init_start, Options, [Name]), + case Mod:init(InitArgs) of {ok, ModState} -> ok = wpool_utils:notify_queue_manager(new_worker, Name, Options), + wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]), {ok, #state{ name = Name , mod = Mod , state = ModState @@ -87,6 +90,7 @@ init({Name, Mod, InitArgs, Options}) -> }}; {ok, ModState, Timeout} -> ok = wpool_utils:notify_queue_manager(new_worker, Name, Options), + wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]), {ok, #state{ name = Name , mod = Mod , state = ModState @@ -101,6 +105,7 @@ init({Name, Mod, InitArgs, Options}) -> terminate(Reason, State) -> #state{mod=Mod, state=ModState, name=Name, options=Options} = State, ok = wpool_utils:notify_queue_manager(worker_dead, Name, Options), + wpool_process_callbacks:notify(handle_worker_death, Options, [Name, Reason]), Mod:terminate(Reason, ModState). %% @private @@ -225,3 +230,5 @@ handle_call(Call, From, State) -> , State#state.name , State#state.options), Reply. + + diff --git a/src/wpool_process_callbacks.erl b/src/wpool_process_callbacks.erl new file mode 100644 index 0000000..7809588 --- /dev/null +++ b/src/wpool_process_callbacks.erl @@ -0,0 +1,101 @@ +-module(wpool_process_callbacks). + +-behaviour(gen_event). + +%% gen_event callbacks + +-export([ init/1 + , handle_event/2 + , handle_call/2 + , handle_info/2 + , code_change/3 + , terminate/2 + ]). + +-export([ notify/3 + , add_callback_module/2 + , remove_callback_module/2 + ]). +-type state() :: module(). + +-type event() :: handle_init_start | handle_worker_creation | handle_worker_death. + +-callback handle_init_start(wpool:name()) -> any(). +-callback handle_worker_creation(wpool:name()) -> any(). +-callback handle_worker_death(wpool:name(), term()) -> any(). + +-optional_callbacks([handle_init_start/1, handle_worker_creation/1, handle_worker_death/2]). + +-spec init(module()) -> {ok, state()}. +init(Module) -> + {ok, Module}. + +-spec handle_event({event(), [any()]}, state()) -> {ok, state()}. +handle_event({Event, Args}, Module) -> + call(Module, Event, Args), + {ok, Module}; +handle_event(_, State) -> + {ok, State}. + +-spec handle_call(any(), state()) -> {ok, ok, state()}. +handle_call(_, State) -> + {ok, ok, State}. + +-spec handle_info(any(), state()) -> {ok, state()}. +handle_info(_, State) -> + {ok, State}. + +-spec code_change(any(), state(), any()) -> {ok, state()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +-spec terminate(any(), state()) -> ok. +terminate(_Reason, _State) -> + ok. + +-spec notify(event(), [wpool:option()], [any()]) -> ok. +notify(Event, Options, Args) -> + case lists:keyfind(event_manager, 1, Options) of + {event_manager, EventMgr} -> + gen_event:notify(EventMgr, {Event, Args}); + _ -> + ok + end. + +-spec add_callback_module(wpool:name(), module()) -> ok | {error, any()}. +add_callback_module(EventManager, Module) -> + case ensure_loaded(Module) of + ok -> + gen_event:add_handler(EventManager, + {wpool_process_callbacks, Module}, Module); + Other -> + Other + end. + + +-spec remove_callback_module(wpool:name(), module()) -> ok | {error, any()}. +remove_callback_module(EventManager, Module) -> + gen_event:delete_handler(EventManager, {wpool_process_callbacks, Module}, Module). + +call(Module, Event, Args) -> + try + case erlang:function_exported(Module, Event, length(Args)) of + true -> + erlang:apply(Module, Event, Args); + _ -> + ok + end + catch + E:R -> + error_logger:warning_msg("Could not call callback module, error:~p, reason:~p", [E, R]) + end. + +ensure_loaded(Module) -> + case code:ensure_loaded(Module) of + {module, Module} -> + ok; + {error, embedded} -> %% We are in embedded mode so the module was loaded if exists + ok; + Other -> + Other + end. diff --git a/src/wpool_process_sup.erl b/src/wpool_process_sup.erl index d32a88b..0486824 100644 --- a/src/wpool_process_sup.erl +++ b/src/wpool_process_sup.erl @@ -34,6 +34,7 @@ start_link(Parent, Name, Options) -> init({Name, Options}) -> Workers = proplists:get_value(workers, Options, 100), Strategy = proplists:get_value(strategy, Options, {one_for_one, 5, 60}), + maybe_add_event_handler(Options), {WorkerType, Worker, InitArgs} = case proplists:get_value(worker_type, Options, gen_server) of gen_server -> @@ -55,3 +56,22 @@ init({Name, Options}) -> , [Worker] } || I <- lists:seq(1, Workers)], {ok, {Strategy, WorkerSpecs}}. + +maybe_add_event_handler(Options) -> + case proplists:get_value(event_manager, Options, undefined) of + undefined -> + ok; + EventMgr -> + lists:foreach(fun(M) -> add_initial_callback(EventMgr, M) end, + proplists:get_value(callbacks, Options, [])) + end. + +add_initial_callback(EventManager, Module) -> + case wpool_process_callbacks:add_callback_module(EventManager, Module) of + ok -> + ok; + Other -> + error_logger:warning_msg("The callback module:~p could not be loaded, reason:~p", + [Module, Other]) + end. + diff --git a/test/wpool_SUITE.erl b/test/wpool_SUITE.erl index 77e7658..54b6a12 100644 --- a/test/wpool_SUITE.erl +++ b/test/wpool_SUITE.erl @@ -379,7 +379,6 @@ broadcast(_Config) -> meck:unload(x), {comment, []}. - %% ============================================================================= %% Helpers %% ============================================================================= diff --git a/test/wpool_process_callbacks_SUITE.erl b/test/wpool_process_callbacks_SUITE.erl new file mode 100644 index 0000000..0b9613b --- /dev/null +++ b/test/wpool_process_callbacks_SUITE.erl @@ -0,0 +1,176 @@ +-module(wpool_process_callbacks_SUITE). + +-type config() :: [{atom(), term()}]. + +-export([ all/0 + ]). +-export([ init_per_suite/1 + , end_per_suite/1 + ]). +-export([ complete_callback_passed_when_starting_pool/1 + , partial_callback_passed_when_starting_pool/1 + , callback_can_be_added_and_removed_after_pool_is_started/1 + , crashing_callback_does_not_affect_others/1 + , non_existsing_module_does_not_affect_others/1 + ]). + +-spec all() -> [atom()]. +all() -> + [ complete_callback_passed_when_starting_pool + , partial_callback_passed_when_starting_pool + , callback_can_be_added_and_removed_after_pool_is_started + , crashing_callback_does_not_affect_others + , non_existsing_module_does_not_affect_others + ]. + +-spec init_per_suite(config()) -> config(). +init_per_suite(Config) -> + ok = wpool:start(), + Config. + +-spec end_per_suite(config()) -> config(). +end_per_suite(Config) -> + wpool:stop(), + Config. + + +-spec complete_callback_passed_when_starting_pool(config()) -> ok. +complete_callback_passed_when_starting_pool(_Config) -> + Pool = callbacks_test, + WorkersCount = 13, + meck:new(callbacks, [non_strict]), + meck:expect(callbacks, handle_init_start, fun(_AWorkerName) -> ok end), + meck:expect(callbacks, handle_worker_creation, fun(_AWorkerName) -> ok end), + meck:expect(callbacks, handle_worker_death, fun(_AWName, _Reason) -> ok end), + {ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount}, + {enable_callbacks, true}, + {worker, {crashy_server, []}}, + {callbacks, [callbacks]}]), + + WorkersCount = ktn_task:wait_for(function_calls(callbacks, handle_init_start, + ['_']), WorkersCount), + WorkersCount = ktn_task:wait_for(function_calls(callbacks, + handle_worker_creation, + ['_']), WorkersCount), + Worker = wpool_pool:random_worker(Pool), + Worker ! crash, + 1 = ktn_task:wait_for(function_calls(callbacks, handle_worker_death, + ['_', '_']), 1), + wpool:stop_pool(Pool), + meck:unload(callbacks), + + ok. + +-spec partial_callback_passed_when_starting_pool(config) -> ok. +partial_callback_passed_when_starting_pool(_Config) -> + Pool = partial_callbacks_test, + WorkersCount = 7, + meck:new(callbacks, [non_strict]), + meck:expect(callbacks, handle_worker_creation, fun(_AWorkerName) -> ok end), + meck:expect(callbacks, handle_worker_death, fun(_AWName, _Reason) -> ok end), + {ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount}, + {enable_callbacks, true}, + {callbacks, [callbacks]}]), + WorkersCount = ktn_task:wait_for(function_calls(callbacks, + handle_worker_creation, + ['_']), WorkersCount), + wpool:stop_pool(Pool), + meck:unload(callbacks), + + ok. + +-spec callback_can_be_added_and_removed_after_pool_is_started(config()) -> ok. +callback_can_be_added_and_removed_after_pool_is_started(_Config) -> + Pool = after_start_callbacks_test, + WorkersCount = 3, + meck:new(callbacks, [non_strict]), + meck:expect(callbacks, handle_worker_death, fun(_AWName, _Reason) -> ok end), + meck:new(callbacks2, [non_strict]), + meck:expect(callbacks2, handle_worker_death, fun(_AWName, _Reason) -> ok end), + {ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount}, + {worker, {crashy_server, []}}, + {enable_callbacks, true}]), + %% Now we are adding 2 callback modules + _ = wpool_pool:add_callback_module(Pool, callbacks), + _ = wpool_pool:add_callback_module(Pool, callbacks2), + Worker = wpool_pool:random_worker(Pool), + Worker ! crash, + + %% they both are called + 1 = ktn_task:wait_for(function_calls(callbacks, handle_worker_death, + ['_', '_']), 1), + 1 = ktn_task:wait_for(function_calls(callbacks2, handle_worker_death, + ['_', '_']), 1), + + %% then the first module is removed + _ = wpool_pool:remove_callback_module(Pool, callbacks), + Worker2 = wpool_pool:random_worker(Pool), + Worker2 ! crash, + + %% and only the scond one is called + 1 = ktn_task:wait_for(function_calls(callbacks, handle_worker_death, + ['_', '_']), 1), + 2 = ktn_task:wait_for(function_calls(callbacks2, handle_worker_death, + ['_', '_']), 2), + + wpool:stop_pool(Pool), + meck:unload(callbacks), + meck:unload(callbacks2), + + ok. + + +-spec crashing_callback_does_not_affect_others(config()) -> ok. +crashing_callback_does_not_affect_others(_Config) -> + Pool = crashing_callbacks_test, + WorkersCount = 3, + meck:new(callbacks, [non_strict]), + meck:expect(callbacks, handle_worker_creation, fun(_AWorkerName) -> ok end), + meck:new(callbacks2, [non_strict]), + meck:expect(callbacks2, handle_worker_creation, + fun(AWorkerName) -> {not_going_to_work} = AWorkerName end), + {ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount}, + {worker, {crashy_server, []}}, + {enable_callbacks, true}, + {callbacks, [callbacks, callbacks2]}]), + + WorkersCount = ktn_task:wait_for(function_calls(callbacks, + handle_worker_creation, + ['_']), WorkersCount), + WorkersCount = ktn_task:wait_for(function_calls(callbacks2, + handle_worker_creation, + ['_']), WorkersCount), + + wpool:stop_pool(Pool), + meck:unload(callbacks), + meck:unload(callbacks2), + + ok. + + +-spec non_existsing_module_does_not_affect_others(config()) -> ok. +non_existsing_module_does_not_affect_others(_Config) -> + Pool = non_existing_callbacks_test, + WorkersCount = 4, + meck:new(callbacks, [non_strict]), + meck:expect(callbacks, handle_worker_creation, fun(_AWorkerName) -> ok end), + {ok, _Pid} = wpool:start_pool(Pool, [{workers, WorkersCount}, + {worker, {crashy_server, []}}, + {enable_callbacks, true}, + {callbacks, [callbacks, non_existing_m]} + ]), + + {error, nofile} = wpool_pool:add_callback_module(Pool, non_existing_m2), + + WorkersCount = ktn_task:wait_for(function_calls(callbacks, + handle_worker_creation, + ['_']), WorkersCount), + + wpool:stop_pool(Pool), + meck:unload(callbacks), + + ok. +function_calls(Module, Function, MeckMatchSpec) -> + fun() -> + meck:num_calls(Module, Function, MeckMatchSpec) + end.