Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prune batches from memtable by time range #4913

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,56 @@ async fn test_prune_memtable_complex_expr() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_mem_range_prune() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);

engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(5, 8),
},
)
.await;

// Starts scan and gets the memtable time range.
let stream = engine
.scan_to_stream(region_id, ScanRequest::default())
.await
.unwrap();

put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(10, 12),
},
)
.await;

let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
8 changes: 6 additions & 2 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::prune::PruneTimeIterator;
use crate::read::Batch;
use crate::region::options::{MemtableOptions, MergeMode};
use crate::sst::file::FileTimeRange;

pub mod bulk;
pub mod key_values;
Expand Down Expand Up @@ -355,8 +357,10 @@ impl MemtableRange {
}

/// Builds an iterator to read the range.
pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
self.context.builder.build()
/// Filters the result by the specific time range.
pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
let iter = self.context.builder.build()?;
Ok(Box::new(PruneTimeIterator::new(iter, time_range)))
}
}

Expand Down
217 changes: 217 additions & 0 deletions src/mito2/src/read/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_time::Timestamp;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::BooleanVectorBuilder;

use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::file_range::FileRangeContextRef;
use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};

Expand Down Expand Up @@ -112,3 +118,214 @@ impl PruneReader {
}
}
}

/// An iterator that prunes batches by time range.
pub(crate) struct PruneTimeIterator {
iter: BoxedBatchIterator,
time_range: FileTimeRange,
}

impl PruneTimeIterator {
/// Creates a new `PruneTimeIterator` with the given iterator and time range.
pub(crate) fn new(iter: BoxedBatchIterator, time_range: FileTimeRange) -> Self {
Self { iter, time_range }
}

/// Prune batch by time range.
fn prune(&self, mut batch: Batch) -> Result<Batch> {
if batch.is_empty() {
return Ok(batch);
}

// fast path, the batch is within the time range.
// Note that the time range is inclusive.
if self.time_range.0 <= batch.first_timestamp().unwrap()
&& batch.last_timestamp().unwrap() <= self.time_range.1
{
return Ok(batch);
}

// slow path, prune the batch by time range.
// Note that the timestamp precision may be different from the time range.
// Safety: We know this is the timestamp type.
let unit = batch
.timestamps()
.data_type()
.as_timestamp()
.unwrap()
.unit();
let mut filter_builder = BooleanVectorBuilder::with_capacity(batch.timestamps().len());
let timestamps = batch.timestamps_native().unwrap();
for ts in timestamps {
let ts = Timestamp::new(*ts, unit);
if self.time_range.0 <= ts && ts <= self.time_range.1 {
filter_builder.push(Some(true));
} else {
filter_builder.push(Some(false));
}
}
let filter = filter_builder.finish();

batch.filter(&filter)?;
Ok(batch)
}

// Prune and return the next non-empty batch.
fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.iter.next() {
let batch = batch?;
let pruned_batch = self.prune(batch)?;
if !pruned_batch.is_empty() {
return Ok(Some(pruned_batch));
}
}
Ok(None)
}
}

impl Iterator for PruneTimeIterator {
type Item = Result<Batch>;

fn next(&mut self) -> Option<Self::Item> {
self.next_non_empty_batch().transpose()
}
}

#[cfg(test)]
mod tests {
use api::v1::OpType;

use super::*;
use crate::test_util::new_batch;

#[test]
fn test_prune_time_iter_empty() {
let input = [];
let iter = input.into_iter().map(Ok);
let iter = PruneTimeIterator::new(
Box::new(iter),
(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
),
);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
assert!(actual.is_empty());
}

#[test]
fn test_prune_time_iter_filter() {
let input = [
new_batch(
b"k1",
&[10, 11],
&[20, 20],
&[OpType::Put, OpType::Put],
&[110, 111],
),
new_batch(
b"k1",
&[15, 16],
&[20, 20],
&[OpType::Put, OpType::Put],
&[115, 116],
),
new_batch(
b"k1",
&[17, 18],
&[20, 20],
&[OpType::Put, OpType::Put],
&[117, 118],
),
];

let iter = input.clone().into_iter().map(Ok);
let iter = PruneTimeIterator::new(
Box::new(iter),
(
Timestamp::new_millisecond(10),
Timestamp::new_millisecond(15),
),
);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
assert_eq!(
actual,
[
new_batch(
b"k1",
&[10, 11],
&[20, 20],
&[OpType::Put, OpType::Put],
&[110, 111],
),
new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
]
);

let iter = input.clone().into_iter().map(Ok);
let iter = PruneTimeIterator::new(
Box::new(iter),
(
Timestamp::new_millisecond(11),
Timestamp::new_millisecond(20),
),
);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
assert_eq!(
actual,
[
new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
new_batch(
b"k1",
&[15, 16],
&[20, 20],
&[OpType::Put, OpType::Put],
&[115, 116],
),
new_batch(
b"k1",
&[17, 18],
&[20, 20],
&[OpType::Put, OpType::Put],
&[117, 118],
),
]
);

let iter = input.into_iter().map(Ok);
let iter = PruneTimeIterator::new(
Box::new(iter),
(
Timestamp::new_millisecond(10),
Timestamp::new_millisecond(18),
),
);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
assert_eq!(
actual,
[
new_batch(
b"k1",
&[10, 11],
&[20, 20],
&[OpType::Put, OpType::Put],
&[110, 111],
),
new_batch(
b"k1",
&[15, 16],
&[20, 20],
&[OpType::Put, OpType::Put],
&[115, 116],
),
new_batch(
b"k1",
&[17, 18],
&[20, 20],
&[OpType::Put, OpType::Put],
&[117, 118],
),
]
);
}
}
4 changes: 3 additions & 1 deletion src/mito2/src/read/scan_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::error::Result;
use crate::read::range::RowGroupIndex;
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::reader::ReaderMetrics;

struct PartitionMetricsInner {
Expand Down Expand Up @@ -128,13 +129,14 @@ pub(crate) fn scan_mem_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
time_range: FileTimeRange,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let ranges = stream_ctx.build_mem_ranges(index);
part_metrics.inc_num_mem_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
let iter = range.build_iter()?;
let iter = range.build_iter(time_range)?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());

let mut source = Source::Iter(iter);
Expand Down
7 changes: 6 additions & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,12 @@ fn build_sources(
sources.reserve(range_meta.row_group_indices.len());
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
let stream = scan_mem_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
range_meta.time_range,
);
Box::pin(stream) as _
} else {
let read_type = if compaction {
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl UnorderedScan {
let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
for await batch in stream {
yield batch;
}
Expand Down
10 changes: 0 additions & 10 deletions src/mito2/src/test_util/memtable_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,6 @@ impl MemtableBuilder for EmptyMemtableBuilder {
}
}

/// Empty iterator builder.
#[derive(Default)]
pub(crate) struct EmptyIterBuilder {}

impl IterBuilder for EmptyIterBuilder {
fn build(&self) -> Result<BoxedBatchIterator> {
Ok(Box::new(std::iter::empty()))
}
}

/// Creates a region metadata to test memtable with default pk.
///
/// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`.
Expand Down