Skip to content
This repository has been archived by the owner on Dec 12, 2024. It is now read-only.

Commit

Permalink
starts using PipesMetadataValue (#26)
Browse files Browse the repository at this point in the history
* starts using PipesMetadataValue

* simplify construction

* add changelog

* fix broken auto-merge

---------

Co-authored-by: Colton Padden <[email protected]>
  • Loading branch information
marijncv and cmpadden authored Dec 12, 2024
1 parent 76a4d37 commit 70c167a
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 35 deletions.
11 changes: 6 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
## [UNRELEASED]

### Added
- (pull/25) Use `cargo nextest` for GitHub CI jobs
- (pull/22) Added `Default` implementation for `PipesContextData`
- (pull/14) Derived `PartialEq` for all types generated by `quicktype`
- (pull/14) Fixed failing unit tests in `context_loader.rs`
- (pull/14) Renamed `ParamsLoader` and `ContextLoader` traits to `LoadParams` and `LoadContext` respectively
- (pull/20) Added `FileChannel`, `StreamChannel`, `BufferedStreamChannel` implementing `MessageWriterChannel`
- (pull/20) Added a simplified `PipesDefaultMessageWriter` implementing `MessageWriter`
- (pull/20) Defined `MessageWriter` and the associated `MessageWriterChannel` traits
- (pull/14) Derived `PartialEq` for all types generated by `quicktype`
- (pull/14) Renamed `ParamsLoader` and `ContextLoader` traits to `LoadParams` and `LoadContext` respectively
- (pull/14) Fixed failing unit tests in `context_loader.rs`
- (pull/22) Added `Default` implementation for `PipesContextData`
- (pull/25) Use `cargo nextest` for GitHub CI jobs
- (pull/26) Adds the use of `PipesMetadataValue` when logging asset/check materializations
- (pull/9) Handle errors for `ParamsLoader` and `ContextLoader` implementations

## 0.1.6
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
use dagster_pipes_rust::types::{PipesMetadataValue, RawValue, Type};
use dagster_pipes_rust::{open_dagster_pipes, AssetCheckSeverity, DagsterPipesError};
use serde_json::json;

use std::collections::HashMap;

fn main() -> Result<(), DagsterPipesError> {
let mut context = open_dagster_pipes()?;
// See supported metadata types here:
// https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/pipes/context.py#L133
let metadata = json!({"row_count": {"raw_value": 100, "type": "int"}});
context.report_asset_materialization("example_rust_subprocess_asset", metadata);

let asset_metadata = HashMap::from([(
"row_count".to_string(),
PipesMetadataValue::new(RawValue::Integer(100), Type::Int),
)]);
context.report_asset_materialization("example_rust_subprocess_asset", asset_metadata);

let check_metadata = HashMap::from([(
"quality".to_string(),
PipesMetadataValue {
raw_value: Some(RawValue::Integer(100)),
pipes_metadata_value_type: Some(Type::Int),
},
)]);
context.report_asset_check(
"example_rust_subprocess_check",
true,
"example_rust_subprocess_asset",
&AssetCheckSeverity::Warn,
json!({"quality": {"raw_value": 5, "type": "int"}}),
check_metadata,
);
Ok(())
}
26 changes: 11 additions & 15 deletions jsonschema/pipes/PipesMetadataValue.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,17 @@
]
},
"raw_value": {
"anyOf": [
{ "type": "integer" },
{ "type": "number" },
{ "type": "string" },
{
"type": "object",
"additionalProperties": true
},
{
"type": "array",
"items": {}
},
{ "type": "boolean" },
{ "type": "null" }
]
"type": [
"integer",
"number",
"string",
"object",
"array",
"boolean",
"null"
],
"additionalProperties": true,
"items": {}
}
}
}
243 changes: 235 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod context_loader;
mod params_loader;
mod types;
pub mod types;
mod types_ext;
mod writer;

