Skip to content

Commit

Permalink
Move checkpoint to another process
Browse files Browse the repository at this point in the history
  • Loading branch information
cam committed May 24, 2021
1 parent d9b8217 commit e5507e5
Show file tree
Hide file tree
Showing 15 changed files with 622 additions and 175 deletions.
3 changes: 2 additions & 1 deletion crates/big_data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ rustler::init!(
nif::lookup_elem,
nif::remove,
nif::remove_row,
nif::remove_row_ids
nif::remove_row_ids,
nif::big_key_list,
],
load = nif::on_load
);
16 changes: 16 additions & 0 deletions crates/big_data/src/nif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ impl NifBigData {
fn to_list(&self, big_key: &str) -> Option<Vec<&RowData>> {
self.get(big_key).map(|big_data| big_data.to_list())
}
fn big_key_list(&self) -> Vec<&String>{
let mut l = Vec::new();
for key in self.data.keys(){
l.push(key);
}
l
}
// clear all big_data
fn clear(&mut self) {
self.data.clear();
Expand Down Expand Up @@ -244,6 +251,15 @@ fn get<'a>(
}
}
#[rustler::nif]
fn big_key_list(
env: Env,
resource: ResourceArc<NifBigDataResource>,
) -> NifResult<Term> {
let read = resource.read();
let list = read.big_key_list();
Ok((list).encode(env))
}
#[rustler::nif]
fn get_range<'a>(
env: Env<'a>,
resource: ResourceArc<NifBigDataResource>,
Expand Down
19 changes: 16 additions & 3 deletions include/big_data.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,29 @@
file_name :: maybe(file:filename()),
file_num = 1 :: pos_integer(),
file_size = 0 :: pos_integer(),
log_meta_ref :: reference(),
wal_buffer_tid :: reference(),
checkpoint_seq = 0 :: pos_integer(),
last_checkpoint_time = 0 :: pos_integer(),
id_seq = 0 :: pos_integer(),
log_seq = 0 :: pos_integer(),
max_size_bytes = ?BD_WAL_MAX_SIZE_BYTES :: pos_integer(),
write_strategy = default :: wal_write_strategy(),
file_modes = [raw, write, read, binary] :: list(),
stop_from :: maybe(from())}).

-define(BD_NOTFOUND, notfound).
% "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789").
-define(BD_CHAR_TUPLE,
{97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114,
115, 116, 117, 118, 119, 120, 121, 122, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76,
77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 48, 49, 50, 51, 52, 53, 54, 55,
56, 57}).
-define(BD_RAND_BYTES(N),
erlang:list_to_binary([erlang:element(
rand:uniform(
erlang:size(?BD_CHAR_TUPLE)),
?BD_CHAR_TUPLE)
|| _ <- lists:seq(1, N)])).
-define(BD_COUNTER_ID_SEQ, 1).
-define(BD_COUNTER_CHECKPOINT_SEQ, 2).
-define(BD_COUNTER_LIST, [id_seq, checkpoint_seq]).

-endif.
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ warn_unused_import,
warnings_as_errors
]}.
{deps, [lager,
{eredis, {git, "https://github.com/wooga/eredis.git", {tag, "v1.2.0"}}}
{eredis, {git, "https://github.com/wooga/eredis.git", {tag, "v1.2.0"}}},
observer_cli
]}.
{eunit_opts, [export_all]}.% same as options for eunit:test(Tests, ...)
{eunit_tests, []}. % same as Tests argument in eunit:test(Tests, ...)
Expand Down
12 changes: 9 additions & 3 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
{ref,"9ad91f149310a7d002cb966f62b7e2c3330abb04"}},
0},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.9.1">>},0}]}.
{<<"lager">>,{pkg,<<"lager">>,<<"3.9.1">>},0},
{<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.6.1">>},0},
{<<"recon">>,{pkg,<<"recon">>,<<"2.5.1">>},1}]}.
[
{pkg_hash,[
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"lager">>, <<"5885BC71308CD38F9D025C8ECDE4E5CCE1CE8565F80BFC6199865C845D6DBE95">>}]},
{<<"lager">>, <<"5885BC71308CD38F9D025C8ECDE4E5CCE1CE8565F80BFC6199865C845D6DBE95">>},
{<<"observer_cli">>, <<"D176F967C978AB8B8A29C35C12524F78B7BB36FD4E9B8276DD75C9CB56E07E42">>},
{<<"recon">>, <<"430FFA60685AC1EFDFB1FE4C97B8767C92D0D92E6E7C3E8621559BA77598678A">>}]},
{pkg_hash_ext,[
{<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>},
{<<"lager">>, <<"3F59BA75A04A99E5F18BF91C89F46DCE536F83C6CB415FE26E6E75A62BEF37DC">>}]}
{<<"lager">>, <<"3F59BA75A04A99E5F18BF91C89F46DCE536F83C6CB415FE26E6E75A62BEF37DC">>},
{<<"observer_cli">>, <<"3418E319764B9DFF1F469E43CBDFFD7FD54EA47CBF765027C557ABD146A19FB3">>},
{<<"recon">>, <<"5721C6B6D50122D8F68CCCAC712CAA1231F97894BAB779EFF5FF0F886CB44648">>}]}
].
133 changes: 133 additions & 0 deletions src/bd_bench.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
%%%-------------------------------------------------------------------
%%% @author yangcancai

