From c25aa88cc8a3168b6687aac4f3355629d5ce531f Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 17 Jul 2023 16:05:32 -0700 Subject: [PATCH] Refactor to avoid raw access of baseWorker --- internal/client.go | 2 +- internal/internal_eager.go | 35 ++++++++++++++++++++ internal/internal_eager_activity.go | 42 +++++++----------------- internal/internal_eager_activity_test.go | 26 ++++++++++----- internal/internal_eager_workflow.go | 42 ++++++++++-------------- internal/internal_worker.go | 25 +++----------- internal/internal_worker_base.go | 33 +++++++++++++++++-- internal/internal_workflow_client.go | 22 +++++-------- 8 files changed, 126 insertions(+), 101 deletions(-) create mode 100644 internal/internal_eager.go diff --git a/internal/client.go b/internal/client.go index 9d616c41a..bb5f09b1a 100644 --- a/internal/client.go +++ b/internal/client.go @@ -824,7 +824,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien client.rootInterceptor = &workflowClientInterceptor{ client: client, eagerDispatcher: &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string][]*workflowWorker), + workersByTaskQueue: make(map[string][]eagerWorker), }, } client.interceptor = client.rootInterceptor diff --git a/internal/internal_eager.go b/internal/internal_eager.go new file mode 100644 index 000000000..eff55c72a --- /dev/null +++ b/internal/internal_eager.go @@ -0,0 +1,35 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +// eagerWorker is the minimal worker interface needed for eager activities and workflows +type eagerWorker interface { + // tryReserveSlot tries to reserver a task slot on the worker without blocking + // caller is expected to release the slot with releaseSlot + tryReserveSlot() bool + // releaseSlot release a task slot acquired by tryReserveSlot + releaseSlot() + // processTaskAsync process a new task on the worker asynchronously and + // call callback once complete + processTaskAsync(task interface{}, callback func()) +} diff --git a/internal/internal_eager_activity.go b/internal/internal_eager_activity.go index 79b0a5857..c0a7536f8 100644 --- a/internal/internal_eager_activity.go +++ b/internal/internal_eager_activity.go @@ -34,7 +34,7 @@ import ( type eagerActivityExecutor struct { eagerActivityExecutorOptions - activityWorker *activityWorker + activityWorker eagerWorker heldSlotCount int countLock sync.Mutex } @@ -97,11 +97,8 @@ func (e *eagerActivityExecutor) reserveOnePendingSlot() bool { // No more room return false } - // Reserve a spot for our request via a non-blocking attempt to take a poller - // request entry which essentially reserves a spot - select { - case <-e.activityWorker.worker.pollerRequestCh: - default: + // Reserve a spot for our request via a non-blocking attempt + if !e.activityWorker.tryReserveSlot() { return false } @@ -131,35 +128,20 @@ func (e *eagerActivityExecutor) handleResponse( // Put every unfulfilled slot back on the poller channel for i := 0; i < unfulfilledSlots; i++ { - // Like other parts that push onto this channel, we assume there is room - // because we took it, so we do a blocking send - e.activityWorker.worker.pollerRequestCh <- struct{}{} + e.activityWorker.releaseSlot() } // Start each activity asynchronously for _, activity := range resp.GetActivityTasks() { - // Before starting the goroutine we have to increase the wait group counter - // that the poller would have otherwise increased - e.activityWorker.worker.stopWG.Add(1) // Asynchronously execute task := &activityTask{activity} - go func() { - // Mark completed when complete - defer func() { - // Like other sends to this channel, we assume there is room because we - // reserved it, so we make a blocking send. The processTask does not do - // this itself because our task is *activityTask, not *polledTask. - e.activityWorker.worker.pollerRequestCh <- struct{}{} - // Decrement executing count - e.countLock.Lock() - e.heldSlotCount-- - e.countLock.Unlock() - }() - - // Process the task synchronously. We call the processor on the base - // worker instead of a higher level so we can get the benefits of metrics, - // stop wait group update, etc. - e.activityWorker.worker.processTask(task) - }() + e.activityWorker.processTaskAsync(task, func() { + // The processTaskAsync does not do this itself because our task is *activityTask, not *polledTask. + e.activityWorker.releaseSlot() + // Decrement executing count + e.countLock.Lock() + e.heldSlotCount-- + e.countLock.Unlock() + }) } } diff --git a/internal/internal_eager_activity_test.go b/internal/internal_eager_activity_test.go index aad9720c9..dd3ad1393 100644 --- a/internal/internal_eager_activity_test.go +++ b/internal/internal_eager_activity_test.go @@ -38,7 +38,7 @@ import ( func TestEagerActivityDisabled(t *testing.T) { exec := newEagerActivityExecutor(eagerActivityExecutorOptions{disabled: true, taskQueue: "task-queue1"}) exec.activityWorker = newActivityWorker(nil, - workerExecutionParameters{TaskQueue: "task-queue1"}, nil, newRegistry(), nil) + workerExecutionParameters{TaskQueue: "task-queue1"}, nil, newRegistry(), nil).worker // Turns requests to false when disabled var req workflowservice.RespondWorkflowTaskCompletedRequest @@ -59,11 +59,13 @@ func TestEagerActivityNoActivityWorker(t *testing.T) { func TestEagerActivityWrongTaskQueue(t *testing.T) { exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1"}) - exec.activityWorker = newActivityWorker(nil, - workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil) + activityWorker := newActivityWorker(nil, workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil) + activityWorker.worker.isWorkerStarted = true + + exec.activityWorker = activityWorker.worker // Fill up the poller request channel for i := 0; i < 10; i++ { - exec.activityWorker.worker.pollerRequestCh <- struct{}{} + activityWorker.worker.pollerRequestCh <- struct{}{} } // Turns requests to false when wrong task queue @@ -77,11 +79,14 @@ func TestEagerActivityWrongTaskQueue(t *testing.T) { func TestEagerActivityMaxPerTask(t *testing.T) { exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1"}) - exec.activityWorker = newActivityWorker(nil, + activityWorker := newActivityWorker(nil, workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil) + activityWorker.worker.isWorkerStarted = true + + exec.activityWorker = activityWorker.worker // Fill up the poller request channel for i := 0; i < 10; i++ { - exec.activityWorker.worker.pollerRequestCh <- struct{}{} + activityWorker.worker.pollerRequestCh <- struct{}{} } // Add 8, but it limits to only the first 3 @@ -99,16 +104,19 @@ func TestEagerActivityCounts(t *testing.T) { // We'll create an eager activity executor with 3 max eager concurrent and 5 // max concurrent exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1", maxConcurrent: 3}) - exec.activityWorker = newActivityWorker(nil, + activityWorker := newActivityWorker(nil, workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 5}, nil, newRegistry(), nil) + activityWorker.worker.isWorkerStarted = true + + exec.activityWorker = activityWorker.worker // Fill up the poller request channel - slotsCh := exec.activityWorker.worker.pollerRequestCh + slotsCh := activityWorker.worker.pollerRequestCh for i := 0; i < 5; i++ { slotsCh <- struct{}{} } // Replace task processor taskProcessor := newWaitingTaskProcessor() - exec.activityWorker.worker.options.taskWorker = taskProcessor + activityWorker.worker.options.taskWorker = taskProcessor // Request 2 commands on wrong task queue then 5 commands on proper task queue // but have 2nd request disabled diff --git a/internal/internal_eager_workflow.go b/internal/internal_eager_workflow.go index bb3c3487c..0725f657d 100644 --- a/internal/internal_eager_workflow.go +++ b/internal/internal_eager_workflow.go @@ -33,25 +33,29 @@ import ( // eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task. type eagerWorkflowDispatcher struct { lock sync.Mutex - workersByTaskQueue map[string][]*workflowWorker + workersByTaskQueue map[string][]eagerWorker } +// registerWorker registers a worker that can be used for eager workflow dispatch func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) { e.lock.Lock() defer e.lock.Unlock() - e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker) + e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker) } -func (e *eagerWorkflowDispatcher) tryGetEagerWorkflowExecutor(options *StartWorkflowOptions) *eagerWorkflowExecutor { +// applyToRequest updates request if eager workflow dispatch is possible and returns the eagerWorkflowExecutor to use +func (e *eagerWorkflowDispatcher) applyToRequest(request *workflowservice.StartWorkflowExecutionRequest) *eagerWorkflowExecutor { e.lock.Lock() defer e.lock.Unlock() // Try every worker that is assigned to the desired task queue. - workers := e.workersByTaskQueue[options.TaskQueue] + workers := e.workersByTaskQueue[request.GetTaskQueue().Name] rand.Shuffle(len(workers), func(i, j int) { workers[i], workers[j] = workers[j], workers[i] }) for _, worker := range workers { - executor := worker.reserveWorkflowExecutor() - if executor != nil { - return executor + if worker.tryReserveSlot() { + request.RequestEagerExecution = true + return &eagerWorkflowExecutor{ + worker: worker, + } } } return nil @@ -60,7 +64,7 @@ func (e *eagerWorkflowDispatcher) tryGetEagerWorkflowExecutor(options *StartWork // eagerWorkflowExecutor is a worker-scoped executor for an eager workflow task. type eagerWorkflowExecutor struct { handledResponse atomic.Bool - worker *workflowWorker + worker eagerWorker } // handleResponse of an eager workflow task from a StartWorkflowExecution request. @@ -68,24 +72,14 @@ func (e *eagerWorkflowExecutor) handleResponse(response *workflowservice.PollWor if !e.handledResponse.CompareAndSwap(false, true) { panic("eagerWorkflowExecutor trying to handle multiple responses") } - // Before starting the goroutine we have to increase the wait group counter - // that the poller would have otherwise increased - e.worker.worker.stopWG.Add(1) - // Asynchronously execute + // Asynchronously execute the task task := &eagerWorkflowTask{ task: response, } - go func() { - // Mark completed when complete - defer func() { - e.release() - }() - - // Process the task synchronously. We call the processor on the base - // worker instead of a higher level so we can get the benefits of metrics, - // stop wait group update, etc. - e.worker.worker.processTask(task) - }() + e.worker.processTaskAsync(task, func() { + // The processTaskAsync does not do this itself because our task is *eagerWorkflowTask, not *polledTask. + e.release() + }) } // release the executor task slot this eagerWorkflowExecutor was holding. @@ -95,6 +89,6 @@ func (e *eagerWorkflowExecutor) release() { if e.handledResponse.CompareAndSwap(false, true) { // Assume there is room because it is reserved on creation, so we make a blocking send. // The processTask does not do this itself because our task is not *polledTask. - e.worker.worker.pollerRequestCh <- struct{}{} + e.worker.releaseSlot() } } diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 0f496cb37..30e5ba0ba 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -376,25 +376,6 @@ func (ww *workflowWorker) Stop() { ww.worker.Stop() } -// reserveWorkflowExecutor -func (ww *workflowWorker) reserveWorkflowExecutor() *eagerWorkflowExecutor { - if !ww.worker.isWorkerStarted || ww.worker.isStop() { - return nil - } - // Reserve a spot for our request via a non-blocking attempt to take a poller - // request entry which essentially reserves a spot - select { - case <-ww.worker.pollerRequestCh: - default: - return nil - } - - // We can request so return the worker - return &eagerWorkflowExecutor{ - worker: ww, - } -} - func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, overrides *workerOverrides, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker { if params.Identity == "" { params.Identity = getWorkerIdentity(params.TaskQueue) @@ -1578,7 +1559,9 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke } else { workflowWorker = newWorkflowWorker(client.workflowService, workerParams, nil, registry) } - client.rootInterceptor.eagerDispatcher.registerWorker(workflowWorker) + if client.rootInterceptor != nil { + client.rootInterceptor.eagerDispatcher.registerWorker(workflowWorker) + } } // activity types. @@ -1586,7 +1569,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke if !options.LocalActivityWorkerOnly { activityWorker = newActivityWorker(client.workflowService, workerParams, nil, registry, nil) // Set the activity worker on the eager executor - workerParams.eagerActivityExecutor.activityWorker = activityWorker + workerParams.eagerActivityExecutor.activityWorker = activityWorker.worker } var sessionWorker *sessionWorker diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 00f6ec7f3..1ccf99cc0 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -310,6 +310,34 @@ func (bw *baseWorker) runPoller() { } } +func (bw *baseWorker) tryReserveSlot() bool { + if !bw.isWorkerStarted || bw.isStop() { + return false + } + // Reserve a executor slot via a non-blocking attempt to take a poller + // request entry which essentially reserves a slot + select { + case <-bw.pollerRequestCh: + default: + return false + } + return true +} + +func (bw *baseWorker) releaseSlot() { + // Like other sends to this channel, we assume there is room because we + // reserved it, so we make a blocking send. + bw.pollerRequestCh <- struct{}{} +} + +func (bw *baseWorker) processTaskAsync(task interface{}, callback func()) { + bw.stopWG.Add(1) + go func() { + defer callback() + bw.processTask(task) + }() +} + func (bw *baseWorker) runTaskDispatcher() { defer bw.stopWG.Done() @@ -323,15 +351,14 @@ func (bw *baseWorker) runTaskDispatcher() { case <-bw.stopCh: return case task := <-bw.taskQueueCh: - // for non-polled-task (local activity result as task), we don't need to rate limit + // for non-polled-task (local activity result as task or eager task), we don't need to rate limit _, isPolledTask := task.(*polledTask) if isPolledTask && bw.taskLimiter.Wait(bw.limiterContext) != nil { if bw.isStop() { return } } - bw.stopWG.Add(1) - go bw.processTask(task) + bw.processTaskAsync(task, func() {}) } } } diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 305e2473d..ef1310a62 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1459,13 +1459,6 @@ type workflowClientInterceptor struct { eagerDispatcher *eagerWorkflowDispatcher } -func (w *workflowClientInterceptor) tryGetEagerWorkflowExecutor(options *StartWorkflowOptions) *eagerWorkflowExecutor { - if !options.EnableEagerStart || !w.client.capabilities.GetEagerWorkflowStart() { - return nil - } - return w.eagerDispatcher.tryGetEagerWorkflowExecutor(options) -} - func (w *workflowClientInterceptor) ExecuteWorkflow( ctx context.Context, in *ClientExecuteWorkflowInput, @@ -1507,10 +1500,6 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( return nil, err } - eagerExecutor := w.tryGetEagerWorkflowExecutor(in.Options) - if eagerExecutor != nil { - defer eagerExecutor.release() - } // run propagators to extract information about tracing and other stuff, store in headers field startRequest := &workflowservice.StartWorkflowExecutionRequest{ Namespace: w.client.namespace, @@ -1528,10 +1517,17 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( CronSchedule: in.Options.CronSchedule, Memo: memo, SearchAttributes: searchAttr, - RequestEagerExecution: eagerExecutor != nil, Header: header, } + var eagerExecutor *eagerWorkflowExecutor + if in.Options.EnableEagerStart && w.client.capabilities.GetEagerWorkflowStart() && w.eagerDispatcher != nil { + eagerExecutor = w.eagerDispatcher.applyToRequest(startRequest) + if eagerExecutor != nil { + defer eagerExecutor.release() + } + } + var response *workflowservice.StartWorkflowExecutionResponse grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler( @@ -1541,7 +1537,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( response, err = w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest) eagerWorkflowTask := response.GetEagerWorkflowTask() - if eagerWorkflowTask != nil { + if eagerWorkflowTask != nil && eagerExecutor != nil { eagerExecutor.handleResponse(eagerWorkflowTask) } // Allow already-started error