diff --git a/Cargo.lock b/Cargo.lock index 387706a3fbcb6..68cdd85def441 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12005,6 +12005,7 @@ dependencies = [ "tokio-retry", "tokio-stream 0.1.15", "tracing", + "tracing-subscriber", "tracing-test", "url", "workspace-hack", diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs index 40ebafbd4eb15..fde6ae0d93a50 100644 --- a/src/common/src/util/value_encoding/mod.rs +++ b/src/common/src/util/value_encoding/mod.rs @@ -385,6 +385,7 @@ fn deserialize_list(item_type: &DataType, data: &mut impl Buf) -> Result Result> { let len = data.get_u32_le(); + tracing::trace!("len: {len}"); let mut bytes = vec![0; len as usize]; data.copy_to_slice(&mut bytes); String::from_utf8(bytes) diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 9bb1a696fad54..80430bf1fe941 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -88,6 +88,7 @@ risingwave_hummock_test = { path = "../storage/hummock_test", features = [ "test", ] } serde_yaml = "0.9" +tracing-subscriber = "0.3.17" tracing-test = "0.2" [features] diff --git a/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs index c9a13c659aede..668beec911f04 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs @@ -339,7 +339,7 @@ impl LogStoreBufferSender { { if *flushed { // Since we iterate from new data to old data, when we meet a flushed data, the - // rest should all be flushed. + // rest should have been flushed. break; } flush_fn(chunk, *epoch, *start_seq_id, *end_seq_id)?; diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 3acec1c807cc2..2e0be368e460f 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -32,11 +32,11 @@ use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde; use crate::common::log_store_impl::kv_log_store::writer::KvLogStoreWriter; use crate::executor::monitor::StreamingMetrics; -mod buffer; -mod reader; +pub(crate) mod buffer; +pub mod reader; pub(crate) mod serde; #[cfg(test)] -mod test_utils; +pub mod test_utils; mod writer; pub(crate) use reader::{REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY}; @@ -55,7 +55,7 @@ pub(crate) const FIRST_SEQ_ID: SeqIdType = 0; pub(crate) type ReaderTruncationOffsetType = (u64, Option); #[derive(Clone)] -pub(crate) struct KvLogStoreReadMetrics { +pub struct KvLogStoreReadMetrics { pub storage_read_count: LabelGuardedIntCounter<5>, pub storage_read_size: LabelGuardedIntCounter<5>, } @@ -190,7 +190,7 @@ impl KvLogStoreMetrics { } #[cfg(test)] - fn for_test() -> Self { + pub(crate) fn for_test() -> Self { KvLogStoreMetrics { storage_write_count: LabelGuardedIntCounter::test_int_counter(), storage_write_size: LabelGuardedIntCounter::test_int_counter(), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 797ec77a163a4..e43b01882d0cc 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -51,7 +51,9 @@ use crate::common::log_store_impl::kv_log_store::buffer::{ use crate::common::log_store_impl::kv_log_store::serde::{ merge_log_store_item_stream, KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde, }; -use crate::common::log_store_impl::kv_log_store::KvLogStoreMetrics; +use crate::common::log_store_impl::kv_log_store::{ + KvLogStoreMetrics, KvLogStoreReadMetrics, SeqIdType, +}; pub(crate) const REWIND_BASE_DELAY: Duration = Duration::from_secs(1); pub(crate) const REWIND_BACKOFF_FACTOR: u64 = 2; @@ -197,7 +199,7 @@ impl KvLogStoreReader { } } -struct AutoRebuildStateStoreReadIter { +pub struct AutoRebuildStateStoreReadIter { state_store: S, iter: S::Iter, // call to get whether to rebuild the iter. Once return true, the closure should reset itself. @@ -230,7 +232,7 @@ impl bool> AutoRebuildStateStoreReadIter } } -mod timeout_auto_rebuild { +pub(crate) mod timeout_auto_rebuild { use std::time::{Duration, Instant}; use risingwave_hummock_sdk::key::TableKeyRange; @@ -240,7 +242,7 @@ mod timeout_auto_rebuild { use crate::common::log_store_impl::kv_log_store::reader::AutoRebuildStateStoreReadIter; - pub(super) type TimeoutAutoRebuildIter = + pub(crate) type TimeoutAutoRebuildIter = AutoRebuildStateStoreReadIter bool + Send>; pub(super) async fn iter_with_timeout_rebuild( @@ -345,57 +347,14 @@ impl KvLogStoreReader { ) -> impl Future< Output = LogStoreResult>>>>, > + Send { - let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch { - // start from the next epoch of last_persisted_epoch - Included( - self.serde - .serialize_pk_epoch_prefix(last_persisted_epoch.next_epoch()), - ) - } else { - Unbounded - }; - let range_end = self.serde.serialize_pk_epoch_prefix( - self.first_write_epoch - .expect("should have set first write epoch"), - ); - - let serde = self.serde.clone(); - let table_id = self.table_id; - let read_metrics = self.metrics.persistent_log_read_metrics.clone(); - let streams_future = try_join_all(serde.vnodes().iter_vnodes().map(|vnode| { - let key_range = prefixed_range_with_vnode( - (range_start.clone(), Excluded(range_end.clone())), - vnode, - ); - let state_store = self.state_store.clone(); - async move { - // rebuild the iter every 10 minutes to avoid pinning hummock version for too long - iter_with_timeout_rebuild( - state_store, - key_range, - HummockEpoch::MAX, - ReadOptions { - // This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch. - prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(), - cache_policy: CachePolicy::Fill(CacheHint::Low), - table_id, - ..Default::default() - }, - Duration::from_secs(10 * 60), - ) - .await - } - })); - - streams_future.map_err(Into::into).map_ok(|streams| { - // TODO: set chunk size by config - Box::pin(merge_log_store_item_stream( - streams, - serde, - 1024, - read_metrics, - )) - }) + read_persisted_log_store( + &self.serde, + self.table_id, + &self.metrics, + self.state_store.clone(), + self.first_write_epoch.expect("should have init"), + last_persisted_epoch, + ) } } @@ -510,50 +469,17 @@ impl LogReader for KvLogStoreReader { let state_store = self.state_store.clone(); let table_id = self.table_id; let read_metrics = self.metrics.flushed_buffer_read_metrics.clone(); - async move { - let iters = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| { - let range_start = - serde.serialize_log_store_pk(vnode, item_epoch, Some(start_seq_id)); - let range_end = - serde.serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id)); - let state_store = &state_store; - - // Use MAX EPOCH here because the epoch to consume may be below the safe - // epoch - async move { - Ok::<_, anyhow::Error>( - state_store - .iter( - (Included(range_start), Included(range_end)), - HummockEpoch::MAX, - ReadOptions { - prefetch_options: - PrefetchOptions::prefetch_for_large_range_scan(), - cache_policy: CachePolicy::Fill(CacheHint::Low), - table_id, - ..Default::default() - }, - ) - .await?, - ) - } - })) - .instrument_await("Wait Create Iter Stream") - .await?; - - let chunk = serde - .deserialize_stream_chunk( - iters, - start_seq_id, - end_seq_id, - item_epoch, - &read_metrics, - ) - .instrument_await("Deserialize Stream Chunk") - .await?; - - Ok((chunk_id, chunk, item_epoch)) - } + read_flushed_chunk( + serde, + state_store, + vnode_bitmap, + chunk_id, + start_seq_id, + end_seq_id, + item_epoch, + table_id, + read_metrics, + ) .boxed() }; @@ -672,6 +598,107 @@ impl LogReader for KvLogStoreReader { } } +#[allow(clippy::too_many_arguments)] +pub(crate) async fn read_flushed_chunk( + serde: LogStoreRowSerde, + state_store: impl StateStoreRead, + vnode_bitmap: Bitmap, + chunk_id: ChunkId, + start_seq_id: SeqIdType, + end_seq_id: SeqIdType, + item_epoch: u64, + table_id: TableId, + read_metrics: KvLogStoreReadMetrics, +) -> LogStoreResult<(ChunkId, StreamChunk, u64)> { + tracing::trace!("reading flushed chunk from buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"); + let iters = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| { + let range_start = serde.serialize_log_store_pk(vnode, item_epoch, Some(start_seq_id)); + let range_end = serde.serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id)); + let state_store = &state_store; + + // Use MAX EPOCH here because the epoch to consume may be below the safe + // epoch + async move { + Ok::<_, anyhow::Error>( + state_store + .iter( + (Included(range_start), Included(range_end)), + HummockEpoch::MAX, + ReadOptions { + prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(), + cache_policy: CachePolicy::Fill(CacheHint::Low), + table_id, + ..Default::default() + }, + ) + .await?, + ) + } + })) + .instrument_await("Wait Create Iter Stream") + .await?; + + let chunk = serde + .deserialize_stream_chunk(iters, start_seq_id, end_seq_id, item_epoch, &read_metrics) + .instrument_await("Deserialize Stream Chunk") + .await?; + + Ok((chunk_id, chunk, item_epoch)) +} + +pub(crate) fn read_persisted_log_store( + serde: &LogStoreRowSerde, + table_id: TableId, + metrics: &KvLogStoreMetrics, + state_store: S, + first_write_epoch: u64, + last_persisted_epoch: Option, +) -> impl Future>>>>> + + Send { + let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch { + // start from the next epoch of last_persisted_epoch + Included(serde.serialize_pk_epoch_prefix(last_persisted_epoch.next_epoch())) + } else { + Unbounded + }; + let range_end = serde.serialize_pk_epoch_prefix(first_write_epoch); + + let serde = serde.clone(); + let read_metrics = metrics.persistent_log_read_metrics.clone(); + let streams_future = try_join_all(serde.vnodes().iter_vnodes().map(|vnode| { + let key_range = + prefixed_range_with_vnode((range_start.clone(), Excluded(range_end.clone())), vnode); + let state_store = state_store.clone(); + async move { + // rebuild the iter every 10 minutes to avoid pinning hummock version for too long + iter_with_timeout_rebuild( + state_store, + key_range, + HummockEpoch::MAX, + ReadOptions { + // This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch. + prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(), + cache_policy: CachePolicy::Fill(CacheHint::Low), + table_id, + ..Default::default() + }, + Duration::from_secs(10 * 60), + ) + .await + } + })); + + streams_future.map_err(Into::into).map_ok(|streams| { + // TODO: set chunk size by config + Box::pin(merge_log_store_item_stream( + streams, + serde, + 1024, + read_metrics, + )) + }) +} + #[cfg(test)] mod tests { use std::collections::{Bound, HashSet}; diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 80d2a7418c216..2e57cf085ffc0 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -110,6 +110,7 @@ mod row_merge; #[cfg(test)] mod integration_tests; +mod sync_kv_log_store; pub mod test_utils; mod utils; @@ -390,6 +391,10 @@ impl Barrier { .map_or(false, |actors| actors.contains(&actor_id)) } + pub fn is_checkpoint(&self) -> bool { + self.kind == BarrierKind::Checkpoint + } + /// Get the initial split assignments for the actor with `actor_id`. /// /// This should only be called on the initial barrier received by the executor. It must be diff --git a/src/stream/src/executor/sync_kv_log_store.rs b/src/stream/src/executor/sync_kv_log_store.rs new file mode 100644 index 0000000000000..f491ffebf4848 --- /dev/null +++ b/src/stream/src/executor/sync_kv_log_store.rs @@ -0,0 +1,1045 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This contains the synced kv log store implementation. +//! It's meant to buffer a large number of records emitted from upstream, +//! to avoid overwhelming the downstream executor. +//! +//! The synced kv log store polls two futures: +//! +//! 1. Upstream: upstream message source +//! +//! It will write stream messages to the log store buffer. e.g. `Message::Barrier`, `Message::Chunk`, ... +//! When writing a stream chunk, if the log store buffer is full, it will: +//! a. Flush the buffer to the log store. +//! b. Convert the stream chunk into a reference (`LogStoreBufferItem::Flushed`) +//! which can read the corresponding chunks in the log store. +//! We will compact adjacent references, +//! so it can read multiple chunks if there's a build up. +//! +//! On receiving barriers, it will: +//! a. Apply truncation to historical data in the logstore. +//! b. Flush and checkpoint the logstore data. +//! +//! 2. State store + buffer + recently flushed chunks: the storage components of the logstore. +//! +//! It will read all historical data from the logstore first. This can be done just by +//! constructing a state store stream, which will read all data until the latest epoch. +//! This is a static snapshot of data. +//! For any subsequently flushed chunks, we will read them via +//! `flushed_chunk_future`. See the next paragraph below. +//! +//! We will next read `flushed_chunk_future` (if there's one pre-existing one), see below for how +//! it's constructed, what it is. +//! +//! Finally we will pop the earliest item in the buffer. +//! - If it's a chunk yield it. +//! - If it's a watermark yield it. +//! - If it's a flushed chunk reference (`LogStoreBufferItem::Flushed`), +//! we will read the corresponding chunks in the log store. +//! This is done by constructing a `flushed_chunk_future` which will read the log store +//! using the `seq_id`. +//! - Barrier, +//! because they are directly propagated from the upstream when polling it. +//! +//! TODO(kwannoel): +//! - [] Add dedicated metrics for sync log store, namespace according to the upstream. +//! - [] Add tests +//! - [] Handle watermark r/w +//! - [] Handle paused stream + +use std::collections::VecDeque; +use std::pin::Pin; + +use await_tree::InstrumentAwait; +use futures::future::BoxFuture; +use futures::{FutureExt, TryStreamExt}; +use futures_async_stream::try_stream; +use itertools::Itertools; +use risingwave_common::array::StreamChunk; +use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; +use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_common::hash::VnodeBitmapExt; +use risingwave_common_estimate_size::EstimateSize; +use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult}; +use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_storage::store::{ + InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, SealCurrentEpochOptions, +}; +use risingwave_storage::StateStore; +use tokio::select; + +use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem; +use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter; +use crate::common::log_store_impl::kv_log_store::reader::{ + read_flushed_chunk, read_persisted_log_store, +}; +use crate::common::log_store_impl::kv_log_store::serde::{ + KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde, +}; +use crate::common::log_store_impl::kv_log_store::{ + FlushInfo, KvLogStoreMetrics, KvLogStoreReadMetrics, ReaderTruncationOffsetType, SeqIdType, + FIRST_SEQ_ID, +}; +use crate::executor::prelude::*; +use crate::executor::{ + Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult, +}; + +type StateStoreStream = Pin>>>; +type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, u64)>>; + +struct SyncedKvLogStoreExecutor { + actor_context: ActorContextRef, + table_id: TableId, + read_metrics: KvLogStoreReadMetrics, + metrics: KvLogStoreMetrics, + serde: LogStoreRowSerde, + seq_id: SeqIdType, + truncation_offset: Option, + + // Upstream + upstream: Executor, + + // Log store state + flushed_chunk_future: Option, + state_store: S, + local_state_store: S::Local, + buffer: SyncedLogStoreBuffer, +} +// Stream interface +impl SyncedKvLogStoreExecutor { + #[allow(clippy::too_many_arguments, dead_code)] + pub async fn new( + actor_context: ActorContextRef, + table_id: u32, + read_metrics: KvLogStoreReadMetrics, + metrics: KvLogStoreMetrics, + serde: LogStoreRowSerde, + seq_id: SeqIdType, + state_store: S, + buffer_max_size: usize, + upstream: Executor, + ) -> Self { + let local_state_store = state_store + .new_local(NewLocalOptions { + table_id: TableId { table_id }, + op_consistency_level: OpConsistencyLevel::Inconsistent, + table_option: TableOption { + retention_seconds: None, + }, + is_replicated: false, + vnodes: serde.vnodes().clone(), + }) + .await; + Self { + actor_context, + table_id: TableId::new(table_id), + read_metrics, + metrics: metrics.clone(), + serde, + seq_id, + truncation_offset: None, + flushed_chunk_future: None, + state_store, + local_state_store, + buffer: SyncedLogStoreBuffer { + buffer: VecDeque::new(), + max_size: buffer_max_size, + next_chunk_id: 0, + metrics, + }, + upstream, + } + } +} + +// Stream interface +impl SyncedKvLogStoreExecutor { + #[try_stream(ok = Message, error = StreamExecutorError)] + pub async fn execute_inner(mut self) { + let mut input = self.upstream.execute(); + let barrier = expect_first_barrier(&mut input).await?; + yield Message::Barrier(barrier.clone()); + let mut state_store_stream = Some( + Self::init( + &barrier, + &mut self.local_state_store, + &self.serde, + self.table_id, + &self.metrics, + self.state_store.clone(), + ) + .await?, + ); + loop { + if let Some(msg) = Self::next( + self.actor_context.id, + &mut input, + self.table_id, + &self.read_metrics, + &mut self.serde, + &mut self.truncation_offset, + &mut state_store_stream, + &mut self.flushed_chunk_future, + &self.state_store, + &mut self.buffer, + &mut self.local_state_store, + &mut self.metrics, + &mut self.seq_id, + ) + .await? + { + yield msg; + } + } + } + + async fn init( + barrier: &Barrier, + local_state_store: &mut S::Local, + serde: &LogStoreRowSerde, + table_id: TableId, + metrics: &KvLogStoreMetrics, + state_store: S, + ) -> StreamExecutorResult> { + let init_epoch_pair = barrier.epoch; + local_state_store + .init(InitOptions::new(init_epoch_pair)) + .await?; + let state_store_stream = read_persisted_log_store( + serde, + table_id, + metrics, + state_store, + barrier.epoch.prev, + None, + ) + .await?; + Ok(state_store_stream) + } + + #[allow(clippy::too_many_arguments)] + async fn next( + actor_id: ActorId, + input: &mut BoxedMessageStream, + table_id: TableId, + read_metrics: &KvLogStoreReadMetrics, + serde: &mut LogStoreRowSerde, + truncation_offset: &mut Option, + state_store_stream: &mut Option>, + flushed_chunk_future: &mut Option, + state_store: &S, + buffer: &mut SyncedLogStoreBuffer, + + local_state_store: &mut S::Local, + metrics: &mut KvLogStoreMetrics, + seq_id: &mut SeqIdType, + ) -> StreamExecutorResult> { + select! { + biased; + // poll from upstream + // Prefer this arm to let barrier bypass. + upstream_item = input.next() => { + match upstream_item { + None => Ok(None), + Some(upstream_item) => { + match upstream_item? { + Message::Barrier(barrier) => { + Self::write_barrier( + local_state_store, + serde, + barrier.clone(), + metrics, + *truncation_offset, + seq_id, + buffer, + actor_id, + ).await?; + let should_update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id).is_some(); + if should_update_vnode_bitmap { + *state_store_stream = Some(read_persisted_log_store( + serde, + table_id, + metrics, + state_store.clone(), + barrier.epoch.prev, + None, + ).await?); + } + Ok(Some(Message::Barrier(barrier))) + } + Message::Chunk(chunk) => { + let start_seq_id = *seq_id; + *seq_id += chunk.cardinality() as SeqIdType; + let end_seq_id = *seq_id - 1; + Self::write_chunk( + metrics, + serde, + start_seq_id, + end_seq_id, + buffer, + chunk, + local_state_store, + ).await?; + Ok(None) + } + // TODO(kwannoel): This should be written to the logstore, + // it will not bypass like barrier. + Message::Watermark(_watermark) => Ok(None), + } + } + } + } + + // read from log store + logstore_item = Self::try_next_item( + table_id, + read_metrics, + serde, + truncation_offset, + state_store_stream, + flushed_chunk_future, + state_store, + buffer + ) => { + let logstore_item = logstore_item?; + Ok(logstore_item.map(Message::Chunk)) + } + } + } +} + +// Read methods +impl SyncedKvLogStoreExecutor { + #[allow(clippy::too_many_arguments)] + async fn try_next_item( + table_id: TableId, + read_metrics: &KvLogStoreReadMetrics, + serde: &LogStoreRowSerde, + truncation_offset: &mut Option, + + // state + log_store_state: &mut Option>, + read_flushed_chunk_future: &mut Option, + state_store: &S, + buffer: &mut SyncedLogStoreBuffer, + ) -> StreamExecutorResult> { + // 1. read state store + if let Some(chunk) = Self::try_next_state_store_item(log_store_state).await? { + return Ok(Some(chunk)); + } + + // 2. read existing flushed chunk future + if let Some(chunk) = Self::try_next_flushed_chunk_future(read_flushed_chunk_future).await? { + return Ok(Some(chunk)); + } + + // 3. read buffer + if let Some(chunk) = Self::try_next_buffer_item( + truncation_offset, + read_flushed_chunk_future, + serde, + state_store, + buffer, + table_id, + read_metrics, + ) + .await? + { + return Ok(Some(chunk)); + } + Ok(None) + } + + async fn try_next_state_store_item( + state_store_stream_opt: &mut Option>, + ) -> StreamExecutorResult> { + if let Some(state_store_stream) = state_store_stream_opt { + match state_store_stream + .try_next() + .instrument_await("try_next item") + .await? + { + Some((_epoch, item)) => match item { + KvLogStoreItem::StreamChunk(chunk) => Ok(Some(chunk)), + KvLogStoreItem::Barrier { .. } => Ok(None), + }, + None => { + *state_store_stream_opt = None; + Ok(None) + } + } + } else { + Ok(None) + } + } + + async fn try_next_flushed_chunk_future( + flushed_chunk_future: &mut Option, + ) -> StreamExecutorResult> { + if let Some(future) = flushed_chunk_future { + match future.await { + Ok((_, chunk, _)) => { + *flushed_chunk_future = None; + Ok(Some(chunk)) + } + Err(_) => { + // TODO: log + propagate error + *flushed_chunk_future = None; + Ok(None) + } + } + } else { + Ok(None) + } + } + + async fn try_next_buffer_item( + truncation_offset: &mut Option, + read_flushed_chunk_future: &mut Option, + serde: &LogStoreRowSerde, + state_store: &S, + buffer: &mut SyncedLogStoreBuffer, + table_id: TableId, + read_metrics: &KvLogStoreReadMetrics, + ) -> StreamExecutorResult> { + let Some((item_epoch, item)) = buffer.pop_front() else { + return Ok(None); + }; + match item { + LogStoreBufferItem::StreamChunk { + chunk, end_seq_id, .. + } => { + truncation_offset.replace((item_epoch, Some(end_seq_id))); + Ok(Some(chunk)) + } + LogStoreBufferItem::Flushed { + vnode_bitmap, + start_seq_id, + end_seq_id, + chunk_id, + } => { + truncation_offset.replace((item_epoch, Some(end_seq_id))); + let serde = serde.clone(); + let read_metrics = read_metrics.clone(); + let read_flushed_chunk_fut = read_flushed_chunk( + serde, + state_store.clone(), + vnode_bitmap, + chunk_id, + start_seq_id, + end_seq_id, + item_epoch, + table_id, + read_metrics, + ) + .boxed(); + *read_flushed_chunk_future = Some(read_flushed_chunk_fut); + Self::try_next_flushed_chunk_future(read_flushed_chunk_future).await + } + LogStoreBufferItem::Barrier { next_epoch, .. } => { + // FIXME(kwannoel): Is `next_epoch` correct for truncation?? + truncation_offset.replace((next_epoch, None)); + Ok(None) + } + LogStoreBufferItem::UpdateVnodes(_) => Ok(None), + } + } +} + +// Write methods +impl SyncedKvLogStoreExecutor { + #[allow(clippy::too_many_arguments)] + async fn write_barrier( + state_store: &mut S::Local, + serde: &mut LogStoreRowSerde, + barrier: Barrier, + metrics: &mut KvLogStoreMetrics, + truncation_offset: Option, + seq_id: &mut SeqIdType, + buffer: &mut SyncedLogStoreBuffer, + actor_id: ActorId, + ) -> StreamExecutorResult<()> { + let epoch = state_store.epoch(); + let mut flush_info = FlushInfo::new(); + + // FIXME(kwannoel): Handle paused stream. + for vnode in serde.vnodes().iter_vnodes() { + let (key, value) = serde.serialize_barrier(epoch, vnode, barrier.is_checkpoint()); + flush_info.flush_one(key.estimated_size() + value.estimated_size()); + state_store.insert(key, value, None)?; + } + + // FIXME(kwannoel): Flush all unflushed chunks + // As an optimization we can also change it into flushed items instead. + // This will reduce memory consumption of logstore. + + flush_info.report(metrics); + + // Apply truncation + let watermark = truncation_offset.map(|truncation_offset| { + VnodeWatermark::new( + serde.vnodes().clone(), + serde.serialize_truncation_offset_watermark(truncation_offset), + ) + }); + state_store.flush().await?; + let watermark = watermark.into_iter().collect_vec(); + state_store.seal_current_epoch( + barrier.epoch.curr, + SealCurrentEpochOptions { + table_watermarks: Some((WatermarkDirection::Ascending, watermark)), + switch_op_consistency_level: None, + }, + ); + + // Add to buffer + buffer.buffer.push_back(( + epoch, + LogStoreBufferItem::Barrier { + is_checkpoint: barrier.is_checkpoint(), + next_epoch: barrier.epoch.curr, + }, + )); + buffer.next_chunk_id = 0; + buffer.update_unconsumed_buffer_metrics(); + + // Apply Vnode Update + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_id) { + state_store.update_vnode_bitmap(vnode_bitmap.clone()); + serde.update_vnode_bitmap(vnode_bitmap.clone()); + buffer + .buffer + .push_back((epoch, LogStoreBufferItem::UpdateVnodes(vnode_bitmap))); + } + + *seq_id = FIRST_SEQ_ID; + Ok(()) + } + + async fn write_chunk( + metrics: &KvLogStoreMetrics, + serde: &LogStoreRowSerde, + start_seq_id: SeqIdType, + end_seq_id: SeqIdType, + buffer: &mut SyncedLogStoreBuffer, + chunk: StreamChunk, + state_store: &mut S::Local, + ) -> StreamExecutorResult<()> { + let chunk_to_flush = + { buffer.add_or_flush_chunk(start_seq_id, end_seq_id, chunk, state_store) }; + match chunk_to_flush { + None => {} + Some(chunk_to_flush) => { + let new_vnode_bitmap = flush_chunk( + metrics, + serde, + start_seq_id, + end_seq_id, + state_store, + chunk_to_flush, + ) + .await?; + { + buffer.add_flushed_item_to_buffer( + start_seq_id, + end_seq_id, + new_vnode_bitmap, + state_store.epoch(), + ); + } + } + } + Ok(()) + } +} + +struct SyncedLogStoreBuffer { + buffer: VecDeque<(u64, LogStoreBufferItem)>, + max_size: usize, + next_chunk_id: ChunkId, + metrics: KvLogStoreMetrics, +} + +async fn flush_chunk( + metrics: &KvLogStoreMetrics, + serde: &LogStoreRowSerde, + start_seq_id: SeqIdType, + end_seq_id: SeqIdType, + state_store: &mut impl LocalStateStore, + chunk: StreamChunk, +) -> StreamExecutorResult { + tracing::trace!("Flushing chunk: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}"); + let epoch = state_store.epoch(); + let mut vnode_bitmap_builder = BitmapBuilder::zeroed(serde.vnodes().len()); + let mut flush_info = FlushInfo::new(); + for (i, (op, row)) in chunk.rows().enumerate() { + let seq_id = start_seq_id + (i as SeqIdType); + assert!(seq_id <= end_seq_id); + let (vnode, key, value) = serde.serialize_data_row(epoch, seq_id, op, row); + vnode_bitmap_builder.set(vnode.to_index(), true); + flush_info.flush_one(key.estimated_size() + value.estimated_size()); + state_store.insert(key, value, None)?; + } + flush_info.report(metrics); + state_store.flush().await?; + + Ok(vnode_bitmap_builder.finish()) +} + +impl SyncedLogStoreBuffer { + fn add_or_flush_chunk( + &mut self, + start_seq_id: SeqIdType, + end_seq_id: SeqIdType, + chunk: StreamChunk, + state_store: &mut impl LocalStateStore, + ) -> Option { + let current_size = self.buffer.len(); + let chunk_size = chunk.cardinality(); + let epoch = state_store.epoch(); + + let should_flush_chunk = current_size + chunk_size >= self.max_size; + if should_flush_chunk { + Some(chunk) + } else { + self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch); + None + } + } + + /// After flushing a chunk, we will preserve a `FlushedItem` inside the buffer. + /// This doesn't contain any data, but it contains the metadata to read the flushed chunk. + fn add_flushed_item_to_buffer( + &mut self, + start_seq_id: SeqIdType, + end_seq_id: SeqIdType, + new_vnode_bitmap: Bitmap, + epoch: u64, + ) { + if let Some(( + item_epoch, + LogStoreBufferItem::Flushed { + end_seq_id: prev_end_seq_id, + vnode_bitmap, + .. + }, + )) = self.buffer.front_mut() + { + assert!( + *prev_end_seq_id < start_seq_id, + "prev end_seq_id {} should be smaller than current start_seq_id {}", + end_seq_id, + start_seq_id + ); + assert_eq!( + epoch, *item_epoch, + "epoch of newly added flushed item must be the same as the last flushed item" + ); + *prev_end_seq_id = end_seq_id; + *vnode_bitmap |= new_vnode_bitmap; + } else { + let chunk_id = self.next_chunk_id; + self.next_chunk_id += 1; + self.buffer.push_back(( + epoch, + LogStoreBufferItem::Flushed { + start_seq_id, + end_seq_id, + vnode_bitmap: new_vnode_bitmap, + chunk_id, + }, + )); + tracing::trace!("Adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"); + } + // FIXME(kwannoel): Seems these metrics are updated _after_ the flush info is reported. + self.update_unconsumed_buffer_metrics(); + } + + fn add_chunk_to_buffer( + &mut self, + chunk: StreamChunk, + start_seq_id: SeqIdType, + end_seq_id: SeqIdType, + epoch: u64, + ) { + let chunk_id = self.next_chunk_id; + self.next_chunk_id += 1; + self.buffer.push_back(( + epoch, + LogStoreBufferItem::StreamChunk { + chunk, + start_seq_id, + end_seq_id, + flushed: false, + chunk_id, + }, + )); + self.update_unconsumed_buffer_metrics(); + } + + fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> { + self.buffer.pop_front() + } + + fn update_unconsumed_buffer_metrics(&self) { + let mut epoch_count = 0; + let mut row_count = 0; + for (_, item) in &self.buffer { + match item { + LogStoreBufferItem::StreamChunk { chunk, .. } => { + row_count += chunk.cardinality(); + } + LogStoreBufferItem::Flushed { + start_seq_id, + end_seq_id, + .. + } => { + row_count += (end_seq_id - start_seq_id) as usize; + } + LogStoreBufferItem::Barrier { .. } => { + epoch_count += 1; + } + LogStoreBufferItem::UpdateVnodes(_) => {} + } + } + self.metrics.buffer_unconsumed_epoch_count.set(epoch_count); + self.metrics.buffer_unconsumed_row_count.set(row_count as _); + self.metrics + .buffer_unconsumed_item_count + .set(self.buffer.len() as _); + self.metrics.buffer_unconsumed_min_epoch.set( + self.buffer + .front() + .map(|(epoch, _)| *epoch) + .unwrap_or_default() as _, + ); + } +} + +impl Execute for SyncedKvLogStoreExecutor +where + S: StateStore, +{ + fn execute(self: Box) -> BoxedMessageStream { + self.execute_inner().boxed() + } +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + use risingwave_common::catalog::Field; + use risingwave_common::hash::VirtualNode; + use risingwave_common::test_prelude::*; + use risingwave_common::util::epoch::test_epoch; + use risingwave_storage::memory::MemoryStateStore; + + use super::*; + use crate::common::log_store_impl::kv_log_store::test_utils::{ + check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema, + }; + use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO; + use crate::executor::test_utils::MockSource; + + fn init_logger() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(false) + .try_init(); + } + + // test read/write buffer + #[tokio::test] + async fn test_read_write_buffer() { + init_logger(); + let schema = Schema { + fields: vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + ], + }; + let pk_indices = vec![0]; + let (mut tx, source) = MockSource::channel(); + let source = source.into_executor(schema.clone(), pk_indices.clone()); + + let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))); + + let pk_info = &KV_LOG_STORE_V2_INFO; + let table = gen_test_log_store_table(pk_info); + + let log_store_executor = SyncedKvLogStoreExecutor::new( + ActorContext::for_test(123), + table.id, + KvLogStoreReadMetrics::for_test(), + KvLogStoreMetrics::for_test(), + LogStoreRowSerde::new(&table, vnodes, pk_info), + 0, + MemoryStateStore::new(), + 10, + source, + ) + .await + .boxed(); + + // Init + tx.push_barrier(test_epoch(1), false); + + let chunk_1 = StreamChunk::from_pretty( + " I I + + 5 10 + + 6 10 + + 8 10 + + 9 10 + + 10 11", + ); + + let chunk_2 = StreamChunk::from_pretty( + " I I + - 5 10 + - 6 10 + - 8 10 + U- 9 10 + U+ 10 11", + ); + + tx.push_chunk(chunk_1.clone()); + tx.push_chunk(chunk_2.clone()); + + let mut stream = log_store_executor.execute(); + + match stream.next().await { + Some(Ok(Message::Barrier(barrier))) => { + assert_eq!(barrier.epoch.curr, test_epoch(1)); + } + other => panic!("Expected a barrier message, got {:?}", other), + } + + match stream.next().await { + Some(Ok(Message::Chunk(chunk))) => { + assert_eq!(chunk, chunk_1); + } + other => panic!("Expected a chunk message, got {:?}", other), + } + + match stream.next().await { + Some(Ok(Message::Chunk(chunk))) => { + assert_eq!(chunk, chunk_2); + } + other => panic!("Expected a chunk message, got {:?}", other), + } + + tx.push_barrier(test_epoch(2), false); + + match stream.next().await { + Some(Ok(Message::Barrier(barrier))) => { + assert_eq!(barrier.epoch.curr, test_epoch(2)); + } + other => panic!("Expected a barrier message, got {:?}", other), + } + } + + // test barrier persisted read + // + // sequence of events (earliest -> latest): + // barrier(1) -> chunk(1) -> chunk(2) -> poll(3) items -> barrier(2) -> poll(1) item + // * poll just means we read from the executor stream. + #[tokio::test] + async fn test_barrier_persisted_read() { + init_logger(); + let schema = Schema { + fields: vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + ], + }; + let pk_indices = vec![0]; + let (mut tx, source) = MockSource::channel(); + let source = source.into_executor(schema.clone(), pk_indices.clone()); + + let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))); + + let pk_info = &KV_LOG_STORE_V2_INFO; + let table = gen_test_log_store_table(pk_info); + + let log_store_executor = SyncedKvLogStoreExecutor::new( + ActorContext::for_test(123), + table.id, + KvLogStoreReadMetrics::for_test(), + KvLogStoreMetrics::for_test(), + LogStoreRowSerde::new(&table, vnodes, pk_info), + 0, + MemoryStateStore::new(), + 10, + source, + ) + .await + .boxed(); + + // Init + tx.push_barrier(test_epoch(1), false); + + let chunk_1 = StreamChunk::from_pretty( + " I I + + 5 10 + + 6 10 + + 8 10 + + 9 10 + + 10 11", + ); + + let chunk_2 = StreamChunk::from_pretty( + " I I + - 5 10 + - 6 10 + - 8 10 + U- 10 11 + U+ 10 10", + ); + + tx.push_chunk(chunk_1.clone()); + tx.push_chunk(chunk_2.clone()); + + tx.push_barrier(test_epoch(2), false); + + let mut stream = log_store_executor.execute(); + + match stream.next().await { + Some(Ok(Message::Barrier(barrier))) => { + assert_eq!(barrier.epoch.curr, test_epoch(1)); + } + other => panic!("Expected a barrier message, got {:?}", other), + } + + match stream.next().await { + Some(Ok(Message::Barrier(barrier))) => { + assert_eq!(barrier.epoch.curr, test_epoch(2)); + } + other => panic!("Expected a barrier message, got {:?}", other), + } + + match stream.next().await { + Some(Ok(Message::Chunk(chunk))) => { + assert_eq!(chunk, chunk_1); + } + other => panic!("Expected a chunk message, got {:?}", other), + } + + match stream.next().await { + Some(Ok(Message::Chunk(chunk))) => { + assert_eq!(chunk, chunk_2); + } + other => panic!("Expected a chunk message, got {:?}", other), + } + } + + // When we hit buffer max_chunk, we only store placeholder `FlushedItem`. + // So we just let capacity = 0, and we will always flush incoming chunks to state store. + #[tokio::test] + async fn test_max_chunk_persisted_read() { + init_logger(); + + let pk_info = &KV_LOG_STORE_V2_INFO; + let column_descs = test_payload_schema(pk_info); + let fields = column_descs + .into_iter() + .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone())) + .collect_vec(); + let schema = Schema { fields }; + let pk_indices = vec![0]; + let (mut tx, source) = MockSource::channel(); + let source = source.into_executor(schema.clone(), pk_indices.clone()); + + let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))); + + let table = gen_test_log_store_table(pk_info); + + let log_store_executor = SyncedKvLogStoreExecutor::new( + ActorContext::for_test(123), + table.id, + KvLogStoreReadMetrics::for_test(), + KvLogStoreMetrics::for_test(), + LogStoreRowSerde::new(&table, vnodes, pk_info), + 0, + MemoryStateStore::new(), + 0, + source, + ) + .await + .boxed(); + + // Init + tx.push_barrier(test_epoch(1), false); + + let chunk_1 = StreamChunk::from_pretty( + " I T + + 5 10 + + 6 10 + + 8 10 + + 9 10 + + 10 11", + ); + + let chunk_2 = StreamChunk::from_pretty( + " I T + - 5 10 + - 6 10 + - 8 10 + U- 10 11 + U+ 10 10", + ); + + tx.push_chunk(chunk_1.clone()); + tx.push_chunk(chunk_2.clone()); + + tx.push_barrier(test_epoch(2), false); + + let mut stream = log_store_executor.execute(); + + for i in 1..=2 { + match stream.next().await { + Some(Ok(Message::Barrier(barrier))) => { + assert_eq!(barrier.epoch.curr, test_epoch(i)); + } + other => panic!("Expected a barrier message, got {:?}", other), + } + } + + match stream.next().await { + Some(Ok(Message::Chunk(actual))) => { + let expected = StreamChunk::from_pretty( + " I T + + 5 10 + + 6 10 + + 8 10 + + 9 10 + + 10 11 + - 5 10 + - 6 10 + - 8 10 + U- 10 11 + U+ 10 10", + ); + assert!( + check_stream_chunk_eq(&actual, &expected), + "Expected: {:#?}, got: {:#?}", + expected, + actual + ); + } + other => panic!("Expected a chunk message, got {:?}", other), + } + } +} diff --git a/src/stream/src/from_proto/hash_join.rs b/src/stream/src/from_proto/hash_join.rs index 8e75cbc57c38a..d768093ba9dda 100644 --- a/src/stream/src/from_proto/hash_join.rs +++ b/src/stream/src/from_proto/hash_join.rs @@ -198,7 +198,7 @@ impl HashKeyDispatcher for HashJoinExecutorDispatcherArgs { /// This macro helps to fill the const generic type parameter. macro_rules! build { ($join_type:ident) => { - Ok(HashJoinExecutor::::new( + HashJoinExecutor::::new( self.ctx, self.info, self.source_l, @@ -219,10 +219,10 @@ impl HashKeyDispatcher for HashJoinExecutorDispatcherArgs { self.chunk_size, self.high_join_amplification_threshold, ) - .boxed()) + .boxed() }; } - match self.join_type_proto { + let executor = match self.join_type_proto { JoinTypeProto::AsofInner | JoinTypeProto::AsofLeftOuter | JoinTypeProto::Unspecified => unreachable!(), @@ -234,7 +234,8 @@ impl HashKeyDispatcher for HashJoinExecutorDispatcherArgs { JoinTypeProto::LeftAnti => build!(LeftAnti), JoinTypeProto::RightSemi => build!(RightSemi), JoinTypeProto::RightAnti => build!(RightAnti), - } + }; + Ok(executor) } fn data_types(&self) -> &[DataType] {