Skip to content

Commit

Permalink
Improve recover wal
Browse files Browse the repository at this point in the history
  • Loading branch information
cam committed May 25, 2021
1 parent e5507e5 commit 520ae8c
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ test: epmd dialyzer
./rebar3 do ct --dir test/ct --config test/ct/ct.config --sys_config config/test.config -v

eunit: epmd
./rebar3 do eunit -v
./rebar3 do eunit --dir test/eunit -v

ct: epmd
sh crates/build_crates.sh clippy
Expand Down
Binary file modified rebar3
Binary file not shown.
9 changes: 8 additions & 1 deletion src/bd_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,15 @@ overview() ->
lists:zip(?FIELDS, L)),
Tps = Time div Cmd,
TotalTime = persistent_term:get(bd_bench_e) - persistent_term:get(bd_bench_s),
S = TotalTime div 1000,
Sec = case S of
0 ->
1;
S ->
S
end,
R#{aver => Tps div 1000,
tps => Cmd div (TotalTime div 1000),
tps => Cmd div Sec,
total_time => TotalTime}.

wait_loop([]) ->
Expand Down
1 change: 1 addition & 0 deletions src/bd_checkpoint.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ sync() ->
gen_server:call(?MODULE, checkpoint).

sync(Timeout) ->
?DEBUG("Checkpoint sync timeout:~p", [Timeout]),
gen_server:call(?MODULE, checkpoint, Timeout).

