Skip to content

Commit

Permalink
re-add delta
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Nov 22, 2024
1 parent 4bd43ca commit f00c451
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 135 deletions.
391 changes: 312 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

29 changes: 11 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ datafusion-proto = { version = "43.0.0" }
datafusion-functions = { version = "43.0.0", features = ["crypto_expressions"] }
datafusion-functions-window = {version = "43.0.0"}
datafusion-functions-json = { version = "0.43.0" }
deltalake = { version = "0.18.2" }
deltalake = { version = "0.22.0" }
cornucopia = { version = "0.9.0" }
cornucopia_async = {version = "0.6.0"}
deadpool-postgres = "0.14"
Expand All @@ -74,26 +74,19 @@ split-debuginfo = "unpacked"


[patch.crates-io]
deltalake = { git = 'https://github.com/delta-io/delta-rs', rev = 'e75a0b49b40f35ed361444bbea0e5720f359d732' }
deltalake = { git = 'https://github.com/delta-io/delta-rs', rev = '25ce38956e25722ba7a6cbc0f5a7dba6b3361554' }
typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '53.2.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '53.2.0/json'}
#datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
#datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
#datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
#datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
#datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
#datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
#datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
#datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion = { path = "../arrow-datafusion/datafusion/core" }
datafusion-common = { path = "../arrow-datafusion/datafusion/common" }
datafusion-execution = { path = "../arrow-datafusion/datafusion/execution" }
datafusion-expr = { path = "../arrow-datafusion/datafusion/expr" }
datafusion-physical-expr = { path = "../arrow-datafusion/datafusion/physical-expr" }
datafusion-physical-plan = { path = "../arrow-datafusion/datafusion/physical-plan" }
datafusion-proto = { path = "../arrow-datafusion/datafusion/proto" }
datafusion-functions-window = { path = "../arrow-datafusion/datafusion/functions-window" }
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-functions-window = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}

datafusion-functions-json = {git = 'https://github.com/ArroyoSystems/datafusion-functions-json', branch = 'datafusion_43'}

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ uuid = { version = "1.7.0", features = ["v4"] }
# Filesystem
parquet = { workspace = true, features = ["async"]}
object_store = { workspace = true }
# deltalake = { workspace = true, features = ["s3"] }
deltalake = { workspace = true, features = ["s3"] }
async-compression = { version = "0.4.3", features = ["tokio", "zstd", "gzip"] }

# MQTT
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// pub mod delta;
pub mod delta;
mod sink;
mod source;

Expand Down
28 changes: 14 additions & 14 deletions crates/arroyo-connectors/src/filesystem/sink/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::filesystem::{sink::two_phase_committer::TwoPhaseCommitter, FileSettin
use anyhow::{bail, Result};

use super::{
add_suffix_prefix, get_partitioner_from_file_settings, parquet::batches_by_partition,
add_suffix_prefix, delta, get_partitioner_from_file_settings, parquet::batches_by_partition,
two_phase_committer::TwoPhaseCommitterOperator, CommitState, CommitStyle, FileNaming,
FileSystemTable, FilenameStrategy, FinishedFile, MultiPartWriterStats, RollingPolicy,
TableType,
Expand Down Expand Up @@ -313,19 +313,19 @@ impl<V: LocalWriter + Send + 'static> TwoPhaseCommitter for LocalFileSystemWrite
}
if let CommitState::DeltaLake { last_version } = self.commit_state {
let storage_provider = Arc::new(StorageProvider::for_url("/").await?);
// if let Some(version) = delta::commit_files_to_delta(
// &finished_files,
// &object_store::path::Path::parse(&self.final_dir)?,
// &storage_provider,
// last_version,
// Arc::new(self.schema.as_ref().unwrap().schema_without_timestamp()),
// )
// .await?
// {
// self.commit_state = CommitState::DeltaLake {
// last_version: version,
// };
// }
if let Some(version) = delta::commit_files_to_delta(
&finished_files,
&object_store::path::Path::parse(&self.final_dir)?,
&storage_provider,
last_version,
Arc::new(self.schema.as_ref().unwrap().schema_without_timestamp()),
)
.await?
{
self.commit_state = CommitState::DeltaLake {
last_version: version,
};
}
}
Ok(())
}
Expand Down
32 changes: 14 additions & 18 deletions crates/arroyo-connectors/src/filesystem/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::concat;
use datafusion::{
common::{Column, Result as DFResult},
execution::{
context::{SessionConfig, SessionState},
runtime_env::RuntimeEnv,
},
logical_expr::{
expr::ScalarFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility,
},
Expand All @@ -45,7 +41,7 @@ use uuid::Uuid;

use arroyo_types::*;
pub mod arrow;
//mod delta;
mod delta;
pub mod json;
pub mod local;
pub mod parquet;
Expand Down Expand Up @@ -931,19 +927,19 @@ where
}
}
if let CommitState::DeltaLake { last_version } = self.commit_state {
// if let Some(new_version) = delta::commit_files_to_delta(
// &finished_files,
// &self.path,
// &self.object_store,
// last_version,
// Arc::new(self.schema.schema_without_timestamp()),
// )
// .await?
// {
// self.commit_state = CommitState::DeltaLake {
// last_version: new_version,
// };
// }
if let Some(new_version) = delta::commit_files_to_delta(
&finished_files,
&self.path,
&self.object_store,
last_version,
Arc::new(self.schema.schema_without_timestamp()),
)
.await?
{
self.commit_state = CommitState::DeltaLake {
last_version: new_version,
};
}
}
let finished_message = CheckpointData::Finished {
max_file_index: self.max_file_index,
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::confluent::ConfluentConnector;
// use crate::filesystem::delta::DeltaLakeConnector;
use crate::filesystem::delta::DeltaLakeConnector;
use crate::filesystem::FileSystemConnector;
use crate::kinesis::KinesisConnector;
use crate::mqtt::MqttConnector;
Expand Down Expand Up @@ -57,7 +57,7 @@ pub fn connectors() -> HashMap<&'static str, Box<dyn ErasedConnector>> {
let connectors: Vec<Box<dyn ErasedConnector>> = vec![
Box::new(BlackholeConnector {}),
Box::new(ConfluentConnector {}),
//Box::new(DeltaLakeConnector {}),
Box::new(DeltaLakeConnector {}),
Box::new(FileSystemConnector {}),
Box::new(FluvioConnector {}),
Box::new(ImpulseConnector {}),
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-udf/arroyo-udf-host/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn test_optional_arg() {
data.append_option(Some(vec![4, 5]));

let result = sync_udf
.invoke(&[ColumnarValue::Array(Arc::new(data.finish()))])
.invoke_batch(&[ColumnarValue::Array(Arc::new(data.finish()))], 1)
.unwrap();

let ColumnarValue::Array(a) = result else {
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-udf/arroyo-udf-python/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def my_add(x: int, y: float) -> float:
]))),
];

let result = udf.invoke(&data).unwrap();
let result = udf.invoke_batch(&data, data.len()).unwrap();
if let ColumnarValue::Array(a) = result {
let a = a
.as_any()
Expand Down

0 comments on commit f00c451

Please sign in to comment.