diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index d528b8e241402..a919b77b336a8 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -243,7 +243,6 @@ where let updates = updates .decode::(&self.metrics.columnar) .expect("valid inline part"); - let key_lower = updates.key_lower().to_vec(); let diffs_sum = diffs_sum::(updates.updates.records()).expect("inline parts are not empty"); let mut write_schemas = write_schemas.clone(); @@ -264,7 +263,6 @@ where Arc::clone(isolated_runtime), updates, run_meta.order.unwrap_or(RunOrder::Unordered), - key_lower, ts_rewrite.clone(), D::encode(&diffs_sum), write_schemas, @@ -381,7 +379,7 @@ pub(crate) const BATCH_COLUMNAR_FORMAT: Config<&'static str> = Config::new( pub(crate) const BATCH_COLUMNAR_FORMAT_PERCENT: Config = Config::new( "persist_batch_columnar_format_percent", - 0, + 100, "Percent of parts to write using 'persist_batch_columnar_format', falling back to 'row'.", ); @@ -405,7 +403,7 @@ pub(crate) const RECORD_SCHEMA_ID: Config = Config::new( pub(crate) const STRUCTURED_ORDER: Config = Config::new( "persist_batch_structured_order", - false, + true, "If enabled, output compaction batches in structured-data order.", ); @@ -420,7 +418,7 @@ pub(crate) const STRUCTURED_ORDER_UNTIL_SHARD: Config<&'static str> = Config::ne pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config = Config::new( "persist_batch_structured_key_lower_len", - 0, + 256, "The maximum size in proto bytes of any structured key-lower metadata to preserve. \ (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)", ); @@ -564,26 +562,16 @@ where V: Codec, T: Timestamp + Lattice + Codec64, { - // TODO: Merge BatchBuilderInternal back into BatchBuilder once we no longer - // need this separate schemas nonsense for compaction. - // - // In the meantime: - // - Compaction uses `BatchBuilderInternal` directly, providing the real - // schema for stats, but with the builder's schema set to a fake Vec - // one. - // - User writes use `BatchBuilder` with both this `stats_schemas` and - // `builder._schemas` the same. - // - // Instead of this BatchBuilder{,Internal} split, I initially tried to just - // split the `add` and `finish` methods into versions that could override - // the stats schema, but there are ownership issues with that approach that - // I think are unresolvable. + pub(crate) metrics: Arc, + + inline_desc: Description, + inclusive_upper: Antichain>, // Reusable buffers for encoding data. Should be cleared after use! - pub(crate) metrics: Arc, pub(crate) key_buf: Vec, pub(crate) val_buf: Vec, + buffer: BatchBuffer, pub(crate) builder: BatchBuilderInternal, } @@ -594,15 +582,63 @@ where T: Timestamp + Lattice + Codec64, D: Semigroup + Codec64, { + pub(crate) fn new( + builder: BatchBuilderInternal, + inline_desc: Description, + metrics: Arc, + ) -> Self { + let buffer = BatchBuffer::new(Arc::clone(&metrics), builder.parts.cfg.blob_target_size); + Self { + metrics, + inline_desc, + inclusive_upper: Antichain::new(), + key_buf: vec![], + val_buf: vec![], + buffer, + builder, + } + } /// Finish writing this batch and return a handle to the written batch. /// /// This fails if any of the updates in this batch are beyond the given /// `upper`. pub async fn finish( - self, + mut self, registered_upper: Antichain, ) -> Result, InvalidUsage> { - self.builder.finish(registered_upper).await + if PartialOrder::less_than(®istered_upper, self.inline_desc.lower()) { + return Err(InvalidUsage::InvalidBounds { + lower: self.inline_desc.lower().clone(), + upper: registered_upper, + }); + } + + // When since is less than or equal to lower, the upper is a strict bound + // on the updates' timestamp because no advancement has been performed. Because user batches + // are always unadvanced, this ensures that new updates are recorded with valid timestamps. + // Otherwise, we can make no assumptions about the timestamps + if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) { + for ts in self.inclusive_upper.iter() { + if registered_upper.less_equal(&ts.0) { + return Err(InvalidUsage::UpdateBeyondUpper { + ts: ts.0.clone(), + expected_upper: registered_upper.clone(), + }); + } + } + } + + self.builder + .flush_part(self.inline_desc.clone(), self.buffer.drain()) + .await; + + self.builder + .finish(Description::new( + self.inline_desc.lower().clone(), + registered_upper, + self.inline_desc.since().clone(), + )) + .await } /// Adds the given update to the batch. @@ -631,13 +667,29 @@ where Some(key), Some(val), ); - let result = self - .builder - .add(&self.key_buf, &self.val_buf, ts, diff) - .await; + + if !self.inline_desc.lower().less_equal(ts) { + return Err(InvalidUsage::UpdateNotBeyondLower { + ts: ts.clone(), + lower: self.inline_desc.lower().clone(), + }); + } + self.inclusive_upper.insert(Reverse(ts.clone())); + + let added = if let Some(full_batch) = + self.buffer + .push(&self.key_buf, &self.val_buf, ts.clone(), diff.clone()) + { + self.builder + .flush_part(self.inline_desc.clone(), full_batch) + .await; + Added::RecordAndParts + } else { + Added::Record + }; self.key_buf.clear(); self.val_buf.clear(); - result + Ok(added) } } @@ -648,23 +700,15 @@ where V: Codec, T: Timestamp + Lattice + Codec64, { - lower: Antichain, - inclusive_upper: Antichain>, - shard_id: ShardId, version: Version, blob: Arc, metrics: Arc, write_schemas: Schemas, - buffer: BatchBuffer, - num_updates: usize, parts: BatchParts, - since: Antichain, - inline_upper: Antichain, - // These provide a bit more safety against appending a batch with the wrong // type to a shard. _phantom: PhantomData, @@ -678,36 +722,22 @@ where D: Semigroup + Codec64, { pub(crate) fn new( - cfg: BatchBuilderConfig, + _cfg: BatchBuilderConfig, parts: BatchParts, metrics: Arc, write_schemas: Schemas, - lower: Antichain, blob: Arc, shard_id: ShardId, version: Version, - since: Antichain, - inline_upper: Option>, ) -> Self { Self { - lower, - inclusive_upper: Antichain::new(), blob, - buffer: BatchBuffer::new(Arc::clone(&metrics), cfg.blob_target_size), metrics, write_schemas, num_updates: 0, parts, shard_id, version, - since, - // TODO: The default case would ideally be `{t + 1 for t in self.inclusive_upper}` but - // there's nothing that lets us increment a timestamp. An empty - // antichain is guaranteed to correctly bound the data in this - // part, but it doesn't really tell us anything. Figure out how - // to make a tighter bound, possibly by changing the part - // description to be an _inclusive_ upper. - inline_upper: inline_upper.unwrap_or_else(|| Antichain::new()), _phantom: PhantomData, } } @@ -718,33 +748,9 @@ where /// `upper`. #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))] pub async fn finish( - mut self, - registered_upper: Antichain, + self, + registered_desc: Description, ) -> Result, InvalidUsage> { - if PartialOrder::less_than(®istered_upper, &self.lower) { - return Err(InvalidUsage::InvalidBounds { - lower: self.lower.clone(), - upper: registered_upper, - }); - } - // when since is less than or equal to lower, the upper is a strict bound on the updates' - // timestamp because no compaction has been performed. Because user batches are always - // uncompacted, this ensures that new updates are recorded with valid timestamps. - // Otherwise, we can make no assumptions about the timestamps - if PartialOrder::less_equal(&self.since, &self.lower) { - for ts in self.inclusive_upper.iter() { - if registered_upper.less_equal(&ts.0) { - return Err(InvalidUsage::UpdateBeyondUpper { - ts: ts.0.clone(), - expected_upper: registered_upper.clone(), - }); - } - } - } - - let remainder = self.buffer.drain(); - self.flush_part(remainder).await; - let batch_delete_enabled = self.parts.cfg.batch_delete_enabled; let shard_metrics = Arc::clone(&self.parts.shard_metrics); // If we haven't switched over to the new schema_id field yet, keep writing the old one. @@ -773,7 +779,7 @@ where }); run_parts.extend(parts); } - let desc = Description::new(self.lower, registered_upper, self.since); + let desc = registered_desc; let batch = Batch::new( batch_delete_enabled, @@ -787,85 +793,12 @@ where Ok(batch) } - /// Adds the given update to the batch. - /// - /// The update timestamp must be greater or equal to `lower` that was given - /// when creating this [BatchBuilder]. - pub async fn add( - &mut self, - key: &[u8], - val: &[u8], - ts: &T, - diff: &D, - ) -> Result> { - if !self.lower.less_equal(ts) { - return Err(InvalidUsage::UpdateNotBeyondLower { - ts: ts.clone(), - lower: self.lower.clone(), - }); - } - - self.inclusive_upper.insert(Reverse(ts.clone())); - - match self.buffer.push(key, val, ts.clone(), diff.clone()) { - Some(part_to_flush) => { - self.flush_part(part_to_flush).await; - Ok(Added::RecordAndParts) - } - None => Ok(Added::Record), - } - } - - /// Adds a batch of updates all at once. The caller takes responsibility for ensuring - /// that the data is appropriately chunked. - /// - /// The update timestamps must be greater or equal to `lower` that was given - /// when creating this [BatchBuilder]. - pub async fn flush_many(&mut self, updates: BlobTraceUpdates) -> Result<(), InvalidUsage> { - for ts in updates.records().timestamps().iter().flatten() { - let ts = T::decode(ts.to_le_bytes()); - if !self.lower.less_equal(&ts) { - return Err(InvalidUsage::UpdateNotBeyondLower { - ts, - lower: self.lower.clone(), - }); - } - - self.inclusive_upper.insert(Reverse(ts)); - } - - // If there have been any individual updates added, flush those first. - // This is a noop if there are no such updates... and at time of writing there - // never are, since no callers mix these two methods. - // TODO: consider moving the individual updates to BatchBuilder so we can - // avoid handling this case. - let previous = self.buffer.drain(); - self.flush_part(previous).await; - - self.flush_part(updates).await; - - Ok(()) - } - /// Flushes the current part to Blob storage, first consolidating and then /// columnar encoding the updates. It is the caller's responsibility to /// chunk `current_part` to be no greater than /// [BatchBuilderConfig::blob_target_size], and must absolutely be less than /// [mz_persist::indexed::columnar::KEY_VAL_DATA_MAX_LEN] - async fn flush_part(&mut self, columnar: BlobTraceUpdates) { - let key_lower = { - let key_bytes = columnar.records().keys(); - if key_bytes.is_empty() { - &[] - } else if self.parts.expected_order() == RunOrder::Codec { - key_bytes.value(0) - } else { - ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array") - } - }; - let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower) - .expect("lower bound always exists"); - + pub async fn flush_part(&mut self, part_desc: Description, columnar: BlobTraceUpdates) { let num_updates = columnar.len(); if num_updates == 0 { return; @@ -874,14 +807,7 @@ where let start = Instant::now(); self.parts - .write( - &self.write_schemas, - key_lower, - columnar, - self.inline_upper.clone(), - self.since.clone(), - diffs_sum, - ) + .write(&self.write_schemas, part_desc, columnar, diffs_sum) .await; self.metrics .compaction @@ -993,7 +919,6 @@ pub(crate) struct BatchParts { metrics: Arc, shard_metrics: Arc, shard_id: ShardId, - lower: Antichain, blob: Arc, isolated_runtime: Arc, next_index: u64, @@ -1009,7 +934,6 @@ impl BatchParts { metrics: Arc, shard_metrics: Arc, shard_id: ShardId, - lower: Antichain, blob: Arc, isolated_runtime: Arc, batch_metrics: &BatchWriteMetrics, @@ -1098,7 +1022,6 @@ impl BatchParts { metrics, shard_metrics, shard_id, - lower, blob, isolated_runtime, next_index: 0, @@ -1113,7 +1036,6 @@ impl BatchParts { metrics: Arc, shard_metrics: Arc, shard_id: ShardId, - lower: Antichain, blob: Arc, isolated_runtime: Arc, batch_metrics: &BatchWriteMetrics, @@ -1161,7 +1083,6 @@ impl BatchParts { metrics, shard_metrics, shard_id, - lower, blob, isolated_runtime, next_index: 0, @@ -1180,13 +1101,10 @@ impl BatchParts { pub(crate) async fn write( &mut self, write_schemas: &Schemas, - key_lower: Vec, + desc: Description, mut updates: BlobTraceUpdates, - upper: Antichain, - since: Antichain, diffs_sum: D, ) { - let desc = Description::new(self.lower.clone(), upper, since); let batch_metrics = self.batch_metrics.clone(); let index = self.next_index; self.next_index += 1; @@ -1278,7 +1196,6 @@ impl BatchParts { Arc::clone(&self.isolated_runtime), part, self.expected_order(), - key_lower, ts_rewrite, D::encode(&diffs_sum), write_schemas.clone(), @@ -1349,7 +1266,6 @@ impl BatchParts { isolated_runtime: Arc, mut updates: BlobTraceBatchPart, run_order: RunOrder, - key_lower: Vec, ts_rewrite: Option>, diffs_sum: [u8; 8], write_schemas: Schemas, @@ -1360,6 +1276,19 @@ impl BatchParts { let metrics_ = Arc::clone(&metrics); let schema_id = write_schemas.id; + let key_lower = { + let key_bytes = updates.updates.records().keys(); + if key_bytes.is_empty() { + &[] + } else if run_order == RunOrder::Codec { + key_bytes.value(0) + } else { + ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array") + } + }; + let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower) + .expect("lower bound always exists"); + let (stats, structured_key_lower, (buf, encode_time)) = isolated_runtime .spawn_named(|| "batch::encode_part", async move { // Only encode our updates in a structured format if required, it's expensive. @@ -1725,14 +1654,13 @@ mod tests { .await; // A new builder has no writing or finished parts. - let builder = write.builder(Antichain::from_elem(0)); - let mut builder = builder.builder; + let mut builder = write.builder(Antichain::from_elem(0)); fn assert_writing( - builder: &BatchBuilderInternal, + builder: &BatchBuilder, expected_finished: &[bool], ) { - let WritingRuns::Compacting(run) = &builder.parts.writing_runs else { + let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else { unreachable!("ordered run!") }; @@ -1745,33 +1673,25 @@ mod tests { // We set blob_target_size to 0, so the first update gets forced out // into a run. let ((k, v), t, d) = &data[0]; - let key = k.encode_to_vec(); - let val = v.encode_to_vec(); - builder.add(&key, &val, t, d).await.expect("invalid usage"); + builder.add(k, v, t, d).await.expect("invalid usage"); assert_writing(&builder, &[false]); // We set batch_builder_max_outstanding_parts to 2, so we are allowed to // pipeline a second part. let ((k, v), t, d) = &data[1]; - let key = k.encode_to_vec(); - let val = v.encode_to_vec(); - builder.add(&key, &val, t, d).await.expect("invalid usage"); + builder.add(k, v, t, d).await.expect("invalid usage"); assert_writing(&builder, &[false, false]); // But now that we have 3 parts, the add call back-pressures until the // first one finishes. let ((k, v), t, d) = &data[2]; - let key = k.encode_to_vec(); - let val = v.encode_to_vec(); - builder.add(&key, &val, t, d).await.expect("invalid usage"); + builder.add(k, v, t, d).await.expect("invalid usage"); assert_writing(&builder, &[true, false, false]); // Finally, pushing a fourth part will cause the first three to spill out into // a new compacted run. let ((k, v), t, d) = &data[3]; - let key = k.encode_to_vec(); - let val = v.encode_to_vec(); - builder.add(&key, &val, t, d).await.expect("invalid usage"); + builder.add(k, v, t, d).await.expect("invalid usage"); assert_writing(&builder, &[false, false]); // Finish off the batch and verify that the keys and such get plumbed diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 7fb2b0097e165..a10f4f703ff78 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -23,6 +23,7 @@ use futures_util::{StreamExt, TryFutureExt}; use mz_dyncfg::Config; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; +use mz_persist::indexed::columnar::ColumnarRecords; use mz_persist::indexed::encoding::BlobTraceUpdates; use mz_persist::location::Blob; use mz_persist_types::{Codec, Codec64}; @@ -781,7 +782,6 @@ where Arc::clone(&metrics), Arc::clone(&shard_metrics), *shard_id, - desc.lower().clone(), Arc::clone(&blob), Arc::clone(&isolated_runtime), &metrics.compaction.batch, @@ -791,12 +791,9 @@ where parts, Arc::clone(&metrics), write_schemas.clone(), - desc.lower().clone(), Arc::clone(&blob), shard_id.clone(), cfg.version.clone(), - desc.since().clone(), - Some(desc.upper().clone()), ); // Duplicating a large codepath here during the migration. @@ -867,7 +864,7 @@ where write_schemas.val.as_ref(), &metrics.columnar, )?; - batch.flush_many(updates).await?; + batch.flush_part(desc.clone(), updates).await; } } else { let mut consolidator = Consolidator::::new( @@ -898,27 +895,42 @@ where metrics.compaction.not_all_prefetched.inc(); } - // Reuse the allocations for individual keys and values - let mut key_vec = vec![]; - let mut val_vec = vec![]; loop { - let fetch_start = Instant::now(); - let Some(updates) = consolidator.next().await? else { + let mut chunks = vec![]; + let mut total_bytes = 0; + // We attempt to pull chunks out of the consolidator that match our target size, + // but it's possible that we may get smaller chunks... for example, if not all + // parts have been fetched yet. Loop until we've got enough data to justify flushing + // it out to blob (or we run out of data.) + while total_bytes < cfg.batch.blob_target_size { + let fetch_start = Instant::now(); + let Some(chunk) = consolidator + .next_chunk( + cfg.compaction_yield_after_n_updates, + cfg.batch.blob_target_size - total_bytes, + ) + .await? + else { + break; + }; + timings.part_fetching += fetch_start.elapsed(); + total_bytes += chunk.goodbytes(); + chunks.push(chunk.records().clone()); + tokio::task::yield_now().await; + } + + if chunks.is_empty() { break; - }; - timings.part_fetching += fetch_start.elapsed(); - for ((k, v), t, d) in updates.take(cfg.compaction_yield_after_n_updates) { - key_vec.clear(); - key_vec.extend_from_slice(k); - val_vec.clear(); - val_vec.extend_from_slice(v); - crate::batch::validate_schema(&write_schemas, &key_vec, &val_vec, None, None); - batch.add(&key_vec, &val_vec, &t, &d).await?; } - tokio::task::yield_now().await; + + // In the hopefully-common case of a single chunk, this will not copy. + let updates = ColumnarRecords::concat(&chunks, &metrics.columnar); + batch + .flush_part(desc.clone(), BlobTraceUpdates::Row(updates)) + .await; } } - let mut batch = batch.finish(desc.upper().clone()).await?; + let mut batch = batch.finish(desc.clone()).await?; // We use compaction as a method of getting inline writes out of state, // to make room for more inline writes. This happens in diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index eb7f8d23fdafa..a6eeb24b6f1c3 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -1417,7 +1417,7 @@ impl RustType for HollowRunRef ts_rewrite: None, format: None, schema_id: None, - structured_key_lower: None, + structured_key_lower: self.structured_key_lower.into_proto(), deprecated_schema_id: None, }; part @@ -1435,7 +1435,7 @@ impl RustType for HollowRunRef hollow_bytes: proto.encoded_size_bytes.into_rust()?, max_part_bytes: run_proto.max_part_bytes.into_rust()?, key_lower: proto.key_lower.to_vec(), - structured_key_lower: None, + structured_key_lower: proto.structured_key_lower.into_rust()?, _phantom_data: Default::default(), }) } diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 228a5e5142de8..8af5174254471 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -1745,6 +1745,7 @@ pub mod datadriven { let output = args.expect_str("output"); let lower = args.expect_antichain("lower"); let upper = args.expect_antichain("upper"); + assert!(PartialOrder::less_than(&lower, &upper)); let since = args .optional_antichain("since") .unwrap_or_else(|| Antichain::from_elem(0)); @@ -1780,7 +1781,6 @@ pub mod datadriven { Arc::clone(&datadriven.client.metrics), Arc::clone(&datadriven.machine.applier.shard_metrics), datadriven.shard_id, - lower.clone(), Arc::clone(&datadriven.client.blob), Arc::clone(&datadriven.client.isolated_runtime), &datadriven.client.metrics.user, @@ -1790,19 +1790,15 @@ pub mod datadriven { parts, Arc::clone(&datadriven.client.metrics), schemas.clone(), - lower, Arc::clone(&datadriven.client.blob), datadriven.shard_id.clone(), datadriven.client.cfg.build_version.clone(), - since, - Some(upper.clone()), ); - let mut builder = BatchBuilder { + let mut builder = BatchBuilder::new( builder, - metrics: Arc::clone(&datadriven.client.metrics), - key_buf: vec![], - val_buf: vec![], - }; + Description::new(lower, upper.clone(), since), + Arc::clone(&datadriven.client.metrics), + ); for ((k, ()), t, d) in updates { builder.add(&k, &(), &t, &d).await.expect("invalid batch"); } diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index 5516369bc5939..7ffd34b3e94bc 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -620,7 +620,6 @@ where Arc::clone(&self.metrics), Arc::clone(&self.machine.applier.shard_metrics), self.shard_id(), - lower.clone(), Arc::clone(&self.blob), Arc::clone(&self.isolated_runtime), &self.metrics.user, @@ -633,7 +632,6 @@ where Arc::clone(&self.metrics), Arc::clone(&self.machine.applier.shard_metrics), self.shard_id(), - lower.clone(), Arc::clone(&self.blob), Arc::clone(&self.isolated_runtime), &self.metrics.user, @@ -644,19 +642,15 @@ where parts, Arc::clone(&self.metrics), self.write_schemas.clone(), - lower, Arc::clone(&self.blob), self.machine.shard_id().clone(), self.cfg.build_version.clone(), - Antichain::from_elem(T::minimum()), - None, ); - BatchBuilder { + BatchBuilder::new( builder, - metrics: Arc::clone(&self.metrics), - key_buf: vec![], - val_buf: vec![], - } + Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())), + Arc::clone(&self.metrics), + ) } /// Uploads the given `updates` as one `Batch` to the blob store and returns diff --git a/src/persist-client/tests/machine/compaction_bounded b/src/persist-client/tests/machine/compaction_bounded index fd03f4410f66c..df48d1340c5e9 100644 --- a/src/persist-client/tests/machine/compaction_bounded +++ b/src/persist-client/tests/machine/compaction_bounded @@ -40,7 +40,7 @@ part 0 # compact b0 and b1 with enough memory for both runs, but a target size that can only hold 2 parts/keys in mem at a time. # most importantly, we expect there to be a single run, as each input has a single run of ordered parts -compact output=b0_1 inputs=(b0,b1) lower=0 upper=6 since=6 target_size=50 memory_bound=10000 +compact output=b0_1 inputs=(b0,b1) lower=0 upper=6 since=6 target_size=30 memory_bound=10000 ---- parts=2 len=4 @@ -81,7 +81,7 @@ f 8 1 part 0 # compact b0, b1, b2 with enough memory for all runs, but a target size that can only hold 2 keys in mem at a time. -compact output=b0_1_2 inputs=(b0,b1,b2) lower=0 upper=9 since=8 target_size=50 memory_bound=1000 +compact output=b0_1_2 inputs=(b0,b1,b2) lower=0 upper=9 since=8 target_size=30 memory_bound=1000 ---- parts=3 len=6 @@ -153,7 +153,7 @@ set-batch-parts-size input=b2 size=50 ---- ok -compact output=b0_1_2 inputs=(b0,b1,b2) lower=0 upper=9 since=8 target_size=50 memory_bound=200 +compact output=b0_1_2 inputs=(b0,b1,b2) lower=0 upper=9 since=8 target_size=30 memory_bound=200 ---- parts=4 len=7 @@ -193,7 +193,7 @@ set-batch-parts-size input=b2 size=1000 ---- ok -compact output=b0_1_2 inputs=(b0,b1,b2) lower=0 upper=9 since=8 target_size=50 memory_bound=200 +compact output=b0_1_2 inputs=(b0,b1,b2) lower=0 upper=9 since=8 target_size=30 memory_bound=200 ---- parts=4 len=7 @@ -223,7 +223,7 @@ set-batch-parts-size input=b0_1_2 size=1000 ---- ok -compact output=b0_1_2_iter1 inputs=(b0_1_2) lower=0 upper=9 since=8 target_size=100 memory_bound=1000 +compact output=b0_1_2_iter1 inputs=(b0_1_2) lower=0 upper=9 since=8 target_size=60 memory_bound=1000 ---- parts=2 len=6 @@ -245,7 +245,7 @@ set-batch-parts-size input=b0_1_2_iter1 size=1000 ---- ok -compact output=b0_1_2_iter2 inputs=(b0_1_2_iter1) lower=0 upper=9 since=8 target_size=100 memory_bound=1000 +compact output=b0_1_2_iter2 inputs=(b0_1_2_iter1) lower=0 upper=9 since=8 target_size=60 memory_bound=1000 ---- parts=2 len=6 diff --git a/src/persist/src/indexed/encoding.rs b/src/persist/src/indexed/encoding.rs index 067de30623379..a78957713afe0 100644 --- a/src/persist/src/indexed/encoding.rs +++ b/src/persist/src/indexed/encoding.rs @@ -65,7 +65,7 @@ pub enum BatchColumnarFormat { impl BatchColumnarFormat { /// Returns a default value for [`BatchColumnarFormat`]. pub const fn default() -> Self { - BatchColumnarFormat::Row + BatchColumnarFormat::Both(2) } /// Returns a [`BatchColumnarFormat`] for a given `&str`, falling back to a default value if