Skip to content

Commit

Permalink
Update dependencies (#415)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Nov 18, 2023
1 parent 59b18b7 commit 47c6ab9
Show file tree
Hide file tree
Showing 18 changed files with 761 additions and 1,186 deletions.
1,765 changes: 693 additions & 1,072 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions arroyo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ arroyo-server-common = { path = "../arroyo-server-common" }
arroyo-connectors = { path = "../arroyo-connectors" }
arroyo-sql = { path = "../arroyo-sql" }
arroyo-datastream = { path = "../arroyo-datastream" }
arroyo-state = { path = "../arroyo-state" }

tonic = { workspace = true }
tonic-reflection = { workspace = true }
tonic-web = { workspace = true }
prost = "0.11"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1.12"
Expand Down Expand Up @@ -46,8 +43,6 @@ utoipa-swagger-ui = { version = "3", features = ["axum"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

argon2 = "0.5"

# logging
tracing = "0.1"
anyhow = "1.0.70"
Expand All @@ -74,7 +69,6 @@ futures = "0.3"
futures-util = "0.3.28"
time = "0.3"
cornucopia_async = { version = "0.4", features = ["with-serde_json-1"] }
jwt-simple = "0.11.4"
uuid = "1.3.3"
regress = "0.6.0"
apache-avro = "0.16.0"
Expand Down
43 changes: 31 additions & 12 deletions arroyo-api/src/udfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use crate::to_micros;
use arroyo_rpc::api_types::udfs::{GlobalUdf, UdfPost, UdfValidationResult, ValidateUdfPost};
use arroyo_rpc::api_types::GlobalUdfCollection;
use arroyo_rpc::grpc::controller_grpc_client::ControllerGrpcClient;
use arroyo_rpc::grpc::{CheckUdfsReq, CheckUdfsResp};
use arroyo_rpc::grpc::{CheckUdfsReq, UdfCrate};
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_sql::{parse_dependencies, ArroyoSchemaProvider};
use arroyo_types::cargo_toml;
use axum::extract::{Path, State};
use axum::Json;
use axum_extra::extract::WithRejection;
Expand Down Expand Up @@ -58,14 +60,14 @@ pub async fn create_udf(
.map_err(log_and_map)?;

// validate udf
let check_udfs_resp =
let (errors, name) =
validate_udf_with_controller(&state.controller_addr, &req.definition).await?;

if check_udfs_resp.errors.len() > 0 {
if errors.len() > 0 {
return Err(bad_request(format!("UDF is invalid.",)));
}

let Some(udf_name) = check_udfs_resp.udf_name else {
let Some(udf_name) = name else {
// this should not be possible
return Err(internal_server_error("UDF name not found"));
};
Expand Down Expand Up @@ -193,7 +195,7 @@ pub async fn delete_udf(
async fn validate_udf_with_controller(
controller_addr: &str,
udf_definition: &str,
) -> Result<CheckUdfsResp, ErrorResp> {
) -> Result<(Vec<String>, Option<String>), ErrorResp> {
let mut controller = match ControllerGrpcClient::connect(controller_addr.to_string()).await {
Ok(controller) => controller,
Err(e) => {
Expand All @@ -202,9 +204,29 @@ async fn validate_udf_with_controller(
}
};

let dependencies = match parse_dependencies(&udf_definition) {
Ok(dependencies) => dependencies,
Err(e) => {
return Ok((vec![e.to_string()], None));
}
};

// use the ArroyoSchemaProvider to do some validation and to get the function name
let function_name = match ArroyoSchemaProvider::new().add_rust_udf(&udf_definition) {
Ok(function_name) => function_name,
Err(e) => return Ok((vec![e.to_string()], None)),
};

// build cargo.toml
let cargo_toml = cargo_toml(&function_name, &dependencies);

let check_udfs_resp = match controller
.check_udfs(CheckUdfsReq {
definition: udf_definition.to_string(),
udf_crate: Some(UdfCrate {
name: function_name.clone(),
definition: udf_definition.to_string(),
cargo_toml,
}),
})
.await
{
Expand All @@ -218,7 +240,7 @@ async fn validate_udf_with_controller(
}
};

Ok(check_udfs_resp)
Ok((check_udfs_resp.errors, Some(function_name)))
}

/// Validate UDFs
Expand All @@ -235,11 +257,8 @@ pub async fn validate_udf(
State(state): State<AppState>,
WithRejection(Json(req), _): WithRejection<Json<ValidateUdfPost>, ApiError>,
) -> Result<Json<UdfValidationResult>, ErrorResp> {
let check_udfs_resp =
let (errors, udf_name) =
validate_udf_with_controller(&state.controller_addr, &req.definition).await?;

Ok(Json(UdfValidationResult {
udf_name: check_udfs_resp.udf_name,
errors: check_udfs_resp.errors,
}))
Ok(Json(UdfValidationResult { udf_name, errors }))
}
10 changes: 5 additions & 5 deletions arroyo-compiler-service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{io, path::PathBuf, str::FromStr, sync::Arc};

use arroyo_rpc::grpc::{
compiler_grpc_server::{CompilerGrpc, CompilerGrpcServer},
CheckUdfsCompilerReq, CheckUdfsCompilerResp, CompileQueryReq, CompileQueryResp, UdfCrate,
CheckUdfsReq, CheckUdfsResp, CompileQueryReq, CompileQueryResp, UdfCrate,
};

use arroyo_server_common::start_admin_server;
Expand Down Expand Up @@ -320,8 +320,8 @@ impl CompilerGrpc for CompileService {

async fn check_udfs(
&self,
request: Request<CheckUdfsCompilerReq>,
) -> Result<Response<CheckUdfsCompilerResp>, Status> {
request: Request<CheckUdfsReq>,
) -> Result<Response<CheckUdfsResp>, Status> {
// only allow one request to be active at a given time
let _guard = self.lock.lock().await;

Expand Down Expand Up @@ -359,7 +359,7 @@ impl CompilerGrpc for CompileService {
);

if output.status.success() {
return Ok(Response::new(CheckUdfsCompilerResp { errors: vec![] }));
return Ok(Response::new(CheckUdfsResp { errors: vec![] }));
}

let stdout = from_utf8(&output.stdout)
Expand Down Expand Up @@ -392,6 +392,6 @@ impl CompilerGrpc for CompileService {

info!("Cargo check on udfs crate found {} errors", errors.len());

return Ok(Response::new(CheckUdfsCompilerResp { errors }));
return Ok(Response::new(CheckUdfsResp { errors }));
}
}
1 change: 0 additions & 1 deletion arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ kafka-sasl = ["rdkafka/sasl", "rdkafka/ssl-vendored"]
arroyo-types = { path = "../arroyo-types" }
arroyo-storage = { path = "../arroyo-storage" }
arroyo-rpc = { path = "../arroyo-rpc" }
arroyo-datastream = { path = "../arroyo-datastream" }

chrono = "0.4"
serde = { version = "1", features = ["derive"] }
Expand Down
1 change: 0 additions & 1 deletion arroyo-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ arroyo-datastream = { path = "../arroyo-datastream" }
arroyo-rpc = { path = "../arroyo-rpc" }
arroyo-state = { path = "../arroyo-state" }
arroyo-storage = { path = "../arroyo-storage" }
arroyo-sql = { path = "../arroyo-sql" }
arroyo-server-common = { path = "../arroyo-server-common" }

tonic = {workspace = true}
Expand Down
3 changes: 1 addition & 2 deletions arroyo-controller/src/compiler.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::cargo_toml;
use crate::states::fatal;
use anyhow::{anyhow, Result};
use arroyo_datastream::{parse_type, Operator, Program, WasmBehavior};
use arroyo_rpc::grpc::compiler_grpc_client::CompilerGrpcClient;
use arroyo_rpc::grpc::{CompileQueryReq, UdfCrate};
use arroyo_types::REMOTE_COMPILER_ENDPOINT_ENV;
use arroyo_types::{cargo_toml, REMOTE_COMPILER_ENDPOINT_ENV};
use petgraph::Direction;
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
Expand Down
68 changes: 8 additions & 60 deletions arroyo-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@
use arroyo_rpc::grpc::compiler_grpc_client::CompilerGrpcClient;
use arroyo_rpc::grpc::controller_grpc_server::{ControllerGrpc, ControllerGrpcServer};
use arroyo_rpc::grpc::{
CheckUdfsCompilerReq, CheckUdfsReq, CheckUdfsResp, GrpcOutputSubscription, HeartbeatNodeReq,
HeartbeatNodeResp, HeartbeatReq, HeartbeatResp, OutputData, RegisterNodeReq, RegisterNodeResp,
RegisterWorkerReq, RegisterWorkerResp, TaskCheckpointCompletedReq, TaskCheckpointCompletedResp,
TaskFailedReq, TaskFailedResp, TaskFinishedReq, TaskFinishedResp, TaskStartedReq,
TaskStartedResp, WorkerFinishedReq, WorkerFinishedResp,
CheckUdfsReq, CheckUdfsResp, GrpcOutputSubscription, HeartbeatNodeReq, HeartbeatNodeResp,
HeartbeatReq, HeartbeatResp, OutputData, RegisterNodeReq, RegisterNodeResp, RegisterWorkerReq,
RegisterWorkerResp, TaskCheckpointCompletedReq, TaskCheckpointCompletedResp, TaskFailedReq,
TaskFailedResp, TaskFinishedReq, TaskFinishedResp, TaskStartedReq, TaskStartedResp,
WorkerFinishedReq, WorkerFinishedResp,
};
use arroyo_rpc::grpc::{
SinkDataReq, SinkDataResp, TaskCheckpointEventReq, TaskCheckpointEventResp, UdfCrate,
WorkerErrorReq, WorkerErrorRes,
SinkDataReq, SinkDataResp, TaskCheckpointEventReq, TaskCheckpointEventResp, WorkerErrorReq,
WorkerErrorRes,
};
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_server_common::log_event;
use arroyo_sql::{parse_dependencies, ArroyoSchemaProvider};
use arroyo_types::{
from_micros, ports, DatabaseConfig, NodeId, WorkerId, REMOTE_COMPILER_ENDPOINT_ENV,
};
Expand Down Expand Up @@ -448,39 +447,12 @@ impl ControllerGrpc for ControllerServer {
})?;

let req = request.into_inner();
let definition = req.definition.clone();

let dependencies = match parse_dependencies(&definition) {
Ok(dependencies) => dependencies,
Err(e) => {
return Ok(udf_error_resp(e));
}
};

// use the ArroyoSchemaProvider to do some validation and to get the function name
let function_name = match ArroyoSchemaProvider::new().add_rust_udf(&definition) {
Ok(function_name) => function_name,
Err(e) => return Ok(udf_error_resp(e)),
};

// build cargo.toml
let cargo_toml = cargo_toml(&function_name, &dependencies);

// send to compiler
let compiler_res = client
.check_udfs(CheckUdfsCompilerReq {
udf_crate: Some(UdfCrate {
name: function_name.clone(),
definition: req.definition,
cargo_toml,
}),
})
.await?
.into_inner();
let compiler_res = client.check_udfs(req).await?.into_inner();

Ok(Response::new(CheckUdfsResp {
errors: compiler_res.errors,
udf_name: Some(function_name),
}))
}
}
Expand Down Expand Up @@ -684,27 +656,3 @@ impl ControllerServer {
Ok(())
}
}

fn cargo_toml(name: &str, dependencies: &str) -> String {
format!(
r#"
[package]
name = "{}"
version = "1.0.0"
edition = "2021"
{}
"#,
name, dependencies
)
}

fn udf_error_resp<E>(e: E) -> Response<CheckUdfsResp>
where
E: core::fmt::Display,
{
Response::new(CheckUdfsResp {
errors: vec![e.to_string()],
udf_name: None,
})
}
3 changes: 0 additions & 3 deletions arroyo-datastream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ edition = "2021"

[dependencies]
arroyo-types = { path = "../arroyo-types" }
arroyo-macro = { path = "../arroyo-macro" }
arroyo-rpc = { path = "../arroyo-rpc" }
dyn-clone = "1.0.11"
petgraph = {version = "0.6", features = ["serde-1"]}
serde = {version = "1", features = ["derive"]}
syn = {version = "2", features = ["full"]}
quote = "1"
proc-macro2 = "1"
bincode = { version = "2.0.0-rc.3", features = ["serde"]}
rand = "0"
toml = "0.7"

tokio = "1"
tonic = {workspace = true}
Expand Down
2 changes: 0 additions & 2 deletions arroyo-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ arroyo-rpc = { path = "../arroyo-rpc" }
arroyo-server-common = { path = "../arroyo-server-common" }

tonic = { workspace = true }
tonic-reflection = { workspace = true }
prost = "0.11"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
fork = "0.1"
rand = "0.8"
local-ip-address = "0.5"
lazy_static = "1.4.0"
Expand Down
1 change: 1 addition & 0 deletions arroyo-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ log = "0.4.20"
tracing = "0.1.40"
async-trait = "0.1.74"
apache-avro = "0.16.0"
regex = "1.10.2"

[build-dependencies]
tonic-build = { workspace = true }
15 changes: 3 additions & 12 deletions arroyo-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ message WorkerErrorReq {
message WorkerErrorRes {
}

message CheckUdfsReq {
string definition = 1;
}

service ControllerGrpc {
rpc RegisterNode(RegisterNodeReq) returns (RegisterNodeResp);
rpc HeartbeatNode(HeartbeatNodeReq) returns (HeartbeatNodeResp);
Expand Down Expand Up @@ -459,20 +455,15 @@ message CompileQueryResp {
string wasm_fns_path = 2;
}

message CheckUdfsResp {
repeated string errors = 1;
optional string udf_name = 2;
}

message CheckUdfsCompilerReq {
message CheckUdfsReq {
UdfCrate udf_crate = 1;
}

message CheckUdfsCompilerResp {
message CheckUdfsResp {
repeated string errors = 1;
}

service CompilerGrpc {
rpc CompileQuery(CompileQueryReq) returns (CompileQueryResp);
rpc CheckUdfs(CheckUdfsCompilerReq) returns (CheckUdfsCompilerResp);
rpc CheckUdfs(CheckUdfsReq) returns (CheckUdfsResp);
}
7 changes: 5 additions & 2 deletions arroyo-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use datafusion_common::DataFusionError;
use prettyplease::unparse;
use regex::Regex;
use std::collections::HashSet;
use std::sync::OnceLock;
use std::time::{Duration, SystemTime};
use std::{collections::HashMap, sync::Arc};
use syn::{parse_file, parse_quote, parse_str, FnArg, Item, ReturnType, Visibility};
Expand Down Expand Up @@ -354,9 +355,11 @@ impl ArroyoSchemaProvider {
}
}

pub fn parse_dependencies(definition: &str) -> Result<String> {
pub fn parse_dependencies(definition: &str) -> anyhow::Result<String> {
// get content of dependencies comment using regex
let re = Regex::new(r"\/\*\n(\[dependencies\]\n[\s\S]*?)\*\/").unwrap();
static REGEX: OnceLock<Regex> = OnceLock::new();
let re = REGEX.get_or_init(|| Regex::new(r"\\*\n(\[dependencies\]\n[\s\S]*?)\*/").unwrap());

if re.find_iter(&definition).count() > 1 {
bail!("Only one dependencies definition is allowed in a UDF");
}
Expand Down
4 changes: 0 additions & 4 deletions arroyo-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ edition = "2021"
[dependencies]
arroyo-types = { path = "../arroyo-types" }
arroyo-rpc = { path = "../arroyo-rpc" }
arroyo-metrics = { path = "../arroyo-metrics" }
arroyo-storage = { path = "../arroyo-storage" }

anyhow = "1.0"
backoff = "0.4.0"
tracing = "0.1"
rand = "0.8"
bincode = "2.0.0-rc.3"
Expand All @@ -22,8 +20,6 @@ arrow-array = { workspace = true }
arrow-schema = { workspace = true }
parquet = { workspace = true }
async-trait = "0.1.68"
async-stream = "0.3.4"
ctor = "0.2"
once_cell = "1.17.1"
futures = "0.3"
bytes = "1.4"
Expand Down
Loading

0 comments on commit 47c6ab9

Please sign in to comment.