Expand All @@ -18,11 +18,13 @@ use crate::context_loader::PayloadErrorKind;
use crate::params_loader::EnvVarLoader as PipesEnvVarParamsLoader;
pub use crate::params_loader::LoadParams;
use crate::params_loader::ParamsError;
pub use crate::types::{Method, PipesContextData, PipesMessage};
pub use crate::types::{Method, PipesContextData, PipesMessage, PipesMetadataValue};
use crate::writer::message_writer::get_opened_payload;
use crate::writer::message_writer::DefaultWriter as PipesDefaultMessageWriter;
pub use crate::writer::message_writer::MessageWriter;
pub use crate::writer::message_writer_channel::MessageWriterChannel;
pub use crate::writer::message_writer::{DefaultWriter, MessageWriter};
pub use crate::writer::message_writer_channel::{
DefaultChannel, FileChannel, MessageWriterChannel,
};

#[derive(Serialize)]
#[serde(rename_all = "UPPERCASE")]
Expand All @@ -31,6 +33,15 @@ pub enum AssetCheckSeverity {
Error,
}

impl PipesMetadataValue {
pub fn new(raw_value: types::RawValue, pipes_metadata_value_type: types::Type) -> Self {
Self {
raw_value: Some(raw_value),
pipes_metadata_value_type: Some(pipes_metadata_value_type),
}
}
}

// partial translation of
// https://github.com/dagster-io/dagster/blob/258d9ca0db/python_modules/dagster-pipes/dagster_pipes/__init__.py#L859-L871
#[derive(Debug)]
Expand Down Expand Up @@ -66,10 +77,14 @@ where
}
}

pub fn report_asset_materialization(&mut self, asset_key: &str, metadata: serde_json::Value) {
pub fn report_asset_materialization(
&mut self,
asset_key: &str,
metadata: HashMap<String, PipesMetadataValue>,
) {
let params: HashMap<String, Option<serde_json::Value>> = HashMap::from([
("asset_key".to_string(), Some(json!(asset_key))),
("metadata".to_string(), Some(metadata)),
("metadata".to_string(), Some(json!(metadata))),
("data_version".to_string(), None), // TODO - support data versions
]);

Expand All @@ -83,14 +98,14 @@ where
passed: bool,
asset_key: &str,
severity: &AssetCheckSeverity,
metadata: serde_json::Value,
metadata: HashMap<String, PipesMetadataValue>,
) {
let params: HashMap<String, Option<serde_json::Value>> = HashMap::from([
("asset_key".to_string(), Some(json!(asset_key))),
("check_name".to_string(), Some(json!(check_name))),
("passed".to_string(), Some(json!(passed))),
("severity".to_string(), Some(json!(severity))),
("metadata".to_string(), Some(metadata)),
("metadata".to_string(), Some(json!(metadata))),
]);

let msg = PipesMessage::new(Method::ReportAssetCheck, Some(params));
Expand Down Expand Up @@ -129,3 +144,215 @@ pub fn open_dagster_pipes() -> Result<PipesContext<PipesDefaultMessageWriter>, D
&message_writer,
))
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs;
use tempfile::NamedTempFile;

use super::*;

