Skip to content

Commit

Permalink
initial refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Oct 20, 2023
1 parent 48ed8ae commit 07149a5
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 67 deletions.
4 changes: 2 additions & 2 deletions arroyo-datastream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use arroyo_types::{Data, GlobalKey, JoinType, Key};
use bincode::{Decode, Encode};
use petgraph::graph::{DiGraph, NodeIndex};
use petgraph::visit::EdgeRef;
use proc_macro2::{Ident, TokenStream};
use proc_macro2::Ident;
use quote::format_ident;
use quote::quote;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -1365,7 +1365,7 @@ impl Program {
let out_t = parse_type(&output.unwrap().weight().value);

let strukt = parse_type(&c.operator);
let config: TokenStream = quote!(c.config);
let config = &c.config;
quote! {
Box::new(#strukt::<#out_k, #out_t>::from_config(#config))
}
Expand Down
11 changes: 6 additions & 5 deletions arroyo-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,15 @@ message SubtaskCheckpointMetadata {
uint64 bytes = 7;

repeated BackendData backend_data = 8;
// this is data that should be sent along with the commit message
// to all subtask. The key is the table name.
map<string,bytes> committing_data = 9;
}

message BackendData {
oneof backend_data {
ParquetStoreData parquet_store = 3;
}
// this is commit messages that may need to be distributed
// around all subtasks, such as if subtask 0 is responsible for committing data
optional bytes committing_data = 4;
}

message OperatorCheckpointMetadata {
Expand All @@ -249,6 +249,7 @@ message OperatorCheckpointMetadata {

repeated BackendData backend_data = 10;
uint64 bytes = 11;
OperatorCommitData commit_data = 12;
}

enum TableType {
Expand Down Expand Up @@ -327,12 +328,12 @@ message CommitResp {
}

message OperatorCommitData {
// map from table to commit data for that table.
// map from table name to commit data for that table.
map<string, TableCommitData> committing_data = 1;
}

message TableCommitData {
repeated BackendData backend_data = 1;
map<uint32, bytes> commit_data_by_subtask = 1;
}

message LoadCompactedDataReq {
Expand Down
73 changes: 31 additions & 42 deletions arroyo-state/src/checkpoint_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::committing_state::CommittingState;
use crate::subtask_state::SubtaskState;
use crate::{BackingStore, StateBackend};
use anyhow::{anyhow, bail};
use arroyo_rpc::grpc;
use arroyo_rpc::grpc::api::OperatorCheckpointDetail;
use arroyo_rpc::grpc::{self, OperatorCommitData, TableCommitData};
use arroyo_rpc::grpc::{
api, backend_data, BackendData, CheckpointMetadata, OperatorCheckpointMetadata,
TableDescriptor, TableWriteBehavior, TaskCheckpointCompletedReq, TaskCheckpointEventReq,
Expand All @@ -23,7 +23,7 @@ pub struct CheckpointState {
tasks: HashMap<String, BTreeMap<u32, SubtaskState>>,
completed_operators: HashSet<String>,
subtasks_to_commit: HashSet<(String, u32)>,
committing_backend_data: HashMap<String, Vec<BackendData>>,
committing_backend_data: HashMap<String, HashMap<String, HashMap<u32, Vec<u8>>>>,

// Used for the web ui -- eventually should be replaced with some other way of tracking / reporting
// this data
Expand Down Expand Up @@ -176,6 +176,14 @@ impl CheckpointState {
});
detail.bytes = Some(metadata.bytes);
detail.finish_time = Some(metadata.finish_time);
for (table, committing_data) in &metadata.committing_data {
self.committing_backend_data
.entry(c.operator_id.clone())
.or_default()
.entry(table.to_string())
.or_default()
.insert(metadata.subtask_index, committing_data.clone());
}

// this is for the actual checkpoint management

Expand Down Expand Up @@ -267,46 +275,6 @@ impl CheckpointState {
.filter_map(Self::backend_data_to_key)
.collect();

if let Some(subtask_state) = subtasks.get(&self.epoch) {
let committing_tables = subtask_state
.metadata
.as_ref()
.unwrap()
.tables
.iter()
.filter_map(|table| {
if table.write_behavior() == TableWriteBehavior::CommitWrites {
Some(table.name.clone())
} else {
None
}
})
.collect::<HashSet<_>>();
let backend_datas: Vec<_> = subtask_state
.metadata
.as_ref()
.unwrap()
.backend_data
.iter()
.filter_map(|backend_data| {
let Some(backend_data::BackendData::ParquetStore(parquet_backend)) =
&backend_data.backend_data
else {
return None;
};
if committing_tables.contains(&parquet_backend.table) {
Some(backend_data.clone())
} else {
None
}
})
.collect();
if !backend_datas.is_empty() {
self.committing_backend_data
.insert(operator_id.clone(), backend_datas);
}
}

let size = subtasks
.values()
.fold(0, |size, s| size + s.metadata.as_ref().unwrap().bytes);
Expand All @@ -323,6 +291,27 @@ impl CheckpointState {
tables: tables.into_values().collect(),
backend_data: backend_data.into_values().collect(),
bytes: size,
commit_data: self
.committing_backend_data
.get(&operator_id)
.map(|commit_data| OperatorCommitData {
committing_data: commit_data
.iter()
.map(|(table_name, subtask_to_commit_data)| {
(
table_name.clone(),
TableCommitData {
commit_data_by_subtask: subtask_to_commit_data
.iter()
.map(|(subtask_index, commit_data)| {
(*subtask_index, commit_data.clone())
})
.collect(),
},
)
})
.collect(),
}),
})
.await;

Expand Down
28 changes: 10 additions & 18 deletions arroyo-state/src/committing_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use arroyo_rpc::grpc::{backend_data, BackendData, OperatorCommitData, TableCommi
pub struct CommittingState {
checkpoint_id: i64,
subtasks_to_commit: HashSet<(String, u32)>,
committing_data: HashMap<String, Vec<BackendData>>,
committing_data: HashMap<String, HashMap<String, HashMap<u32, Vec<u8>>>>,
}

impl CommittingState {
pub fn new(
checkpoint_id: i64,
subtasks_to_commit: HashSet<(String, u32)>,
committing_data: HashMap<String, Vec<BackendData>>,
committing_data: HashMap<String, HashMap<String, HashMap<u32, Vec<u8>>>>,
) -> Self {
Self {
checkpoint_id,
Expand All @@ -36,26 +36,18 @@ impl CommittingState {

pub fn committing_data(&self) -> HashMap<String, OperatorCommitData> {
self.committing_data
.clone()
.into_iter()
.map(|(operator_id, backend_datas)| {
let mut table_map: HashMap<String, Vec<_>> = HashMap::new();
for backend_data in backend_datas {
let backend_data::BackendData::ParquetStore(parquet_data) =
&backend_data.backend_data.as_ref().unwrap();
let table = parquet_data.table.clone();
table_map.entry(table).or_default().push(backend_data);
}
.iter()
.map(|(operator_id, v)| {
(
operator_id,
operator_id.clone(),
OperatorCommitData {
committing_data: table_map
.into_iter()
.map(|(table, backing_data)| {
committing_data: v
.iter()
.map(|(table_name, subtask_to_commit_data)| {
(
table,
table_name.clone(),
TableCommitData {
backend_data: backing_data,
commit_data_by_subtask: subtask_to_commit_data.clone(),
},
)
})
Expand Down
1 change: 1 addition & 0 deletions arroyo-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ mod test {
tables: default_tables(),
backend_data: message.subtask_metadata.backend_data,
bytes: 5,
commit_data: None,
})
.await;

Expand Down

0 comments on commit 07149a5

Please sign in to comment.