Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to DataFusion 43/Arrow 53.2 #790

Merged
merged 10 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,947 changes: 1,141 additions & 806 deletions Cargo.lock

Large diffs are not rendered by default.

81 changes: 47 additions & 34 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,37 @@ members = [
resolver = "2"

[workspace.dependencies]
tonic = { version = "0.11", features = ["zstd"] }
tonic-build = { version = "0.11" }
tonic-web = { version = "0.11" }
tonic-reflection = { version = "0.11" }
arrow = { version = "=52.1.0" }
arrow-ord = { version = "=52.1.0" }
arrow-array = { version = "=52.1.0" }
arrow-schema = { version = "=52.1.0" }
arrow-json = { version = "=52.1.0" }
object_store = { version = "0.10" }
parquet = { version = "=52.1.0" }
tonic = { version = "0.13", features = ["zstd"] }
tonic-build = { version = "0.12" }
tonic-web = { version = "0.12" }
tonic-reflection = { version = "0.12" }
tower = { version = "0.5" }
tower-http = {version = "0.6"}
axum = { version = "0.7" }
utoipa = { version = "4" }

arrow = { version = "=53.2.0" }
arrow-ord = { version = "=53.2.0" }
arrow-array = { version = "=53.2.0" }
arrow-schema = { version = "=53.2.0" }
arrow-json = { version = "=53.2.0" }
object_store = { version = "0.11" }
parquet = { version = "=53.2.0" }
ahash = { version = "=0.8.7" }
datafusion = { version = "40.0.0" }
datafusion-common = { version = "40.0.0" }
datafusion-proto = { version = "40.0.0" }
datafusion-functions = { version = "40.0.0", features = ["crypto_expressions"] }
datafusion-functions-json = { version = "0.40.0" }
deltalake = { version = "0.18.2" }
datafusion = { version = "43.0.0" }
datafusion-common = { version = "43.0.0" }
datafusion-proto = { version = "43.0.0" }
datafusion-functions = { version = "43.0.0", features = ["crypto_expressions"] }
datafusion-functions-window = {version = "43.0.0"}
datafusion-functions-json = { version = "0.43.0" }
deltalake = { version = "0.22.0" }
cornucopia = { version = "0.9.0" }
cornucopia_async = {version = "0.6.0"}
deadpool-postgres = "0.12"
prost = { version = "0.12", features = ["no-recursion-limit"] }
prost-reflect = "0.12.0"
prost-build = {version = "0.12" }
prost-types = "0.12"
deadpool-postgres = "0.14"
prost = { version = "0.13", features = ["no-recursion-limit"] }
prost-reflect = "0.14.0"
prost-build = {version = "0.13" }
prost-types = "0.13"
aws-config = "1.5.6"
reqwest = "0.12"

Expand All @@ -68,19 +74,26 @@ split-debuginfo = "unpacked"


[patch.crates-io]
deltalake = { git = 'https://github.com/delta-io/delta-rs', rev = 'e75a0b49b40f35ed361444bbea0e5720f359d732' }
deltalake = { git = 'https://github.com/delta-io/delta-rs', rev = '25ce38956e25722ba7a6cbc0f5a7dba6b3361554' }
typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '52.1.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '52.1.0/json'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.10.2/arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '53.2.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '53.2.0/json'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-functions-window = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}

datafusion-functions-json = {git = 'https://github.com/ArroyoSystems/datafusion-functions-json', branch = 'datafusion_43'}

object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.11.1/arroyo' }

cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }

# needed until 0.13 is released to get tower 0.5 upgrade
tonic = { git = "https://github.com/hyperium/tonic", rev = "b80428ba7dfcc607dfaac07022c360159ee1dc79" }
25 changes: 12 additions & 13 deletions crates/arroyo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ tonic = { workspace = true }
tonic-reflection = { workspace = true }
tonic-web = { workspace = true }
prost = {workspace = true}
prost-reflect = "0.12.0"
prost-reflect = {workspace = true}
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1.12"
tower = "0.4"
tower = {workspace = true}
rand = "0.8"
rand_chacha = "0.3"
async-trait = "0.1"
Expand All @@ -40,14 +40,14 @@ arrow-schema = {workspace = true, features = ["serde"]}
bincode = { version = "2.0.0-rc.3", features = ["serde"]}
petgraph = {version = "0.6", features = ["serde-1"]}

http = "0.2"
tower-http = {version = "0.4", features = ["trace", "fs", "cors", "validate-request", "auth", "compression-zstd"]}
axum = {version = "0.6.20", features = ["headers", "tokio", "macros"]}
axum-extra = "0.7.4"
h2 = "0.3.26"
http = "1"
tower-http = { workspace = true, features = ["trace", "fs", "cors", "validate-request", "auth", "compression-zstd"]}
axum = {workspace = true, features = ["tokio", "macros"]}
axum-extra = { version = "0.9", features = ["typed-header"] }
h2 = "0.4"
thiserror = "1.0.40"
utoipa = "4"
utoipa-swagger-ui = { version = "4", features = ["axum"] }
utoipa = { workspace = true }
utoipa-swagger-ui = { version = "7", features = ["axum"] }

serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand All @@ -63,7 +63,6 @@ typify = "0.0.13"
schemars = "0.8"

# metric querying
prometheus-http-query = "0.6.5"
reqwest = {workspace = true}
base64 = '0.21'

Expand All @@ -87,16 +86,16 @@ time = "0.3"
cornucopia_async = { workspace = true, features = ["with-serde_json-1"]}
jwt-simple = "0.11.4"
uuid = "1.3.3"
regress = "0.6.0"
regress = "0.10"
apache-avro = "0.16.0"
toml = "0.8"
rust-embed = { version = "6.8.1", features = ["axum"] }
rust-embed = { version = "8", features = ["axum"] }
mime_guess = "2.0.4"

[build-dependencies]
cornucopia = { workspace = true }
postgres = "0.19.5"
arroyo-types = { path = "../arroyo-types" }
utoipa = "3"
utoipa = { workspace = true }
rusqlite = "0.31.0"
refinery = { version = "0.8.14", features = ["rusqlite"] }
5 changes: 3 additions & 2 deletions crates/arroyo-api/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{rest_utils::ErrorResp, AuthData, OrgMetadata};
use axum::headers::authorization::{Authorization, Bearer};
use axum::TypedHeader;
use axum_extra::headers::authorization::Bearer;
use axum_extra::headers::Authorization;
use axum_extra::TypedHeader;
use cornucopia_async::Database;

pub(crate) async fn authenticate(
Expand Down
15 changes: 7 additions & 8 deletions crates/arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use axum::Json;
use cornucopia_async::DatabaseSource;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use std::net::{SocketAddr, TcpListener};
use std::net::SocketAddr;
use time::OffsetDateTime;
use tokio::net::TcpListener;
use tonic::transport::Channel;
use tower_http::compression::predicate::NotForContentType;
use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate};
Expand Down Expand Up @@ -121,10 +122,10 @@ pub async fn compiler_service() -> Result<CompilerGrpcClient<Channel>, ErrorResp
})
}

