Skip to content

Commit

Permalink
Add tests for eager workflow dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Jul 18, 2023
1 parent c226220 commit 8d20c42
Show file tree
Hide file tree
Showing 3 changed files with 316 additions and 3 deletions.
116 changes: 116 additions & 0 deletions internal/internal_eager_workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"testing"

"github.com/stretchr/testify/require"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
)

type eagerWorkerMock struct {
tryReserveSlotCallback func() bool
releaseSlotCallback func()
processTaskAsyncCallback func(interface{}, func())
}

func (e *eagerWorkerMock) tryReserveSlot() bool {
return e.tryReserveSlotCallback()
}

func (e *eagerWorkerMock) releaseSlot() {
e.releaseSlotCallback()
}

func (e *eagerWorkerMock) processTaskAsync(task interface{}, callback func()) {
e.processTaskAsyncCallback(task, callback)
}

func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) {
dispatcher := &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
}
dispatcher.registerWorker(&workflowWorker{
executionParameters: workerExecutionParameters{TaskQueue: "bad-task-queue"},
})

request := &workflowservice.StartWorkflowExecutionRequest{
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
}
exec := dispatcher.applyToRequest(request)
require.Nil(t, exec)
require.False(t, request.GetRequestEagerExecution())
}

func TestEagerWorkflowDispatchAvailableWorker(t *testing.T) {
dispatcher := &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
}

availableWorker := &eagerWorkerMock{
tryReserveSlotCallback: func() bool { return true },
}
dispatcher.workersByTaskQueue["task-queue"] = []eagerWorker{
&eagerWorkerMock{
tryReserveSlotCallback: func() bool { return false },
},
&eagerWorkerMock{
tryReserveSlotCallback: func() bool { return false },
},
availableWorker,
}

request := &workflowservice.StartWorkflowExecutionRequest{
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
}
exec := dispatcher.applyToRequest(request)
require.Equal(t, exec.worker, availableWorker)
require.True(t, request.GetRequestEagerExecution())
}

func TestEagerWorkflowExecutor(t *testing.T) {
slotReleased := false
worker := &eagerWorkerMock{
tryReserveSlotCallback: func() bool { return true },
releaseSlotCallback: func() {
slotReleased = true
},
processTaskAsyncCallback: func(task interface{}, callback func()) {
callback()
},
}

exec := &eagerWorkflowExecutor{
worker: worker,
}
exec.handleResponse(&workflowservice.PollWorkflowTaskQueueResponse{})
require.True(t, slotReleased)
require.Panics(t, func() {
exec.release()
})
require.Panics(t, func() {
exec.handleResponse(&workflowservice.PollWorkflowTaskQueueResponse{})
})
}
8 changes: 5 additions & 3 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type (
}
)

// SetRetryLongPollGracePeriod sets the amount of time a long poller retrys on
// SetRetryLongPollGracePeriod sets the amount of time a long poller retries on
// fatal errors before it actually fails. For test use only,
// not safe to call with a running worker.
func SetRetryLongPollGracePeriod(period time.Duration) {
Expand Down Expand Up @@ -333,7 +333,9 @@ func (bw *baseWorker) releaseSlot() {
func (bw *baseWorker) processTaskAsync(task interface{}, callback func()) {
bw.stopWG.Add(1)
go func() {
defer callback()
if callback != nil {
defer callback()
}
bw.processTask(task)
}()
}
Expand All @@ -358,7 +360,7 @@ func (bw *baseWorker) runTaskDispatcher() {
return
}
}
bw.processTaskAsync(task, func() {})
bw.processTaskAsync(task, nil)
}
}
}
Expand Down
195 changes: 195 additions & 0 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,201 @@ func (s *workflowClientTestSuite) TestStartWorkflow() {
s.Equal(createResponse.GetRunId(), resp.GetRunID())
}

func (s *workflowClientTestSuite) TestEagerStartWorkflowNotSupported() {
client, ok := s.client.(*WorkflowClient)
client.capabilities = &workflowservice.GetSystemInfoResponse_Capabilities{
EagerWorkflowStart: false,
}

var processTask bool
var releaseSlot bool
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{
taskqueue: {
&eagerWorkerMock{
tryReserveSlotCallback: func() bool { return true },
releaseSlotCallback: func() {
releaseSlot = true
},
processTaskAsyncCallback: func(task interface{}, callback func()) {
processTask = true
callback()
},
},
},
},
}
s.True(ok)
options := StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
EnableEagerStart: true,
}
f1 := func(ctx Context, r []byte) string {
panic("this is just a stub")
}