async() ->
Expand Down
55 changes: 47 additions & 8 deletions src/bd_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ recover_wal(#{dir := Dir} = Config) ->
[public, set, {keypos, #bd_wal.id}, {write_concurrency, true}]),
bd_checkpoint:set_tid(Tid),
ok = make_dir(Dir),
CheckpointSeq = bd_checkpoint:lookup_checkpoint_seq(),
WalFiles =
lists:sort(
filelib:wildcard(
filename:join(Dir, "*.wal"))),
maybe_not_checkpoint_files(sort_file(filelib:wildcard(
filename:join(Dir, "*.wal"))),
CheckpointSeq),
%% read chunk to wal_buffer
CheckpointSeq = bd_checkpoint:lookup_checkpoint_seq(),
?DEBUG("CheckpointSeq = ~p", [CheckpointSeq]),
bd_counter:put(?BD_COUNTER_CHECKPOINT_SEQ, CheckpointSeq),
NewIdSeq =
Expand All @@ -212,13 +212,13 @@ recover_wal(#{dir := Dir} = Config) ->
close_existing(Fd),
IdSeq
end,
0,
CheckpointSeq,
WalFiles),
bd_counter:put(?BD_COUNTER_ID_SEQ, NewIdSeq),
CurFileNum = extract_file_num(lists:reverse(WalFiles)),
%% proccess all action
process_all_action(Tid),
bd_checkpoint:sync(60000),
bd_checkpoint:sync(600000),
State =
#bd_log_wal_state{file_num = CurFileNum,
data_dir = Dir,
Expand Down Expand Up @@ -289,7 +289,7 @@ open_and_read_header(File) ->
%% recover wal chunck to ets table
recover_wal_chunk(Tid, CheckpointSeq, Fd, ChunkSz) ->
Chunk = read_from_wal_file(Fd, ChunkSz),
recover_frames(Tid, CheckpointSeq, Fd, Chunk, ChunkSz, 0).
recover_frames(Tid, CheckpointSeq, Fd, Chunk, ChunkSz, CheckpointSeq).

recover_frames(_,
_CheckpointSeq,
Expand Down Expand Up @@ -425,11 +425,13 @@ close_existing(Fd) ->

roll_over(#bd_log_wal_state{fd = Fd,
data_dir = DataDir,
id_seq = IdSeq,
file_num = FileNum} =
State) ->
close_existing(Fd),
Num = FileNum + 1,
FileName = filename:join(DataDir, zpad_filename("", "wal", Num)),
FileName =
filename:join(DataDir, zpad_filename(erlang:integer_to_list(IdSeq + 1), "wal", Num)),
S = State#bd_log_wal_state{file_num = Num, file_name = FileName},
open_wal(S).

Expand Down Expand Up @@ -461,3 +463,40 @@ zpad_filename("", Ext, Num) ->
zpad_filename(Prefix, Ext, Num) ->
lists:flatten(
io_lib:format("~s_~8..0B.~s", [Prefix, Num, Ext])).

sort_file(Files) ->
lists:sort(fun sort_file/2, Files).

sort_file(A, B) ->
case string:split(A, "_") of
[_, A1] ->
[_, B1] = string:split(B, "_"),
A1 < B1;
[A1] ->
[B1] = string:split(B, "_"),
A1 < B1
end.

maybe_not_checkpoint_files([], _) ->
[];
maybe_not_checkpoint_files(Files, CheckpointSeq) ->
N = maybe_not_checkpoint_files(Files, CheckpointSeq, 1),
lists:sublist(Files, N, erlang:length(Files)).

maybe_not_checkpoint_files([], _, N) ->
N - 1;
maybe_not_checkpoint_files([File | Rest], CheckpointSeq, N) ->
B = case string:split(
filename:basename(File), "_")
of
[_] ->
true;
[First, _] ->
erlang:list_to_integer(First) >= CheckpointSeq
end,
case B of
true ->
N;
false ->
maybe_not_checkpoint_files(Rest, CheckpointSeq, N + 1)
end.
2 changes: 1 addition & 1 deletion test/ct/bd_log_wal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ recover(Config) ->
?assertEqual(1, bd_checkpoint:checkpoint_seq()),
?assertEqual([], ets:tab2list(S#bd_log_wal_state.wal_buffer_tid)),
?assertEqual(1, bd_checkpoint:lookup_checkpoint_seq()),
?assertEqual(filename:join(Dir, "00000002.wal"), S#bd_log_wal_state.file_name),
?assertEqual(filename:join(Dir, "1_00000002.wal"), S#bd_log_wal_state.file_name),
?assertEqual([#row_data{row_id = <<"1">>,
time = 1,
term = {a, 1}}],
Expand Down
102 changes: 102 additions & 0 deletions test/eunit/big_log_wal_test.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
%%%-------------------------------------------------------------------
%%% @author yangcancai

%%% Copyright (c) 2021 by yangcancai([email protected]), All Rights Reserved.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% https://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%

%%% @doc
%%%
%%% @end
%%% Created : 2021-05-25T03:12:27+00:00
%%%-------------------------------------------------------------------
-module(big_log_wal_test).

-include_lib("eunit/include/eunit.hrl").

sort_file_test() ->
Files = bd_log_wal:sort_file(["data/1_00000001.wal", "data/1111_00000002.wal"]),
?assertEqual(Files, ["data/1_00000001.wal", "data/1111_00000002.wal"]),
Files = bd_log_wal:sort_file(["data/1111_00000002.wal", "data/1_00000001.wal"]),
?assertEqual(Files, ["data/1_00000001.wal", "data/1111_00000002.wal"]),
Fiels1 = bd_log_wal:sort_file(["data/00000001.wal", "data/00000002.wal"]),
?assertEqual(Fiels1, ["data/00000001.wal", "data/00000002.wal"]),
Fiels1 = bd_log_wal:sort_file(["data/00000002.wal", "data/00000001.wal"]),
ok.

zpad_extract_num_test() ->
?assertEqual(1, bd_log_wal:zpad_extract_num(["data/1_00000001.wal"])),
?assertEqual(1, bd_log_wal:zpad_extract_num(["data/00000001.wal"])),
ok.

maybe_not_checkpoint_files_test() ->
Files = bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal"], 0),
?assertEqual(Files, ["data/1_00000001.wal"]),
?assertEqual(["data/1_00000001.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal"], 1)),
?assertEqual(["data/1_00000001.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal"], 2)),

?assertEqual(["data/1_00000001.wal", "data/2_00000002.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal"],
0)),

?assertEqual(["data/1_00000001.wal", "data/2_00000002.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal"],
1)),
?assertEqual(["data/2_00000002.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal"],
2)),

?assertEqual(["data/2_00000002.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal"],
3)),
?assertEqual(["data/1_00000001.wal", "data/2_00000002.wal", "data/4_00000004.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal",
"data/4_00000004.wal"],
0)),
?assertEqual(["data/1_00000001.wal", "data/2_00000002.wal", "data/4_00000004.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal",
"data/4_00000004.wal"],
1)),
?assertEqual(["data/2_00000002.wal", "data/4_00000004.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal",
"data/4_00000004.wal"],
2)),

?assertEqual(["data/4_00000004.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal",
"data/4_00000004.wal"],
3)),
?assertEqual(["data/4_00000004.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal",
"data/4_00000004.wal"],
4)),

?assertEqual(["data/4_00000004.wal"],
bd_log_wal:maybe_not_checkpoint_files(["data/1_00000001.wal",
"data/2_00000002.wal",
"data/4_00000004.wal"],
5)),

ok.

0 comments on commit 520ae8c

Please sign in to comment.