diff --git a/arroyo-controller/src/job_controller/mod.rs b/arroyo-controller/src/job_controller/mod.rs index faede1210..0ecbdabc2 100644 --- a/arroyo-controller/src/job_controller/mod.rs +++ b/arroyo-controller/src/job_controller/mod.rs @@ -8,8 +8,9 @@ use crate::types::public::StopMode as SqlStopMode; use anyhow::bail; use arroyo_datastream::Program; use arroyo_rpc::grpc::{ - worker_grpc_client::WorkerGrpcClient, CheckpointReq, JobFinishedReq, LoadCompactedDataReq, - StopExecutionReq, StopMode, TaskCheckpointEventType, + backend_data, worker_grpc_client::WorkerGrpcClient, CheckpointReq, CommitReq, JobFinishedReq, + LoadCompactedDataReq, OperatorCommitData, ParquetStoreData, StopExecutionReq, StopMode, + TableCommitData, TaskCheckpointEventType, }; use arroyo_state::{BackingStore, StateBackend}; use arroyo_types::{to_micros, WorkerId}; @@ -433,6 +434,7 @@ impl RunningJobModel { DbCheckpointState::committing, ) .await?; + let committing_data = committing_state.committing_data(); self.checkpoint_state = Some(CheckpointingOrCommittingState::Committing(committing_state)); info!( @@ -443,12 +445,9 @@ impl RunningJobModel { for worker in self.workers.values_mut() { worker .connect - .checkpoint(Request::new(CheckpointReq { - timestamp: to_micros(SystemTime::now()), - min_epoch: self.min_epoch, + .commit(Request::new(CommitReq { epoch: self.epoch, - then_stop: false, - is_commit: true, + committing_data: committing_data.clone(), })) .await?; } @@ -707,7 +706,7 @@ impl JobController { } pub async fn send_commit_messages(&mut self) -> anyhow::Result<()> { - let Some(CheckpointingOrCommittingState::Committing(_committing)) = + let Some(CheckpointingOrCommittingState::Committing(committing)) = &self.model.checkpoint_state else { bail!("should be committing") @@ -715,13 +714,10 @@ impl JobController { for worker in self.model.workers.values_mut() { worker .connect - .checkpoint(Request::new(CheckpointReq { - timestamp: to_micros(SystemTime::now()), - min_epoch: self.model.min_epoch, + .commit(CommitReq { epoch: self.model.epoch, - then_stop: false, - is_commit: true, - })) + committing_data: committing.committing_data(), + }) .await?; } Ok(()) diff --git a/arroyo-controller/src/states/scheduling.rs b/arroyo-controller/src/states/scheduling.rs index 52982e16a..e490f755e 100644 --- a/arroyo-controller/src/states/scheduling.rs +++ b/arroyo-controller/src/states/scheduling.rs @@ -14,7 +14,9 @@ use tonic::{transport::Channel, Request}; use tracing::{error, info, warn}; use anyhow::anyhow; -use arroyo_state::{parquet::get_storage_env_vars, BackingStore, StateBackend}; +use arroyo_state::{ + committing_state::CommittingState, parquet::get_storage_env_vars, BackingStore, StateBackend, +}; use crate::{ job_controller::JobController, @@ -368,6 +370,7 @@ impl State for Scheduling { metadata.min_epoch = min_epoch; if needs_commits { let mut commit_subtasks = HashSet::new(); + let mut committing_data = HashMap::new(); for operator_id in &metadata.operator_ids { let operator_metadata = StateBackend::load_operator_metadata(&ctx.config.id, operator_id, epoch) @@ -378,6 +381,7 @@ impl State for Scheduling { operator_id, ctx.config.id ); }; + committing_data.insert(operator_id.clone(), operator_metadata.backend_data); if operator_metadata.has_state && operator_metadata .tables @@ -396,7 +400,7 @@ impl State for Scheduling { } } } - committing_state = Some((id, commit_subtasks)); + committing_state = Some(CommittingState::new(id, commit_subtasks, committing_data)); } StateBackend::write_checkpoint_metadata(metadata).await; } diff --git a/arroyo-datastream/src/lib.rs b/arroyo-datastream/src/lib.rs index 7faf73691..c17b5b81d 100644 --- a/arroyo-datastream/src/lib.rs +++ b/arroyo-datastream/src/lib.rs @@ -17,7 +17,7 @@ use arroyo_types::{Data, GlobalKey, JoinType, Key}; use bincode::{Decode, Encode}; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::EdgeRef; -use proc_macro2::Ident; +use proc_macro2::{Ident, TokenStream}; use quote::format_ident; use quote::quote; use serde::{Deserialize, Serialize}; @@ -1365,7 +1365,7 @@ impl Program { let out_t = parse_type(&output.unwrap().weight().value); let strukt = parse_type(&c.operator); - let config = &c.config; + let config: TokenStream = quote!(c.config); quote! { Box::new(#strukt::<#out_k, #out_t>::from_config(#config)) } diff --git a/arroyo-macro/src/lib.rs b/arroyo-macro/src/lib.rs index a58f36c08..2d15b0666 100644 --- a/arroyo-macro/src/lib.rs +++ b/arroyo-macro/src/lib.rs @@ -529,8 +529,8 @@ fn impl_stream_node_type( match control_message { arroyo_rpc::ControlMessage::Checkpoint(_) => tracing::warn!("shouldn't receive checkpoint"), arroyo_rpc::ControlMessage::Stop { mode: _ } => tracing::warn!("shouldn't receive stop"), - arroyo_rpc::ControlMessage::Commit { epoch } => { - self.handle_commit(epoch, &mut ctx).await; + arroyo_rpc::ControlMessage::Commit { epoch, commit_data } => { + self.handle_commit(epoch, commit_data, &mut ctx).await; }, arroyo_rpc::ControlMessage::LoadCompacted { compacted } => { ctx.load_compacted(compacted).await; @@ -802,7 +802,7 @@ fn impl_stream_node_type( if !methods.contains("handle_commit") { defs.push(quote! { - async fn handle_commit(&mut self, epoch: u32, ctx: &mut Context<#out_k, #out_t>) { + async fn handle_commit(&mut self, epoch: u32, commit_data: std::collections::HashMap>>, ctx: &mut Context<#out_k, #out_t>) { tracing::warn!("default handling of commit with epoch {:?}", epoch); } }) diff --git a/arroyo-rpc/proto/rpc.proto b/arroyo-rpc/proto/rpc.proto index e483bd84e..ce6eb8fbf 100644 --- a/arroyo-rpc/proto/rpc.proto +++ b/arroyo-rpc/proto/rpc.proto @@ -230,6 +230,9 @@ message BackendData { oneof backend_data { ParquetStoreData parquet_store = 3; } + // this is commit messages that may need to be distributed + // around all subtasks, such as if subtask 0 is responsible for committing data + optional bytes committing_data = 4; } message OperatorCheckpointMetadata { @@ -315,6 +318,23 @@ message CheckpointReq { message CheckpointResp { } +message CommitReq { + uint32 epoch = 1; + map committing_data = 2; +} + +message CommitResp { +} + +message OperatorCommitData { + // map from table to commit data for that table. + map committing_data = 1; +} + +message TableCommitData { + repeated BackendData backend_data = 1; +} + message LoadCompactedDataReq { string operator_id = 1; repeated BackendData backend_data_to_drop = 2; // this data got compressed... @@ -347,6 +367,7 @@ message JobFinishedResp { service WorkerGrpc { rpc StartExecution(StartExecutionReq) returns (StartExecutionResp); rpc Checkpoint(CheckpointReq) returns (CheckpointResp); + rpc Commit(CommitReq) returns (CommitResp); rpc LoadCompactedData(LoadCompactedDataReq) returns (LoadCompactedDataRes); rpc StopExecution(StopExecutionReq) returns (StopExecutionResp); rpc JobFinished(JobFinishedReq) returns (JobFinishedResp); diff --git a/arroyo-rpc/src/lib.rs b/arroyo-rpc/src/lib.rs index ca4431030..9d6778d7d 100644 --- a/arroyo-rpc/src/lib.rs +++ b/arroyo-rpc/src/lib.rs @@ -2,6 +2,7 @@ pub mod api_types; pub mod formats; pub mod public_ids; +use std::collections::HashMap; use std::{fs, time::SystemTime}; use crate::api_types::connections::PrimitiveType; @@ -32,9 +33,16 @@ pub mod grpc { #[derive(Debug)] pub enum ControlMessage { Checkpoint(CheckpointBarrier), - Stop { mode: StopMode }, - Commit { epoch: u32 }, - LoadCompacted { compacted: CompactionResult }, + Stop { + mode: StopMode, + }, + Commit { + epoch: u32, + commit_data: HashMap>>, + }, + LoadCompacted { + compacted: CompactionResult, + }, NoOp, } diff --git a/arroyo-state/src/checkpoint_state.rs b/arroyo-state/src/checkpoint_state.rs index 08899db6e..7975be3e6 100644 --- a/arroyo-state/src/checkpoint_state.rs +++ b/arroyo-state/src/checkpoint_state.rs @@ -23,6 +23,7 @@ pub struct CheckpointState { tasks: HashMap>, completed_operators: HashSet, subtasks_to_commit: HashSet<(String, u32)>, + committing_backend_data: HashMap>, // Used for the web ui -- eventually should be replaced with some other way of tracking / reporting // this data @@ -47,6 +48,7 @@ impl CheckpointState { tasks: HashMap::new(), completed_operators: HashSet::new(), subtasks_to_commit: HashSet::new(), + committing_backend_data: HashMap::new(), operator_details: HashMap::new(), } } @@ -265,6 +267,46 @@ impl CheckpointState { .filter_map(Self::backend_data_to_key) .collect(); + if let Some(subtask_state) = subtasks.get(&self.epoch) { + let committing_tables = subtask_state + .metadata + .as_ref() + .unwrap() + .tables + .iter() + .filter_map(|table| { + if table.write_behavior() == TableWriteBehavior::CommitWrites { + Some(table.name.clone()) + } else { + None + } + }) + .collect::>(); + let backend_datas: Vec<_> = subtask_state + .metadata + .as_ref() + .unwrap() + .backend_data + .iter() + .filter_map(|backend_data| { + let Some(backend_data::BackendData::ParquetStore(parquet_backend)) = + &backend_data.backend_data + else { + return None; + }; + if committing_tables.contains(&parquet_backend.table) { + Some(backend_data.clone()) + } else { + None + } + }) + .collect(); + if !backend_datas.is_empty() { + self.committing_backend_data + .insert(operator_id.clone(), backend_datas); + } + } + let size = subtasks .values() .fold(0, |size, s| size + s.metadata.as_ref().unwrap().bytes); @@ -307,7 +349,11 @@ impl CheckpointState { } pub fn committing_state(&self) -> CommittingState { - CommittingState::new(self.checkpoint_id, self.subtasks_to_commit.clone()) + CommittingState::new( + self.checkpoint_id, + self.subtasks_to_commit.clone(), + self.committing_backend_data.clone(), + ) } pub async fn save_state(&self) -> anyhow::Result<()> { diff --git a/arroyo-state/src/committing_state.rs b/arroyo-state/src/committing_state.rs index 1439a6c9b..235f7bd06 100644 --- a/arroyo-state/src/committing_state.rs +++ b/arroyo-state/src/committing_state.rs @@ -1,15 +1,23 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; + +use arroyo_rpc::grpc::{backend_data, BackendData, OperatorCommitData, TableCommitData}; pub struct CommittingState { checkpoint_id: i64, subtasks_to_commit: HashSet<(String, u32)>, + committing_data: HashMap>, } impl CommittingState { - pub fn new(checkpoint_id: i64, subtasks_to_commit: HashSet<(String, u32)>) -> Self { + pub fn new( + checkpoint_id: i64, + subtasks_to_commit: HashSet<(String, u32)>, + committing_data: HashMap>, + ) -> Self { Self { checkpoint_id, subtasks_to_commit, + committing_data, } } @@ -25,13 +33,36 @@ impl CommittingState { pub fn done(&self) -> bool { self.subtasks_to_commit.is_empty() } -} -impl From<(i64, HashSet<(String, u32)>)> for CommittingState { - fn from((checkpoint_id, subtasks_to_commit): (i64, HashSet<(String, u32)>)) -> Self { - Self { - checkpoint_id, - subtasks_to_commit, - } + pub fn committing_data(&self) -> HashMap { + self.committing_data + .clone() + .into_iter() + .map(|(operator_id, backend_datas)| { + let mut table_map: HashMap> = HashMap::new(); + for backend_data in backend_datas { + let backend_data::BackendData::ParquetStore(parquet_data) = + &backend_data.backend_data.as_ref().unwrap(); + let table = parquet_data.table.clone(); + table_map.entry(table).or_default().push(backend_data); + } + ( + operator_id, + OperatorCommitData { + committing_data: table_map + .into_iter() + .map(|(table, backing_data)| { + ( + table, + TableCommitData { + backend_data: backing_data, + }, + ) + }) + .collect(), + }, + ) + }) + .collect() } } diff --git a/arroyo-state/src/lib.rs b/arroyo-state/src/lib.rs index c9e638a8c..64fe13bff 100644 --- a/arroyo-state/src/lib.rs +++ b/arroyo-state/src/lib.rs @@ -202,6 +202,7 @@ pub trait BackingStore { async fn get_key_values(&self, table: char) -> Vec<(K, V)>; async fn load_compacted(&mut self, compaction: CompactionResult); + async fn insert_committing_data(&mut self, table: char, committing_data: Vec); } pub struct StateStore { @@ -416,6 +417,12 @@ impl StateStore { pub async fn load_compacted(&mut self, compaction: CompactionResult) { self.backend.load_compacted(compaction).await; } + + pub async fn insert_committing_data(&mut self, table: char, committing_data: Vec) { + self.backend + .insert_committing_data(table, committing_data) + .await + } } #[cfg(test)] diff --git a/arroyo-state/src/parquet.rs b/arroyo-state/src/parquet.rs index 7858a925a..e52012b23 100644 --- a/arroyo-state/src/parquet.rs +++ b/arroyo-state/src/parquet.rs @@ -483,6 +483,14 @@ impl BackingStore for ParquetBackend { async fn load_compacted(&mut self, compaction: CompactionResult) { self.writer.load_compacted_data(compaction).await; } + + async fn insert_committing_data(&mut self, table: char, commit_data: Vec) { + self.writer + .sender + .send(ParquetQueueItem::CommitData { table, commit_data }) + .await + .unwrap(); + } } impl ParquetBackend { @@ -683,6 +691,7 @@ impl ParquetBackend { file.file.clone(), grpc::BackendData { backend_data: Some(BackendData::ParquetStore(file.clone())), + committing_data: None, }, ); } @@ -701,6 +710,7 @@ impl ParquetBackend { if let Some(p) = compact_parquet_store_data { backend_data_to_load.push(grpc::BackendData { backend_data: Some(BackendData::ParquetStore(p)), + committing_data: None, }); } } @@ -1085,6 +1095,7 @@ impl ParquetWriter { enum ParquetQueueItem { Write(ParquetWrite), Checkpoint(ParquetCheckpoint), + CommitData { table: char, commit_data: Vec }, } #[derive(Debug)] @@ -1309,6 +1320,8 @@ impl ParquetFlusher { async fn flush_iteration(&mut self) -> Result { let mut checkpoint_epoch = None; + let mut table_to_commit_data = HashMap::new(); + // accumulate writes in the RecordBatchBuilders until we get a checkpoint while checkpoint_epoch.is_none() { tokio::select! { @@ -1329,6 +1342,9 @@ impl ParquetFlusher { Some(ParquetQueueItem::Checkpoint(epoch)) => { checkpoint_epoch = Some(epoch); }, + Some(ParquetQueueItem::CommitData{table, commit_data}) => { + table_to_commit_data.insert(table, commit_data); + } None => { debug!("Parquet flusher closed"); return Ok(false); @@ -1399,6 +1415,7 @@ impl ParquetFlusher { } checkpoint_backend_data.push(grpc::BackendData { backend_data: Some(BackendData::ParquetStore(file.clone())), + committing_data: table_to_commit_data.remove(&table), }); new_current_files .entry(table) diff --git a/arroyo-worker/src/connectors/filesystem/local.rs b/arroyo-worker/src/connectors/filesystem/local.rs index 80121f36e..4d05dad24 100644 --- a/arroyo-worker/src/connectors/filesystem/local.rs +++ b/arroyo-worker/src/connectors/filesystem/local.rs @@ -191,6 +191,7 @@ impl + Send + 'static> Two &mut self, _task_info: &TaskInfo, pre_commit: Vec, + commit_data: HashMap, ) -> Result<()> { for FilePreCommit { tmp_file, diff --git a/arroyo-worker/src/connectors/filesystem/mod.rs b/arroyo-worker/src/connectors/filesystem/mod.rs index 611602a62..7d0cc9d79 100644 --- a/arroyo-worker/src/connectors/filesystem/mod.rs +++ b/arroyo-worker/src/connectors/filesystem/mod.rs @@ -18,7 +18,7 @@ use futures::{stream::StreamExt, TryStreamExt}; use object_store::{path::Path, MultipartId, UploadPart}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::warn; +use tracing::{info, warn}; use typify::import_types; import_types!(schema = "../connector-schemas/filesystem/table.json"); @@ -1401,7 +1401,12 @@ impl + Send + 'static> &mut self, _task_info: &TaskInfo, pre_commit: Vec, + commit_data: HashMap, ) -> Result<()> { + info!("multipart commit running"); + for (filename, file_to_finish) in commit_data { + warn!("finishing {} with {:?}", filename, file_to_finish); + } self.sender .send(FileSystemMessages::FilesToFinish(pre_commit)) .await?; diff --git a/arroyo-worker/src/connectors/impulse.rs b/arroyo-worker/src/connectors/impulse.rs index 4b94c6a9a..7ab04b650 100644 --- a/arroyo-worker/src/connectors/impulse.rs +++ b/arroyo-worker/src/connectors/impulse.rs @@ -167,7 +167,7 @@ impl ImpulseSourceFunc { } } } - Ok(ControlMessage::Commit { epoch: _ }) => { + Ok(ControlMessage::Commit { .. }) => { unreachable!("sources shouldn't receive commit messages"); } Ok(ControlMessage::LoadCompacted { compacted }) => { diff --git a/arroyo-worker/src/connectors/kafka/sink/mod.rs b/arroyo-worker/src/connectors/kafka/sink/mod.rs index 7d181663f..ba5bd17e1 100644 --- a/arroyo-worker/src/connectors/kafka/sink/mod.rs +++ b/arroyo-worker/src/connectors/kafka/sink/mod.rs @@ -251,7 +251,12 @@ impl KafkaSinkFunc { } } - async fn handle_commit(&mut self, epoch: u32, ctx: &mut crate::engine::Context<(), ()>) { + async fn handle_commit( + &mut self, + epoch: u32, + commit_data: HashMap>>, + ctx: &mut crate::engine::Context<(), ()>, + ) { let ConsistencyMode::ExactlyOnce { next_transaction_index: _, producer_to_complete, @@ -295,8 +300,8 @@ impl KafkaSinkFunc { if !self.is_committing() { return; } - if let Some(ControlMessage::Commit { epoch }) = ctx.control_rx.recv().await { - self.handle_commit(epoch, ctx).await; + if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await { + self.handle_commit(epoch, commit_data, ctx).await; } else { warn!("no commit message received, not committing") } diff --git a/arroyo-worker/src/connectors/kafka/source/mod.rs b/arroyo-worker/src/connectors/kafka/source/mod.rs index 97481753a..57707323a 100644 --- a/arroyo-worker/src/connectors/kafka/source/mod.rs +++ b/arroyo-worker/src/connectors/kafka/source/mod.rs @@ -303,7 +303,7 @@ where } } } - Some(ControlMessage::Commit { epoch: _ }) => { + Some(ControlMessage::Commit { .. }) => { unreachable!("sources shouldn't receive commit messages"); } Some(ControlMessage::LoadCompacted {compacted}) => { diff --git a/arroyo-worker/src/connectors/kinesis/source/mod.rs b/arroyo-worker/src/connectors/kinesis/source/mod.rs index f6af5118b..683267003 100644 --- a/arroyo-worker/src/connectors/kinesis/source/mod.rs +++ b/arroyo-worker/src/connectors/kinesis/source/mod.rs @@ -450,7 +450,7 @@ impl KinesisSourceFunc { } } } - Some(ControlMessage::Commit { epoch: _ }) => { + Some(ControlMessage::Commit { .. }) => { unreachable!("sources shouldn't receive commit messages"); } Some(ControlMessage::LoadCompacted { compacted }) => { @@ -458,7 +458,6 @@ impl KinesisSourceFunc { }, Some(ControlMessage::NoOp ) => {} None => { - } } } diff --git a/arroyo-worker/src/connectors/polling_http.rs b/arroyo-worker/src/connectors/polling_http.rs index 591f883b3..e8b01ab24 100644 --- a/arroyo-worker/src/connectors/polling_http.rs +++ b/arroyo-worker/src/connectors/polling_http.rs @@ -157,7 +157,7 @@ where } } } - ControlMessage::Commit { epoch: _ } => { + ControlMessage::Commit { .. } => { unreachable!("sources shouldn't receive commit messages"); } ControlMessage::LoadCompacted { compacted } => { diff --git a/arroyo-worker/src/connectors/sse.rs b/arroyo-worker/src/connectors/sse.rs index 8099ac29e..445e9f729 100644 --- a/arroyo-worker/src/connectors/sse.rs +++ b/arroyo-worker/src/connectors/sse.rs @@ -136,7 +136,7 @@ where } } } - ControlMessage::Commit { epoch: _ } => { + ControlMessage::Commit { .. } => { unreachable!("sources shouldn't receive commit messages"); } ControlMessage::LoadCompacted { compacted } => { diff --git a/arroyo-worker/src/connectors/two_phase_committer.rs b/arroyo-worker/src/connectors/two_phase_committer.rs index 4e5e1862c..7920c3323 100644 --- a/arroyo-worker/src/connectors/two_phase_committer.rs +++ b/arroyo-worker/src/connectors/two_phase_committer.rs @@ -10,6 +10,7 @@ use arroyo_rpc::{ use arroyo_state::tables::global_keyed_map::GlobalKeyedState; use arroyo_types::{Data, Key, Record, TaskInfo, Watermark}; use async_trait::async_trait; +use bincode::config; use tracing::warn; #[derive(StreamNode)] @@ -52,6 +53,7 @@ pub trait TwoPhaseCommitter: Send + 'static { &mut self, task_info: &TaskInfo, pre_commit: Vec, + operator_data: HashMap, ) -> Result<()>; async fn checkpoint( &mut self, @@ -129,13 +131,25 @@ impl> TwoPhaseCommitterOper } async fn on_close(&mut self, ctx: &mut crate::engine::Context<(), ()>) { - if let Some(ControlMessage::Commit { epoch }) = ctx.control_rx.recv().await { - self.handle_commit(epoch, ctx).await; + if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await { + self.handle_commit(epoch, commit_data, ctx).await; } else { warn!("no commit message received, not committing") } } + fn map_from_serialized_data(serialized_data: Vec>) -> HashMap { + let mut result_map = HashMap::new(); + serialized_data.into_iter().for_each(|map_bytes| { + let map: HashMap = + bincode::decode_from_slice(&map_bytes, config::standard()) + .unwrap() + .0; + result_map.extend(map); + }); + result_map + } + async fn handle_checkpoint( &mut self, checkpoint_barrier: &arroyo_types::CheckpointBarrier, @@ -155,6 +169,12 @@ impl> TwoPhaseCommitterOper ) .await .unwrap(); + let serialized_pre_commits = + bincode::encode_to_vec(&pre_commits, config::standard()).unwrap(); + ctx.state + .insert_committing_data('p', serialized_pre_commits) + .await; + let mut recovery_data_state: GlobalKeyedState = ctx.state.get_global_keyed_state('r').await; recovery_data_state @@ -168,11 +188,23 @@ impl> TwoPhaseCommitterOper pre_commit_state.insert(key, value).await; } } - async fn handle_commit(&mut self, epoch: u32, ctx: &mut crate::engine::Context<(), ()>) { + async fn handle_commit( + &mut self, + epoch: u32, + mut commit_data: HashMap>>, + ctx: &mut crate::engine::Context<(), ()>, + ) { let pre_commits = self.pre_commits.clone(); self.pre_commits.clear(); self.committer - .commit(&ctx.task_info, pre_commits) + .commit( + &ctx.task_info, + pre_commits, + commit_data + .remove(&'p') + .map(|serialized_data| Self::map_from_serialized_data(serialized_data)) + .unwrap_or_default(), + ) .await .expect("committer committed"); let checkpoint_event = arroyo_rpc::ControlResp::CheckpointEvent(CheckpointEvent { diff --git a/arroyo-worker/src/connectors/websocket.rs b/arroyo-worker/src/connectors/websocket.rs index 40acda754..58fd27d1b 100644 --- a/arroyo-worker/src/connectors/websocket.rs +++ b/arroyo-worker/src/connectors/websocket.rs @@ -112,7 +112,7 @@ where } } } - ControlMessage::Commit { epoch: _ } => { + ControlMessage::Commit { .. } => { unreachable!("sources shouldn't receive commit messages"); } ControlMessage::LoadCompacted { compacted } => { diff --git a/arroyo-worker/src/lib.rs b/arroyo-worker/src/lib.rs index e373e7405..d3173425c 100644 --- a/arroyo-worker/src/lib.rs +++ b/arroyo-worker/src/lib.rs @@ -9,11 +9,11 @@ use arrow::datatypes::{DataType, Field, Schema}; use arroyo_rpc::grpc::controller_grpc_client::ControllerGrpcClient; use arroyo_rpc::grpc::worker_grpc_server::{WorkerGrpc, WorkerGrpcServer}; use arroyo_rpc::grpc::{ - CheckpointReq, CheckpointResp, HeartbeatReq, JobFinishedReq, JobFinishedResp, - LoadCompactedDataReq, LoadCompactedDataRes, RegisterWorkerReq, StartExecutionReq, - StartExecutionResp, StopExecutionReq, StopExecutionResp, TaskCheckpointCompletedReq, - TaskCheckpointEventReq, TaskFailedReq, TaskFinishedReq, TaskStartedReq, WorkerErrorReq, - WorkerResources, + CheckpointReq, CheckpointResp, CommitReq, CommitResp, HeartbeatReq, JobFinishedReq, + JobFinishedResp, LoadCompactedDataReq, LoadCompactedDataRes, RegisterWorkerReq, + StartExecutionReq, StartExecutionResp, StopExecutionReq, StopExecutionResp, + TaskCheckpointCompletedReq, TaskCheckpointEventReq, TaskFailedReq, TaskFinishedReq, + TaskStartedReq, WorkerErrorReq, WorkerResources, }; use arroyo_server_common::start_admin_server; use arroyo_types::{ @@ -566,7 +566,10 @@ impl WorkerGrpc for WorkerServer { }; for sender in &senders { sender - .send(ControlMessage::Commit { epoch: req.epoch }) + .send(ControlMessage::Commit { + epoch: req.epoch, + commit_data: HashMap::new(), + }) .await .unwrap(); } @@ -599,6 +602,53 @@ impl WorkerGrpc for WorkerServer { Ok(Response::new(CheckpointResp {})) } + async fn commit(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + let sender_commit_map_pairs = { + let state_mutex = self.state.lock().unwrap(); + let Some(state) = state_mutex.as_ref() else { + return Err(Status::failed_precondition( + "Worker has not yet started execution", + )); + }; + info!("committing func"); + let mut sender_commit_map_pairs = vec![]; + info!("committing data is {:?}", req.committing_data); + for (operator_id, commit_operator) in req.committing_data { + let nodes = state.operator_controls.get(&operator_id).unwrap().clone(); + let commit_map: HashMap<_, _> = commit_operator + .committing_data + .into_iter() + .map(|(table, backend_data)| { + ( + table.chars().next().unwrap(), + backend_data + .backend_data + .into_iter() + .filter_map(|backend_data| backend_data.committing_data) + .collect(), + ) + }) + .collect(); + sender_commit_map_pairs.push((nodes, commit_map)); + } + sender_commit_map_pairs + }; + for (senders, commit_map) in sender_commit_map_pairs { + for sender in senders { + sender + .send(ControlMessage::Commit { + epoch: req.epoch, + commit_data: commit_map.clone(), + }) + .await + .unwrap(); + } + } + info!("finished sending commits"); + Ok(Response::new(CommitResp {})) + } + async fn load_compacted_data( &self, request: Request,