From 88e19cd45f36e41df270c4a6ab8e0de484328763 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Wed, 2 Oct 2024 15:04:58 -0700 Subject: [PATCH 1/2] Error for unused operation --- internal/client.go | 5 +++- internal/internal_workflow_client_test.go | 29 +++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/internal/client.go b/internal/client.go index 2e6745cae..454e5b641 100644 --- a/internal/client.go +++ b/internal/client.go @@ -746,7 +746,7 @@ type ( // workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. callbacks []*commonpb.Callback // links. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. - links []*commonpb.Link + links []*commonpb.Link } // WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start. @@ -1082,6 +1082,9 @@ func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUp case <-op.doneCh: return op.handle, op.err case <-ctx.Done(): + if !op.executed.Load() { + return nil, fmt.Errorf("%v: %v", ctx.Err(), fmt.Errorf("operation was not executed")) + } return nil, ctx.Err() } } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index df3b644fc..270b27203 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1027,6 +1027,35 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Retry() { s.NoError(err) } +func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_OperationNotExecuted() { + s.workflowServiceClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.StartWorkflowExecutionResponse{ + RunId: runID, + }, nil) + + updOp := NewUpdateWithStartWorkflowOperation( + UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }) + + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + _, err := s.workflowClient.ExecuteWorkflow( + ctxWithTimeout, + StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskqueue, + // WithStartOperation is not specified! + }, workflowType, + ) + require.NoError(s.T(), err) + + _, err = updOp.Get(ctxWithTimeout) + require.EqualError(s.T(), err, "context deadline exceeded: operation was not executed") +} + func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Abort() { tests := []struct { name string From 7e831a976271d0255b7c7a1133043ca5d1eef192 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Wed, 2 Oct 2024 17:16:41 -0700 Subject: [PATCH 2/2] %w --- internal/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/client.go b/internal/client.go index 454e5b641..98fc53e2a 100644 --- a/internal/client.go +++ b/internal/client.go @@ -1083,7 +1083,7 @@ func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUp return op.handle, op.err case <-ctx.Done(): if !op.executed.Load() { - return nil, fmt.Errorf("%v: %v", ctx.Err(), fmt.Errorf("operation was not executed")) + return nil, fmt.Errorf("%w: %w", ctx.Err(), fmt.Errorf("operation was not executed")) } return nil, ctx.Err() }