From 4ae8f9da697e3e53b690a32b4ef038a3ff0c9448 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Mon, 23 Oct 2023 17:34:15 -0700 Subject: [PATCH] Use udfs crate for compilation in addition to checking --- arroyo-compiler-service/src/main.rs | 9 ++---- arroyo-controller/src/compiler.rs | 45 ++++++++++++++++++++------- arroyo-rpc/proto/rpc.proto | 1 + arroyo-sql/src/plan_graph.rs | 7 ++--- build_dir/.gitignore | 2 ++ build_dir/pipeline/Cargo.toml | 1 + build_dir/pipeline/src/.gitignore | 1 - build_dir/udfs/Cargo.toml | 8 ----- build_dir/udfs/src/lib.rs | 1 - docker/build_base/pipeline/Cargo.toml | 2 ++ docker/build_base/udfs/Cargo.toml | 8 ----- 11 files changed, 45 insertions(+), 40 deletions(-) delete mode 100644 build_dir/pipeline/src/.gitignore delete mode 100644 build_dir/udfs/src/lib.rs diff --git a/arroyo-compiler-service/src/main.rs b/arroyo-compiler-service/src/main.rs index 913b97a23..959e03851 100644 --- a/arroyo-compiler-service/src/main.rs +++ b/arroyo-compiler-service/src/main.rs @@ -177,12 +177,7 @@ impl CompileService { tokio::fs::write(build_dir.join("wasm-fns/src/lib.rs"), &req.wasm_fns).await?; - // make sure udfs.rs exists - tokio::fs::OpenOptions::new() - .create(true) - .write(true) - .open(build_dir.join("udfs/src/udfs.rs")) - .await?; + tokio::fs::write(build_dir.join("udfs/src/lib.rs"), &req.udfs).await?; let result = self.get_output().await?; @@ -281,7 +276,7 @@ impl CompilerGrpc for CompileService { // write udf to build_dir udfs module let build_dir = &self.build_dir; tokio::fs::write( - build_dir.join("udfs/src/udfs.rs"), + build_dir.join("udfs/src/lib.rs"), &request.into_inner().udfs_rs, ) .await diff --git a/arroyo-controller/src/compiler.rs b/arroyo-controller/src/compiler.rs index 89242167f..1bd8fee4d 100644 --- a/arroyo-controller/src/compiler.rs +++ b/arroyo-controller/src/compiler.rs @@ -59,6 +59,7 @@ impl ProgramCompiler { job_id: self.job_id.clone(), types: self.compile_types().to_string(), pipeline: self.compile_pipeline_main(&self.name, &self.program.get_hash()), + udfs: self.compile_udf_lib(), wasm_fns: self.compile_wasm_lib().to_string(), }; @@ -110,6 +111,7 @@ impl ProgramCompiler { let types = self.compile_types().to_string(); let main = self.compile_pipeline_main(&self.name, &self.program.get_hash()); let wasm = self.compile_wasm_lib().to_string(); + let udfs = self.compile_udf_lib(); let workspace_toml = r#" [workspace] @@ -166,13 +168,13 @@ edition = "2021" [dependencies] types = {{ path = "../types" }} +udfs = {{ path = "../udfs" }} petgraph = "0.6" chrono = "0.4" bincode = "=2.0.0-rc.3" bincode_derive = "=2.0.0-rc.3" serde = "1.0" serde_json = "1.0" -regex = "1" arrow = {{ workspace = true }} parquet = {{ workspace = true }} arrow-array = {{ workspace = true }} @@ -190,6 +192,21 @@ arroyo-worker = {{ path = "{}/arroyo-worker"{}}} ); Self::create_subproject(&dir, "pipeline", &pipeline_toml, "main.rs", main).await?; + let udfs_toml = r#" +[package] +name = "udfs" +version = "1.0.0" +edition = "2021" + +[dependencies] +chrono = "0.4" +serde = "1.0" +serde_json = "1.0" +regex = "1" + "#; + + Self::create_subproject(&dir, "udfs", &udfs_toml, "lib.rs", udfs).await?; + let wasmfns_toml = format!( r#" [package] @@ -320,13 +337,6 @@ wasm-opt = false }; info!("{}", self.program.dot()); - let udfs: Vec = self - .program - .udfs - .iter() - .map(|t| parse_str(t).unwrap()) - .collect(); - let other_defs: Vec = self .program .other_defs @@ -336,7 +346,6 @@ wasm-opt = false let make_graph_function = self.program.make_graph_function(); - // TODO: move udfs to udfs crate prettyplease::unparse(&parse_quote! { #imports @@ -348,12 +357,26 @@ wasm-opt = false arroyo_worker::WorkerServer::new(#name, #hash, graph).start().unwrap(); } - #(#udfs )* - #(#other_defs )* }) } + fn compile_udf_lib(&self) -> String { + let udfs: Vec = self + .program + .udfs + .iter() + .map(|t| parse_str(t).unwrap()) + .collect(); + + prettyplease::unparse(&parse_quote! { + use std::time::{SystemTime, Duration}; + use std::str::FromStr; + + #(#udfs )* + }) + } + fn compile_wasm_lib(&self) -> TokenStream { let wasm_fns: Vec<_> = self .program diff --git a/arroyo-rpc/proto/rpc.proto b/arroyo-rpc/proto/rpc.proto index e483bd84e..ca46e5abd 100644 --- a/arroyo-rpc/proto/rpc.proto +++ b/arroyo-rpc/proto/rpc.proto @@ -423,6 +423,7 @@ message CompileQueryReq { string types = 2; string pipeline = 3; string wasm_fns = 4; + string udfs = 5; } message CompileQueryResp { diff --git a/arroyo-sql/src/plan_graph.rs b/arroyo-sql/src/plan_graph.rs index 232a8a03a..1a92e70b6 100644 --- a/arroyo-sql/src/plan_graph.rs +++ b/arroyo-sql/src/plan_graph.rs @@ -1912,15 +1912,14 @@ pub fn get_program( ); let mut udfs = vec![]; - udfs.push(format!( - "mod udfs {{ use std::time::{{SystemTime, Duration}}; {} }}", + udfs.push( schema_provider .udf_defs .values() .map(|u| u.def.as_str()) .collect::>() - .join("\n\n") - )); + .join("\n\n"), + ); let graph: DiGraph = plan_graph.into(); diff --git a/build_dir/.gitignore b/build_dir/.gitignore index ae9f0d11c..4badd3a21 100644 --- a/build_dir/.gitignore +++ b/build_dir/.gitignore @@ -1,2 +1,4 @@ Cargo.lock wasm-fns/Cargo.lock +udfs/src/lib.rs +pipeline/src/main.rs diff --git a/build_dir/pipeline/Cargo.toml b/build_dir/pipeline/Cargo.toml index e603ce0e7..05e739ec1 100644 --- a/build_dir/pipeline/Cargo.toml +++ b/build_dir/pipeline/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] types = { path = "../types" } +udfs = { path = "../udfs" } petgraph = "0.6" bincode = "=2.0.0-rc.3" bincode_derive = "=2.0.0-rc.3" diff --git a/build_dir/pipeline/src/.gitignore b/build_dir/pipeline/src/.gitignore deleted file mode 100644 index 13bd6e195..000000000 --- a/build_dir/pipeline/src/.gitignore +++ /dev/null @@ -1 +0,0 @@ -main.rs diff --git a/build_dir/udfs/Cargo.toml b/build_dir/udfs/Cargo.toml index cbd0b4bbd..e714ac45c 100644 --- a/build_dir/udfs/Cargo.toml +++ b/build_dir/udfs/Cargo.toml @@ -3,16 +3,8 @@ name = "udfs" version = "1.0.0" edition = "2021" -[lib] -crate-type = ["cdylib"] - [dependencies] -types = { path = "../types" } chrono = "0.4" serde = "1.0" serde_json = "1.0" -arroyo-types = { path = "../../arroyo-types" } regex = "1" - -[package.metadata.wasm-pack.profile.release] -wasm-opt = false diff --git a/build_dir/udfs/src/lib.rs b/build_dir/udfs/src/lib.rs deleted file mode 100644 index 8baeb6a6d..000000000 --- a/build_dir/udfs/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -mod udfs; diff --git a/docker/build_base/pipeline/Cargo.toml b/docker/build_base/pipeline/Cargo.toml index 468c19a73..149887c4d 100644 --- a/docker/build_base/pipeline/Cargo.toml +++ b/docker/build_base/pipeline/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] types = { path = "../types" } +udfs = { path = "../udfs" } petgraph = "0.6" bincode = "=2.0.0-rc.3" bincode_derive = "=2.0.0-rc.3" @@ -16,5 +17,6 @@ arrow = { workspace = true} arrow-array = { workspace = true} arroyo-types = { path = "/opt/arroyo/src/arroyo-types" } arroyo-worker = { path = "/opt/arroyo/src/arroyo-worker", features = ["kafka-sasl"] } + [package.metadata.wasm-pack.profile.release] wasm-opt = false diff --git a/docker/build_base/udfs/Cargo.toml b/docker/build_base/udfs/Cargo.toml index 783f1e83e..e714ac45c 100644 --- a/docker/build_base/udfs/Cargo.toml +++ b/docker/build_base/udfs/Cargo.toml @@ -3,16 +3,8 @@ name = "udfs" version = "1.0.0" edition = "2021" -[lib] -crate-type = ["cdylib"] - [dependencies] -types = { path = "../types" } chrono = "0.4" serde = "1.0" serde_json = "1.0" -arroyo-types = { path = "/opt/arroyo/src/arroyo-types" } regex = "1" - -[package.metadata.wasm-pack.profile.release] -wasm-opt = false