Skip to content

Commit

Permalink
fix: do not remove deletion markers when window time range overlaps (#…
Browse files Browse the repository at this point in the history
…3773)

* fix: do not remove deletion markers when window time range overlaps

* chore: fix some minor issues; add compaction test

* chore: add more test

* fix: nitpick master's nitpick
  • Loading branch information
v0y4g3r authored Apr 23, 2024
1 parent f764fd5 commit 778e195
Show file tree
Hide file tree
Showing 3 changed files with 342 additions and 18 deletions.
254 changes: 237 additions & 17 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -84,35 +85,41 @@ impl TwcsPicker {
/// fragmentation. For other windows, we allow at most 1 file at each window.
fn build_output(
&self,
time_windows: &BTreeMap<i64, Vec<FileHandle>>,
time_windows: &BTreeMap<i64, Window>,
active_window: Option<i64>,
) -> Vec<CompactionOutput> {
let mut output = vec![];
for (window, files) in time_windows {
let files_in_window = &files.files;
// we only remove deletion markers once no file in current window overlaps with any other window.
let filter_deleted = !files.overlapping;

if let Some(active_window) = active_window
&& *window == active_window
{
if files.len() > self.max_active_window_files {
if files_in_window.len() > self.max_active_window_files {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1, // we only have two levels and always compact to l1
inputs: files.clone(),
inputs: files_in_window.clone(),
filter_deleted,
});
} else {
debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window);
}
} else {
// not active writing window
if files.len() > self.max_inactive_window_files {
if files_in_window.len() > self.max_inactive_window_files {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
inputs: files.clone(),
inputs: files_in_window.clone(),
filter_deleted,
});
} else {
debug!(
"No enough files, current: {}, max_inactive_window_files: {}",
files.len(),
files_in_window.len(),
self.max_inactive_window_files
)
}
Expand Down Expand Up @@ -195,24 +202,99 @@ impl Picker for TwcsPicker {
}
}

struct Window {
start: Timestamp,
end: Timestamp,
files: Vec<FileHandle>,
time_window: i64,
overlapping: bool,
}

impl Window {
/// Creates a new [Window] with given file.
fn new_with_file(file: FileHandle) -> Self {
let (start, end) = file.time_range();
Self {
start,
end,
files: vec![file],
time_window: 0,
overlapping: false,
}
}

/// Returns the time range of all files in current window (inclusive).
fn range(&self) -> (Timestamp, Timestamp) {
(self.start, self.end)
}

/// Adds a new file to window and updates time range.
fn add_file(&mut self, file: FileHandle) {
let (start, end) = file.time_range();
self.start = self.start.min(start);
self.end = self.end.max(end);
self.files.push(file);
}
}

/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
fn assign_to_windows<'a>(
files: impl Iterator<Item = &'a FileHandle>,
time_window_size: i64,
) -> BTreeMap<i64, Vec<FileHandle>> {
let mut windows: BTreeMap<i64, Vec<FileHandle>> = BTreeMap::new();
) -> BTreeMap<i64, Window> {
let mut windows: HashMap<i64, Window> = HashMap::new();
// Iterates all files and assign to time windows according to max timestamp
for file in files {
let (_, end) = file.time_range();
for f in files {
let (_, end) = f.time_range();
let time_window = end
.convert_to(TimeUnit::Second)
.unwrap()
.value()
.align_to_ceil_by_bucket(time_window_size)
.unwrap_or(i64::MIN);
windows.entry(time_window).or_default().push(file.clone());

match windows.entry(time_window) {
Entry::Occupied(mut e) => {
e.get_mut().add_file(f.clone());
}
Entry::Vacant(e) => {
let mut window = Window::new_with_file(f.clone());
window.time_window = time_window;
e.insert(window);
}
}
}
if windows.is_empty() {
return BTreeMap::new();
}
windows

let mut windows = windows.into_values().collect::<Vec<_>>();
windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));

let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.

for idx in 1..windows.len() {
let next_range = windows[idx].range();
if overlaps(&current_range, &next_range) {
windows[idx - 1].overlapping = true;
windows[idx].overlapping = true;
}
current_range = (
current_range.0.min(next_range.0),
current_range.1.max(next_range.1),
);
}

windows.into_iter().map(|w| (w.time_window, w)).collect()
}

/// Checks if two inclusive timestamp ranges overlap with each other.
fn overlaps(l: &(Timestamp, Timestamp), r: &(Timestamp, Timestamp)) -> bool {
let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
let (_, l_end) = l;
let (r_start, _) = r;

r_start <= l_end
}

/// Finds the latest active writing window among all files.
Expand Down Expand Up @@ -344,6 +426,7 @@ impl TwcsCompactionTask {
sst_layer.clone(),
&output.inputs,
append_mode,
output.filter_deleted,
)
.await?;
let file_meta_opt = sst_layer
Expand Down Expand Up @@ -572,6 +655,8 @@ pub(crate) struct CompactionOutput {
pub output_level: Level,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
/// Whether to remove deletion markers.
pub filter_deleted: bool,
}

