Skip to content

Commit

Permalink
Merge pull request #155 from inaka/elbrujohalcon.backports
Browse files Browse the repository at this point in the history
Backports to 3.1
  • Loading branch information
michalwski authored Oct 11, 2018
2 parents 21c062d + c5affb4 commit e08b307
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 184 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
.erlang.mk
worker_pool.d
_build/
rebar.lock
.rebar/
all.coverdata
.classpath
Expand All @@ -18,4 +17,4 @@ erl_crash.dump
*.log
*~
.idea
*.iml
*.iml
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ before_install:
- ./ci before_install "${PWD:?}"/rebar3
install:
- ./ci install "${PWD:?}"/rebar3
- pip install --user codecov
script:
- ./ci script "${PWD:?}"/rebar3
cache:
directories:
- .plt
after_success:
- codecov
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Worker Pool [![Build Status](https://travis-ci.org/inaka/worker_pool.svg?branch=master)](https://travis-ci.org/inaka/worker_pool)
# Worker Pool [![Build Status](https://travis-ci.org/inaka/worker_pool.svg?branch=master)](https://travis-ci.org/inaka/worker_pool)[![codecov](https://codecov.io/gh/inaka/worker_pool/branch/master/graph/badge.svg)](https://codecov.io/gh/inaka/worker_pool)

<img src="http://img3.wikia.nocookie.net/__cb20140705120849/clubpenguin/images/thumb/f/ff/MINIONS.jpg/481px-MINIONS.jpg" align="right" style="float:right" height="400" />

Expand Down
7 changes: 0 additions & 7 deletions include/wpool.hrl

This file was deleted.

29 changes: 19 additions & 10 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
, warn_untyped_record
, debug_info]}.

{profiles, [
{test, [
{deps, [
{katana_test, "1.0.0"},
{mixer, "1.0.0", {pkg, inaka_mixer}},
{meck, "0.8.10"}
]}
]}
]}.
{profiles, [{test, [{deps, [ {katana_test, "1.0.1"}
, {katana, "0.4.0"}
, {mixer, "1.0.1", {pkg, inaka_mixer}}
, {meck, "0.8.11"}
]
}]
}]
}.

%% == Common Test ==

Expand All @@ -53,7 +52,17 @@

%% == Cover ==

{cover_enabled, true}.
{plugins , [coveralls,
{rebar3_codecov, "0.1.0"}
]}.

{cover_enabled , true}.
{cover_export_enabled , true}.

{provider_hooks,
[
{post, [{ct, {codecov, analyze}}]}
]}.

{cover_opts, [verbose]}.

Expand Down
1 change: 1 addition & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[].
15 changes: 10 additions & 5 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,15 @@
-export([ init/1
]).

-include("wpool.hrl").

-type wpool() :: #wpool{}.
-record(wpool, { name :: wpool:name()
, size :: pos_integer()
, next :: pos_integer()
, opts :: [wpool:option()]
, qmanager :: wpool_queue_manager:queue_mgr()
, born = os:timestamp() :: erlang:timestamp()
}).

-opaque wpool() :: #wpool{}.
-export_type([wpool/0]).

