Skip to content
This repository has been archived by the owner on Dec 12, 2024. It is now read-only.

Add Default implementation for PipesContextData #22

Merged
merged 7 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Added
- (pull/25) Use `cargo nextest` for GitHub CI jobs
- (pull/22) Added `Default` implementation for `PipesContextData`
- (pull/20) Added `FileChannel`, `StreamChannel`, `BufferedStreamChannel` implementing `MessageWriterChannel`
- (pull/20) Added a simplified `PipesDefaultMessageWriter` implementing `MessageWriter`
- (pull/20) Defined `MessageWriter` and the associated `MessageWriterChannel` traits
Expand Down
28 changes: 5 additions & 23 deletions src/context_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,12 @@ mod tests {
default_context_loader.load_context(params.clone()).unwrap(),
PipesContextData {
asset_keys: Some(vec!["asset1".to_string(), "asset2".to_string()]),
code_version_by_asset_key: None,
extras: Some(HashMap::from([(
"key".to_string(),
Some(Value::String("value".to_string()))
)])),
job_name: None,
partition_key: None,
partition_key_range: None,
partition_time_window: None,
provenance_by_asset_key: None,
retry_number: 0,
run_id: "012345".to_string(),
..PipesContextData::default()
}
);
}
Expand Down Expand Up @@ -157,18 +151,12 @@ mod tests {
default_context_loader.load_context(params.clone()).unwrap(),
PipesContextData {
asset_keys: Some(vec!["asset1".to_string(), "asset2".to_string()]),
code_version_by_asset_key: None,
extras: Some(HashMap::from([(
"key".to_string(),
Some(Value::String("value".to_string()))
)])),
job_name: None,
partition_key: None,
partition_key_range: None,
partition_time_window: None,
provenance_by_asset_key: None,
retry_number: 0,
run_id: "012345".to_string(),
..PipesContextData::default()
}
);
}
Expand All @@ -182,9 +170,9 @@ mod tests {
r#"
{
"asset_keys": ["asset_from_path"],
"run_id": "id_from_path",
"extras": {"key_from_path": "value_from_path"},
"retry_number": 0
"retry_number": 0,
"run_id": "id_from_path"
}"#
.as_bytes(),
)
Expand All @@ -196,18 +184,12 @@ mod tests {
default_context_loader.load_context(params.clone()).unwrap(),
PipesContextData {
asset_keys: Some(vec!["asset_from_path".to_string()]),
code_version_by_asset_key: None,
extras: Some(HashMap::from([(
"key_from_path".to_string(),
Some(Value::String("value_from_path".to_string()))
)])),
job_name: None,
partition_key: None,
partition_key_range: None,
partition_time_window: None,
provenance_by_asset_key: None,
retry_number: 0,
run_id: "id_from_path".to_string(),
..PipesContextData::default()
}
);
}
Expand Down
4 changes: 2 additions & 2 deletions src/params_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ const DAGSTER_PIPES_CONTEXT_ENV_VAR: &str = "DAGSTER_PIPES_CONTEXT";
const DAGSTER_PIPES_MESSAGES_ENV_VAR: &str = "DAGSTER_PIPES_MESSAGES";

/// Load params passed from the orchestration process by the context injector and
/// message reader. These params are used to respectively bootstrap the
/// [`PipesContextLoader`] and [`MessageWriter`](crate::MessageWriter).
/// message reader. These params are used to respectively bootstrap
/// [`LoadContext`](crate::LoadContext) and [`PipesMessageWriter`].
pub trait LoadParams {
/// Whether or not this process has been provided with provided with information
/// to create a `PipesContext` or should instead return a mock.
Expand Down
25 changes: 24 additions & 1 deletion src/types_ext.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@
use std::collections::HashMap;
//! Module containing extension implementations for the types auto-generated by `quicktype` in `types.rs`.
//! The implementations are included here because:
//! 1. `quicktype` does not support these additional traits
//! 2. Manual changes made in `types.rs` will be overwritten by future calls to `quicktype.sh`.
#![allow(clippy::derivable_impls)]
use crate::PipesContextData;

use crate::{Method, PipesMessage};
use std::collections::HashMap;

impl Default for PipesContextData {
fn default() -> Self {
Self {
asset_keys: None,
code_version_by_asset_key: None,
extras: None,
job_name: None,
partition_key: None,
partition_key_range: None,
partition_time_window: None,
provenance_by_asset_key: None,
retry_number: 0,
run_id: String::new(),
}
}
}

impl PipesMessage {
pub fn new(method: Method, params: Option<HashMap<String, Option<serde_json::Value>>>) -> Self {
Expand Down
Loading