Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor #20262

Draft
wants to merge 56 commits into
base: main
Choose a base branch
from
Draft

refactor #20262

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
326641a
add interfaces
kwannoel Jan 10, 2025
7c64aea
interim
kwannoel Jan 13, 2025
0256115
separate select
kwannoel Jan 13, 2025
e1d8e76
poll writer and reader
kwannoel Jan 14, 2025
4dbfd23
directly read from upstream
kwannoel Jan 14, 2025
a3accdc
match different upstream items
kwannoel Jan 15, 2025
28d90f8
docs
kwannoel Jan 15, 2025
6b8b872
add skeleton for read path
kwannoel Jan 15, 2025
ec68692
refactor common fn
kwannoel Jan 16, 2025
229f7b5
handle full read path
kwannoel Jan 16, 2025
f26a979
write barrier
kwannoel Jan 16, 2025
2a8f518
write chunk
kwannoel Jan 17, 2025
a8fc266
wired up write_barrier
kwannoel Jan 17, 2025
1ac6f68
wire up write_chunk
kwannoel Jan 17, 2025
774a055
track metrics in buffer
kwannoel Jan 17, 2025
e99f87a
truncate offset after read
kwannoel Jan 17, 2025
307a06c
refactor into functions
kwannoel Jan 17, 2025
b67baf4
add constructor
kwannoel Jan 17, 2025
c7f6ae5
fmt
kwannoel Jan 17, 2025
7d2230f
remove unnecessary async
kwannoel Jan 17, 2025
287d50e
do not lock mutex across await point
kwannoel Jan 17, 2025
1203fef
fix warnings
kwannoel Jan 17, 2025
5139adb
fmt
kwannoel Jan 20, 2025
3cca011
refactor to executor
kwannoel Jan 20, 2025
cf6e86e
renaming
kwannoel Jan 20, 2025
f620e9d
impl execute
kwannoel Jan 20, 2025
a6cd2e3
fix warn + docs
kwannoel Jan 20, 2025
9a7779b
refactor persisted logstore
kwannoel Jan 20, 2025
5468b2e
init state store iter
kwannoel Jan 20, 2025
458f02e
fix warn
kwannoel Jan 20, 2025
c8f2ea8
pass executor rather than msg stream
kwannoel Jan 20, 2025
c269c1f
defer state_store clone
kwannoel Jan 20, 2025
31b535f
add vnodes
kwannoel Jan 20, 2025
c26800d
test outline pt 1
kwannoel Jan 21, 2025
c7b6b94
add more test lines
kwannoel Jan 21, 2025
9e7b62b
check test results
kwannoel Jan 21, 2025
93b10a7
fix calls
kwannoel Jan 21, 2025
ca0c40e
make test compile
kwannoel Jan 21, 2025
c0e006b
yield first barrier
kwannoel Jan 22, 2025
a82fcf5
logging
kwannoel Jan 22, 2025
e90e4d2
bias to upstream side
kwannoel Jan 22, 2025
e28e3fe
fmt
kwannoel Jan 22, 2025
123046c
allow unused
kwannoel Jan 22, 2025
6bca386
fix test lints
kwannoel Jan 23, 2025
e6ded4e
fix
kwannoel Jan 23, 2025
5a8b58a
use prev
kwannoel Jan 27, 2025
f0fe699
fix deps
kwannoel Jan 27, 2025
2b3dfd3
remove LS
kwannoel Jan 27, 2025
1e4ab0f
use pub(crate) instead of pub
kwannoel Jan 27, 2025
c0e70d0
use expect_first_barrier + yield before init state store
kwannoel Jan 27, 2025
e0360a4
fix warn
kwannoel Jan 27, 2025
8cc063a
apply vnode update to write path
kwannoel Jan 27, 2025
5cccf42
update read path
kwannoel Jan 27, 2025
484a5f8
no need lock
kwannoel Jan 27, 2025
78bce70
fix test
kwannoel Jan 28, 2025
00fffd6
refactor
kwannoel Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ fn deserialize_list(item_type: &DataType, data: &mut impl Buf) -> Result<ScalarI

