Skip to content

Commit

Permalink
sql: Implement Delta Lake Sink
Browse files Browse the repository at this point in the history
arroyo-state: added additional message passing data to commit loop, so subtask 0 is responsible for finishing files.
arroyo-worker: introduced CommitStrategy, an enum of PerOperator and PerSubtask to TwoPhaseCommitter
arroyo-connectors: Refactored filesystem connector so that it can be reused by Delta connector.
  • Loading branch information
jacksonrnewhouse committed Oct 27, 2023
1 parent e85e8d3 commit cae7aad
Show file tree
Hide file tree
Showing 28 changed files with 1,006 additions and 204 deletions.
221 changes: 200 additions & 21 deletions Cargo.lock

Large diffs are not rendered by default.

176 changes: 96 additions & 80 deletions arroyo-connectors/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ impl Connector for FileSystemConnector {
table: Self::TableT,
schema: Option<&ConnectionSchema>,
) -> anyhow::Result<crate::Connection> {
// confirm commit style is Direct
if let Some(CommitStyle::Direct) = table
.file_settings
.as_ref()
.ok_or_else(|| anyhow!("no file_settings"))?
.commit_style
{
// ok
} else {
bail!("commit_style must be Direct");
}

let backend_config = BackendConfig::parse_url(&table.write_target.path, true)?;
let is_local = match &backend_config {
BackendConfig::Local { .. } => true,
Expand Down Expand Up @@ -137,88 +149,92 @@ impl Connector for FileSystemConnector {
opts: &mut std::collections::HashMap<String, String>,
schema: Option<&ConnectionSchema>,
) -> anyhow::Result<crate::Connection> {
let storage_options: std::collections::HashMap<String, String> = opts
.iter()
.filter(|(k, _)| k.starts_with("storage."))
.map(|(k, v)| (k.trim_start_matches("storage.").to_string(), v.to_string()))
.collect();
opts.retain(|k, _| !k.starts_with("storage."));

let storage_url = pull_opt("path", opts)?;
BackendConfig::parse_url(&storage_url, true)?;

let inactivity_rollover_seconds = pull_option_to_i64("inactivity_rollover_seconds", opts)?;
let max_parts = pull_option_to_i64("max_parts", opts)?;
let rollover_seconds = pull_option_to_i64("rollover_seconds", opts)?;
let target_file_size = pull_option_to_i64("target_file_size", opts)?;
let target_part_size = pull_option_to_i64("target_part_size", opts)?;

let partition_fields: Vec<_> = opts
.remove("partition_fields")
.map(|fields| fields.split(',').map(|f| f.to_string()).collect())
.unwrap_or_default();

let time_partition_pattern = opts.remove("time_partition_pattern");

let partitioning = if time_partition_pattern.is_some() || !partition_fields.is_empty() {
Some(Partitioning {
time_partition_pattern,
partition_fields,
})
} else {
None
};
let table = file_system_table_from_options(opts, schema, CommitStyle::Direct)?;

let file_settings = Some(FileSettings {
inactivity_rollover_seconds,
max_parts,
rollover_seconds,
target_file_size,
target_part_size,
partitioning,
});
self.from_config(None, name, EmptyConfig {}, table, schema)
}
}

let format_settings = match schema
.ok_or(anyhow!("require schema"))?
.format
.as_ref()
.ok_or(anyhow!(
"filesystem sink requires a format, such as json or parquet"
))? {
Format::Parquet(..) => {
let compression = opts
.remove("parquet_compression")
.map(|value| {
Compression::try_from(&value).map_err(|_err| {
anyhow!("{} is not a valid parquet_compression argument", value)
})
pub fn file_system_table_from_options(
opts: &mut std::collections::HashMap<String, String>,
schema: Option<&ConnectionSchema>,
commit_style: CommitStyle,
) -> Result<FileSystemTable> {
let storage_options: std::collections::HashMap<String, String> = opts
.iter()
.filter(|(k, _)| k.starts_with("storage."))
.map(|(k, v)| (k.trim_start_matches("storage.").to_string(), v.to_string()))
.collect();
opts.retain(|k, _| !k.starts_with("storage."));

let storage_url = pull_opt("path", opts)?;
BackendConfig::parse_url(&storage_url, true)?;

let inactivity_rollover_seconds = pull_option_to_i64("inactivity_rollover_seconds", opts)?;
let max_parts = pull_option_to_i64("max_parts", opts)?;
let rollover_seconds = pull_option_to_i64("rollover_seconds", opts)?;
let target_file_size = pull_option_to_i64("target_file_size", opts)?;
let target_part_size = pull_option_to_i64("target_part_size", opts)?;

let partition_fields: Vec<_> = opts
.remove("partition_fields")
.map(|fields| fields.split(',').map(|f| f.to_string()).collect())
.unwrap_or_default();

let time_partition_pattern = opts.remove("time_partition_pattern");

let partitioning = if time_partition_pattern.is_some() || !partition_fields.is_empty() {
Some(Partitioning {
time_partition_pattern,
partition_fields,
})
} else {
None
};

let file_settings = Some(FileSettings {
inactivity_rollover_seconds,
max_parts,
rollover_seconds,
target_file_size,
target_part_size,
partitioning,
commit_style: Some(commit_style),
});

let format_settings = match schema
.ok_or(anyhow!("require schema"))?
.format
.as_ref()
.ok_or(anyhow!(
"filesystem sink requires a format, such as json or parquet"
))? {
Format::Parquet(..) => {
let compression = opts
.remove("parquet_compression")
.map(|value| {
Compression::try_from(&value).map_err(|_err| {
anyhow!("{} is not a valid parquet_compression argument", value)
})
.transpose()?;
let row_batch_size = pull_option_to_i64("parquet_row_batch_size", opts)?;
let row_group_size = pull_option_to_i64("parquet_row_group_size", opts)?;
Some(FormatSettings::Parquet {
compression,
row_batch_size,
row_group_size,
})
}
Format::Json(..) => Some(FormatSettings::Json {}),
other => bail!("Unsupported format: {:?}", other),
};

self.from_config(
None,
name,
EmptyConfig {},
FileSystemTable {
write_target: FolderUrl {
path: storage_url,
storage_options,
},
file_settings,
format_settings,
},
schema,
)
}
.transpose()?;
let row_batch_size = pull_option_to_i64("parquet_row_batch_size", opts)?;
let row_group_size = pull_option_to_i64("parquet_row_group_size", opts)?;
Some(FormatSettings::Parquet {
compression,
row_batch_size,
row_group_size,
})
}
Format::Json(..) => Some(FormatSettings::Json {}),
other => bail!("Unsupported format: {:?}", other),
};
Ok(FileSystemTable {
write_target: FolderUrl {
path: storage_url,
storage_options,
},
file_settings,
format_settings,
})
}
2 changes: 2 additions & 0 deletions arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use websocket::WebsocketConnector;
use self::kafka::KafkaConnector;

pub mod blackhole;
pub mod delta;
pub mod filesystem;
pub mod fluvio;
pub mod impulse;
Expand All @@ -36,6 +37,7 @@ pub mod websocket;
pub fn connectors() -> HashMap<&'static str, Box<dyn ErasedConnector>> {
let mut m: HashMap<&'static str, Box<dyn ErasedConnector>> = HashMap::new();
m.insert("blackhole", Box::new(BlackholeConnector {}));
m.insert("delta", Box::new(delta::DeltaLakeConnector {}));
m.insert("filesystem", Box::new(filesystem::FileSystemConnector {}));
m.insert("fluvio", Box::new(FluvioConnector {}));
m.insert("impulse", Box::new(ImpulseConnector {}));
Expand Down
23 changes: 9 additions & 14 deletions arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::types::public::StopMode as SqlStopMode;
use anyhow::bail;
use arroyo_datastream::Program;
use arroyo_rpc::grpc::{
worker_grpc_client::WorkerGrpcClient, CheckpointReq, JobFinishedReq, LoadCompactedDataReq,
StopExecutionReq, StopMode, TaskCheckpointEventType,
worker_grpc_client::WorkerGrpcClient, CheckpointReq, CommitReq, JobFinishedReq,
LoadCompactedDataReq, StopExecutionReq, StopMode, TaskCheckpointEventType,
};
use arroyo_state::{BackingStore, StateBackend};
use arroyo_types::{to_micros, WorkerId};
Expand Down Expand Up @@ -433,6 +433,7 @@ impl RunningJobModel {
DbCheckpointState::committing,
)
.await?;
let committing_data = committing_state.committing_data();
self.checkpoint_state =
Some(CheckpointingOrCommittingState::Committing(committing_state));
info!(
Expand All @@ -443,12 +444,9 @@ impl RunningJobModel {
for worker in self.workers.values_mut() {
worker
.connect
.checkpoint(Request::new(CheckpointReq {
timestamp: to_micros(SystemTime::now()),
min_epoch: self.min_epoch,
.commit(Request::new(CommitReq {
epoch: self.epoch,
then_stop: false,
is_commit: true,
committing_data: committing_data.clone(),
}))
.await?;
}
Expand Down Expand Up @@ -707,21 +705,18 @@ impl JobController {
}

pub async fn send_commit_messages(&mut self) -> anyhow::Result<()> {
let Some(CheckpointingOrCommittingState::Committing(_committing)) =
let Some(CheckpointingOrCommittingState::Committing(committing)) =
&self.model.checkpoint_state
else {
bail!("should be committing")
};
for worker in self.model.workers.values_mut() {
worker
.connect
.checkpoint(Request::new(CheckpointReq {
timestamp: to_micros(SystemTime::now()),
min_epoch: self.model.min_epoch,
.commit(CommitReq {
epoch: self.model.epoch,
then_stop: false,
is_commit: true,
}))
committing_data: committing.committing_data(),
})
.await?;
}
Ok(())
Expand Down
19 changes: 17 additions & 2 deletions arroyo-controller/src/states/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tonic::{transport::Channel, Request};
use tracing::{error, info, warn};

use anyhow::anyhow;
use arroyo_state::{parquet::get_storage_env_vars, BackingStore, StateBackend};
use arroyo_state::{
committing_state::CommittingState, parquet::get_storage_env_vars, BackingStore, StateBackend,
};

use crate::{
job_controller::JobController,
Expand Down Expand Up @@ -368,6 +370,7 @@ impl State for Scheduling {
metadata.min_epoch = min_epoch;
if needs_commits {
let mut commit_subtasks = HashSet::new();
let mut committing_data = HashMap::new();
for operator_id in &metadata.operator_ids {
let operator_metadata =
StateBackend::load_operator_metadata(&ctx.config.id, operator_id, epoch)
Expand All @@ -378,6 +381,18 @@ impl State for Scheduling {
operator_id, ctx.config.id
);
};
if let Some(commit_data) = operator_metadata.commit_data {
committing_data.insert(
operator_id.clone(),
commit_data
.committing_data
.into_iter()
.map(|(table, commit_data_map)| {
(table, commit_data_map.commit_data_by_subtask)
})
.collect(),
);
}
if operator_metadata.has_state
&& operator_metadata
.tables
Expand All @@ -396,7 +411,7 @@ impl State for Scheduling {
}
}
}
committing_state = Some((id, commit_subtasks));
committing_state = Some(CommittingState::new(id, commit_subtasks, committing_data));
}
StateBackend::write_checkpoint_metadata(metadata).await;
}
Expand Down
6 changes: 3 additions & 3 deletions arroyo-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ fn impl_stream_node_type(
match control_message {
arroyo_rpc::ControlMessage::Checkpoint(_) => tracing::warn!("shouldn't receive checkpoint"),
arroyo_rpc::ControlMessage::Stop { mode: _ } => tracing::warn!("shouldn't receive stop"),
arroyo_rpc::ControlMessage::Commit { epoch } => {
self.handle_commit(epoch, &mut ctx).await;
arroyo_rpc::ControlMessage::Commit { epoch, commit_data } => {
self.handle_commit(epoch, commit_data, &mut ctx).await;
},
arroyo_rpc::ControlMessage::LoadCompacted { compacted } => {
ctx.load_compacted(compacted).await;
Expand Down Expand Up @@ -802,7 +802,7 @@ fn impl_stream_node_type(

if !methods.contains("handle_commit") {
defs.push(quote! {
async fn handle_commit(&mut self, epoch: u32, ctx: &mut Context<#out_k, #out_t>) {
async fn handle_commit(&mut self, epoch: u32, commit_data: std::collections::HashMap<char, std::collections::HashMap<u32, Vec<u8>>>, ctx: &mut Context<#out_k, #out_t>) {
tracing::warn!("default handling of commit with epoch {:?}", epoch);
}
})
Expand Down
22 changes: 22 additions & 0 deletions arroyo-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ message SubtaskCheckpointMetadata {
uint64 bytes = 7;

repeated BackendData backend_data = 8;
// this is data that should be sent along with the commit message
// to all subtask. The key is the table name.
map<string,bytes> committing_data = 9;
}

message BackendData {
Expand All @@ -246,6 +249,7 @@ message OperatorCheckpointMetadata {

repeated BackendData backend_data = 10;
uint64 bytes = 11;
OperatorCommitData commit_data = 12;
}

enum TableType {
Expand Down Expand Up @@ -315,6 +319,23 @@ message CheckpointReq {
message CheckpointResp {
}

message CommitReq {
uint32 epoch = 1;
map<string, OperatorCommitData> committing_data = 2;
}

message CommitResp {
}

message OperatorCommitData {
// map from table name to commit data for that table.
map<string, TableCommitData> committing_data = 1;
}

message TableCommitData {
map<uint32, bytes> commit_data_by_subtask = 1;
}

message LoadCompactedDataReq {
string operator_id = 1;
repeated BackendData backend_data_to_drop = 2; // this data got compressed...
Expand Down Expand Up @@ -347,6 +368,7 @@ message JobFinishedResp {
service WorkerGrpc {
rpc StartExecution(StartExecutionReq) returns (StartExecutionResp);
rpc Checkpoint(CheckpointReq) returns (CheckpointResp);
rpc Commit(CommitReq) returns (CommitResp);
rpc LoadCompactedData(LoadCompactedDataReq) returns (LoadCompactedDataRes);
rpc StopExecution(StopExecutionReq) returns (StopExecutionResp);
rpc JobFinished(JobFinishedReq) returns (JobFinishedResp);
Expand Down
Loading

0 comments on commit cae7aad

Please sign in to comment.