Skip to content

Commit

Permalink
Add dynamic output feature to support step runtime to define outputs …
Browse files Browse the repository at this point in the history
…within the business logic. (#86)
  • Loading branch information
jun-he authored Feb 7, 2025
1 parent be2101a commit fd3b451
Show file tree
Hide file tree
Showing 15 changed files with 731 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
@JsonSubTypes.Type(name = "SUBWORKFLOW", value = SubworkflowArtifact.class),
@JsonSubTypes.Type(name = "FOREACH", value = ForeachArtifact.class),
@JsonSubTypes.Type(name = "TITUS", value = TitusArtifact.class),
@JsonSubTypes.Type(name = "NOTEBOOK", value = NotebookArtifact.class)
@JsonSubTypes.Type(name = "NOTEBOOK", value = NotebookArtifact.class),
@JsonSubTypes.Type(name = "DYNAMIC_OUTPUT", value = DynamicOutputArtifact.class)
})
public interface Artifact {
/** Get artifact type info. */
Expand All @@ -45,7 +46,9 @@ enum Type {
/** titus artifact. */
TITUS(Constants.MAESTRO_PREFIX + "titus"),
/** notebook artifact. */
NOTEBOOK(Constants.MAESTRO_PREFIX + "notebook");
NOTEBOOK(Constants.MAESTRO_PREFIX + "notebook"),
/** dynamic output artifact. */
DYNAMIC_OUTPUT(Constants.MAESTRO_PREFIX + "dynamic_output");

private final String key;

Expand Down Expand Up @@ -102,4 +105,14 @@ default TitusArtifact asTitus() {
default NotebookArtifact asNotebook() {
throw new MaestroInternalError("Artifact type [%s] cannot be used as NOTEBOOK", getType());
}

/**
* get DynamicOutput type artifact.
*
* @return concrete artifact object.
*/
default DynamicOutputArtifact asDynamicOutput() {
throw new MaestroInternalError(
"Artifact type [%s] cannot be used as DYNAMIC_OUTPUT", getType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.netflix.maestro.models.artifact;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import com.netflix.maestro.annotations.Nullable;
import com.netflix.maestro.models.definition.StepOutputsDefinition;
import com.netflix.maestro.models.parameter.MapParameter;
import com.netflix.maestro.models.timeline.TimelineLogEvent;
import java.util.List;
import java.util.Map;
import javax.validation.Valid;
import lombok.Data;

/** Signals artifact to store dynamic signals generated at step runtime. */
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder(
value = {"outputs", "info"},
alphabetic = true)
@Data
public class DynamicOutputArtifact implements Artifact {
@Valid private Map<StepOutputsDefinition.StepOutputType, List<MapParameter>> outputs;
@Nullable private TimelineLogEvent info;

@JsonIgnore
@Override
public DynamicOutputArtifact asDynamicOutput() {
return this;
}

@Override
public Type getType() {
return Type.DYNAMIC_OUTPUT;
}

@JsonIgnore
public @Nullable List<MapParameter> getOutputSignals() {
if (outputs != null) {
return outputs.get(StepOutputsDefinition.StepOutputType.SIGNAL);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.netflix.maestro.models.artifact;

import static org.junit.Assert.assertEquals;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.netflix.maestro.MaestroBaseTest;
import com.netflix.maestro.models.definition.StepOutputsDefinition;
import java.util.Map;
import lombok.Data;
import org.junit.Test;

public class DynamicOutputArtifactTest extends MaestroBaseTest {
@Data
private static class Artifacts {
@JsonProperty("artifacts")
private Map<String, Artifact> artifacts;
}

@Test
public void testRoundTripSerde() throws Exception {
DynamicOutputArtifact request =
loadObject(
"fixtures/artifact/sample-dynamic-output-artifact.json", DynamicOutputArtifact.class);
assertEquals(
request, MAPPER.readValue(MAPPER.writeValueAsString(request), DynamicOutputArtifact.class));
}

@Test
public void testDynamicOutputArtifact() throws Exception {
Artifacts artifactMap = loadObject("fixtures/artifact/sample-artifacts.json", Artifacts.class);
DynamicOutputArtifact dynamicOutputArtifact =
artifactMap.getArtifacts().get(Artifact.Type.DYNAMIC_OUTPUT.key()).asDynamicOutput();
assertEquals(1, dynamicOutputArtifact.getOutputs().size());
assertEquals(
1,
dynamicOutputArtifact.getOutputs().get(StepOutputsDefinition.StepOutputType.SIGNAL).size());
assertEquals(
"demo_table",
dynamicOutputArtifact.getOutputSignals().getFirst().getEvaluatedResult().get("name"));
assertEquals(
1536787990L,
dynamicOutputArtifact.getOutputSignals().getFirst().getEvaluatedResult().get("timestamp"));
assertEquals("sample info log", dynamicOutputArtifact.getInfo().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,37 @@
}
},
"type": "FOREACH"
},
"maestro_dynamic_output": {
"outputs": {
"SIGNAL": [
{
"value": {
"name": {
"value": "demo_table",
"type": "STRING"
},
"timestamp": {
"value": 1536787990,
"type": "LONG"
}
},
"type": "MAP",
"evaluated_result": {
"name": "demo_table",
"timestamp": 1536787990
},
"evaluated_time": 1625871404000
}
]
},
"info": {
"timestamp": 1609272999666,
"type": "LOG",
"level": "INFO",
"message": "sample info log"
},
"type": "DYNAMIC_OUTPUT"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"outputs": {
"SIGNAL": [
{
"value": {
"name": {
"value": "demo_table",
"type": "STRING"
},
"timestamp": {
"value": 1536787990,
"type": "LONG"
}
},
"type": "MAP",
"evaluated_result": {
"name": "demo_table",
"timestamp": 1536787990
},
"evaluated_time": 1625871404000
}
]
},
"info": {
"timestamp": 1609272999666,
"type": "LOG",
"level": "INFO",
"message": "sample info log"
},
"type": "DYNAMIC_OUTPUT"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
{
"step_id": "job1",
"step_name": "step1",
"step_instance_id": 123,
"step_attempt_id": 1,
"step_instance_uuid": "bar",
"type": "subworkflow",
"step_run_params": {
"foo": {
"value": "bar",
"type": "STRING",
"mode": "mutable"
}
},
"params": {
"param1": {
"value": "foo",
"type": "STRING",
"evaluated_result": "foo",
"evaluated_time": 1608171805392
}
},
"step_retry": {
"error_retries": 0,
"error_retry_limit": 2,
"platform_retries": 0,
"platform_retry_limit": 10,
"manual_retries": 0,
"retryable": true
},
"synced": true,
"runtime_state": {
"status": "FINISHING",
"end_time": 1608171805401,
"modify_time": 1608171805401
},
"timeline": [
{
"timestamp": 1609272999666,
"type": "LOG",
"level": "INFO",
"message": "hello"
}
],
"pending_records": [
{
"event_time": 1608171805401,
"new_status": "FINISHING",
"old_status": "RUNNING"
}
],
"artifacts": {
"maestro_subworkflow": {
"subworkflow_id": "test-subwf",
"subworkflow_version_id": 1,
"subworkflow_instance_id": 1,
"subworkflow_run_id": 1,
"subworkflow_uuid": "foo-bar",
"subworkflow_overview": {
"step_overview": {
"SUCCEEDED": 1
},
"total_step_count": 1
},
"type": "SUBWORKFLOW"
},
"maestro_dynamic_output": {
"outputs": {
"SIGNAL": [
{
"value": {
"name": {
"value": "table_1",
"type": "STRING"
},
"timestamp": {
"value": 1536787990,
"type": "LONG"
},
"is_iceberg": {
"value": true,
"type": "BOOLEAN"
},
"nested_map": {
"type": "MAP",
"value": {
"nested_string_map": {
"type": "STRING_MAP",
"value": {
"foo": "bar"
}
},
"nested_string_array": {
"type": "STRING_ARRAY",
"value": [
"a",
"b",
"c"
]
},
"nested_long_array": {
"type": "LONG_ARRAY",
"value": [
1,
2,
3
]
},
"nested_double_array": {
"type": "DOUBLE_ARRAY",
"value": [
1.1,
2.2,
3.3
]
},
"nested_boolean_array": {
"type": "BOOLEAN_ARRAY",
"value": [
true,
false,
true
]
}
}
}
},
"type": "MAP",
"evaluated_result": {
"name": "table_1",
"timestamp": 1536787990,
"is_iceberg": true,
"nested_map": {
"nested_string_map": {
"foo": "bar"
},
"nested_string_array": [
"a",
"b",
"c"
],
"nested_long_array": [
1,
2,
3
],
"nested_double_array": [
1.1,
2.2,
3.3
],
"nested_boolean_array": [
true,
false,
true
]
}
},
"evaluated_time": 1625871404000
},
{
"value": {
"name": {
"value": "table_2",
"type": "STRING"
},
"timestamp": {
"value": 1536787990,
"type": "LONG"
}
},
"type": "MAP",
"evaluated_result": {
"name": "table_2",
"timestamp": 1536787990
},
"evaluated_time": 1625871404000
}
]
},
"type": "DYNAMIC_OUTPUT"
}
},
"dependencies": {
}
}
Loading

0 comments on commit fd3b451

Please sign in to comment.