%%% Copyright (c) 2021 by yangcancai([email protected]), All Rights Reserved.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% https://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%

%%% @doc
%%%
%%% @end
%%% Created : 2021-05-19T07:19:22+00:00
%%%-------------------------------------------------------------------
-module(bd_bench).

-author("yangcancai").

-include("big_data.hrl").

-export([run/0, run/2, overview/0]).

-define(TotalCmdIx, 1).
-define(TotalTime, 2).
-define(FIELDS, [total_command, sum_time]).
-define(SIZE, erlang:length(?FIELDS)).

run() ->
run(100, 100).

run(N, Degree) when is_integer(N) ->
run(#{bucket_counter => N,
degree => Degree,
min_row => 1,
max_row => 300,
mix_row_bytes => 100,
max_row_bytes => 500}).

run(#{bucket_counter := BucketCounter, degree := Degree} = Config) ->
spawn_link(fun() ->
Each = BucketCounter div Degree,
Pid = self(),
Size = ?SIZE,
Ref = counters:new(Size, []),
persistent_term:put(bd_bench_s, erlang:system_time(1000)),
persistent_term:put(counters, Ref),
Pids =
[spawn_link(fun() -> loop(Pid, Each, Config) end)
|| _ <- lists:seq(1, Degree)],
wait_loop(Pids),
Rs = overview(),
io:format("Overview = ~p~n", [Rs])
end).

overview() ->
Ref = persistent_term:get(counters),
L = [counters:get(Ref, I) || I <- lists:seq(1, ?SIZE)],
R = #{total_command := Cmd, sum_time := Time} =
maps:from_list(
lists:zip(?FIELDS, L)),
Tps = Time div Cmd,
TotalTime = persistent_term:get(bd_bench_e) - persistent_term:get(bd_bench_s),
R#{aver => Tps div 1000,
tps => Cmd div (TotalTime div 1000),
total_time => TotalTime}.

wait_loop([]) ->
persistent_term:put(bd_bench_e, erlang:system_time(1000)),
ok;
wait_loop(Pids) ->
receive
{Pid, done} ->
?DEBUG("Bech done pid = ~p", [Pid]),
wait_loop(lists:delete(Pid, Pids));
{'EXIT', Pid, _} ->
?DEBUG("Bech exit pid = ~p", [Pid]),
wait_loop(lists:delete(Pid, Pids))
end.

loop(Parent,
Each,
#{min_row := MixRow,
max_row := MaxRow,
mix_row_bytes := MixRowBytes,
max_row_bytes := MaxRowBytes}) ->
Row = gen_row(MixRow, MaxRow),
Ref = persistent_term:get(counters),
%% insert
[begin
Bucket = gen_bucket(),
[begin
RowBytes = gen_data(MixRowBytes, MaxRowBytes),
counters:add(Ref, ?TotalCmdIx, 1),
{T, _Rs} =
timer:tc(fun() ->
big_data:command(#bd_wal{action = insert,
args =
[Bucket,
erlang:integer_to_binary(RowID),
erlang:system_time(1000),
RowBytes]})
end),
counters:add(Ref, ?TotalTime, T)
end
|| RowID <- lists:seq(1, Row)]
end
|| _ <- lists:seq(1, Each)],
Parent ! {self(), done}.

gen_row(MixRow, MaxRow) ->
rand(MixRow, MaxRow).

gen_bucket() ->
rand_bytes(32).

gen_data(MixRowBytes, MaxRowBytes) ->
RowBytes = rand(MixRowBytes, MaxRowBytes),
rand_bytes(RowBytes).

rand(Mix, Max) ->
rand:uniform(Max - Mix) + Mix.

rand_bytes(N) ->
?BD_RAND_BYTES(N).
Loading

0 comments on commit e5507e5

Please sign in to comment.