/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
Expand All @@ -580,10 +665,12 @@ async fn build_sst_reader(
sst_layer: AccessLayerRef,
inputs: &[FileHandle],
append_mode: bool,
filter_deleted: bool,
) -> error::Result<BoxedBatchReader> {
let scan_input = ScanInput::new(sst_layer, ProjectionMapper::all(&metadata)?)
.with_files(inputs.to_vec())
.with_append_mode(append_mode)
.with_filter_deleted(filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true);
SeqScan::new(scan_input).build_reader().await
Expand Down Expand Up @@ -642,7 +729,7 @@ mod tests {
.iter(),
3,
);
assert_eq!(5, windows.get(&0).unwrap().len());
assert_eq!(5, windows.get(&0).unwrap().files.len());

let files = [FileId::random(); 3];
let windows = assign_to_windows(
Expand All @@ -656,15 +743,148 @@ mod tests {
);
assert_eq!(
files[0],
windows.get(&0).unwrap().first().unwrap().file_id()
windows.get(&0).unwrap().files.first().unwrap().file_id()
);
assert_eq!(
files[1],
windows.get(&3).unwrap().first().unwrap().file_id()
windows.get(&3).unwrap().files.first().unwrap().file_id()
);
assert_eq!(
files[2],
windows.get(&12).unwrap().first().unwrap().file_id()
windows.get(&12).unwrap().files.first().unwrap().file_id()
);
}

/// (Window value, overlapping, files' time ranges in window)
type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);

fn check_assign_to_windows_with_overlapping(
file_time_ranges: &[(i64, i64)],
time_window: i64,
expected_files: &[ExpectedWindowSpec],
) {
let files: Vec<_> = (0..file_time_ranges.len())
.map(|_| FileId::random())
.collect();

let file_handles = files
.iter()
.zip(file_time_ranges.iter())
.map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
.collect::<Vec<_>>();

let windows = assign_to_windows(file_handles.iter(), time_window);

for (expected_window, overlapping, window_files) in expected_files {
let actual_window = windows.get(expected_window).unwrap();
assert_eq!(*overlapping, actual_window.overlapping);
let mut file_ranges = actual_window
.files
.iter()
.map(|f| {
let (s, e) = f.time_range();
(s.value(), e.value())
})
.collect::<Vec<_>>();
file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
assert_eq!(window_files, &file_ranges);
}
}

#[test]
fn test_assign_to_windows_with_overlapping() {
check_assign_to_windows_with_overlapping(
&[(0, 999), (1000, 1999), (2000, 2999)],
2,
&[
(0, false, vec![(0, 999)]),
(2, false, vec![(1000, 1999), (2000, 2999)]),
],
);

check_assign_to_windows_with_overlapping(
&[(0, 1), (0, 999), (100, 2999)],
2,
&[
(0, true, vec![(0, 1), (0, 999)]),
(2, true, vec![(100, 2999)]),
],
);

check_assign_to_windows_with_overlapping(
&[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
2,
&[
(0, false, vec![(0, 999)]),
(2, false, vec![(1000, 1999), (2000, 2999)]),
(4, false, vec![(3000, 3999)]),
],
);

check_assign_to_windows_with_overlapping(
&[
(0, 999),
(1000, 1999),
(2000, 2999),
(3000, 3999),
(0, 3999),
],
2,
&[
(0, true, vec![(0, 999)]),
(2, true, vec![(1000, 1999), (2000, 2999)]),
(4, true, vec![(0, 3999), (3000, 3999)]),
],
);

check_assign_to_windows_with_overlapping(
&[
(0, 999),
(1000, 1999),
(2000, 2999),
(3000, 3999),
(1999, 3999),
],
2,
&[
(0, false, vec![(0, 999)]),
(2, true, vec![(1000, 1999), (2000, 2999)]),
(4, true, vec![(1999, 3999), (3000, 3999)]),
],
);

check_assign_to_windows_with_overlapping(
&[
(0, 999), // window 0
(1000, 1999), // window 2
(2000, 2999), // window 2
(3000, 3999), // window 4
(2999, 3999), // window 4
],
2,
&[
// window 2 overlaps with window 4
(0, false, vec![(0, 999)]),
(2, true, vec![(1000, 1999), (2000, 2999)]),
(4, true, vec![(2999, 3999), (3000, 3999)]),
],
);

check_assign_to_windows_with_overlapping(
&[
(0, 999), // window 0
(1000, 1999), // window 2
(2000, 2999), // window 2
(3000, 3999), // window 4
(0, 1000), // // window 2
],
2,
&[
// only window 0 overlaps with window 2.
(0, true, vec![(0, 999)]),
(2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
(4, false, vec![(3000, 3999)]),
],
);
}

Expand Down
Loading

0 comments on commit 778e195

Please sign in to comment.