diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 00a9fc747e..dddcd0e7c5 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -169,7 +169,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState := nCtx.NodeStateReader().GetArrayNodeState() currentArrayNodePhase := arrayNodeState.Phase - incrementTaskPhaseVersion := false + taskPhaseVersion := arrayNodeState.TaskPhaseVersion eventRecorder := newArrayEventRecorder(nCtx.EventsRecorder()) switch currentArrayNodePhase { @@ -246,7 +246,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu messageCollector := errorcollector.NewErrorMessageCollector() for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) - taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) // do not process nodes in terminal state if isTerminalNodePhase(nodePhase) { @@ -284,11 +283,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } arrayNodeState.SubNodeRetryAttempts.SetItem(i, uint64(subNodeStatus.GetAttempts())) arrayNodeState.SubNodeSystemFailures.SetItem(i, uint64(subNodeStatus.GetSystemFailures())) - - // increment task phase version if subNode phase or task phase changed - if subNodeStatus.GetPhase() != nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != taskPhase { - incrementTaskPhaseVersion = true - } } // process phases of subNodes to determine overall `ArrayNode` phase @@ -435,15 +429,17 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu taskPhase = idlcore.TaskExecution_FAILED } - // if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise - // increment it if we detect any changes in subNode state. + // need to increment taskPhaseVersion if arrayNodeState.Phase does not change, otherwise + // reset to 0. by incrementing this always we report an event and ensure processing + // every time the ArrayNode is evaluated. if this overhead becomes too large, we will need + // to revisit and only increment when any subNode state changes. if currentArrayNodePhase != arrayNodeState.Phase { arrayNodeState.TaskPhaseVersion = 0 - } else if incrementTaskPhaseVersion { - arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1 + } else { + arrayNodeState.TaskPhaseVersion = taskPhaseVersion + 1 } - if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil { + if err := eventRecorder.finalize(ctx, nCtx, taskPhase, taskPhaseVersion, a.eventConfig); err != nil { logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) return handler.UnknownTransition, err } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 23062a8cb3..8e96ee9645 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -753,7 +753,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "BindingResolutionFailure", err.Error(), nil), nil } - if nodeInputs != nil && len(nodeInputs.Literals) > 0 { + if nodeInputs != nil { inputsFile := v1alpha1.GetInputsFile(dataDir) if err := c.store.WriteProtobuf(ctx, inputsFile, storage.Options{}, nodeInputs); err != nil { c.metrics.InputsWriteFailure.Inc(ctx) diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index 913a517a0f..a37a4cdf6b 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -35,9 +35,6 @@ type cachedRawStore struct { // Head gets metadata about the reference. This should generally be a lightweight operation. func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Metadata, error) { - ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/Head") - defer span.End() - key := []byte(reference) if oRaw, err := s.cache.Get(key); err == nil { s.metrics.CacheHit.Inc()