Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Oct 20, 2023
1 parent 08e43b8 commit 48ed8ae
Show file tree
Hide file tree
Showing 21 changed files with 277 additions and 55 deletions.
24 changes: 10 additions & 14 deletions arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::types::public::StopMode as SqlStopMode;
use anyhow::bail;
use arroyo_datastream::Program;
use arroyo_rpc::grpc::{
worker_grpc_client::WorkerGrpcClient, CheckpointReq, JobFinishedReq, LoadCompactedDataReq,
StopExecutionReq, StopMode, TaskCheckpointEventType,
backend_data, worker_grpc_client::WorkerGrpcClient, CheckpointReq, CommitReq, JobFinishedReq,
LoadCompactedDataReq, OperatorCommitData, ParquetStoreData, StopExecutionReq, StopMode,
TableCommitData, TaskCheckpointEventType,
};
use arroyo_state::{BackingStore, StateBackend};
use arroyo_types::{to_micros, WorkerId};
Expand Down Expand Up @@ -433,6 +434,7 @@ impl RunningJobModel {
DbCheckpointState::committing,
)
.await?;
let committing_data = committing_state.committing_data();
self.checkpoint_state =
Some(CheckpointingOrCommittingState::Committing(committing_state));
info!(
Expand All @@ -443,12 +445,9 @@ impl RunningJobModel {
for worker in self.workers.values_mut() {
worker
.connect
.checkpoint(Request::new(CheckpointReq {
timestamp: to_micros(SystemTime::now()),
min_epoch: self.min_epoch,
.commit(Request::new(CommitReq {
epoch: self.epoch,
then_stop: false,
is_commit: true,
committing_data: committing_data.clone(),
}))
.await?;
}
Expand Down Expand Up @@ -707,21 +706,18 @@ impl JobController {
}

pub async fn send_commit_messages(&mut self) -> anyhow::Result<()> {
let Some(CheckpointingOrCommittingState::Committing(_committing)) =
let Some(CheckpointingOrCommittingState::Committing(committing)) =
&self.model.checkpoint_state
else {
bail!("should be committing")
};
for worker in self.model.workers.values_mut() {
worker
.connect
.checkpoint(Request::new(CheckpointReq {
timestamp: to_micros(SystemTime::now()),
min_epoch: self.model.min_epoch,
.commit(CommitReq {
epoch: self.model.epoch,
then_stop: false,
is_commit: true,
}))
committing_data: committing.committing_data(),
})
.await?;
}
Ok(())
Expand Down
8 changes: 6 additions & 2 deletions arroyo-controller/src/states/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tonic::{transport::Channel, Request};
use tracing::{error, info, warn};

use anyhow::anyhow;
use arroyo_state::{parquet::get_storage_env_vars, BackingStore, StateBackend};
use arroyo_state::{
committing_state::CommittingState, parquet::get_storage_env_vars, BackingStore, StateBackend,
};

use crate::{
job_controller::JobController,
Expand Down Expand Up @@ -368,6 +370,7 @@ impl State for Scheduling {
metadata.min_epoch = min_epoch;
if needs_commits {
let mut commit_subtasks = HashSet::new();
let mut committing_data = HashMap::new();
for operator_id in &metadata.operator_ids {
let operator_metadata =
StateBackend::load_operator_metadata(&ctx.config.id, operator_id, epoch)
Expand All @@ -378,6 +381,7 @@ impl State for Scheduling {
operator_id, ctx.config.id
);
};
committing_data.insert(operator_id.clone(), operator_metadata.backend_data);
if operator_metadata.has_state
&& operator_metadata
.tables
Expand All @@ -396,7 +400,7 @@ impl State for Scheduling {
}
}
}
committing_state = Some((id, commit_subtasks));
committing_state = Some(CommittingState::new(id, commit_subtasks, committing_data));
}
StateBackend::write_checkpoint_metadata(metadata).await;
}
Expand Down
4 changes: 2 additions & 2 deletions arroyo-datastream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use arroyo_types::{Data, GlobalKey, JoinType, Key};
use bincode::{Decode, Encode};
use petgraph::graph::{DiGraph, NodeIndex};
use petgraph::visit::EdgeRef;
use proc_macro2::Ident;
use proc_macro2::{Ident, TokenStream};
use quote::format_ident;
use quote::quote;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -1365,7 +1365,7 @@ impl Program {
let out_t = parse_type(&output.unwrap().weight().value);

let strukt = parse_type(&c.operator);
let config = &c.config;
let config: TokenStream = quote!(c.config);
quote! {
Box::new(#strukt::<#out_k, #out_t>::from_config(#config))
}
Expand Down
6 changes: 3 additions & 3 deletions arroyo-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ fn impl_stream_node_type(
match control_message {
arroyo_rpc::ControlMessage::Checkpoint(_) => tracing::warn!("shouldn't receive checkpoint"),
arroyo_rpc::ControlMessage::Stop { mode: _ } => tracing::warn!("shouldn't receive stop"),
arroyo_rpc::ControlMessage::Commit { epoch } => {
self.handle_commit(epoch, &mut ctx).await;
arroyo_rpc::ControlMessage::Commit { epoch, commit_data } => {
self.handle_commit(epoch, commit_data, &mut ctx).await;
},
arroyo_rpc::ControlMessage::LoadCompacted { compacted } => {
ctx.load_compacted(compacted).await;
Expand Down Expand Up @@ -802,7 +802,7 @@ fn impl_stream_node_type(

if !methods.contains("handle_commit") {
defs.push(quote! {
async fn handle_commit(&mut self, epoch: u32, ctx: &mut Context<#out_k, #out_t>) {
async fn handle_commit(&mut self, epoch: u32, commit_data: std::collections::HashMap<char, Vec<Vec<u8>>>, ctx: &mut Context<#out_k, #out_t>) {
tracing::warn!("default handling of commit with epoch {:?}", epoch);
}
})
Expand Down
21 changes: 21 additions & 0 deletions arroyo-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ message BackendData {
oneof backend_data {
ParquetStoreData parquet_store = 3;
}
// this is commit messages that may need to be distributed
// around all subtasks, such as if subtask 0 is responsible for committing data
optional bytes committing_data = 4;
}

message OperatorCheckpointMetadata {
Expand Down Expand Up @@ -315,6 +318,23 @@ message CheckpointReq {
message CheckpointResp {
}

message CommitReq {
uint32 epoch = 1;
map<string, OperatorCommitData> committing_data = 2;
}

message CommitResp {
}

message OperatorCommitData {
// map from table to commit data for that table.
map<string, TableCommitData> committing_data = 1;
}

message TableCommitData {
repeated BackendData backend_data = 1;
}

message LoadCompactedDataReq {
string operator_id = 1;
repeated BackendData backend_data_to_drop = 2; // this data got compressed...
Expand Down Expand Up @@ -347,6 +367,7 @@ message JobFinishedResp {
service WorkerGrpc {
rpc StartExecution(StartExecutionReq) returns (StartExecutionResp);
rpc Checkpoint(CheckpointReq) returns (CheckpointResp);
rpc Commit(CommitReq) returns (CommitResp);
rpc LoadCompactedData(LoadCompactedDataReq) returns (LoadCompactedDataRes);
rpc StopExecution(StopExecutionReq) returns (StopExecutionResp);
rpc JobFinished(JobFinishedReq) returns (JobFinishedResp);
Expand Down
14 changes: 11 additions & 3 deletions arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod api_types;
pub mod formats;
pub mod public_ids;

use std::collections::HashMap;
use std::{fs, time::SystemTime};

use crate::api_types::connections::PrimitiveType;
Expand Down Expand Up @@ -32,9 +33,16 @@ pub mod grpc {
#[derive(Debug)]
pub enum ControlMessage {
Checkpoint(CheckpointBarrier),
Stop { mode: StopMode },
Commit { epoch: u32 },
LoadCompacted { compacted: CompactionResult },
Stop {
mode: StopMode,
},
Commit {
epoch: u32,
commit_data: HashMap<char, Vec<Vec<u8>>>,
},
LoadCompacted {
compacted: CompactionResult,
},
NoOp,
}

Expand Down
48 changes: 47 additions & 1 deletion arroyo-state/src/checkpoint_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct CheckpointState {
tasks: HashMap<String, BTreeMap<u32, SubtaskState>>,
completed_operators: HashSet<String>,
subtasks_to_commit: HashSet<(String, u32)>,
committing_backend_data: HashMap<String, Vec<BackendData>>,

// Used for the web ui -- eventually should be replaced with some other way of tracking / reporting
// this data
Expand All @@ -47,6 +48,7 @@ impl CheckpointState {
tasks: HashMap::new(),
completed_operators: HashSet::new(),
subtasks_to_commit: HashSet::new(),
committing_backend_data: HashMap::new(),
operator_details: HashMap::new(),
}
}
Expand Down Expand Up @@ -265,6 +267,46 @@ impl CheckpointState {
.filter_map(Self::backend_data_to_key)
.collect();

if let Some(subtask_state) = subtasks.get(&self.epoch) {
let committing_tables = subtask_state
.metadata
.as_ref()
.unwrap()
.tables
.iter()
.filter_map(|table| {
if table.write_behavior() == TableWriteBehavior::CommitWrites {
Some(table.name.clone())
} else {
None
}
})
.collect::<HashSet<_>>();
let backend_datas: Vec<_> = subtask_state
.metadata
.as_ref()
.unwrap()
.backend_data
.iter()
.filter_map(|backend_data| {
let Some(backend_data::BackendData::ParquetStore(parquet_backend)) =
&backend_data.backend_data
else {
return None;
};
if committing_tables.contains(&parquet_backend.table) {
Some(backend_data.clone())
} else {
None
}
})
.collect();
if !backend_datas.is_empty() {
self.committing_backend_data
.insert(operator_id.clone(), backend_datas);
}
}

let size = subtasks
.values()
.fold(0, |size, s| size + s.metadata.as_ref().unwrap().bytes);
Expand Down Expand Up @@ -307,7 +349,11 @@ impl CheckpointState {
}

pub fn committing_state(&self) -> CommittingState {
CommittingState::new(self.checkpoint_id, self.subtasks_to_commit.clone())
CommittingState::new(
self.checkpoint_id,
self.subtasks_to_commit.clone(),
self.committing_backend_data.clone(),
)
}

pub async fn save_state(&self) -> anyhow::Result<()> {
Expand Down
49 changes: 40 additions & 9 deletions arroyo-state/src/committing_state.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use arroyo_rpc::grpc::{backend_data, BackendData, OperatorCommitData, TableCommitData};

pub struct CommittingState {
checkpoint_id: i64,
subtasks_to_commit: HashSet<(String, u32)>,
committing_data: HashMap<String, Vec<BackendData>>,
}

impl CommittingState {
pub fn new(checkpoint_id: i64, subtasks_to_commit: HashSet<(String, u32)>) -> Self {
pub fn new(
checkpoint_id: i64,
subtasks_to_commit: HashSet<(String, u32)>,
committing_data: HashMap<String, Vec<BackendData>>,
) -> Self {
Self {
checkpoint_id,
subtasks_to_commit,
committing_data,
}
}

Expand All @@ -25,13 +33,36 @@ impl CommittingState {
pub fn done(&self) -> bool {
self.subtasks_to_commit.is_empty()
}
}

impl From<(i64, HashSet<(String, u32)>)> for CommittingState {
fn from((checkpoint_id, subtasks_to_commit): (i64, HashSet<(String, u32)>)) -> Self {
Self {
checkpoint_id,
subtasks_to_commit,
}
pub fn committing_data(&self) -> HashMap<String, OperatorCommitData> {
self.committing_data
.clone()
.into_iter()
.map(|(operator_id, backend_datas)| {
let mut table_map: HashMap<String, Vec<_>> = HashMap::new();
for backend_data in backend_datas {
let backend_data::BackendData::ParquetStore(parquet_data) =
&backend_data.backend_data.as_ref().unwrap();
let table = parquet_data.table.clone();
table_map.entry(table).or_default().push(backend_data);
}
(
operator_id,
OperatorCommitData {
committing_data: table_map
.into_iter()
.map(|(table, backing_data)| {
(
table,
TableCommitData {
backend_data: backing_data,
},
)
})
.collect(),
},
)
})
.collect()
}
}
7 changes: 7 additions & 0 deletions arroyo-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ pub trait BackingStore {
async fn get_key_values<K: Key, V: Data>(&self, table: char) -> Vec<(K, V)>;

async fn load_compacted(&mut self, compaction: CompactionResult);
async fn insert_committing_data(&mut self, table: char, committing_data: Vec<u8>);
}

pub struct StateStore<S: BackingStore> {
Expand Down Expand Up @@ -416,6 +417,12 @@ impl<S: BackingStore> StateStore<S> {
pub async fn load_compacted(&mut self, compaction: CompactionResult) {
self.backend.load_compacted(compaction).await;
}

pub async fn insert_committing_data(&mut self, table: char, committing_data: Vec<u8>) {
self.backend
.insert_committing_data(table, committing_data)
.await
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 48ed8ae

Please sign in to comment.