Skip to content

Commit

Permalink
Jun/distributed flow (#83)
Browse files Browse the repository at this point in the history
* Improve maestro engine to spread the load based on the workflow instance identity.

It helps to handle super large workflow (e.g. millions of jobs over foreach).

It also enables a lightweight approach to identify the group ids during wake up.

* Implement REST based FlowOperation and improve flow engine to support wake up all tasks.
  • Loading branch information
jun-he authored Feb 4, 2025
1 parent 0f21e98 commit 99d8099
Show file tree
Hide file tree
Showing 47 changed files with 1,033 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ private Constants() {}
public static final String WORKFLOW_RUNTIME_SUMMARY_FIELD =
MAESTRO_PREFIX + "workflow_runtime_summary";

/** Maestro step definition field name. */
public static final String STEP_DEFINITION_FIELD = MAESTRO_PREFIX + "step_definition";

/** Maestro step instance runtime summary field name. */
public static final String STEP_RUNTIME_SUMMARY_FIELD = MAESTRO_PREFIX + "step_runtime_summary";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"correlation_id",
"step_instance_id",
"workflow_version_id",
"group_id",
"max_group_num",
"owner",
"definition",
"tags",
Expand Down Expand Up @@ -96,7 +96,8 @@ public class StepInstance {
@Min(1)
private long workflowVersionId; // version id of baseline workflow

@NotNull private Long groupId; // the group id to group a set of root workflows together
@Min(1)
private long maxGroupNum; // used to decide the group id for this specific instance tree

// required owner from workflow instance properties snapshot.
@Valid @NotNull private User owner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"workflow_uuid",
"correlation_id",
"workflow_version_id",
"group_id",
"max_group_num",
"internal_id",
"execution_id",
"run_config",
Expand Down Expand Up @@ -88,7 +88,8 @@ public class WorkflowInstance {
@Min(1)
private long workflowVersionId; // version id of baseline workflow

private Long groupId; // the group id to group a set of root workflows together
@Min(1)
private long maxGroupNum; // used to decide the group id for this specific instance tree

private String executionId; // internal execution id to identify this workflow run

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.netflix.maestro.annotations.Nullable;
import com.netflix.maestro.models.Constants;
import com.netflix.maestro.models.definition.Workflow;
import com.netflix.maestro.models.instance.WorkflowInstance;
import com.netflix.maestro.models.trigger.TriggerUuids;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
Expand Down Expand Up @@ -120,4 +121,24 @@ private static String encodeBase62(long value, boolean isOrdered) {
}
return sb.toString();
}

/**
* Return the group id for given workflow instance. It should be deterministically and evenly
* distributed.
*/
public static long deriveGroupId(WorkflowInstance instance) {
return deriveGroupId(instance.getIdentity(), instance.getMaxGroupNum());
}

public static long deriveGroupId(String groupingKey, long maxGroupNum) {
if (maxGroupNum <= 0) { // default groupId to 0 for backward compatibility
return 0;
}
long ret = groupingKey.hashCode() % maxGroupNum;
if (ret < 0) {
return ret + maxGroupNum;
} else {
return ret;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.maestro.utils;

import com.netflix.maestro.models.definition.Workflow;
import com.netflix.maestro.models.instance.WorkflowInstance;
import com.netflix.maestro.models.trigger.CronTimeTrigger;
import com.netflix.maestro.models.trigger.SignalTrigger;
import com.netflix.maestro.models.trigger.TriggerUuids;
Expand Down Expand Up @@ -71,4 +72,20 @@ public void testLargeRangeKey() {
Assert.assertEquals("BAzL8n0Y58m7", IdHelper.rangeKey(Long.MAX_VALUE));
Assert.assertTrue(IdHelper.rangeKey(1000000L).compareTo(IdHelper.rangeKey(Long.MAX_VALUE)) < 0);
}

@Test
public void testDeriveGroupId() {
WorkflowInstance instance = new WorkflowInstance();
instance.setWorkflowId("sample-dag-test-1");
instance.setWorkflowInstanceId(12);
instance.setWorkflowRunId(2);
Assert.assertEquals(0, IdHelper.deriveGroupId(instance));

instance.setMaxGroupNum(10);
Assert.assertEquals(5, IdHelper.deriveGroupId(instance));

Assert.assertEquals(0, IdHelper.deriveGroupId("test-key", 3));
Assert.assertTrue("negative-test".hashCode() < 0);
Assert.assertEquals(2, IdHelper.deriveGroupId("negative-test", 3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"correlation_id": "sample-dag-test-3-1-1",
"step_instance_id": 1,
"workflow_version_id": 1,
"max_group_num": 8,
"owner": "tester",
"definition": {
"step": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"workflow_id": "sample-dag-test-3",
"internal_id": 12345,
"group_id": 2,
"max_group_num": 3,
"workflow_instance_id": 1,
"workflow_run_id": 1,
"workflow_uuid": "8a0bd56f-745f-4a2c-b81b-1b2f89127e73",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"workflow_id": "sample-dag-test-3",
"workflow_instance_id": 1,
"workflow_run_id": 1,
"group_id": 3,
"max_group_num": 3,
"step_id": "job1",
"step_attempt_id": 1,
"step_status": "RUNNING",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"workflow_name": "Test workflow 01",
"workflow_instance_id": 1,
"workflow_run_id": 1,
"max_group_num": 5,
"initiator": {"type": "MANUAL"},
"criticality": "CRITICAL",
"run_policy": "START_FRESH_NEW_RUN",
Expand Down Expand Up @@ -456,5 +457,22 @@
"job.2"
]
}
},
"step_map": {
"job1": {"step": {
"id": "job1", "type": "Sleep", "params": {
"sleep_seconds": {"value": 15, "type": "LONG"},
"cpu": {"value": 2, "type": "DOUBLE"},
"memory": {"value": "10.229999999999", "type": "DOUBLE"},
"monitor": {"value": 1.00000000000001, "type": "DOUBLE"}
}}},
"job.2": {"step": {
"id": "job.2", "type": "NoOp", "params": {
"param1": {"value": [15, 16], "type": "DOUBLE_ARRAY"},
"param2": {"value": ["12.3", "45"], "type": "DOUBLE_ARRAY"},
"param3": {"value": ["10.229999999999", 1.00000000000001], "type": "DOUBLE_ARRAY"},
"param4": {"expression": "return new double[]{1.2, 3.45};", "type": "DOUBLE_ARRAY"},
"param5": {"expression": "return new double[]{1.229999999999, 3.00000000000001};", "type": "DOUBLE_ARRAY"}
}}}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ public void terminate(
stepInstance.setWorkflowInstanceId(summary.getWorkflowInstanceId());
stepInstance.setWorkflowRunId(summary.getWorkflowRunId());
stepInstance.setStepId(stepId);
stepInstance.setGroupId(summary.getGroupId());
stepInstance.setMaxGroupNum(summary.getMaxGroupNum());
StepAction stepAction = StepAction.createTerminate(action, stepInstance, user, reason, false);

upsertActions(
Expand Down Expand Up @@ -668,7 +668,7 @@ public int terminate(
stepInstanceAction, stepInstance, user, reason, true);
return toJson(stepAction);
})
.collect(Collectors.toList());
.toList();

// batch upsert them into DB.
String workflowIdentity = instance.getIdentity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class RunRequest {

@Nullable private final List<Tag> runtimeTags;
@Nullable private final String correlationId;
@Nullable private final Long groupId;
@Nullable private final Long maxGroupNum;
@Nullable private final Long instanceStepConcurrency; // null means unset and disabled
@Nullable private final Map<String, ParamDefinition> runParams;
@Nullable private final Map<String, Map<String, ParamDefinition>> stepRunParams;
Expand All @@ -71,7 +71,7 @@ public static RestartConfig.RestartNode getCurrentNode(RestartConfig config) {
Checks.checkTrue(
config != null && !ObjectHelper.isCollectionEmptyOrNull(config.getRestartPath()),
"Cannot get restart info in empty restart configuration");
return config.getRestartPath().get(config.getRestartPath().size() - 1);
return config.getRestartPath().getLast();
}

/** Static util method to extract the second to the last node from the restart path. */
Expand Down Expand Up @@ -121,7 +121,7 @@ public void updateForDownstreamIfNeeded(String currentStepId, WorkflowInstance t
if (restartConfig != null) {
// still along restart path and not reach the downstream
if (currentStepId.equals(getRestartStepId()) && restartConfig.getRestartPath().size() > 1) {
restartConfig.getRestartPath().remove(restartConfig.getRestartPath().size() - 1);
restartConfig.getRestartPath().removeLast();
validateIdentity(toRestart);
if (restartConfig.getRestartPath().size() == 1) {
this.currentPolicy = restartConfig.getRestartPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.maestro.models.Constants;
import com.netflix.maestro.models.Defaults;
import com.netflix.maestro.models.definition.Criticality;
import com.netflix.maestro.models.definition.Step;
import com.netflix.maestro.models.definition.StepTransition;
import com.netflix.maestro.models.definition.Tag;
import com.netflix.maestro.models.definition.TagList;
Expand Down Expand Up @@ -55,7 +56,7 @@
"workflow_instance_id",
"workflow_run_id",
"correlation_id",
"group_id",
"max_group_num",
"creation_time",
"workflow_uuid",
"run_policy",
Expand All @@ -65,7 +66,7 @@
"params",
"tags",
"runtime_dag",
"dag",
"step_map",
"criticality",
"instance_step_concurrency"
},
Expand All @@ -82,7 +83,7 @@ public class WorkflowSummary {
private long workflowInstanceId;
private long workflowRunId;
private String correlationId;
private Long groupId;
private long maxGroupNum;
private Long creationTime;
@NotNull private String workflowUuid;

Expand All @@ -106,7 +107,7 @@ public void setParams(Map<String, Parameter> input) {
@Valid @TagListConstraint private TagList tags;

private Map<String, StepTransition> runtimeDag; // actual dag used.
private Map<String, StepTransition> dag; // full dag.
private Map<String, Step> stepMap; // all steps in the full dag.

@Nullable private Criticality criticality;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void addExtraTasksAndInput(Flow flow, Task task) {
}

if (!summary.isFreshRun() && summary.getRunPolicy() != RunPolicy.RESTART_FROM_BEGINNING) {
Set<String> stepIds = new HashSet<>(summary.getDag().keySet());
Set<String> stepIds = new HashSet<>(summary.getStepMap().keySet());
flow.getFlowDef().getTasks().stream()
.flatMap(tasks -> tasks.stream().map(TaskDef::taskReferenceName))
.forEach(stepIds::remove);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@

import com.netflix.maestro.engine.transformation.WorkflowTranslator;
import com.netflix.maestro.engine.utils.WorkflowHelper;
import com.netflix.maestro.flow.engine.FlowExecutor;
import com.netflix.maestro.flow.runtime.FlowOperation;
import com.netflix.maestro.models.Constants;
import com.netflix.maestro.models.instance.WorkflowInstance;
import com.netflix.maestro.utils.IdHelper;
import java.util.Collections;
import lombok.AllArgsConstructor;

/** Workflow runner to run a maestro workflow in the internal flow engine. */
@AllArgsConstructor
public class WorkflowRunner {
private final FlowExecutor flowExecutor;
private final FlowOperation flowOperation;
private final WorkflowTranslator translator;
private final WorkflowHelper workflowHelper;

Expand All @@ -34,8 +35,8 @@ public class WorkflowRunner {
* @return UUID of the internal flow instance
*/
public String start(WorkflowInstance instance) {
return flowExecutor.startFlow(
instance.getGroupId(),
return flowOperation.startFlow(
IdHelper.deriveGroupId(instance),
instance.getWorkflowUuid(),
instance.getIdentity(),
translator.translate(instance),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"workflow_id",
"workflow_instance_id",
"workflow_run_id",
"group_id",
"max_group_num",
"step_id",
"step_attempt_id",
"step_status",
Expand All @@ -55,7 +55,7 @@ public class StepInstanceWakeUpEvent implements MaestroJobEvent {
@Min(1)
private long workflowRunId;

private Long groupId; // internal flow group id to identify the event destination
private long maxGroupNum; // used to decide the group id to identify the event destination

@Nullable private String stepId;
@Nullable private String stepAttemptId;
Expand Down Expand Up @@ -93,7 +93,7 @@ public static StepInstanceWakeUpEvent create(StepInstance stepInstance, StepActi
event.stepId = action.getStepId();
event.stepAction = action.getAction();
event.entityType = EntityType.STEP;
event.groupId = stepInstance.getGroupId();
event.maxGroupNum = stepInstance.getMaxGroupNum();

if (stepInstance.getStepAttemptId() > 0) {
event.stepAttemptId = String.valueOf(stepInstance.getStepAttemptId());
Expand Down Expand Up @@ -123,7 +123,7 @@ public static StepInstanceWakeUpEvent create(
event.workflowId = instance.getWorkflowId();
event.workflowInstanceId = instance.getWorkflowInstanceId();
event.workflowRunId = instance.getWorkflowRunId();
event.groupId = instance.getGroupId();
event.maxGroupNum = instance.getMaxGroupNum();
event.workflowAction = action;
event.entityType = EntityType.WORKFLOW;
return event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.maestro.exceptions.MaestroRetryableError;
import com.netflix.maestro.flow.dao.MaestroFlowDao;
import com.netflix.maestro.models.instance.WorkflowInstance;
import com.netflix.maestro.utils.IdHelper;
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void process(Supplier<RunWorkflowInstancesJobEvent> runWorkflowInstancesJ
if (instance.getStatus() == WorkflowInstance.Status.CREATED) {
if (instanceRunUuid.getUuid().equals(instance.getWorkflowUuid())) {
if (!flowDao.existFlowWithSameKeys(
instance.getGroupId(), instance.getWorkflowUuid())) {
IdHelper.deriveGroupId(instance), instance.getWorkflowUuid())) {
String executionId = runWorkflowInstance(instance);
LOG.info(
"Run a workflow instance {} with an internal execution_id [{}]",
Expand Down
Loading

0 comments on commit 99d8099

Please sign in to comment.