Skip to content

Commit

Permalink
Merge pull request #31147 from teskje/storage-exchange-sequencer
Browse files Browse the repository at this point in the history
storage: use exchange-based sequencing to broadcast internal commands
  • Loading branch information
teskje authored Jan 28, 2025
2 parents 3921fec + 508f73d commit 8f331b7
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 60 deletions.
5 changes: 2 additions & 3 deletions src/storage/src/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub trait HealthOperator {

/// A default `HealthOperator` for use in normal cases.
pub struct DefaultWriter {
pub command_tx: Rc<RefCell<dyn InternalCommandSender>>,
pub command_tx: InternalCommandSender,
pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
}

Expand Down Expand Up @@ -293,8 +293,7 @@ impl HealthOperator for DefaultWriter {

fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
self.command_tx
.borrow_mut()
.broadcast(InternalStorageCommand::SuspendAndRestart {
.send(InternalStorageCommand::SuspendAndRestart {
// Suspend and restart is expected to operate on the primary object and
// not any of the sub-objects
id,
Expand Down
216 changes: 196 additions & 20 deletions src/storage/src/internal_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
//! Types for cluster-internal control messages that can be broadcast to all
//! workers from individual operators/workers.
use std::collections::BTreeMap;
use std::time::Instant;
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::rc::Rc;
use std::sync::mpsc;

use mz_repr::{GlobalId, Row};
use mz_rocksdb::config::SharedWriteBufferManager;
Expand All @@ -18,8 +20,11 @@ use mz_storage_types::sinks::StorageSinkDesc;
use mz_storage_types::sources::IngestionDescription;
use serde::{Deserialize, Serialize};
use timely::communication::Allocate;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::generic::{source, OutputHandle};
use timely::dataflow::operators::{Broadcast, Operator};
use timely::progress::Antichain;
use timely::synchronization::Sequencer;
use timely::scheduling::{Activator, Scheduler};
use timely::worker::Worker as TimelyWorker;

use crate::statistics::{SinkStatisticsRecord, SourceStatisticsRecord};
Expand Down Expand Up @@ -120,31 +125,202 @@ pub enum InternalStorageCommand {
},
}

/// Allows broadcasting [`internal commands`](InternalStorageCommand) to all
/// workers.
pub trait InternalCommandSender {
/// A sender broadcasting [`InternalStorageCommand`]s to all workers.
#[derive(Clone)]
pub struct InternalCommandSender {
tx: mpsc::Sender<InternalStorageCommand>,
activator: Rc<RefCell<Option<Activator>>>,
}

impl InternalCommandSender {
/// Broadcasts the given command to all workers.
fn broadcast(&mut self, internal_cmd: InternalStorageCommand);
pub fn send(&self, cmd: InternalStorageCommand) {
if self.tx.send(cmd).is_err() {
panic!("internal command channel disconnected");
}

/// Returns the next available command, if any. This returns `None` when
/// there are currently no commands but there might be commands again in the
/// future.
fn next(&mut self) -> Option<InternalStorageCommand>;
self.activator.borrow().as_ref().map(|a| a.activate());
}
}

impl InternalCommandSender for Sequencer<InternalStorageCommand> {
fn broadcast(&mut self, internal_cmd: InternalStorageCommand) {
self.push(internal_cmd);
}
/// A receiver for [`InternalStorageCommand`]s broadcasted by workers.
pub struct InternalCommandReceiver {
rx: mpsc::Receiver<InternalStorageCommand>,
}

fn next(&mut self) -> Option<InternalStorageCommand> {
Iterator::next(self)
impl InternalCommandReceiver {
/// Returns the next available command, if any.
///
/// This returns `None` when there are currently no commands but there might be commands again
/// in the future.
pub fn try_recv(&self) -> Option<InternalStorageCommand> {
match self.rx.try_recv() {
Ok(cmd) => Some(cmd),
Err(mpsc::TryRecvError::Empty) => None,
Err(mpsc::TryRecvError::Disconnected) => {
panic!("internal command channel disconnected")
}
}
}
}

pub(crate) fn setup_command_sequencer<'w, A: Allocate>(
timely_worker: &'w mut TimelyWorker<A>,
) -> Sequencer<InternalStorageCommand> {
// TODO(aljoscha): Use something based on `mz_ore::NowFn`?
Sequencer::new(timely_worker, Instant::now())
) -> (InternalCommandSender, InternalCommandReceiver) {
let (input_tx, input_rx) = mpsc::channel();
let (output_tx, output_rx) = mpsc::channel();
let activator = Rc::new(RefCell::new(None));

timely_worker.dataflow_named::<(), _, _>("command_sequencer", {
let activator = Rc::clone(&activator);
move |scope| {
// Create a stream of commands received from `input_rx`.
//
// The output commands are tagged by worker ID and command index, allowing downstream
// operators to ensure their correct relative order.
let stream = source(scope, "command_sequencer::source", |cap, info| {
*activator.borrow_mut() = Some(scope.activator_for(info.address));

let worker_id = scope.index();
let mut cmd_index = 0;
let mut capability = Some(cap);

move |output: &mut OutputHandle<_, _, _>| {
let Some(cap) = &capability else {
return;
};

let mut session = output.session(cap);
loop {
match input_rx.try_recv() {
Ok(command) => {
let cmd = IndexedCommand {
index: cmd_index,
command,
};
session.give((worker_id, cmd));
cmd_index += 1;
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
// Drop our capability to shut down.
capability = None;
break;
}
}
}
}
});

// Sequence all commands through a single worker to establish a unique order.
//
// The output commands are tagged by a command index, allowing downstream operators to
// ensure their correct relative order.
let stream = stream.unary_frontier(
Exchange::new(|_| 0),
"command_sequencer::sequencer",
|cap, _info| {
let mut cmd_index = 0;
let mut capability = Some(cap);

// For each worker, keep an ordered list of pending commands, as well as the
// current index of the next command.
let mut pending_commands = vec![(BTreeSet::new(), 0); scope.peers()];

move |input, output: &mut OutputHandle<_, _, _>| {
let Some(cap) = &capability else {
return;
};

while let Some((_cap, data)) = input.next() {
for (worker_id, cmd) in data.drain(..) {
pending_commands[worker_id].0.insert(cmd);
}
}

let mut session = output.session(cap);
for (commands, next_idx) in &mut pending_commands {
while commands.first().is_some_and(|c| c.index == *next_idx) {
let mut cmd = commands.pop_first().unwrap();
cmd.index = cmd_index;
session.give(cmd);

*next_idx += 1;
cmd_index += 1;
}
}

if input.frontier().is_empty() {
// Drop our capability to shut down.
capability = None;
}
}
},
);

// Broadcast the ordered commands to all workers.
let stream = stream.broadcast();

// Sink the stream back into `output_tx`.
stream.sink(Pipeline, "command_sequencer::sink", {
// Keep an ordered list of pending commands, as well as the current index of the
// next command.
let mut pending_commands = BTreeSet::new();
let mut next_idx = 0;

move |input| {
while let Some((_cap, data)) = input.next() {
pending_commands.extend(data.drain(..));
}

while pending_commands
.first()
.is_some_and(|c| c.index == next_idx)
{
let cmd = pending_commands.pop_first().unwrap();
let _ = output_tx.send(cmd.command);
next_idx += 1;
}
}
});
}
});

let tx = InternalCommandSender {
tx: input_tx,
activator,
};
let rx = InternalCommandReceiver { rx: output_rx };

(tx, rx)
}

// An [`InternalStorageCommand`] tagged with an index.
//
// This is a `(u64, InternalStorageCommand)` in spirit, but implements `Ord` (which
// `InternalStorageCommand` doesn't) by looking only at the index.
#[derive(Clone, Debug, Serialize, Deserialize)]
struct IndexedCommand {
index: u64,
command: InternalStorageCommand,
}

impl PartialEq for IndexedCommand {
fn eq(&self, other: &Self) -> bool {
self.cmp(other).is_eq()
}
}

impl Eq for IndexedCommand {}

impl PartialOrd for IndexedCommand {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for IndexedCommand {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.index.cmp(&other.index)
}
}
4 changes: 2 additions & 2 deletions src/storage/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ pub fn build_ingestion_dataflow<A: Allocate>(
"source",
&health_stream,
crate::healthcheck::DefaultWriter {
command_tx: Rc::clone(&storage_state.internal_cmd_tx),
command_tx: storage_state.internal_cmd_tx.clone(),
updates: Rc::clone(&storage_state.object_status_updates),
},
storage_state
Expand Down Expand Up @@ -456,7 +456,7 @@ pub fn build_export_dataflow<A: Allocate>(
"sink",
&health_stream,
crate::healthcheck::DefaultWriter {
command_tx: Rc::clone(&storage_state.internal_cmd_tx),
command_tx: storage_state.internal_cmd_tx.clone(),
updates: Rc::clone(&storage_state.object_status_updates),
},
storage_state
Expand Down
6 changes: 2 additions & 4 deletions src/storage/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

//! Logic related to the creation of dataflow sinks.
use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -55,7 +54,7 @@ pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
SnapshotMode::Exclude
};

let command_tx = Rc::clone(&storage_state.internal_cmd_tx);
let command_tx = storage_state.internal_cmd_tx.clone();

let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
scope,
Expand All @@ -75,8 +74,7 @@ pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
Box::pin(async move {
let error = format!("storage_sink: {error}");
tracing::info!("{error}");
let mut command_tx = command_tx.borrow_mut();
command_tx.broadcast(InternalStorageCommand::SuspendAndRestart {
command_tx.send(InternalStorageCommand::SuspendAndRestart {
id: sink_id,
reason: error,
});
Expand Down
Loading

0 comments on commit 8f331b7

Please sign in to comment.