fn deserialize_str(data: &mut impl Buf) -> Result<Box<str>> {
let len = data.get_u32_le();
tracing::trace!("len: {len}");
let mut bytes = vec![0; len as usize];
data.copy_to_slice(&mut bytes);
String::from_utf8(bytes)
Expand Down
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ risingwave_hummock_test = { path = "../storage/hummock_test", features = [
"test",
] }
serde_yaml = "0.9"
tracing-subscriber = "0.3.17"
tracing-test = "0.2"

[features]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl LogStoreBufferSender {
{
if *flushed {
// Since we iterate from new data to old data, when we meet a flushed data, the
// rest should all be flushed.
// rest should have been flushed.
break;
}
flush_fn(chunk, *epoch, *start_seq_id, *end_seq_id)?;
Expand Down
10 changes: 5 additions & 5 deletions src/stream/src/common/log_store_impl/kv_log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde;
use crate::common::log_store_impl::kv_log_store::writer::KvLogStoreWriter;
use crate::executor::monitor::StreamingMetrics;

mod buffer;
mod reader;
pub(crate) mod buffer;
pub mod reader;
pub(crate) mod serde;
#[cfg(test)]
mod test_utils;
pub mod test_utils;
mod writer;

pub(crate) use reader::{REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY};
Expand All @@ -55,7 +55,7 @@ pub(crate) const FIRST_SEQ_ID: SeqIdType = 0;
pub(crate) type ReaderTruncationOffsetType = (u64, Option<SeqIdType>);

#[derive(Clone)]
pub(crate) struct KvLogStoreReadMetrics {
pub struct KvLogStoreReadMetrics {
pub storage_read_count: LabelGuardedIntCounter<5>,
pub storage_read_size: LabelGuardedIntCounter<5>,
}
Expand Down Expand Up @@ -190,7 +190,7 @@ impl KvLogStoreMetrics {
}

#[cfg(test)]
fn for_test() -> Self {
pub(crate) fn for_test() -> Self {
KvLogStoreMetrics {
storage_write_count: LabelGuardedIntCounter::test_int_counter(),
storage_write_size: LabelGuardedIntCounter::test_int_counter(),
Expand Down
225 changes: 126 additions & 99 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ use crate::common::log_store_impl::kv_log_store::buffer::{
use crate::common::log_store_impl::kv_log_store::serde::{
merge_log_store_item_stream, KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
};
use crate::common::log_store_impl::kv_log_store::KvLogStoreMetrics;
use crate::common::log_store_impl::kv_log_store::{
KvLogStoreMetrics, KvLogStoreReadMetrics, SeqIdType,
};

pub(crate) const REWIND_BASE_DELAY: Duration = Duration::from_secs(1);
pub(crate) const REWIND_BACKOFF_FACTOR: u64 = 2;
Expand Down Expand Up @@ -197,7 +199,7 @@ impl<S: StateStoreRead> KvLogStoreReader<S> {
}
}

struct AutoRebuildStateStoreReadIter<S: StateStoreRead, F> {
pub struct AutoRebuildStateStoreReadIter<S: StateStoreRead, F> {
state_store: S,
iter: S::Iter,
// call to get whether to rebuild the iter. Once return true, the closure should reset itself.
Expand Down Expand Up @@ -230,7 +232,7 @@ impl<S: StateStoreRead, F: FnMut() -> bool> AutoRebuildStateStoreReadIter<S, F>
}
}

mod timeout_auto_rebuild {
pub(crate) mod timeout_auto_rebuild {
use std::time::{Duration, Instant};

use risingwave_hummock_sdk::key::TableKeyRange;
Expand All @@ -240,7 +242,7 @@ mod timeout_auto_rebuild {

use crate::common::log_store_impl::kv_log_store::reader::AutoRebuildStateStoreReadIter;

pub(super) type TimeoutAutoRebuildIter<S: StateStoreRead> =
pub(crate) type TimeoutAutoRebuildIter<S: StateStoreRead> =
AutoRebuildStateStoreReadIter<S, impl FnMut() -> bool + Send>;

pub(super) async fn iter_with_timeout_rebuild<S: StateStoreRead>(
Expand Down Expand Up @@ -345,57 +347,14 @@ impl<S: StateStoreRead + Clone> KvLogStoreReader<S> {
) -> impl Future<
Output = LogStoreResult<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>,
> + Send {
let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch {
// start from the next epoch of last_persisted_epoch
Included(
self.serde
.serialize_pk_epoch_prefix(last_persisted_epoch.next_epoch()),
)
} else {
Unbounded
};
let range_end = self.serde.serialize_pk_epoch_prefix(
self.first_write_epoch
.expect("should have set first write epoch"),
);

let serde = self.serde.clone();
let table_id = self.table_id;
let read_metrics = self.metrics.persistent_log_read_metrics.clone();
let streams_future = try_join_all(serde.vnodes().iter_vnodes().map(|vnode| {
let key_range = prefixed_range_with_vnode(
(range_start.clone(), Excluded(range_end.clone())),
vnode,
);
let state_store = self.state_store.clone();
async move {
// rebuild the iter every 10 minutes to avoid pinning hummock version for too long
iter_with_timeout_rebuild(
state_store,
key_range,
HummockEpoch::MAX,
ReadOptions {
// This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch.
prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(),
cache_policy: CachePolicy::Fill(CacheHint::Low),
table_id,
..Default::default()
},
Duration::from_secs(10 * 60),
)
.await
}
}));

streams_future.map_err(Into::into).map_ok(|streams| {
// TODO: set chunk size by config
Box::pin(merge_log_store_item_stream(
streams,
serde,
1024,
read_metrics,
))
})
read_persisted_log_store(
&self.serde,
self.table_id,
&self.metrics,
self.state_store.clone(),
self.first_write_epoch.expect("should have init"),
last_persisted_epoch,
)
}
}

Expand Down Expand Up @@ -510,50 +469,17 @@ impl<S: StateStoreRead + Clone> LogReader for KvLogStoreReader<S> {
let state_store = self.state_store.clone();
let table_id = self.table_id;
let read_metrics = self.metrics.flushed_buffer_read_metrics.clone();
async move {
let iters = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
let range_start =
serde.serialize_log_store_pk(vnode, item_epoch, Some(start_seq_id));
let range_end =
serde.serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id));
let state_store = &state_store;

// Use MAX EPOCH here because the epoch to consume may be below the safe
// epoch
async move {
Ok::<_, anyhow::Error>(
state_store
.iter(
(Included(range_start), Included(range_end)),
HummockEpoch::MAX,
ReadOptions {
prefetch_options:
PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::Fill(CacheHint::Low),
table_id,
..Default::default()
},
)
.await?,
)
}
}))
.instrument_await("Wait Create Iter Stream")
.await?;

let chunk = serde
.deserialize_stream_chunk(
iters,
start_seq_id,
end_seq_id,
item_epoch,
&read_metrics,
)
.instrument_await("Deserialize Stream Chunk")
.await?;

Ok((chunk_id, chunk, item_epoch))
}
read_flushed_chunk(
serde,
state_store,
vnode_bitmap,
chunk_id,
start_seq_id,
end_seq_id,
item_epoch,
table_id,
read_metrics,
)
.boxed()
};

Expand Down Expand Up @@ -672,6 +598,107 @@ impl<S: StateStoreRead + Clone> LogReader for KvLogStoreReader<S> {
}
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn read_flushed_chunk(
serde: LogStoreRowSerde,
state_store: impl StateStoreRead,
vnode_bitmap: Bitmap,
chunk_id: ChunkId,
start_seq_id: SeqIdType,
end_seq_id: SeqIdType,
item_epoch: u64,
table_id: TableId,
read_metrics: KvLogStoreReadMetrics,
) -> LogStoreResult<(ChunkId, StreamChunk, u64)> {
tracing::trace!("reading flushed chunk from buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}");
let iters = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
let range_start = serde.serialize_log_store_pk(vnode, item_epoch, Some(start_seq_id));
let range_end = serde.serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id));
let state_store = &state_store;

// Use MAX EPOCH here because the epoch to consume may be below the safe
// epoch
async move {
Ok::<_, anyhow::Error>(
state_store
.iter(
(Included(range_start), Included(range_end)),
HummockEpoch::MAX,
ReadOptions {
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::Fill(CacheHint::Low),
table_id,
..Default::default()
},
)
.await?,
)
}
}))
.instrument_await("Wait Create Iter Stream")
.await?;

let chunk = serde
.deserialize_stream_chunk(iters, start_seq_id, end_seq_id, item_epoch, &read_metrics)
.instrument_await("Deserialize Stream Chunk")
.await?;

Ok((chunk_id, chunk, item_epoch))
}

