Skip to content

Commit

Permalink
Encode to both representations in the batch builder
Browse files Browse the repository at this point in the history
Also folding in the BatchBuffer... it's no longer a clean abstraction
boundary.
  • Loading branch information
bkirwi committed Dec 17, 2024
1 parent 5ebea69 commit 78c33ea
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 56 deletions.
124 changes: 69 additions & 55 deletions src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ use futures_util::{stream, FutureExt};
use mz_dyncfg::Config;
use mz_ore::cast::CastFrom;
use mz_ore::{instrument, soft_panic_or_log};
use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsBuilder};
use mz_persist::indexed::columnar::{
ColumnarRecords, ColumnarRecordsBuilder, ColumnarRecordsStructuredExt,
};
use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
use mz_persist::location::Blob;
use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
use mz_persist_types::columnar::{ColumnEncoder, Schema2};
use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
use mz_persist_types::schema::SchemaId;
use mz_persist_types::stats::{trim_to_budget, truncate_bytes, TruncateBound, TRUNCATE_LEN};
Expand Down Expand Up @@ -563,6 +566,7 @@ where
T: Timestamp + Lattice + Codec64,
{
pub(crate) metrics: Arc<Metrics>,
writer_schemas: Schemas<K, V>,

inline_desc: Description<T>,
inclusive_upper: Antichain<Reverse<T>>,
Expand All @@ -571,7 +575,12 @@ where
pub(crate) key_buf: Vec<u8>,
pub(crate) val_buf: Vec<u8>,

buffer: BatchBuffer,
buffer: ColumnarRecordsBuilder,
buffer_ext: Option<(
<K::Schema as Schema2<K>>::Encoder,
<V::Schema as Schema2<V>>::Encoder,
)>,

pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
}

Expand All @@ -587,14 +596,28 @@ where
inline_desc: Description<T>,
metrics: Arc<Metrics>,
) -> Self {
let buffer = BatchBuffer::new(Arc::clone(&metrics), builder.parts.cfg.blob_target_size);
let buffer = ColumnarRecordsBuilder::default();
let writer_schemas = builder.write_schemas.clone();
let buffer_ext = builder
.parts
.cfg
.batch_columnar_format
.is_structured()
.then(|| {
(
writer_schemas.key.encoder().expect("valid schema"),
writer_schemas.val.encoder().expect("valid schema"),
)
});
Self {
metrics,
inline_desc,
inclusive_upper: Antichain::new(),
key_buf: vec![],
val_buf: vec![],
buffer,
writer_schemas,
buffer_ext,
builder,
}
}
Expand Down Expand Up @@ -628,8 +651,15 @@ where
}
}

let records = self.buffer.finish(&self.metrics.columnar);
let structured = self.buffer_ext.map(|(k, v)| ColumnarRecordsStructuredExt {
key: Arc::new(k.finish()),
val: Arc::new(v.finish()),
});
let part = BlobTraceUpdates::new(records, structured);

self.builder
.flush_part(self.inline_desc.clone(), self.buffer.drain())
.flush_part(self.inline_desc.clone(), part)
.await;

self.builder
Expand Down Expand Up @@ -675,13 +705,43 @@ where
});
}
self.inclusive_upper.insert(Reverse(ts.clone()));
let update = (
(self.key_buf.as_slice(), self.val_buf.as_slice()),
Codec64::encode(ts),
Codec64::encode(diff),
);
assert!(self.buffer.push(update), "single update overflowed an i32");
if let Some((ks, vs)) = &mut self.buffer_ext {
ks.append(key);
vs.append(val);
}

let added = if let Some(full_batch) =
self.buffer
.push(&self.key_buf, &self.val_buf, ts.clone(), diff.clone())
{
let added = if self.buffer.total_bytes() > self.builder.parts.cfg.blob_target_size {
let records = mem::take(&mut self.buffer).finish(&self.metrics.columnar);
let structured =
self.buffer_ext
.as_mut()
.map(|(ks, vs)| ColumnarRecordsStructuredExt {
key: Arc::new(
mem::replace(
ks,
self.writer_schemas.key.encoder().expect("valid schema"),
)
.finish(),
),
val: Arc::new(
mem::replace(
vs,
self.writer_schemas.val.encoder().expect("valid schema"),
)
.finish(),
),
});
self.builder
.flush_part(self.inline_desc.clone(), full_batch)
.flush_part(
self.inline_desc.clone(),
BlobTraceUpdates::new(records, structured),
)
.await;
Added::RecordAndParts
} else {
Expand Down Expand Up @@ -854,52 +914,6 @@ pub(crate) fn validate_schema<K: Codec, V: Codec>(
.unwrap_or_else(|err| panic!("constructing batch with mismatched val schema: {}", err));
}

#[derive(Debug)]
struct BatchBuffer {
metrics: Arc<Metrics>,
blob_target_size: usize,
records_builder: ColumnarRecordsBuilder,
}

impl BatchBuffer {
fn new(metrics: Arc<Metrics>, blob_target_size: usize) -> Self {
BatchBuffer {
metrics,
blob_target_size,
records_builder: ColumnarRecordsBuilder::default(),
}
}

fn push<T: Codec64, D: Codec64>(
&mut self,
key: &[u8],
val: &[u8],
ts: T,
diff: D,
) -> Option<BlobTraceUpdates> {
let update = ((key, val), ts.encode(), diff.encode());
assert!(
self.records_builder.push(update),
"single update overflowed an i32"
);

// if we've filled up a batch part, flush out to blob to keep our memory usage capped.
if self.records_builder.total_bytes() >= self.blob_target_size {
Some(self.drain())
} else {
None
}
}

fn drain(&mut self) -> BlobTraceUpdates {
// TODO: we're in a position to do a very good estimate here, instead of using the default.
let builder = mem::take(&mut self.records_builder);
let records = builder.finish(&self.metrics.columnar);
assert_eq!(self.records_builder.len(), 0);
BlobTraceUpdates::Row(records)
}
}

#[derive(Debug)]
enum WritingRuns<T> {
/// Building a single run with the specified ordering. Parts are expected to be internally
Expand Down
2 changes: 1 addition & 1 deletion src/persist-types/src/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub trait Schema2<T>: Debug + Send + Sync {
/// Type that is able to decode values of `T` from [`Self::ArrowColumn`].
type Decoder: ColumnDecoder<T> + Debug + Send + Sync;
/// Type that is able to encoder values of `T`.
type Encoder: ColumnEncoder<T, FinishedColumn = Self::ArrowColumn> + Debug;
type Encoder: ColumnEncoder<T, FinishedColumn = Self::ArrowColumn> + Debug + Send + Sync;

/// Returns a type that is able to decode instances of `T` from the provider column.
fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error>;
Expand Down
23 changes: 23 additions & 0 deletions src/persist/src/indexed/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,29 @@ pub enum BlobTraceUpdates {
}

impl BlobTraceUpdates {
/// Create a new [BlobTraceUpdates] from the provided parts.
pub fn new(
codec: ColumnarRecords,
structured: Option<ColumnarRecordsStructuredExt>,
) -> BlobTraceUpdates {
match structured {
None => Self::Row(codec),
Some(structured) => {
assert_eq!(
codec.len(),
structured.key.len(),
"key length should match codec data"
);
assert_eq!(
codec.len(),
structured.val.len(),
"val length should match codec data"
);
Self::Both(codec, structured)
}
}
}

/// The number of updates.
pub fn len(&self) -> usize {
self.records().len()
Expand Down

0 comments on commit 78c33ea

Please sign in to comment.