Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus: Fix link not being attached to a workflow started via a Handler function #1659

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (o *workflowRunOperation[I, O]) Start(
// Prevent the test env client from panicking when we try to use it from a workflow run operation.
ctx = context.WithValue(ctx, internal.IsWorkflowRunOpContextKey, true)

nctx, ok := internal.NexusOperationContextFromGoContext(ctx)
_, ok := internal.NexusOperationContextFromGoContext(ctx)
if !ok {
return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error")
}
Expand All @@ -221,7 +221,10 @@ func (o *workflowRunOperation[I, O]) Start(
if err != nil {
return nil, err
}
return &nexus.HandlerStartOperationResultAsync{OperationID: handle.ID()}, nil
return &nexus.HandlerStartOperationResultAsync{
OperationID: handle.ID(),
Links: []nexus.Link{handle.Link()},
}, nil
}

wfOpts, err := o.options.GetOptions(ctx, input, options)
Expand All @@ -234,22 +237,9 @@ func (o *workflowRunOperation[I, O]) Start(
return nil, err
}

// Create the link information about the new workflow and return to the caller.
link := &common.Link_WorkflowEvent{
Namespace: nctx.Namespace,
WorkflowId: handle.ID(),
RunId: handle.RunID(),
Reference: &common.Link_WorkflowEvent_EventRef{
EventRef: &common.Link_WorkflowEvent_EventReference{
EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
},
},
}
nexusLink := ConvertLinkWorkflowEventToNexusLink(link)

return &nexus.HandlerStartOperationResultAsync{
OperationID: handle.ID(),
Links: []nexus.Link{nexusLink},
Links: []nexus.Link{handle.Link()},
}, nil
}

Expand All @@ -262,11 +252,14 @@ type WorkflowHandle[T any] interface {
ID() string
// ID is the workflow's run ID.
RunID() string
// Link to the WorkflowExecutionStarted event of the workflow represented by this handle.
Link() nexus.Link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm why does this need to be on the interface? Are users expected to use this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't but I don't think it hurts.

Another approach would be to put this on the struct implementation and have an unexported method on the interface to ensure users don't try to implement it. This is important since the handler's return value is the interface.

}

type workflowHandle[T any] struct {
id string
runID string
namespace string
id string
runID string
}

func (h workflowHandle[T]) ID() string {
Expand All @@ -277,6 +270,22 @@ func (h workflowHandle[T]) RunID() string {
return h.runID
}

func (h workflowHandle[T]) Link() nexus.Link {
// Create the link information about the new workflow and return to the caller.
link := &common.Link_WorkflowEvent{
Namespace: h.namespace,
WorkflowId: h.ID(),
RunId: h.RunID(),
Reference: &common.Link_WorkflowEvent_EventRef{
EventRef: &common.Link_WorkflowEvent_EventReference{
EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
},
},
}
return ConvertLinkWorkflowEventToNexusLink(link)

}

// ExecuteWorkflow starts a workflow run for a [WorkflowRunOperationOptions] Handler, linking the execution chain to a
// Nexus operation (subsequent runs started from continue-as-new and retries).
// Automatically propagates the callback and request ID from the nexus options to the workflow.
Expand Down Expand Up @@ -354,7 +363,8 @@ func ExecuteUntypedWorkflow[R any](
return nil, err
}
return workflowHandle[R]{
id: run.GetID(),
runID: run.GetRunID(),
namespace: nctx.Namespace,
id: run.GetID(),
runID: run.GetRunID(),
}, nil
}
Loading