diff --git a/.gitignore b/.gitignore index d670fbc..781b743 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ __pycache__/ *.py[cod] *$py.class *.egg-info/ + +.vscode \ No newline at end of file diff --git a/example-dagster-pipes-rust-project/example_dagster_pipes_rust_project_tests/test_subprocess.py b/example-dagster-pipes-rust-project/example_dagster_pipes_rust_project_tests/test_subprocess.py new file mode 100644 index 0000000..fcad8da --- /dev/null +++ b/example-dagster-pipes-rust-project/example_dagster_pipes_rust_project_tests/test_subprocess.py @@ -0,0 +1,51 @@ +import shutil + +from dagster import ( + asset, + AssetCheckSpec, + AssetExecutionContext, + file_relative_path, + materialize_to_memory, + MaterializeResult, + PipesSubprocessClient, + PipesEnvContextInjector, +) + + +def test_example_rust_subprocess_asset(): + @asset( + check_specs=[ + AssetCheckSpec( + name="example_rust_subprocess_check", + asset="example_rust_subprocess_asset", + ) + ], + ) + def example_rust_subprocess_asset( + context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient + ) -> MaterializeResult: + """Demonstrates running Rust binary in a subprocess.""" + cmd = [shutil.which("cargo"), "run"] + cwd = file_relative_path(__file__, "../rust_processing_jobs") + return pipes_subprocess_client.run( + command=cmd, + cwd=cwd, + context=context, + ).get_materialize_result() + + result = materialize_to_memory( + assets=[example_rust_subprocess_asset], + resources={ + "pipes_subprocess_client": PipesSubprocessClient( + context_injector=PipesEnvContextInjector(), + ) + }, + ) + assert result.success + assert result.get_asset_materialization_events()[0].asset_key.path == [ + "example_rust_subprocess_asset" + ] + assert ( + result.get_asset_check_evaluations()[0].asset_check_key.name + == "example_rust_subprocess_check" + ) diff --git a/example-dagster-pipes-rust-project/rust_processing_jobs/src/main.rs b/example-dagster-pipes-rust-project/rust_processing_jobs/src/main.rs index 2fb7afb..a02df7e 100644 --- a/example-dagster-pipes-rust-project/rust_processing_jobs/src/main.rs +++ b/example-dagster-pipes-rust-project/rust_processing_jobs/src/main.rs @@ -11,7 +11,7 @@ fn main() -> Result<(), DagsterPipesError> { "example_rust_subprocess_check", true, "example_rust_subprocess_asset", - AssetCheckSeverity::Warn, + &AssetCheckSeverity::Warn, json!({"quality": {"raw_value": 5, "type": "int"}}), ); Ok(())