pub fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> anyhow::Result<u16> {
pub async fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> anyhow::Result<u16> {
let config = config();
let addr = SocketAddr::new(config.api.bind_address, config.api.http_port);
let listener = TcpListener::bind(addr)?;
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;

let app = rest::create_rest_app(database, &config.controller_endpoint()).layer(
Expand All @@ -137,11 +138,9 @@ pub fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> anyhow::R
);

info!("Starting API server on {:?}", local_addr);
guard.into_spawn_task(wrap_start(
"api",
local_addr,
axum::Server::from_tcp(listener)?.serve(app.into_make_service()),
));
guard.into_spawn_task(wrap_start("api", local_addr, async {
axum::serve(listener, app.into_make_service()).await
}));

Ok(local_addr.port())
}
Expand Down
6 changes: 4 additions & 2 deletions crates/arroyo-api/src/rest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use arroyo_server_common::log_event;
use axum::extract::rejection::JsonRejection;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::{Json, TypedHeader};
use axum::Json;
use axum_extra::headers::authorization::Bearer;
use axum_extra::headers::Authorization;
use axum_extra::TypedHeader;
use serde_json::json;
use tracing::{error, warn};

use axum::headers::authorization::{Authorization, Bearer};
use cornucopia_async::{DatabaseSource, DbError};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ anyhow = "1.0.71"
tracing = "0.1.37"
regress = "0.10.0"
futures = "0.3.28"
axum = {version = "0.6.12"}
axum = {workspace = true}
rand = "0.8.5"
base64 = "0.13.1"
bytes = "1.5.0"
Expand Down
9 changes: 3 additions & 6 deletions crates/arroyo-connectors/src/filesystem/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ use arroyo_storage::StorageProvider;
use async_trait::async_trait;
use bincode::{Decode, Encode};
use chrono::{DateTime, Utc};
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::concat;
use datafusion::{
common::{Column, Result as DFResult},
execution::{
context::{SessionConfig, SessionState},
runtime_env::RuntimeEnv,
},
logical_expr::{
expr::ScalarFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility,
},
Expand Down Expand Up @@ -210,8 +207,8 @@ fn partition_string_for_fields_and_time(

fn compile_expression(expr: &Expr, schema: ArroyoSchemaRef) -> Result<Arc<dyn PhysicalExpr>> {
let physical_planner = DefaultPhysicalPlanner::default();
let session_state =
SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default()));
let session_state = SessionStateBuilder::new().build();

let plan = physical_planner.create_physical_expr(
expr,
&(schema.schema.as_ref().clone()).try_into()?,
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/kinesis/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arroyo_operator::operator::ArrowOperator;
use arroyo_rpc::retry;
use arroyo_types::CheckpointBarrier;
use async_trait::async_trait;
use aws_config::{from_env, Region};
use aws_config::{BehaviorVersion, Region};
use aws_sdk_kinesis::primitives::Blob;
use aws_sdk_kinesis::types::PutRecordsRequestEntry;
use aws_sdk_kinesis::Client as KinesisClient;
Expand All @@ -33,7 +33,7 @@ impl ArrowOperator for KinesisSinkFunc {
}

async fn on_start(&mut self, _ctx: &mut ArrowContext) {
let mut loader = from_env();
let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28());
if let Some(region) = &self.aws_region {
loader = loader.region(Region::new(region.clone()));
}
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/kinesis/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use arroyo_state::global_table_config;
use arroyo_state::tables::global_keyed_map::GlobalKeyedView;
use arroyo_types::{from_nanos, UserError};
use async_trait::async_trait;
use aws_config::{from_env, Region};
use aws_config::{BehaviorVersion, Region};
use aws_sdk_kinesis::error::SdkError;
use aws_sdk_kinesis::operation::get_records::GetRecordsOutput;
use aws_sdk_kinesis::operation::get_shard_iterator::builders::GetShardIteratorFluentBuilder;
Expand Down Expand Up @@ -337,7 +337,7 @@ impl KinesisSourceFunc {
}

async fn init_client(&mut self) {
let mut loader = from_env();
let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28());
if let Some(region) = &self.aws_region {
loader = loader.region(Region::new(region.clone()));
}
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ serde = "1"
anyhow = "1.0.70"

# Kubernetes
kube = { version = "0.91", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.22.0", features = ["v1_30"] }
kube = { version = "0.96", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.23.0", features = ["v1_30"] }
serde_yaml = {version = "0.9"}
shlex = "1.3"

Expand All @@ -64,7 +64,7 @@ regex = "1.7.3"
reqwest = { workspace = true, features = ["json"] }
uuid = "1.3.3"
async-stream = "0.3.5"
base64 = "0.21.5"
base64 = "0.22"
rusqlite = { version = "0.31.0", features = ["serde_json", "time"] }

[build-dependencies]
Expand Down
11 changes: 6 additions & 5 deletions crates/arroyo-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,10 @@ impl ControllerServer {
}

pub async fn start(self, guard: ShutdownGuard) -> anyhow::Result<u16> {
let reflection = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(arroyo_rpc::grpc::API_FILE_DESCRIPTOR_SET)
.build()
.unwrap();
// let reflection = tonic_reflection::server::Builder::configure()
// .register_encoded_file_descriptor_set(arroyo_rpc::grpc::API_FILE_DESCRIPTOR_SET)
// .build_v1()
// .unwrap();

let addr = SocketAddr::new(
config().controller.bind_address,
Expand All @@ -655,7 +655,8 @@ impl ControllerServer {
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd),
)
.add_service(reflection)
// TODO: re-enable once tonic 0.13 is released
//.add_service(reflection)
.serve_with_incoming(TcpListenerStream::new(listener)),
));

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-datastream/src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Debug for LogicalNode {

pub type LogicalGraph = DiGraph<LogicalNode, LogicalEdge>;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd)]
pub struct DylibUdfConfig {
pub dylib_path: String,
pub arg_types: Vec<DataType>,
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-formats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ arroyo-rpc = { path = "../arroyo-rpc" }
apache-avro = "0.16.0"
serde = {version = "1.0", features = ["derive"]}
serde_json = "1.0"
utoipa = "4"
utoipa = { workspace = true }
arrow = { workspace = true }
arrow-schema = { workspace = true }
arrow-array = { workspace = true}
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-openapi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ progenitor = { version = '0.8' }
serde_json = "1.0"
syn = "1.0"

utoipa = "4"
utoipa = {workspace = true}
arroyo-api = { path = "../arroyo-api" }
2 changes: 1 addition & 1 deletion crates/arroyo-operator/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl Display for AsDisplayable<'_> {
write!(f, "{:?}", d)
}
AsDisplayable::Plan(p) => {
write!(f, "`{}`", displayable(*p).indent(false))
write!(f, "```\n{}\n```", displayable(*p).indent(false))
}
AsDisplayable::Schema(s) => {
for field in s.fields() {
Expand Down
3 changes: 2 additions & 1 deletion crates/arroyo-planner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ arroyo-udf-python = { path = "../arroyo-udf/arroyo-udf-python" }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-window = { workspace = true }
datafusion-functions-json = { workspace = true }

prost = {workspace = true}
Expand All @@ -42,7 +43,7 @@ syn = {version = "2", features = ["full", "parsing", "extra-traits"]}
tracing = "0.1.37"
tracing-subscriber = "0.3"

serde_json_path = "0.6.3"
serde_json_path = "0.7"
apache-avro = "0.16.0"
prettyplease = "0.2.4"
unicase = "2.7.0"
Expand Down
Loading
Loading