pub(crate) fn read_persisted_log_store<S: StateStoreRead + Clone>(
serde: &LogStoreRowSerde,
table_id: TableId,
metrics: &KvLogStoreMetrics,
state_store: S,
first_write_epoch: u64,
last_persisted_epoch: Option<u64>,
) -> impl Future<Output = LogStoreResult<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>>
+ Send {
let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch {
// start from the next epoch of last_persisted_epoch
Included(serde.serialize_pk_epoch_prefix(last_persisted_epoch.next_epoch()))
} else {
Unbounded
};
let range_end = serde.serialize_pk_epoch_prefix(first_write_epoch);

let serde = serde.clone();
let read_metrics = metrics.persistent_log_read_metrics.clone();
let streams_future = try_join_all(serde.vnodes().iter_vnodes().map(|vnode| {
let key_range =
prefixed_range_with_vnode((range_start.clone(), Excluded(range_end.clone())), vnode);
let state_store = state_store.clone();
async move {
// rebuild the iter every 10 minutes to avoid pinning hummock version for too long
iter_with_timeout_rebuild(
state_store,
key_range,
HummockEpoch::MAX,
ReadOptions {
// This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch.
prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(),
cache_policy: CachePolicy::Fill(CacheHint::Low),
table_id,
..Default::default()
},
Duration::from_secs(10 * 60),
)
.await
}
}));

streams_future.map_err(Into::into).map_ok(|streams| {
// TODO: set chunk size by config
Box::pin(merge_log_store_item_stream(
streams,
serde,
1024,
read_metrics,
))
})
}

#[cfg(test)]
mod tests {
use std::collections::{Bound, HashSet};
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ mod row_merge;

#[cfg(test)]
mod integration_tests;
mod sync_kv_log_store;
pub mod test_utils;
mod utils;

Expand Down Expand Up @@ -390,6 +391,10 @@ impl Barrier {
.map_or(false, |actors| actors.contains(&actor_id))
}

pub fn is_checkpoint(&self) -> bool {
self.kind == BarrierKind::Checkpoint
}

/// Get the initial split assignments for the actor with `actor_id`.
///
/// This should only be called on the initial barrier received by the executor. It must be
Expand Down
Loading
Loading