From 12943f2e31672ebd93d586c029693413e1af3e87 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sun, 16 Jul 2023 09:34:26 -0700 Subject: [PATCH] Add eager workflow start --- .../docker/dynamic-config-custom.yaml | 2 + internal/client.go | 13 ++- internal/internal_eager_workflow.go | 100 ++++++++++++++++++ internal/internal_task_handlers.go | 5 + internal/internal_task_pollers.go | 2 + internal/internal_worker.go | 20 ++++ internal/internal_workflow_client.go | 23 +++- test/integration_test.go | 1 + 8 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 internal/internal_eager_workflow.go diff --git a/.github/workflows/docker/dynamic-config-custom.yaml b/.github/workflows/docker/dynamic-config-custom.yaml index 227934f50..b70052061 100644 --- a/.github/workflows/docker/dynamic-config-custom.yaml +++ b/.github/workflows/docker/dynamic-config-custom.yaml @@ -2,4 +2,6 @@ system.forceSearchAttributesCacheRefreshOnRead: - value: true # Dev setup only. Please don't turn this on in production. constraints: {} system.enableActivityEagerExecution: + - value: true +system.enableEagerWorkflowStart: - value: true \ No newline at end of file diff --git a/internal/client.go b/internal/client.go index 9092e48f4..9d616c41a 100644 --- a/internal/client.go +++ b/internal/client.go @@ -596,6 +596,11 @@ type ( // supported when Temporal server is using ElasticSearch). The key and value type must be registered on Temporal server side. // Use GetSearchAttributes API to get valid key and corresponding value type. SearchAttributes map[string]interface{} + + // EnableEagerStart - request eager execution for this workflow, if a local worker is available. + // + // NOTE: Experimental + EnableEagerStart bool } // RetryPolicy defines the retry policy. @@ -816,7 +821,13 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien } // Create outbound interceptor by wrapping backwards through chain - client.interceptor = &workflowClientInterceptor{client: client} + client.rootInterceptor = &workflowClientInterceptor{ + client: client, + eagerDispatcher: &eagerWorkflowDispatcher{ + workersByTaskQueue: make(map[string][]*workflowWorker), + }, + } + client.interceptor = client.rootInterceptor for i := len(options.Interceptors) - 1; i >= 0; i-- { client.interceptor = options.Interceptors[i].InterceptClient(client.interceptor) } diff --git a/internal/internal_eager_workflow.go b/internal/internal_eager_workflow.go new file mode 100644 index 000000000..bb3c3487c --- /dev/null +++ b/internal/internal_eager_workflow.go @@ -0,0 +1,100 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "math/rand" + "sync" + "sync/atomic" + + "go.temporal.io/api/workflowservice/v1" +) + +// eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task. +type eagerWorkflowDispatcher struct { + lock sync.Mutex + workersByTaskQueue map[string][]*workflowWorker +} + +func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) { + e.lock.Lock() + defer e.lock.Unlock() + e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker) +} + +func (e *eagerWorkflowDispatcher) tryGetEagerWorkflowExecutor(options *StartWorkflowOptions) *eagerWorkflowExecutor { + e.lock.Lock() + defer e.lock.Unlock() + // Try every worker that is assigned to the desired task queue. + workers := e.workersByTaskQueue[options.TaskQueue] + rand.Shuffle(len(workers), func(i, j int) { workers[i], workers[j] = workers[j], workers[i] }) + for _, worker := range workers { + executor := worker.reserveWorkflowExecutor() + if executor != nil { + return executor + } + } + return nil +} + +// eagerWorkflowExecutor is a worker-scoped executor for an eager workflow task. +type eagerWorkflowExecutor struct { + handledResponse atomic.Bool + worker *workflowWorker +} + +// handleResponse of an eager workflow task from a StartWorkflowExecution request. +func (e *eagerWorkflowExecutor) handleResponse(response *workflowservice.PollWorkflowTaskQueueResponse) { + if !e.handledResponse.CompareAndSwap(false, true) { + panic("eagerWorkflowExecutor trying to handle multiple responses") + } + // Before starting the goroutine we have to increase the wait group counter + // that the poller would have otherwise increased + e.worker.worker.stopWG.Add(1) + // Asynchronously execute + task := &eagerWorkflowTask{ + task: response, + } + go func() { + // Mark completed when complete + defer func() { + e.release() + }() + + // Process the task synchronously. We call the processor on the base + // worker instead of a higher level so we can get the benefits of metrics, + // stop wait group update, etc. + e.worker.worker.processTask(task) + }() +} + +// release the executor task slot this eagerWorkflowExecutor was holding. +// If it is currently handling a responses or has already released the task slot +// then do nothing. +func (e *eagerWorkflowExecutor) release() { + if e.handledResponse.CompareAndSwap(false, true) { + // Assume there is room because it is reserved on creation, so we make a blocking send. + // The processTask does not do this itself because our task is not *polledTask. + e.worker.worker.pollerRequestCh <- struct{}{} + } +} diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 7b42ef22b..6c9be3b63 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -94,6 +94,11 @@ type ( laRetryCh chan *localActivityTask } + // eagerWorkflowTask represents a workflow task sent from an eager workflow executor + eagerWorkflowTask struct { + task *workflowservice.PollWorkflowTaskQueueResponse + } + // activityTask wraps a activity task. activityTask struct { task *workflowservice.PollActivityTaskQueueResponse diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 0f67eae84..fb77ab1a3 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -313,6 +313,8 @@ func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error { switch task := task.(type) { case *workflowTask: return wtp.processWorkflowTask(task) + case *eagerWorkflowTask: + return wtp.processWorkflowTask(wtp.toWorkflowTask(task.task)) default: panic("unknown task type.") } diff --git a/internal/internal_worker.go b/internal/internal_worker.go index e785d1ece..0f496cb37 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -376,6 +376,25 @@ func (ww *workflowWorker) Stop() { ww.worker.Stop() } +// reserveWorkflowExecutor +func (ww *workflowWorker) reserveWorkflowExecutor() *eagerWorkflowExecutor { + if !ww.worker.isWorkerStarted || ww.worker.isStop() { + return nil + } + // Reserve a spot for our request via a non-blocking attempt to take a poller + // request entry which essentially reserves a spot + select { + case <-ww.worker.pollerRequestCh: + default: + return nil + } + + // We can request so return the worker + return &eagerWorkflowExecutor{ + worker: ww, + } +} + func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, overrides *workerOverrides, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker { if params.Identity == "" { params.Identity = getWorkerIdentity(params.TaskQueue) @@ -1559,6 +1578,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke } else { workflowWorker = newWorkflowWorker(client.workflowService, workerParams, nil, registry) } + client.rootInterceptor.eagerDispatcher.registerWorker(workflowWorker) } // activity types. diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 8affbbf23..305e2473d 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -88,6 +88,7 @@ type ( failureConverter converter.FailureConverter contextPropagators []ContextPropagator workerInterceptors []WorkerInterceptor + rootInterceptor *workflowClientInterceptor interceptor ClientOutboundInterceptor excludeInternalFromRetry *uberatomic.Bool capabilities *workflowservice.GetSystemInfoResponse_Capabilities @@ -1453,7 +1454,17 @@ func serializeSearchAttributes(input map[string]interface{}) (*commonpb.SearchAt return &commonpb.SearchAttributes{IndexedFields: attr}, nil } -type workflowClientInterceptor struct{ client *WorkflowClient } +type workflowClientInterceptor struct { + client *WorkflowClient + eagerDispatcher *eagerWorkflowDispatcher +} + +func (w *workflowClientInterceptor) tryGetEagerWorkflowExecutor(options *StartWorkflowOptions) *eagerWorkflowExecutor { + if !options.EnableEagerStart || !w.client.capabilities.GetEagerWorkflowStart() { + return nil + } + return w.eagerDispatcher.tryGetEagerWorkflowExecutor(options) +} func (w *workflowClientInterceptor) ExecuteWorkflow( ctx context.Context, @@ -1496,6 +1507,10 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( return nil, err } + eagerExecutor := w.tryGetEagerWorkflowExecutor(in.Options) + if eagerExecutor != nil { + defer eagerExecutor.release() + } // run propagators to extract information about tracing and other stuff, store in headers field startRequest := &workflowservice.StartWorkflowExecutionRequest{ Namespace: w.client.namespace, @@ -1513,6 +1528,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( CronSchedule: in.Options.CronSchedule, Memo: memo, SearchAttributes: searchAttr, + RequestEagerExecution: eagerExecutor != nil, Header: header, } @@ -1524,7 +1540,10 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( defer cancel() response, err = w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest) - + eagerWorkflowTask := response.GetEagerWorkflowTask() + if eagerWorkflowTask != nil { + eagerExecutor.handleResponse(eagerWorkflowTask) + } // Allow already-started error var runID string if e, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok && !in.Options.WorkflowExecutionErrorWhenAlreadyStarted { diff --git a/test/integration_test.go b/test/integration_test.go index 4741d5cce..eea6af770 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -3912,6 +3912,7 @@ func (ts *IntegrationTestSuite) startWorkflowOptions(wfID string) client.StartWo WorkflowExecutionTimeout: 15 * time.Second, WorkflowTaskTimeout: time.Second, WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + EnableEagerStart: true, } if wfID == CronWorkflowID { wfOptions.CronSchedule = "@every 1s"