createResponse := &workflowservice.StartWorkflowExecutionResponse{
RunId: runID,
EagerWorkflowTask: &workflowservice.PollWorkflowTaskQueueResponse{},
}
s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil)

resp, err := client.ExecuteWorkflow(context.Background(), options, f1, []byte("test"))
s.Equal(converter.GetDefaultDataConverter(), client.dataConverter)
s.Nil(err)
s.Equal(createResponse.GetRunId(), resp.GetRunID())
s.False(processTask)
s.False(releaseSlot)
}

func (s *workflowClientTestSuite) TestEagerStartWorkflowNoWorker() {
client, ok := s.client.(*WorkflowClient)
client.capabilities = &workflowservice.GetSystemInfoResponse_Capabilities{
EagerWorkflowStart: false,
}

var processTask bool
var releaseSlot bool
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{
taskqueue: {
&eagerWorkerMock{
tryReserveSlotCallback: func() bool { return false },
releaseSlotCallback: func() {
releaseSlot = true
},
processTaskAsyncCallback: func(task interface{}, callback func()) {
processTask = true
callback()
},
},
},
},
}
s.True(ok)
options := StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
EnableEagerStart: true,
}
f1 := func(ctx Context, r []byte) string {
panic("this is just a stub")
}

createResponse := &workflowservice.StartWorkflowExecutionResponse{
RunId: runID,
EagerWorkflowTask: &workflowservice.PollWorkflowTaskQueueResponse{},
}
s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil)

resp, err := client.ExecuteWorkflow(context.Background(), options, f1, []byte("test"))
s.Equal(converter.GetDefaultDataConverter(), client.dataConverter)
s.Nil(err)
s.Equal(createResponse.GetRunId(), resp.GetRunID())
s.False(processTask)
s.False(releaseSlot)
}

func (s *workflowClientTestSuite) TestEagerStartWorkflow() {
client, ok := s.client.(*WorkflowClient)
client.capabilities = &workflowservice.GetSystemInfoResponse_Capabilities{
EagerWorkflowStart: true,
}

var processTask bool
var releaseSlot bool
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{
taskqueue: {
&eagerWorkerMock{
tryReserveSlotCallback: func() bool { return true },
releaseSlotCallback: func() {
releaseSlot = true
},
processTaskAsyncCallback: func(task interface{}, callback func()) {
processTask = true
callback()
},
},
},
},
}
s.True(ok)
options := StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
EnableEagerStart: true,
}
f1 := func(ctx Context, r []byte) string {
panic("this is just a stub")
}

createResponse := &workflowservice.StartWorkflowExecutionResponse{
RunId: runID,
EagerWorkflowTask: &workflowservice.PollWorkflowTaskQueueResponse{},
}
s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil)

resp, err := client.ExecuteWorkflow(context.Background(), options, f1, []byte("test"))
s.Equal(converter.GetDefaultDataConverter(), client.dataConverter)
s.Nil(err)
s.Equal(createResponse.GetRunId(), resp.GetRunID())
s.True(processTask)
s.True(releaseSlot)
}

func (s *workflowClientTestSuite) TestEagerStartWorkflowStartRequestFail() {
client, ok := s.client.(*WorkflowClient)
client.capabilities = &workflowservice.GetSystemInfoResponse_Capabilities{
EagerWorkflowStart: true,
}

var processTask bool
var releaseSlot bool
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{
taskqueue: {
&eagerWorkerMock{
tryReserveSlotCallback: func() bool { return true },
releaseSlotCallback: func() {
releaseSlot = true
},
processTaskAsyncCallback: func(task interface{}, callback func()) {
processTask = true
callback()
},
},
},
},
}
s.True(ok)
options := StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
EnableEagerStart: true,
}
f1 := func(ctx Context, r []byte) string {
panic("this is just a stub")
}

s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed request"))

resp, err := client.ExecuteWorkflow(context.Background(), options, f1, []byte("test"))
s.Nil(resp)
s.Error(err)
s.False(processTask)
s.True(releaseSlot)
}

func (s *workflowClientTestSuite) TestExecuteWorkflowWithDataConverter() {
dc := iconverter.NewTestDataConverter()
s.client = NewServiceClient(s.service, nil, ClientOptions{DataConverter: dc})
Expand Down

0 comments on commit 8d20c42

Please sign in to comment.