#[test]
fn test_write_pipes_metadata() {
let asset_metadata = HashMap::from([
(
"text".to_string(),
PipesMetadataValue::new(
types::RawValue::String("hello".to_string()),
types::Type::Text,
),
),
(
"url".to_string(),
PipesMetadataValue::new(
types::RawValue::String("http://someurl.com".to_string()),
types::Type::Url,
),
),
(
"path".to_string(),
PipesMetadataValue::new(
types::RawValue::String("file://some/path".to_string()),
types::Type::Path,
),
),
(
"notebook".to_string(),
PipesMetadataValue::new(
types::RawValue::String("notebook".to_string()),
types::Type::Notebook,
),
),
(
"json_object".to_string(),
PipesMetadataValue::new(
types::RawValue::AnythingMap(HashMap::from([(
"key".to_string(),
Some(json!("value")),
)])),
types::Type::Json,
),
),
(
"json_array".to_string(),
PipesMetadataValue::new(
types::RawValue::AnythingArray(vec![Some(json!({"key": "value"}))]),
types::Type::Json,
),
),
(
"md".to_string(),
PipesMetadataValue::new(
types::RawValue::String("## markdown".to_string()),
types::Type::Md,
),
),
(
"dagster_run".to_string(),
PipesMetadataValue::new(
types::RawValue::String("1234".to_string()),
types::Type::DagsterRun,
),
),
(
"asset".to_string(),
PipesMetadataValue::new(
types::RawValue::String("some_asset".to_string()),
types::Type::Asset,
),
),
(
"job".to_string(),
PipesMetadataValue::new(
types::RawValue::String("some_job".to_string()),
types::Type::Job,
),
),
(
"timestamp".to_string(),
PipesMetadataValue::new(
types::RawValue::String("2012-04-23T18:25:43.511Z".to_string()),
types::Type::Timestamp,
),
),
(
"int".to_string(),
PipesMetadataValue::new(types::RawValue::Integer(100), types::Type::Int),
),
(
"float".to_string(),
PipesMetadataValue::new(types::RawValue::Double(100.0), types::Type::Float),
),
(
"bool".to_string(),
PipesMetadataValue::new(types::RawValue::Bool(true), types::Type::Bool),
),
(
"none".to_string(),
PipesMetadataValue {
raw_value: None,
pipes_metadata_value_type: None,
},
),
]);

let file = NamedTempFile::new().unwrap();
let path = file.path().to_str().unwrap().to_string();

let mut context: PipesContext<DefaultWriter> = PipesContext {
message_channel: DefaultChannel::File(FileChannel::new(path.into())),
data: PipesContextData {
asset_keys: Some(vec!["asset1".to_string()]),
code_version_by_asset_key: None,
extras: None,
job_name: None,
partition_key: None,
partition_key_range: None,
partition_time_window: None,
provenance_by_asset_key: None,
retry_number: 0,
run_id: "012345".to_string(),
},
};
context.report_asset_materialization("asset1", asset_metadata);

assert_eq!(
serde_json::from_str::<PipesMessage>(&fs::read_to_string(file.path()).unwrap())
.unwrap(),
PipesMessage {
dagster_pipes_version: "0.1".to_string(),
method: Method::ReportAssetMaterialization,
params: Some(HashMap::from([
("asset_key".to_string(), Some(json!("asset1"))),
(
"metadata".to_string(),
Some(json!({
"text": {
"raw_value": "hello",
"type": "text"
},
"url": {
"raw_value": "http://someurl.com",
"type": "url"
},
"path": {
"raw_value": "file://some/path",
"type": "path"
},
"notebook": {
"raw_value": "notebook",
"type": "notebook"
},
"json_object": {
"raw_value": {"key": "value"},
"type": "json"
},
"json_array": {
"raw_value": [{"key": "value"}],
"type": "json"
},
"md": {
"raw_value": "## markdown",
"type": "md"
},
"dagster_run": {
"raw_value": "1234",
"type": "dagster_run"
},
"asset": {
"raw_value": "some_asset",
"type": "asset"
},
"job": {
"raw_value": "some_job",
"type": "job"
},
"timestamp": {
"raw_value": "2012-04-23T18:25:43.511Z",
"type": "timestamp"
},
"int": {
"raw_value": 100,
"type": "int"
},
"float": {
"raw_value": 100.0,
"type": "float"
},
"bool": {
"raw_value": true,
"type": "bool"
},
"none": {
"raw_value": null,
"type": null
}
}))
),
("data_version".to_string(), None),
])),
}
);
}
}
2 changes: 2 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,7 @@ pub enum RawValue {

Double(f64),

Integer(i64),

String(String),
}
2 changes: 1 addition & 1 deletion src/writer/message_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl MessageWriter for DefaultWriter {
const BUFFERED_STDIO_KEY: &str = "buffered_stdio";
const STDERR: &str = "stderr";
const STDOUT: &str = "stdout";
const INCLUDE_STDIO_IN_MESSAGES_KEY: &str = "include_stdio_in_messages";
//const INCLUDE_STDIO_IN_MESSAGES_KEY: &str = "include_stdio_in_messages";

match (
params.get(FILE_PATH_KEY),
Expand Down

0 comments on commit 70c167a

Please sign in to comment.