Skip to content

Commit

Permalink
Use udfs crate for compilation in addition to checking
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Oct 24, 2023
1 parent d0e6e63 commit 94e5132
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 39 deletions.
9 changes: 2 additions & 7 deletions arroyo-compiler-service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,7 @@ impl CompileService {

tokio::fs::write(build_dir.join("wasm-fns/src/lib.rs"), &req.wasm_fns).await?;

// make sure udfs.rs exists
tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.open(build_dir.join("udfs/src/udfs.rs"))
.await?;
tokio::fs::write(build_dir.join("udfs/src/lib.rs"), &req.udfs).await?;

let result = self.get_output().await?;

Expand Down Expand Up @@ -281,7 +276,7 @@ impl CompilerGrpc for CompileService {
// write udf to build_dir udfs module
let build_dir = &self.build_dir;
tokio::fs::write(
build_dir.join("udfs/src/udfs.rs"),
build_dir.join("udfs/src/lib.rs"),
&request.into_inner().udfs_rs,
)
.await
Expand Down
44 changes: 34 additions & 10 deletions arroyo-controller/src/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl ProgramCompiler {
job_id: self.job_id.clone(),
types: self.compile_types().to_string(),
pipeline: self.compile_pipeline_main(&self.name, &self.program.get_hash()),
udfs: self.compile_udf_lib(),
wasm_fns: self.compile_wasm_lib().to_string(),
};

Expand Down Expand Up @@ -110,6 +111,7 @@ impl ProgramCompiler {
let types = self.compile_types().to_string();
let main = self.compile_pipeline_main(&self.name, &self.program.get_hash());
let wasm = self.compile_wasm_lib().to_string();
let udfs = self.compile_udf_lib();

let workspace_toml = r#"
[workspace]
Expand Down Expand Up @@ -166,13 +168,13 @@ edition = "2021"
[dependencies]
types = {{ path = "../types" }}
udfs = {{ path = "../udfs" }}
petgraph = "0.6"
chrono = "0.4"
bincode = "=2.0.0-rc.3"
bincode_derive = "=2.0.0-rc.3"
serde = "1.0"
serde_json = "1.0"
regex = "1"
arrow = {{ workspace = true }}
parquet = {{ workspace = true }}
arrow-array = {{ workspace = true }}
Expand All @@ -190,6 +192,21 @@ arroyo-worker = {{ path = "{}/arroyo-worker"{}}}
);
Self::create_subproject(&dir, "pipeline", &pipeline_toml, "main.rs", main).await?;

let udfs_toml = r#"
[package]
name = "udfs"
version = "1.0.0"
edition = "2021"
[dependencies]
chrono = "0.4"
serde = "1.0"
serde_json = "1.0"
regex = "1"
"#;

Self::create_subproject(&dir, "udfs", &udfs_toml, "lib.rs", udfs).await?;

let wasmfns_toml = format!(
r#"
[package]
Expand Down Expand Up @@ -320,12 +337,6 @@ wasm-opt = false
};
info!("{}", self.program.dot());

let udfs: Vec<TokenStream> = self
.program
.udfs
.iter()
.map(|t| parse_str(t).unwrap())
.collect();

let other_defs: Vec<TokenStream> = self
.program
Expand All @@ -336,7 +347,6 @@ wasm-opt = false

let make_graph_function = self.program.make_graph_function();

// TODO: move udfs to udfs crate
prettyplease::unparse(&parse_quote! {
#imports

Expand All @@ -348,12 +358,26 @@ wasm-opt = false
arroyo_worker::WorkerServer::new(#name, #hash, graph).start().unwrap();
}

#(#udfs )*

#(#other_defs )*
})
}

fn compile_udf_lib(&self) -> String {
let udfs: Vec<TokenStream> = self
.program
.udfs
.iter()
.map(|t| parse_str(t).unwrap())
.collect();

prettyplease::unparse(&parse_quote!{
use std::time::{SystemTime, Duration};
use std::str::FromStr;

#(#udfs )*
})
}

fn compile_wasm_lib(&self) -> TokenStream {
let wasm_fns: Vec<_> = self
.program
Expand Down
1 change: 1 addition & 0 deletions arroyo-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ message CompileQueryReq {
string types = 2;
string pipeline = 3;
string wasm_fns = 4;
string udfs = 5;
}

message CompileQueryResp {
Expand Down
6 changes: 2 additions & 4 deletions arroyo-sql/src/plan_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1912,15 +1912,13 @@ pub fn get_program(
);

let mut udfs = vec![];
udfs.push(format!(
"mod udfs {{ use std::time::{{SystemTime, Duration}}; {} }}",
schema_provider
udfs.push(schema_provider
.udf_defs
.values()
.map(|u| u.def.as_str())
.collect::<Vec<_>>()
.join("\n\n")
));
);

let graph: DiGraph<StreamNode, StreamEdge> = plan_graph.into();

Expand Down
2 changes: 2 additions & 0 deletions build_dir/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Cargo.lock
wasm-fns/Cargo.lock
udfs/src/lib.rs
pipeline/src/main.rs
1 change: 1 addition & 0 deletions build_dir/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
types = { path = "../types" }
udfs = { path = "../udfs" }
petgraph = "0.6"
bincode = "=2.0.0-rc.3"
bincode_derive = "=2.0.0-rc.3"
Expand Down
1 change: 0 additions & 1 deletion build_dir/pipeline/src/.gitignore

This file was deleted.

8 changes: 0 additions & 8 deletions build_dir/udfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,8 @@ name = "udfs"
version = "1.0.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

[dependencies]
types = { path = "../types" }
chrono = "0.4"
serde = "1.0"
serde_json = "1.0"
arroyo-types = { path = "../../arroyo-types" }
regex = "1"

[package.metadata.wasm-pack.profile.release]
wasm-opt = false
1 change: 0 additions & 1 deletion build_dir/udfs/src/lib.rs

This file was deleted.

2 changes: 2 additions & 0 deletions docker/build_base/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
types = { path = "../types" }
udfs = { path = "../udfs" }
petgraph = "0.6"
bincode = "=2.0.0-rc.3"
bincode_derive = "=2.0.0-rc.3"
Expand All @@ -16,5 +17,6 @@ arrow = { workspace = true}
arrow-array = { workspace = true}
arroyo-types = { path = "/opt/arroyo/src/arroyo-types" }
arroyo-worker = { path = "/opt/arroyo/src/arroyo-worker", features = ["kafka-sasl"] }

[package.metadata.wasm-pack.profile.release]
wasm-opt = false
8 changes: 0 additions & 8 deletions docker/build_base/udfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,8 @@ name = "udfs"
version = "1.0.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

[dependencies]
types = { path = "../types" }
chrono = "0.4"
serde = "1.0"
serde_json = "1.0"
arroyo-types = { path = "/opt/arroyo/src/arroyo-types" }
regex = "1"

[package.metadata.wasm-pack.profile.release]
wasm-opt = false

0 comments on commit 94e5132

Please sign in to comment.