Skip to content

Commit

Permalink
message passing now working.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Oct 20, 2023
1 parent 07149a5 commit 0968bbc
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 68 deletions.
5 changes: 2 additions & 3 deletions arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use crate::types::public::StopMode as SqlStopMode;
use anyhow::bail;
use arroyo_datastream::Program;
use arroyo_rpc::grpc::{
backend_data, worker_grpc_client::WorkerGrpcClient, CheckpointReq, CommitReq, JobFinishedReq,
LoadCompactedDataReq, OperatorCommitData, ParquetStoreData, StopExecutionReq, StopMode,
TableCommitData, TaskCheckpointEventType,
worker_grpc_client::WorkerGrpcClient, CheckpointReq, CommitReq, JobFinishedReq,
LoadCompactedDataReq, StopExecutionReq, StopMode, TaskCheckpointEventType,
};
use arroyo_state::{BackingStore, StateBackend};
use arroyo_types::{to_micros, WorkerId};
Expand Down
13 changes: 12 additions & 1 deletion arroyo-controller/src/states/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,18 @@ impl State for Scheduling {
operator_id, ctx.config.id
);
};
committing_data.insert(operator_id.clone(), operator_metadata.backend_data);
if let Some(commit_data) = operator_metadata.commit_data {
committing_data.insert(
operator_id.clone(),
commit_data
.committing_data
.into_iter()
.map(|(table, commit_data_map)| {
(table, commit_data_map.commit_data_by_subtask)
})
.collect(),
);
}
if operator_metadata.has_state
&& operator_metadata
.tables
Expand Down
2 changes: 1 addition & 1 deletion arroyo-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ fn impl_stream_node_type(

if !methods.contains("handle_commit") {
defs.push(quote! {
async fn handle_commit(&mut self, epoch: u32, commit_data: std::collections::HashMap<char, Vec<Vec<u8>>>, ctx: &mut Context<#out_k, #out_t>) {
async fn handle_commit(&mut self, epoch: u32, commit_data: std::collections::HashMap<char, std::collections::HashMap<u32, Vec<u8>>>, ctx: &mut Context<#out_k, #out_t>) {
tracing::warn!("default handling of commit with epoch {:?}", epoch);
}
})
Expand Down
2 changes: 1 addition & 1 deletion arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub enum ControlMessage {
},
Commit {
epoch: u32,
commit_data: HashMap<char, Vec<Vec<u8>>>,
commit_data: HashMap<char, HashMap<u32, Vec<u8>>>,
},
LoadCompacted {
compacted: CompactionResult,
Expand Down
27 changes: 17 additions & 10 deletions arroyo-state/src/committing_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::{HashMap, HashSet};

use arroyo_rpc::grpc::{backend_data, BackendData, OperatorCommitData, TableCommitData};
use arroyo_rpc::grpc::{OperatorCommitData, TableCommitData};

pub struct CommittingState {
checkpoint_id: i64,
Expand Down Expand Up @@ -35,13 +35,19 @@ impl CommittingState {
}

pub fn committing_data(&self) -> HashMap<String, OperatorCommitData> {
self.committing_data
let operators_to_commit: HashSet<_> = self
.subtasks_to_commit
.iter()
.map(|(operator_id, v)| {
(
operator_id.clone(),
OperatorCommitData {
committing_data: v
.map(|(operator_id, _subtask_id)| operator_id.clone())
.collect();
operators_to_commit
.into_iter()
.map(|operator_id| {
let committing_data = self
.committing_data
.get(&operator_id)
.map(|table_map| {
table_map
.iter()
.map(|(table_name, subtask_to_commit_data)| {
(
Expand All @@ -51,9 +57,10 @@ impl CommittingState {
},
)
})
.collect(),
},
)
.collect()
})
.unwrap_or_default();
(operator_id, OperatorCommitData { committing_data })
})
.collect()
}
Expand Down
25 changes: 15 additions & 10 deletions arroyo-state/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,13 @@ impl BackingStore for ParquetBackend {
self.writer.load_compacted_data(compaction).await;
}

async fn insert_committing_data(&mut self, table: char, commit_data: Vec<u8>) {
async fn insert_committing_data(&mut self, table: char, committing_data: Vec<u8>) {
self.writer
.sender
.send(ParquetQueueItem::CommitData { table, commit_data })
.send(ParquetQueueItem::CommitData {
table,
data: committing_data,
})
.await
.unwrap();
}
Expand Down Expand Up @@ -691,7 +694,6 @@ impl ParquetBackend {
file.file.clone(),
grpc::BackendData {
backend_data: Some(BackendData::ParquetStore(file.clone())),
committing_data: None,
},
);
}
Expand All @@ -710,7 +712,6 @@ impl ParquetBackend {
if let Some(p) = compact_parquet_store_data {
backend_data_to_load.push(grpc::BackendData {
backend_data: Some(BackendData::ParquetStore(p)),
committing_data: None,
});
}
}
Expand Down Expand Up @@ -1027,6 +1028,7 @@ impl ParquetWriter {
.map(|table| (table.name.chars().next().unwrap(), table.clone()))
.collect(),
builders: HashMap::new(),
commit_data: HashMap::new(),
current_files,
load_compacted_tx,
new_compacted: vec![],
Expand Down Expand Up @@ -1095,7 +1097,7 @@ impl ParquetWriter {
enum ParquetQueueItem {
Write(ParquetWrite),
Checkpoint(ParquetCheckpoint),
CommitData { table: char, commit_data: Vec<u8> },
CommitData { table: char, data: Vec<u8> },
}

#[derive(Debug)]
Expand Down Expand Up @@ -1236,6 +1238,7 @@ struct ParquetFlusher {
task_info: TaskInfo,
table_descriptors: HashMap<char, TableDescriptor>,
builders: HashMap<char, RecordBatchBuilder>,
commit_data: HashMap<char, Vec<u8>>,
current_files: HashMap<char, BTreeMap<u32, Vec<ParquetStoreData>>>, // table -> epoch -> file
load_compacted_tx: Receiver<CompactionResult>,
new_compacted: Vec<ParquetStoreData>,
Expand Down Expand Up @@ -1320,8 +1323,6 @@ impl ParquetFlusher {
async fn flush_iteration(&mut self) -> Result<bool> {
let mut checkpoint_epoch = None;

let mut table_to_commit_data = HashMap::new();

// accumulate writes in the RecordBatchBuilders until we get a checkpoint
while checkpoint_epoch.is_none() {
tokio::select! {
Expand All @@ -1342,8 +1343,8 @@ impl ParquetFlusher {
Some(ParquetQueueItem::Checkpoint(epoch)) => {
checkpoint_epoch = Some(epoch);
},
Some(ParquetQueueItem::CommitData{table, commit_data}) => {
table_to_commit_data.insert(table, commit_data);
Some(ParquetQueueItem::CommitData{table, data}) => {
self.commit_data.insert(table, data);
}
None => {
debug!("Parquet flusher closed");
Expand Down Expand Up @@ -1415,7 +1416,6 @@ impl ParquetFlusher {
}
checkpoint_backend_data.push(grpc::BackendData {
backend_data: Some(BackendData::ParquetStore(file.clone())),
committing_data: table_to_commit_data.remove(&table),
});
new_current_files
.entry(table)
Expand Down Expand Up @@ -1457,6 +1457,11 @@ impl ParquetFlusher {
watermark: cp.watermark.map(to_micros),
backend_data: checkpoint_backend_data,
bytes: bytes as u64,
committing_data: self
.commit_data
.drain()
.map(|(table, data)| (table.to_string(), data))
.collect(),
};
self.control_tx
.send(ControlResp::CheckpointCompleted(CheckpointCompleted {
Expand Down
1 change: 0 additions & 1 deletion arroyo-worker/src/connectors/filesystem/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ impl<K: Key, D: Data + Sync + Serialize, V: LocalWriter<D> + Send + 'static> Two
&mut self,
_task_info: &TaskInfo,
pre_commit: Vec<Self::PreCommit>,
commit_data: HashMap<String, Self::PreCommit>,
) -> Result<()> {
for FilePreCommit {
tmp_file,
Expand Down
5 changes: 2 additions & 3 deletions arroyo-worker/src/connectors/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1401,11 +1401,10 @@ impl<K: Key, T: Data + Sync, R: MultiPartWriter<InputType = T> + Send + 'static>
&mut self,
_task_info: &TaskInfo,
pre_commit: Vec<Self::PreCommit>,
commit_data: HashMap<String, Self::PreCommit>,
) -> Result<()> {
info!("multipart commit running");
for (filename, file_to_finish) in commit_data {
warn!("finishing {} with {:?}", filename, file_to_finish);
for file_to_finish in &pre_commit {
warn!("finishing {:?}", file_to_finish);
}
self.sender
.send(FileSystemMessages::FilesToFinish(pre_commit))
Expand Down
2 changes: 1 addition & 1 deletion arroyo-worker/src/connectors/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl<K: Key + Serialize, T: SchemaData + Serialize> KafkaSinkFunc<K, T> {
async fn handle_commit(
&mut self,
epoch: u32,
commit_data: HashMap<char, Vec<Vec<u8>>>,
commit_data: HashMap<char, HashMap<u32, Vec<u8>>>,
ctx: &mut crate::engine::Context<(), ()>,
) {
let ConsistencyMode::ExactlyOnce {
Expand Down
1 change: 1 addition & 0 deletions arroyo-worker/src/connectors/kafka/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ async fn test_kafka() {
tables: source::tables(),
backend_data: checkpoint_completed.subtask_metadata.backend_data,
bytes: checkpoint_completed.subtask_metadata.bytes,
commit_data: None,
})
.await;

Expand Down
80 changes: 48 additions & 32 deletions arroyo-worker/src/connectors/two_phase_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,23 @@ pub trait TwoPhaseCommitter<K: Key, T: Data + Sync>: Send + 'static {
&mut self,
task_info: &TaskInfo,
pre_commit: Vec<Self::PreCommit>,
operator_data: HashMap<String, Self::PreCommit>,
) -> Result<()>;
async fn checkpoint(
&mut self,
task_info: &TaskInfo,
watermark: Option<SystemTime>,
stopping: bool,
) -> Result<(Self::DataRecovery, HashMap<String, Self::PreCommit>)>;
fn commit_style(&self) -> CommitStyle {
CommitStyle::PerSubtask
}
}

pub enum CommitStyle {
// Per subtask uses the subtask itself as the committer, writing the pre-commit messages to state for restoration.
PerSubtask,
// Per operator uses subtask 0 as the committer, passing all PreCommit data through the control system/checkpoint metadata.
PerOperator,
}

#[process_fn(in_k = K, in_t = T)]
Expand Down Expand Up @@ -138,16 +147,12 @@ impl<K: Key, T: Data + Sync, TPC: TwoPhaseCommitter<K, T>> TwoPhaseCommitterOper
}
}

fn map_from_serialized_data(serialized_data: Vec<Vec<u8>>) -> HashMap<String, TPC::PreCommit> {
let mut result_map = HashMap::new();
serialized_data.into_iter().for_each(|map_bytes| {
let map: HashMap<String, TPC::PreCommit> =
bincode::decode_from_slice(&map_bytes, config::standard())
.unwrap()
.0;
result_map.extend(map);
});
result_map
fn map_from_serialized_data(serialized_data: Vec<u8>) -> Vec<TPC::PreCommit> {
let map: HashMap<String, TPC::PreCommit> =
bincode::decode_from_slice(&serialized_data, config::standard())
.unwrap()
.0;
map.into_values().collect()
}

async fn handle_checkpoint(
Expand All @@ -169,42 +174,53 @@ impl<K: Key, T: Data + Sync, TPC: TwoPhaseCommitter<K, T>> TwoPhaseCommitterOper
)
.await
.unwrap();
let serialized_pre_commits =
bincode::encode_to_vec(&pre_commits, config::standard()).unwrap();
ctx.state
.insert_committing_data('p', serialized_pre_commits)
.await;

let mut recovery_data_state: GlobalKeyedState<usize, _, _> =
ctx.state.get_global_keyed_state('r').await;
recovery_data_state
.insert(ctx.task_info.task_index, recovery_data)
.await;
let mut pre_commit_state: GlobalKeyedState<String, _, _> =
ctx.state.get_global_keyed_state('p').await;
self.pre_commits.clear();
for (key, value) in pre_commits {
self.pre_commits.push(value.clone());
pre_commit_state.insert(key, value).await;
if pre_commits.is_empty() {
return;
}
let commit_style = self.committer.commit_style();
match commit_style {
CommitStyle::PerSubtask => {
let mut pre_commit_state: GlobalKeyedState<String, _, _> =
ctx.state.get_global_keyed_state('p').await;
for (key, value) in pre_commits {
self.pre_commits.push(value.clone());
pre_commit_state.insert(key, value).await;
}
}
CommitStyle::PerOperator => {
let serialized_pre_commits =
bincode::encode_to_vec(&pre_commits, config::standard()).unwrap();
ctx.state
.insert_committing_data('p', serialized_pre_commits)
.await;
}
}
}
async fn handle_commit(
&mut self,
epoch: u32,
mut commit_data: HashMap<char, Vec<Vec<u8>>>,
mut commit_data: HashMap<char, HashMap<u32, Vec<u8>>>,
ctx: &mut crate::engine::Context<(), ()>,
) {
let pre_commits = self.pre_commits.clone();
self.pre_commits.clear();
let pre_commits = match self.committer.commit_style() {
CommitStyle::PerSubtask => std::mem::take(&mut self.pre_commits),
CommitStyle::PerOperator => commit_data
.remove(&'p')
.unwrap_or_default()
.into_values()
.flat_map(|serialized_data| Self::map_from_serialized_data(serialized_data))
.collect(),
};

self.committer
.commit(
&ctx.task_info,
pre_commits,
commit_data
.remove(&'p')
.map(|serialized_data| Self::map_from_serialized_data(serialized_data))
.unwrap_or_default(),
)
.commit(&ctx.task_info, pre_commits)
.await
.expect("committer committed");
let checkpoint_event = arroyo_rpc::ControlResp::CheckpointEvent(CheckpointEvent {
Expand Down
6 changes: 1 addition & 5 deletions arroyo-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,11 +622,7 @@ impl WorkerGrpc for WorkerServer {
.map(|(table, backend_data)| {
(
table.chars().next().unwrap(),
backend_data
.backend_data
.into_iter()
.filter_map(|backend_data| backend_data.committing_data)
.collect(),
backend_data.commit_data_by_subtask,
)
})
.collect();
Expand Down

0 comments on commit 0968bbc

Please sign in to comment.