Skip to content

Commit

Permalink
Add pv/3 iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
seriyps committed Oct 25, 2024
1 parent b51b5e6 commit e850d9f
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ jobs:
- name: Run Proper Tests
run: rebar3 proper -c

- name: Run EUnit Tests
run: rebar3 eunit -c

- name: Coverage
run: rebar3 cover --verbose --min_coverage 80 # zip/3 can only be fully tested on OTP-26+

Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,29 @@ OTP `lists` module.
Functions `iterator_pmap:pmap/2` and `iterator_pmap:pmap/3` provide parallel version
of `iterator:map/2`: it takes iterator as input and returns a new iterator where map function
is executed for each input element in parallel on a pool of worker processes.
While elements of input are processed in parallel, the ordering of elements is preserved.
The `ordered` parameter controls if the parallel map should preserve the order of the original
iterator or it is allowed to reshuffle the elements (so it outputs elements which are processed
faster - earlier, increasing the throughput).

Another non-standard function is `pv/3` (from `man pv` - "pipe view"). A pass-through iterator
that can be added somewhere in the pipeline to periodically (either every `for_each_n` elements
or every `every_s` seconds) report the current progress of a long-running iterator:

```erlang
I0 = ...,
I1 = iterator:pv(
fun(SampleElement, TimePassed, ItemsPassed, TotalItems) ->
TimeS = erlang:convert_time_unit(TimePassed, native, second),
?LOG_INFO("Processed ~p items. Pace is ~p per-second. Current item: ~p",
[TotalItems, ItemsPassed / TimeS, SampleElement])
end,
#{for_each_n => 1000,
every_s => 30},
I0),
...
```
This example will log current progress either every 30 seconds or after processing every 1000
elements (whichever triggers first).

## Setup

Expand Down
78 changes: 78 additions & 0 deletions src/iterator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
map/2,
mapfoldl/3,
nthtail/2,
pv/3,
sublist/2,
takewhile/2,
zip/3
Expand Down Expand Up @@ -485,6 +486,83 @@ maybe_next(done) ->
maybe_next(#iter{} = Iter) ->
next(Iter).

-record(
pv,
{f :: fun((any(), integer()) -> any()),
for_each_n :: pos_integer(),
every_s :: pos_integer(),
last_report_time :: integer(),
last_report_n :: pos_integer(),
total_n :: pos_integer(),
inner_i :: iterator:iterator(any())}).


%% @doc Passthrough iterator one can use to periodically report the progress of the inner iterator.
%% Name comes from `pv' (pipe view) Unix utility. See `man pv'.
%% @param F function to call when one of the conditions triggers. Function arguments:
%% - `Data' - current element of the inner iterator (sample)
%% - `TimePassed' - time passed since the last report in native units
%% - `ItemsPassed' - number of items passed since the last report
%% - `TotalItems' - total number of items passed since the start
%% `TimePassed' + `ItemsPassed' are convenient to calculate the speed of the stream.
%% @param Opts trigger condition options:
%% - `for_each_n' - trigger every N-th element
%% - `every_s' - trigger every S seconds
%% @param InnerIter inner iterator to wrap
%%
%% Keep in mind that whichever trigger condition is met first, the `F' function will be called and
%% counters/timers will reset. So if you set `for_each_n' to 1000 and `every_s' to 30, then the
%% function will be called either as counter reaches 1000 or 30 seconds pass since the last call.
%%
%% If it takes more than `every_s' seconds to process a single element, the function will be called
%% with additional delay.
-spec pv(fun((Type, TimePassed :: integer(), ItemsPassed :: integer(), TotalItems::integer()) -> any()),
#{for_each_n => pos_integer(),
every_s => pos_integer()},
iterator:iterator(Type)) -> iterator:iterator(Type)
when
Type :: any().
pv(F, Opts, InnerIter) when is_function(F, 4) ->
Start = erlang:monotonic_time(),
State = #pv{f = F,
every_s = maps:get(every_s, Opts, 30),
for_each_n = maps:get(for_each_n, Opts, 1000),
last_report_time = Start,
last_report_n = 0,
total_n = 0,
inner_i = InnerIter},
iterator:new(fun yield_pv/1, State).

yield_pv(#pv{f = F,
every_s = TimeTrigger,
for_each_n = CountTrigger,
last_report_time = LastReportT,
last_report_n = LastReportN,
total_n = N,
inner_i = InnerIter} = St) ->
case iterator:next(InnerIter) of
{ok, Data, NewInnerIter} ->
NextN = N + 1,
ItemsProcessed = NextN - LastReportN,
CountCondition = ItemsProcessed >= CountTrigger,
Now = erlang:monotonic_time(),
TimePassed = Now - LastReportT,
TimeCondition = erlang:convert_time_unit(TimePassed, native, second) >= TimeTrigger,
if CountCondition orelse TimeCondition ->
F(Data, TimePassed, ItemsProcessed, NextN),
{Data, St#pv{last_report_time = Now,
last_report_n = NextN,
total_n = NextN,
inner_i = NewInnerIter}};
true ->
{Data, St#pv{total_n = NextN,
inner_i = NewInnerIter}}
end;
done ->
done
end.


%% @doc Iterator over .eterm file (file containing dot-terminated Erlang terms)
%% XXX: never abandon this iterator from long-running processes! It would leak file descriptor!
%% Either consume it to the end or close with `iterator:close/1' explicitly.
Expand Down
46 changes: 46 additions & 0 deletions test/iterator_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
%% @doc Unit-tests for `iterator'.

-module(iterator_tests).

-include_lib("eunit/include/eunit.hrl").

pv_each_n_test() ->
ForEachN = 5,
I0 = iterator:from_list(lists:seq(1, 50)),
Counter = counters:new(1, []),
I1 = iterator:pv(fun(_, _Time, NItems, TotalItems) ->
ok = counters:add(Counter, 1, ForEachN),
?assertEqual(TotalItems, counters:get(Counter, 1)),
?assertEqual(ForEachN, NItems)
end,
#{
for_each_n => ForEachN,
every_s => 120 % large so it never triggers
},
I0),
iterator:to_list(I1).

%% XXX: ths test can be flaky because it relies on the sleep time
pv_every_s_test() ->
EveryS = 1,
Size = 70,
Sleep = 50,
ApproxPerBatch = (EveryS * 1000) div Sleep,
I0 = iterator:from_list(lists:seq(1, Size)),
I1 = iterator:map(fun(X) ->
timer:sleep(Sleep),
X
end, I0),
I2 = iterator:pv(fun(_, Time, NItems, _TotalItems) ->
TimeMs = erlang:convert_time_unit(Time, native, millisecond),
%% We can't assert exact values here because of the sleep
?assert(abs(TimeMs - (EveryS * 1000)) < 30, [{time, TimeMs}]),
?assert(abs(NItems - ApproxPerBatch) < 4, [{n_items, NItems}]),
io:format("ok!~n", [])
end,
#{
for_each_n => 100, % large so it never triggers
every_s => EveryS
},
I1),
iterator:to_list(I2).

0 comments on commit e850d9f

Please sign in to comment.