%% ===================================================================
Expand Down Expand Up @@ -198,8 +204,7 @@ stats(Wpool, Sup) ->
{T + MQL, [{N, WS} | L]}
end
end, {0, []}, lists:seq(1, Wpool#wpool.size)),
ManagerStats = wpool_queue_manager:stats(Wpool#wpool.name),
PendingTasks = proplists:get_value(pending_tasks, ManagerStats),
PendingTasks = wpool_queue_manager:pending_task_count(Wpool#wpool.qmanager),
[ {pool, Sup}
, {supervisor, erlang:whereis(Sup)}
, {options, lists:ukeysort(1, proplists:unfold(Wpool#wpool.opts))}
Expand Down
15 changes: 15 additions & 0 deletions src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
, handle_call/3
, handle_cast/2
, handle_info/2
, format_status/2
]).

%%%===================================================================
Expand Down Expand Up @@ -124,6 +125,20 @@ handle_info(Info, State) ->
{stop, Reason, State#state{state = NewState}}
end.

%% @private
-spec format_status(normal | terminate, [[{_, _}] | state(), ...]) -> term().
format_status(Opt, [PDict, State]) ->
case erlang:function_exported(State#state.mod, format_status, 2) of
false ->
case Opt of % This is copied from gen_server:format_status/4
terminate -> State#state.state;
normal -> [{data, [{"State", State#state.state}]}]
end;
true ->
wpool_utils:do_try(
fun() -> (State#state.mod):format_status(Opt, [PDict, State#state.state]) end)
end.

%%%===================================================================
%%% real (i.e. interesting) callbacks
%%%===================================================================
Expand Down
39 changes: 10 additions & 29 deletions src/wpool_queue_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
, worker_dead/2
, worker_ready/2
, worker_busy/2
]).
-export([ stats/1
, pending_task_count/1
]).

%% gen_server callbacks
Expand All @@ -40,8 +39,6 @@
, handle_info/2
]).

-include("wpool.hrl").

-record(state, { wpool :: wpool:name()
, clients :: queue:queue({cast|{pid(), _}, term()})
, workers :: gb_sets:set(atom())
Expand Down Expand Up @@ -126,23 +123,11 @@ worker_busy(QueueManager, Worker) ->
worker_dead(QueueManager, Worker) ->
gen_server:cast(QueueManager, {worker_dead, Worker}).

%% @doc Returns statistics for this queue.
-spec stats(wpool:name()) ->
proplists:proplist() | {error, {invalid_pool, wpool:name()}}.
stats(PoolName) ->
case ets:lookup(wpool_pool, PoolName) of
[] -> {error, {invalid_pool, PoolName}};
[#wpool{qmanager = QueueManager, size = PoolSize, born = Born}] ->
{AvailableWorkers, PendingTasks} =
gen_server:call(QueueManager, worker_counts),
BusyWorkers = PoolSize - AvailableWorkers,
[ {pool_age_in_secs, age_in_seconds(Born)}
, {pool_size, PoolSize}
, {pending_tasks, PendingTasks}
, {available_workers, AvailableWorkers}
, {busy_workers, BusyWorkers}
]
end.
%% @doc Retrieves the number of pending tasks (used for stats)
%% @see wpool_pool:stats/1
-spec pending_task_count(queue_mgr()) -> non_neg_integer().
pending_task_count(QueueManager) ->
gen_server:call(QueueManager, pending_task_count).

%%%===================================================================
%%% gen_server callbacks
Expand Down Expand Up @@ -211,7 +196,7 @@ handle_cast({cast_to_available_worker, Cast}, State) ->
end.

-type call_request() ::
{available_worker, infinity|pos_integer()} | worker_counts.
{available_worker, infinity|pos_integer()} | pending_task_count.
%% @private
-spec handle_call(call_request(), from(), state()) ->
{reply, {ok, atom()}, state()} | {noreply, state()}.
Expand All @@ -238,10 +223,8 @@ handle_call(
{noreply, State}
end
end;
handle_call(worker_counts, _From, State) ->
#state{workers = AvailableWorkers} = State,
Available = gb_sets:size(AvailableWorkers),
{reply, {Available, get(pending_tasks)}, State}.
handle_call(pending_task_count, _From, State) ->
{reply, get(pending_tasks), State}.

%% @private
-spec handle_info(any(), state()) -> {noreply, state()}.
Expand Down Expand Up @@ -277,8 +260,6 @@ dec(Key) -> put(Key, get(Key) - 1).

now_in_microseconds() -> timer:now_diff(os:timestamp(), {0, 0, 0}).

age_in_seconds(Born) -> timer:now_diff(os:timestamp(), Born) div 1000000.

expires(Timeout) ->
case Timeout of
infinity -> infinity;
Expand All @@ -292,4 +273,4 @@ monitor_worker(Worker, Client, State = #state{monitors = Mons}) ->
queue_out(Clients, fifo) ->
queue:out(Clients);
queue_out(Clients, lifo) ->
queue:out_r(Clients).
queue:out_r(Clients).
12 changes: 10 additions & 2 deletions test/echo_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
, handle_call/3
, handle_cast/2
, handle_info/2
, handle_continue/2
, format_status/2
]).

-dialyzer([no_behaviours]).
Expand All @@ -48,6 +50,12 @@ handle_info(Info, _State) -> Info.
handle_cast(Cast, _State) -> Cast.

-type from() :: {pid(), reference()}.
-spec handle_call(state | Call, from(), State) -> {reply, State, State} | Call.
handle_call(state, _From, State) -> {reply, State, State};
-spec handle_call(Call, from(), term()) -> Call.
handle_call(Call, _From, _State) -> Call.

-spec handle_continue(Continue, term()) -> Continue.
handle_continue(Continue, _State) -> Continue.

-spec format_status(normal | terminate, [[{_, _}] | State, ...]) ->
{formatted_state, State}.
format_status(_, [_PDict, State]) -> {formatted_state, State}.
79 changes: 44 additions & 35 deletions test/wpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,17 @@ too_much_overrun(_Config) ->

ct:comment("Start a long running task..."),
ok = wpool:cast(wpool_SUITE_too_much_overrun, {timer, sleep, [5000]}),
timer:sleep(100),
{dictionary, Dict} = erlang:process_info(Worker, dictionary),
{TaskId, _, _} = proplists:get_value(wpool_task, Dict),
TaskId =
ktn_task:wait_for_success(
fun() ->
{dictionary, Dict} = erlang:process_info(Worker, dictionary),
{TId, _, _} = proplists:get_value(wpool_task, Dict),
TId
end),

ct:comment("Simulate overrun warning..."),
TCPid ! {check, Worker, TaskId, 9999999999, infinity}, % huge runtime… no more overruns
% huge runtime => no more overruns
TCPid ! {check, Worker, TaskId, 9999999999, infinity},

ct:comment("Get overrun message..."),
_ = receive
Expand Down Expand Up @@ -188,7 +193,8 @@ kill_on_overrun(_Config) ->
_ = receive
{overrun1, Message2} ->
max_overrun_limit = proplists:get_value(alert, Message2),
wpool_SUITE_kill_on_overrun_pool = proplists:get_value(pool, Message2),
wpool_SUITE_kill_on_overrun_pool =
proplists:get_value(pool, Message2),
WPid2 = proplists:get_value(worker, Message2),
true = is_pid(WPid2),
false = erlang:is_process_alive(WPid2)
Expand Down Expand Up @@ -262,37 +268,41 @@ stats(_Config) ->

% Start a long task on every worker
Sleep = {timer, sleep, [2000]},
[wpool:cast(wpool_SUITE_stats_pool, Sleep, next_worker) ||
_ <- lists:seq(1, 10)],

timer:sleep(100),

% Checks ...
WorkingStats = wpool:stats(wpool_SUITE_stats_pool),
wpool_SUITE_stats_pool = Get(pool, WorkingStats),
PoolPid = Get(supervisor, WorkingStats),
Options = Get(options, WorkingStats),
10 = Get(size, WorkingStats),
1 = Get(next_worker, WorkingStats),
WorkingWorkers = Get(workers, WorkingStats),
10 = length(WorkingWorkers),
[ begin
WorkerStats = Get(I, WorkingWorkers),
0 = Get(message_queue_len, WorkerStats),
{timer, sleep, 1} = Get(current_function, WorkerStats),
{timer, sleep, 1, _} = Get(current_location, WorkerStats),
{cast, Sleep} = Get(task, WorkerStats),
true = is_number(Get(runtime, WorkerStats))
end || I <- lists:seq(1, 10)],
[wpool:cast(wpool_SUITE_stats_pool, Sleep, next_worker)
|| _ <- lists:seq(1, 10)],

ok =
ktn_task:wait_for_success(
fun() ->
WorkingStats = wpool:stats(wpool_SUITE_stats_pool),
wpool_SUITE_stats_pool = Get(pool, WorkingStats),
PoolPid = Get(supervisor, WorkingStats),
Options = Get(options, WorkingStats),
10 = Get(size, WorkingStats),
1 = Get(next_worker, WorkingStats),
WorkingWorkers = Get(workers, WorkingStats),
10 = length(WorkingWorkers),
[ begin
WorkerStats = Get(I, WorkingWorkers),
0 = Get(message_queue_len, WorkerStats),
{timer, sleep, 1} = Get(current_function, WorkerStats),
{timer, sleep, 1, _} = Get(current_location, WorkerStats),
{cast, Sleep} = Get(task, WorkerStats),
true = is_number(Get(runtime, WorkerStats))
end || I <- lists:seq(1, 10)],
ok
end),

wpool:stop_sup_pool(wpool_SUITE_stats_pool),

timer:sleep(5000),

no_workers =
try wpool:stats(wpool_SUITE_stats_pool)
catch _:E -> E
end,
ktn_task:wait_for(
fun() ->
try wpool:stats(wpool_SUITE_stats_pool)
catch
_:E -> E
end
end, no_workers, 100, 50),

{comment, []}.

Expand Down Expand Up @@ -332,7 +342,6 @@ complete_coverage(_Config) ->
ok = gen_server:cast(TCPid, cast),

ct:comment("Queue Manager"),
{error, {invalid_pool, invalid}} = wpool_queue_manager:stats(invalid),
QMPid = get_queue_manager(PoolPid),
QMPid ! info,
{ok, QMState} = wpool_queue_manager:init([{pool, pool}]),
Expand All @@ -353,8 +362,8 @@ broadcast(_Config) ->
% Broadcast x:x() execution to workers.
wpool:broadcast(Pool, {x, x, []}),
% Give some time for the workers to perform the calls.
timer:sleep(1000),
WorkersCount = meck:num_calls(x, x, '_'),
WorkersCount =
ktn_task:wait_for(fun() -> meck:num_calls(x, x, '_') end, WorkersCount),

ct:comment("Check they all are \"working\""),
% Make all the workers sleep for 1.5 seconds
Expand Down
4 changes: 3 additions & 1 deletion test/wpool_meta_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ init_per_suite(Config) ->
[ {application, worker_pool}
%% Until the next version of katana-test fixes the missing test deps in plt
%% issue, we can't use the default warnings that include 'unknown' here.
, {dialyzer_warnings, [error_handling, race_conditions, unmatched_returns, no_return]}
, { dialyzer_warnings
, [error_handling, race_conditions, unmatched_returns, no_return]
}
| Config
].

Expand Down
Loading

0 comments on commit e08b307

Please sign in to comment.