Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use udfs crate for compilation in addition to checking #382

Merged
merged 3 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions arroyo-compiler-service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,7 @@ impl CompileService {

tokio::fs::write(build_dir.join("wasm-fns/src/lib.rs"), &req.wasm_fns).await?;

// make sure udfs.rs exists
tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.open(build_dir.join("udfs/src/udfs.rs"))
.await?;
tokio::fs::write(build_dir.join("udfs/src/lib.rs"), &req.udfs).await?;

let result = self.get_output().await?;

Expand Down Expand Up @@ -281,7 +276,7 @@ impl CompilerGrpc for CompileService {
// write udf to build_dir udfs module
let build_dir = &self.build_dir;
tokio::fs::write(
build_dir.join("udfs/src/udfs.rs"),
build_dir.join("udfs/src/lib.rs"),
&request.into_inner().udfs_rs,
)
.await
Expand Down
45 changes: 34 additions & 11 deletions arroyo-controller/src/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl ProgramCompiler {
job_id: self.job_id.clone(),
types: self.compile_types().to_string(),
pipeline: self.compile_pipeline_main(&self.name, &self.program.get_hash()),
udfs: self.compile_udf_lib(),
wasm_fns: self.compile_wasm_lib().to_string(),
};

Expand Down Expand Up @@ -110,6 +111,7 @@ impl ProgramCompiler {
let types = self.compile_types().to_string();
let main = self.compile_pipeline_main(&self.name, &self.program.get_hash());
let wasm = self.compile_wasm_lib().to_string();
let udfs = self.compile_udf_lib();

let workspace_toml = r#"
[workspace]
Expand Down Expand Up @@ -166,13 +168,13 @@ edition = "2021"

[dependencies]
types = {{ path = "../types" }}
udfs = {{ path = "../udfs" }}
petgraph = "0.6"
chrono = "0.4"
bincode = "=2.0.0-rc.3"
bincode_derive = "=2.0.0-rc.3"
serde = "1.0"
serde_json = "1.0"
regex = "1"
arrow = {{ workspace = true }}
parquet = {{ workspace = true }}
arrow-array = {{ workspace = true }}
Expand All @@ -190,6 +192,21 @@ arroyo-worker = {{ path = "{}/arroyo-worker"{}}}
);
Self::create_subproject(&dir, "pipeline", &pipeline_toml, "main.rs", main).await?;

let udfs_toml = r#"
[package]
name = "udfs"
version = "1.0.0"
edition = "2021"

[dependencies]
chrono = "0.4"
serde = "1.0"
serde_json = "1.0"
regex = "1"
"#;

Self::create_subproject(&dir, "udfs", &udfs_toml, "lib.rs", udfs).await?;

let wasmfns_toml = format!(
r#"
[package]
Expand Down Expand Up @@ -320,13 +337,6 @@ wasm-opt = false
};
info!("{}", self.program.dot());

let udfs: Vec<TokenStream> = self
.program
.udfs
.iter()
.map(|t| parse_str(t).unwrap())
.collect();

let other_defs: Vec<TokenStream> = self
.program
.other_defs
Expand All @@ -336,7 +346,6 @@ wasm-opt = false

let make_graph_function = self.program.make_graph_function();

// TODO: move udfs to udfs crate
prettyplease::unparse(&parse_quote! {
#imports

Expand All @@ -348,12 +357,26 @@ wasm-opt = false
arroyo_worker::WorkerServer::new(#name, #hash, graph).start().unwrap();
}

#(#udfs )*

#(#other_defs )*
})
}

fn compile_udf_lib(&self) -> String {
let udfs: Vec<TokenStream> = self
.program
.udfs
.iter()
.map(|t| parse_str(t).unwrap())
.collect();

prettyplease::unparse(&parse_quote! {
use std::time::{SystemTime, Duration};
use std::str::FromStr;

#(#udfs )*
})
}

fn compile_wasm_lib(&self) -> TokenStream {
let wasm_fns: Vec<_> = self
.program
Expand Down
1 change: 1 addition & 0 deletions arroyo-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ message CompileQueryReq {
string types = 2;
string pipeline = 3;
string wasm_fns = 4;
string udfs = 5;
}

message CompileQueryResp {
Expand Down
4 changes: 3 additions & 1 deletion arroyo-sql-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ fn get_pipeline_module(

#function

#(#udfs)*
mod udfs {
#(#udfs)*
}

#(#other_defs)*
}
Expand Down
7 changes: 3 additions & 4 deletions arroyo-sql/src/plan_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1912,15 +1912,14 @@ pub fn get_program(
);

let mut udfs = vec![];
udfs.push(format!(
"mod udfs {{ use std::time::{{SystemTime, Duration}}; {} }}",
udfs.push(
schema_provider
.udf_defs
.values()
.map(|u| u.def.as_str())
.collect::<Vec<_>>()
.join("\n\n")
));
.join("\n\n"),
);

let graph: DiGraph<StreamNode, StreamEdge> = plan_graph.into();

Expand Down
2 changes: 2 additions & 0 deletions build_dir/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Cargo.lock
wasm-fns/Cargo.lock
udfs/src/lib.rs
pipeline/src/main.rs
1 change: 1 addition & 0 deletions build_dir/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
types = { path = "../types" }
udfs = { path = "../udfs" }
petgraph = "0.6"
bincode = "=2.0.0-rc.3"
bincode_derive = "=2.0.0-rc.3"
Expand Down
1 change: 0 additions & 1 deletion build_dir/pipeline/src/.gitignore

This file was deleted.

Empty file.
8 changes: 0 additions & 8 deletions build_dir/udfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,8 @@ name = "udfs"
version = "1.0.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

[dependencies]
types = { path = "../types" }
chrono = "0.4"
serde = "1.0"
serde_json = "1.0"
arroyo-types = { path = "../../arroyo-types" }
regex = "1"

[package.metadata.wasm-pack.profile.release]
wasm-opt = false
Empty file added build_dir/udfs/src/.placeholder
Empty file.
1 change: 0 additions & 1 deletion build_dir/udfs/src/lib.rs

This file was deleted.

2 changes: 2 additions & 0 deletions docker/build_base/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
types = { path = "../types" }
udfs = { path = "../udfs" }
petgraph = "0.6"
bincode = "=2.0.0-rc.3"
bincode_derive = "=2.0.0-rc.3"
Expand All @@ -16,5 +17,6 @@ arrow = { workspace = true}
arrow-array = { workspace = true}
arroyo-types = { path = "/opt/arroyo/src/arroyo-types" }
arroyo-worker = { path = "/opt/arroyo/src/arroyo-worker", features = ["kafka-sasl"] }

[package.metadata.wasm-pack.profile.release]
wasm-opt = false
8 changes: 0 additions & 8 deletions docker/build_base/udfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,8 @@ name = "udfs"
version = "1.0.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

[dependencies]
types = { path = "../types" }
chrono = "0.4"
serde = "1.0"
serde_json = "1.0"
arroyo-types = { path = "/opt/arroyo/src/arroyo-types" }
regex = "1"

[package.metadata.wasm-pack.profile.release]
wasm-opt = false
Loading