Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auxiliary directory for supporting the multi-directory configuration. #294

Merged
merged 23 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8e63ffd
Basement for supporting multi-directory configurations.
LykxSassinator Feb 13, 2023
bb4b0ce
Add extra normal cases for multi-directory configurations..
LykxSassinator Feb 14, 2023
12ea46d
Supplement extra abnormal cases for multi-directories.
LykxSassinator Feb 15, 2023
8ba434a
Polish annotations.
LykxSassinator Feb 15, 2023
d56cd51
Supplement extra corner cases.
LykxSassinator Feb 15, 2023
642f703
Polish codes.
LykxSassinator Feb 16, 2023
5dcf1c5
Polish codes according comments.
LykxSassinator Feb 16, 2023
51eb236
Polish test cases.
LykxSassinator Feb 16, 2023
0908226
Polish codes.
LykxSassinator Feb 21, 2023
25b508b
Polish codes according to comments
LykxSassinator Feb 22, 2023
4e1d280
Polish codes according to comments.
LykxSassinator Feb 23, 2023
3382594
Rename the naming `auxiliary_dir` to `spill_dir`.
LykxSassinator Feb 24, 2023
bde2eac
Clear unnecessary failpoints.
LykxSassinator Feb 24, 2023
0d93570
Bugfix when `rotate` the writable_file and `append_buffer.size()` > s…
LykxSassinator Feb 27, 2023
38ef1cb
Polish codes according to comments.
LykxSassinator Feb 27, 2023
8332c39
Polish codes.
LykxSassinator Feb 28, 2023
485d7b2
Bugfix for reusing when no enough space for the next LogBatch but sti…
LykxSassinator Mar 1, 2023
7dbbae9
Merge branch 'master' into auxiliary_directory
LykxSassinator Mar 1, 2023
50a7219
Bugfix for duplicate log files on different dirs and polish codes.
LykxSassinator Mar 1, 2023
6efefb1
Polish codes
LykxSassinator Mar 1, 2023
cc0d0dc
Polish codes and make a minor bugfix.
LykxSassinator Mar 2, 2023
b74c8a4
Revoke the works on tackling the second `pipe.append()` when exists n…
LykxSassinator Mar 2, 2023
b8ff5bd
Supplement the ut for testing empty LogItemBatch.
LykxSassinator Mar 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,20 @@ pub enum RecoveryMode {
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
/// Directory to store log files. Will create on startup if not exists.
/// Main directory to store log files. Will create on startup if not exists.
///
/// Default: ""
pub dir: String,

/// Auxiliary directory to store log files. Will create on startup if
/// set but not exists.
///
/// Newly logs will be put into this dir when the main `dir` is full
/// and no spare space for new logs.
///
/// Default: None
pub spill_dir: Option<String>,

/// How to deal with file corruption during recovery.
///
/// Default: "tolerate-tail-corruption".
Expand Down Expand Up @@ -106,6 +115,7 @@ impl Default for Config {
#[allow(unused_mut)]
let mut cfg = Config {
dir: "".to_owned(),
spill_dir: None,
recovery_mode: RecoveryMode::TolerateTailCorruption,
recovery_read_block_size: ReadableSize::kb(16),
recovery_threads: 4,
Expand Down Expand Up @@ -209,12 +219,14 @@ mod tests {
let dump = toml::to_string_pretty(&value).unwrap();
let load = toml::from_str(&dump).unwrap();
assert_eq!(value, load);
assert!(load.spill_dir.is_none());
}

#[test]
fn test_custom() {
let custom = r#"
dir = "custom_dir"
spill-dir = "custom_spill_dir"
recovery-mode = "tolerate-tail-corruption"
bytes-per-sync = "2KB"
target-file-size = "1MB"
Expand All @@ -225,6 +237,7 @@ mod tests {
"#;
let mut load: Config = toml::from_str(custom).unwrap();
assert_eq!(load.dir, "custom_dir");
assert_eq!(load.spill_dir, Some("custom_spill_dir".to_owned()));
assert_eq!(load.recovery_mode, RecoveryMode::TolerateTailCorruption);
assert_eq!(load.bytes_per_sync, Some(ReadableSize::kb(2)));
assert_eq!(load.target_file_size, ReadableSize::mb(1));
Expand Down
174 changes: 172 additions & 2 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::write_barrier::{WriteBarrier, Writer};
use crate::{perf_context, Error, GlobalStats, Result};

const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);
/// Max retry count for `write`.
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
const WRITE_MAX_RETRY_TIMES: u64 = 2;

pub struct Engine<F = DefaultFileSystem, P = FilePipeLog<F>>
where
Expand Down Expand Up @@ -142,7 +144,14 @@ where
let start = Instant::now();
let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
debug_assert!(len > 0);
let block_handle = {

let mut attempt_count = 0_u64;
let block_handle = loop {
tabokie marked this conversation as resolved.
Show resolved Hide resolved
// Max retry count is limited to `WRITE_MAX_RETRY_TIMES`, that is, 2.
// If the first `append` retry because of NOSPC error, the next `append`
// should success, unless there exists several abnormal cases in the IO device.
// In that case, `Engine::write` must return `Err`.
attempt_count += 1;
let mut writer = Writer::new(log_batch, sync);
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
// Snapshot and clear the current perf context temporarily, so the write group
// leader will collect the perf context diff later.
Expand Down Expand Up @@ -178,7 +187,27 @@ where
debug_assert_eq!(writer.perf_context_diff.write_wait_duration, Duration::ZERO);
perf_context += &writer.perf_context_diff;
set_perf_context(perf_context);
writer.finish()?
// Retry if `writer.finish()` returns a special 'Error::Other', remarking that
// there still exists free space for this `LogBatch`.
match writer.finish() {
Ok(handle) => {
break handle;
}
Err(Error::TryAgain(_)) => {
if attempt_count >= WRITE_MAX_RETRY_TIMES {
// A special err, we will retry this LogBatch `append` by appending
// this writer to the next write group, and the current write leader
// will not hang on this write and will return timely.
return Err(Error::TryAgain(format!(
"Failed to write logbatch, exceed max_retry_count: ({})",
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
WRITE_MAX_RETRY_TIMES
)));
}
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
}
Err(e) => {
return Err(e);
}
}
};
let mut now = Instant::now();
log_batch.finish_write(block_handle);
Expand Down Expand Up @@ -2560,4 +2589,145 @@ mod tests {
// Replay of rewrite filtered.
assert!(engine.raft_groups().is_empty());
}

#[test]
fn test_start_engine_with_multi_dirs() {
let dir = tempfile::Builder::new()
.prefix("test_start_engine_with_multi_dirs_default")
.tempdir()
.unwrap();
let spill_dir = tempfile::Builder::new()
.prefix("test_start_engine_with_multi_dirs_spill")
.tempdir()
.unwrap();
let paths = [
dir.path().to_str().unwrap(),
spill_dir.path().to_str().unwrap(),
];
let file_system = Arc::new(DeleteMonitoredFileSystem::new());
let entry_data = vec![b'x'; 512];

// Preparations for multi-dirs.
{
// Step 1: write data into the main directory.
let cfg = Config {
dir: paths[0].to_owned(),
enable_log_recycle: false,
target_file_size: ReadableSize(1),
..Default::default()
};
let engine = RaftLogEngine::open_with_file_system(cfg, file_system.clone()).unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 10, Some(&entry_data));
}
let mut file_count = engine.file_count(Some(LogQueue::Append));
let halve_file_count = (file_count + 1) / 2;
drop(engine);

// Step 2: select several log files and move them into the `spill_dir`
// directory.
std::fs::read_dir(paths[0]).unwrap().for_each(|e| {
let p = e.unwrap().path();
if !p.is_file() || file_count < halve_file_count {
return;
}
let file_name = p.file_name().unwrap().to_str().unwrap();
if let Some(FileId {
queue: LogQueue::Append,
seq: _,
}) = FileId::parse_file_name(file_name)
{
let mut dst_path = PathBuf::from(&paths[1]);
dst_path.push(file_name);
file_system.rename(p, dst_path).unwrap();
file_count -= 1;
}
});
}

// Case 1: start an engine with no recycling.
let cfg = Config {
dir: paths[0].to_owned(),
spill_dir: Some(paths[1].to_owned()),
target_file_size: ReadableSize(1),
enable_log_recycle: false,
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap();
assert_eq!(file_system.inner.file_count(), engine.file_count(None));
let (start, end) = engine.file_span(LogQueue::Append);
// Append data.
for rid in 1..=10 {
engine.append(rid, 10, 20, Some(&entry_data));
}
let (start_1, end_1) = engine.file_span(LogQueue::Append);
assert_eq!(start_1, start);
assert!(end_1 > end);
assert_eq!(file_system.inner.file_count(), engine.file_count(None));
drop(engine);

// Case 2: restart the engine with recycle and prefill.
let cfg_2 = Config {
enable_log_recycle: true,
prefill_for_recycle: true,
purge_threshold: ReadableSize(40),
..cfg
};
let engine =
RaftLogEngine::open_with_file_system(cfg_2.clone(), file_system.clone()).unwrap();
let file_count = file_system.inner.file_count();
let (start_2, end_2) = engine.file_span(LogQueue::Append);
assert!(file_count > engine.file_count(None));
assert_eq!((start_1, end_1), (start_2, end_2));
// Append data, recycled files are reused.
for rid in 1..=10 {
engine.append(rid, 20, 30, Some(&entry_data));
}
assert_eq!(file_count, file_system.inner.file_count());
// Mark all data obsolete.
for rid in 1..=10 {
engine.clean(rid);
}
let (start_2, end_2) = engine.file_span(LogQueue::Append);
assert_eq!(start_2, start_1);
assert!(end_2 > end_1);
assert_eq!(file_count, file_system.inner.file_count());
// Purge and check the file span. As all logs are reused from recycled files,
// the whole file_count won't change.
engine.purge_expired_files().unwrap();
let (start_3, end_3) = engine.file_span(LogQueue::Append);
assert!(start_3 > start_2);
assert_eq!(start_3, end_3);
let engine = engine.reopen();
// As the `purge_threshold` < the count of real append count, there exists
// newly created file for appending, which are not generated by reusing recycled
// logs.
assert!(file_count < file_system.inner.file_count());
let file_count = file_system.inner.file_count();
// Reuse all files for appending new data. Here, recycled files in auxiliary
// directory (`spill-dir`) also are reused.
for rid in 1..=30 {
engine.append(rid, 1, 10, Some(&entry_data));
}
assert_eq!(file_count, file_system.inner.file_count());
let (start_4, end_4) = engine.file_span(LogQueue::Append);
drop(engine);

// Case 3: restart the engine with no recycle.
let cfg_3 = Config {
enable_log_recycle: false,
prefill_for_recycle: false,
purge_threshold: ReadableSize(40),
..cfg_2
};
let engine = RaftLogEngine::open_with_file_system(cfg_3, file_system.clone()).unwrap();
assert_eq!((start_4, end_4), engine.file_span(LogQueue::Append));
// All prefilled - recycled files are cleared.
assert!(file_count > file_system.inner.file_count());
for rid in 1..=5 {
engine.append(rid, 10, 20, Some(&entry_data));
}
assert!(end_4 < engine.file_span(LogQueue::Append).1);
}
}
4 changes: 4 additions & 0 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,15 @@ impl LogFd {
/// bytes were written.
pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult<usize> {
fail_point!("log_fd::write::zero", |_| { Ok(0) });
fail_point!("log_fd::write::no_space_err", |_| {
Err(from_nix_error(nix::Error::ENOSPC, "nospace"))
});
let mut written = 0;
while written < content.len() {
let bytes = match pwrite(self.0, &content[written..], offset as i64) {
Ok(bytes) => bytes,
Err(e) if e == Errno::EINTR => continue,
Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")),
Err(e) => return Err(from_nix_error(e, "pwrite")),
};
if bytes == 0 {
Expand Down
19 changes: 19 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub enum Error {
Codec(#[from] CodecError),
#[error("Protobuf Error: {0}")]
Protobuf(#[from] protobuf::ProtobufError),
#[error("TryAgain Error: {0}")]
TryAgain(String),
#[error("Entry Compacted")]
EntryCompacted,
#[error("Entry Not Found")]
Expand All @@ -30,3 +32,20 @@ pub enum Error {
}

pub type Result<T> = ::std::result::Result<T, Error>;

/// Check whether the given error is a nospace error.
pub(crate) fn is_no_space_err(e: &Error) -> bool {
// TODO: make the following judgement more elegant when the error type
// `ErrorKind::StorageFull` is stable.
if_chain::if_chain! {
if let Error::Io(ref e) = e;
if let Some(err) = e.get_ref();
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
let err_msg = format!("{}", err);
if err_msg.contains("nospace");
then {
true
} else {
false
}
}
}
Loading