Skip to content

Commit

Permalink
implement message passing of commit data as part of checkpointing.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Oct 21, 2023
1 parent 08e43b8 commit c5af51e
Show file tree
Hide file tree
Showing 20 changed files with 298 additions and 60 deletions.
23 changes: 9 additions & 14 deletions arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::types::public::StopMode as SqlStopMode;
use anyhow::bail;
use arroyo_datastream::Program;
use arroyo_rpc::grpc::{
worker_grpc_client::WorkerGrpcClient, CheckpointReq, JobFinishedReq, LoadCompactedDataReq,
StopExecutionReq, StopMode, TaskCheckpointEventType,
worker_grpc_client::WorkerGrpcClient, CheckpointReq, CommitReq, JobFinishedReq,
LoadCompactedDataReq, StopExecutionReq, StopMode, TaskCheckpointEventType,
};
use arroyo_state::{BackingStore, StateBackend};
use arroyo_types::{to_micros, WorkerId};
Expand Down Expand Up @@ -433,6 +433,7 @@ impl RunningJobModel {
DbCheckpointState::committing,
)
.await?;
let committing_data = committing_state.committing_data();
self.checkpoint_state =
Some(CheckpointingOrCommittingState::Committing(committing_state));
info!(
Expand All @@ -443,12 +444,9 @@ impl RunningJobModel {
for worker in self.workers.values_mut() {
worker
.connect
.checkpoint(Request::new(CheckpointReq {
timestamp: to_micros(SystemTime::now()),
min_epoch: self.min_epoch,
.commit(Request::new(CommitReq {
epoch: self.epoch,
then_stop: false,
is_commit: true,
committing_data: committing_data.clone(),
}))
.await?;
}
Expand Down Expand Up @@ -707,21 +705,18 @@ impl JobController {
}

pub async fn send_commit_messages(&mut self) -> anyhow::Result<()> {
let Some(CheckpointingOrCommittingState::Committing(_committing)) =
let Some(CheckpointingOrCommittingState::Committing(committing)) =
&self.model.checkpoint_state
else {
bail!("should be committing")
};
for worker in self.model.workers.values_mut() {
worker
.connect
.checkpoint(Request::new(CheckpointReq {
timestamp: to_micros(SystemTime::now()),
min_epoch: self.model.min_epoch,
.commit(CommitReq {
epoch: self.model.epoch,
then_stop: false,
is_commit: true,
}))
committing_data: committing.committing_data(),
})
.await?;
}
Ok(())
Expand Down
19 changes: 17 additions & 2 deletions arroyo-controller/src/states/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tonic::{transport::Channel, Request};
use tracing::{error, info, warn};

use anyhow::anyhow;
use arroyo_state::{parquet::get_storage_env_vars, BackingStore, StateBackend};
use arroyo_state::{
committing_state::CommittingState, parquet::get_storage_env_vars, BackingStore, StateBackend,
};

use crate::{
job_controller::JobController,
Expand Down Expand Up @@ -368,6 +370,7 @@ impl State for Scheduling {
metadata.min_epoch = min_epoch;
if needs_commits {
let mut commit_subtasks = HashSet::new();
let mut committing_data = HashMap::new();
for operator_id in &metadata.operator_ids {
let operator_metadata =
StateBackend::load_operator_metadata(&ctx.config.id, operator_id, epoch)
Expand All @@ -378,6 +381,18 @@ impl State for Scheduling {
operator_id, ctx.config.id
);
};
if let Some(commit_data) = operator_metadata.commit_data {
committing_data.insert(
operator_id.clone(),
commit_data
.committing_data
.into_iter()
.map(|(table, commit_data_map)| {
(table, commit_data_map.commit_data_by_subtask)
})
.collect(),
);
}
if operator_metadata.has_state
&& operator_metadata
.tables
Expand All @@ -396,7 +411,7 @@ impl State for Scheduling {
}
}
}
committing_state = Some((id, commit_subtasks));
committing_state = Some(CommittingState::new(id, commit_subtasks, committing_data));
}
StateBackend::write_checkpoint_metadata(metadata).await;
}
Expand Down
6 changes: 3 additions & 3 deletions arroyo-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ fn impl_stream_node_type(
match control_message {
arroyo_rpc::ControlMessage::Checkpoint(_) => tracing::warn!("shouldn't receive checkpoint"),
arroyo_rpc::ControlMessage::Stop { mode: _ } => tracing::warn!("shouldn't receive stop"),
arroyo_rpc::ControlMessage::Commit { epoch } => {
self.handle_commit(epoch, &mut ctx).await;
arroyo_rpc::ControlMessage::Commit { epoch, commit_data } => {
self.handle_commit(epoch, commit_data, &mut ctx).await;
},
arroyo_rpc::ControlMessage::LoadCompacted { compacted } => {
ctx.load_compacted(compacted).await;
Expand Down Expand Up @@ -802,7 +802,7 @@ fn impl_stream_node_type(

if !methods.contains("handle_commit") {
defs.push(quote! {
async fn handle_commit(&mut self, epoch: u32, ctx: &mut Context<#out_k, #out_t>) {
async fn handle_commit(&mut self, epoch: u32, commit_data: std::collections::HashMap<char, std::collections::HashMap<u32, Vec<u8>>>, ctx: &mut Context<#out_k, #out_t>) {
tracing::warn!("default handling of commit with epoch {:?}", epoch);
}
})
Expand Down
22 changes: 22 additions & 0 deletions arroyo-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ message SubtaskCheckpointMetadata {
uint64 bytes = 7;

repeated BackendData backend_data = 8;
// this is data that should be sent along with the commit message
// to all subtask. The key is the table name.
map<string,bytes> committing_data = 9;
}

message BackendData {
Expand All @@ -246,6 +249,7 @@ message OperatorCheckpointMetadata {

repeated BackendData backend_data = 10;
uint64 bytes = 11;
OperatorCommitData commit_data = 12;
}

enum TableType {
Expand Down Expand Up @@ -315,6 +319,23 @@ message CheckpointReq {
message CheckpointResp {
}

message CommitReq {
uint32 epoch = 1;
map<string, OperatorCommitData> committing_data = 2;
}

message CommitResp {
}

message OperatorCommitData {
// map from table name to commit data for that table.
map<string, TableCommitData> committing_data = 1;
}

message TableCommitData {
map<uint32, bytes> commit_data_by_subtask = 1;
}

message LoadCompactedDataReq {
string operator_id = 1;
repeated BackendData backend_data_to_drop = 2; // this data got compressed...
Expand Down Expand Up @@ -347,6 +368,7 @@ message JobFinishedResp {
service WorkerGrpc {
rpc StartExecution(StartExecutionReq) returns (StartExecutionResp);
rpc Checkpoint(CheckpointReq) returns (CheckpointResp);
rpc Commit(CommitReq) returns (CommitResp);
rpc LoadCompactedData(LoadCompactedDataReq) returns (LoadCompactedDataRes);
rpc StopExecution(StopExecutionReq) returns (StopExecutionResp);
rpc JobFinished(JobFinishedReq) returns (JobFinishedResp);
Expand Down
14 changes: 11 additions & 3 deletions arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod api_types;
pub mod formats;
pub mod public_ids;

use std::collections::HashMap;
use std::{fs, time::SystemTime};

use crate::api_types::connections::PrimitiveType;
Expand Down Expand Up @@ -32,9 +33,16 @@ pub mod grpc {
#[derive(Debug)]
pub enum ControlMessage {
Checkpoint(CheckpointBarrier),
Stop { mode: StopMode },
Commit { epoch: u32 },
LoadCompacted { compacted: CompactionResult },
Stop {
mode: StopMode,
},
Commit {
epoch: u32,
commit_data: HashMap<char, HashMap<u32, Vec<u8>>>,
},
LoadCompacted {
compacted: CompactionResult,
},
NoOp,
}

Expand Down
39 changes: 37 additions & 2 deletions arroyo-state/src/checkpoint_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::committing_state::CommittingState;
use crate::subtask_state::SubtaskState;
use crate::{BackingStore, StateBackend};
use anyhow::{anyhow, bail};
use arroyo_rpc::grpc;
use arroyo_rpc::grpc::api::OperatorCheckpointDetail;
use arroyo_rpc::grpc::{self, OperatorCommitData, TableCommitData};
use arroyo_rpc::grpc::{
api, backend_data, BackendData, CheckpointMetadata, OperatorCheckpointMetadata,
TableDescriptor, TableWriteBehavior, TaskCheckpointCompletedReq, TaskCheckpointEventReq,
Expand All @@ -23,6 +23,7 @@ pub struct CheckpointState {
tasks: HashMap<String, BTreeMap<u32, SubtaskState>>,
completed_operators: HashSet<String>,
subtasks_to_commit: HashSet<(String, u32)>,
committing_backend_data: HashMap<String, HashMap<String, HashMap<u32, Vec<u8>>>>,

// Used for the web ui -- eventually should be replaced with some other way of tracking / reporting
// this data
Expand All @@ -47,6 +48,7 @@ impl CheckpointState {
tasks: HashMap::new(),
completed_operators: HashSet::new(),
subtasks_to_commit: HashSet::new(),
committing_backend_data: HashMap::new(),
operator_details: HashMap::new(),
}
}
Expand Down Expand Up @@ -174,6 +176,14 @@ impl CheckpointState {
});
detail.bytes = Some(metadata.bytes);
detail.finish_time = Some(metadata.finish_time);
for (table, committing_data) in &metadata.committing_data {
self.committing_backend_data
.entry(c.operator_id.clone())
.or_default()
.entry(table.to_string())
.or_default()
.insert(metadata.subtask_index, committing_data.clone());
}

// this is for the actual checkpoint management

Expand Down Expand Up @@ -281,6 +291,27 @@ impl CheckpointState {
tables: tables.into_values().collect(),
backend_data: backend_data.into_values().collect(),
bytes: size,
commit_data: self
.committing_backend_data
.get(&operator_id)
.map(|commit_data| OperatorCommitData {
committing_data: commit_data
.iter()
.map(|(table_name, subtask_to_commit_data)| {
(
table_name.clone(),
TableCommitData {
commit_data_by_subtask: subtask_to_commit_data
.iter()
.map(|(subtask_index, commit_data)| {
(*subtask_index, commit_data.clone())
})
.collect(),
},
)
})
.collect(),
}),
})
.await;

Expand All @@ -307,7 +338,11 @@ impl CheckpointState {
}

pub fn committing_state(&self) -> CommittingState {
CommittingState::new(self.checkpoint_id, self.subtasks_to_commit.clone())
CommittingState::new(
self.checkpoint_id,
self.subtasks_to_commit.clone(),
self.committing_backend_data.clone(),
)
}

pub async fn save_state(&self) -> anyhow::Result<()> {
Expand Down
48 changes: 39 additions & 9 deletions arroyo-state/src/committing_state.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use arroyo_rpc::grpc::{OperatorCommitData, TableCommitData};

pub struct CommittingState {
checkpoint_id: i64,
subtasks_to_commit: HashSet<(String, u32)>,
committing_data: HashMap<String, HashMap<String, HashMap<u32, Vec<u8>>>>,
}

impl CommittingState {
pub fn new(checkpoint_id: i64, subtasks_to_commit: HashSet<(String, u32)>) -> Self {
pub fn new(
checkpoint_id: i64,
subtasks_to_commit: HashSet<(String, u32)>,
committing_data: HashMap<String, HashMap<String, HashMap<u32, Vec<u8>>>>,
) -> Self {
Self {
checkpoint_id,
subtasks_to_commit,
committing_data,
}
}

Expand All @@ -25,13 +33,35 @@ impl CommittingState {
pub fn done(&self) -> bool {
self.subtasks_to_commit.is_empty()
}
}

impl From<(i64, HashSet<(String, u32)>)> for CommittingState {
fn from((checkpoint_id, subtasks_to_commit): (i64, HashSet<(String, u32)>)) -> Self {
Self {
checkpoint_id,
subtasks_to_commit,
}
pub fn committing_data(&self) -> HashMap<String, OperatorCommitData> {
let operators_to_commit: HashSet<_> = self
.subtasks_to_commit
.iter()
.map(|(operator_id, _subtask_id)| operator_id.clone())
.collect();
operators_to_commit
.into_iter()
.map(|operator_id| {
let committing_data = self
.committing_data
.get(&operator_id)
.map(|table_map| {
table_map
.iter()
.map(|(table_name, subtask_to_commit_data)| {
(
table_name.clone(),
TableCommitData {
commit_data_by_subtask: subtask_to_commit_data.clone(),
},
)
})
.collect()
})
.unwrap_or_default();
(operator_id, OperatorCommitData { committing_data })
})
.collect()
}
}
8 changes: 8 additions & 0 deletions arroyo-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ pub trait BackingStore {
async fn get_key_values<K: Key, V: Data>(&self, table: char) -> Vec<(K, V)>;

async fn load_compacted(&mut self, compaction: CompactionResult);
async fn insert_committing_data(&mut self, table: char, committing_data: Vec<u8>);
}

pub struct StateStore<S: BackingStore> {
Expand Down Expand Up @@ -416,6 +417,12 @@ impl<S: BackingStore> StateStore<S> {
pub async fn load_compacted(&mut self, compaction: CompactionResult) {
self.backend.load_compacted(compaction).await;
}

pub async fn insert_committing_data(&mut self, table: char, committing_data: Vec<u8>) {
self.backend
.insert_committing_data(table, committing_data)
.await
}
}

#[cfg(test)]
Expand Down Expand Up @@ -558,6 +565,7 @@ mod test {
tables: default_tables(),
backend_data: message.subtask_metadata.backend_data,
bytes: 5,
commit_data: None,
})
.await;

Expand Down
Loading

0 comments on commit c5af51e

Please sign in to comment.