Skip to content

Commit

Permalink
Add eager workflow start
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Jul 16, 2023
1 parent e1d76b7 commit 12943f2
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ system.forceSearchAttributesCacheRefreshOnRead:
- value: true # Dev setup only. Please don't turn this on in production.
constraints: {}
system.enableActivityEagerExecution:
- value: true
system.enableEagerWorkflowStart:
- value: true
13 changes: 12 additions & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ type (
// supported when Temporal server is using ElasticSearch). The key and value type must be registered on Temporal server side.
// Use GetSearchAttributes API to get valid key and corresponding value type.
SearchAttributes map[string]interface{}

// EnableEagerStart - request eager execution for this workflow, if a local worker is available.
//
// NOTE: Experimental
EnableEagerStart bool
}

// RetryPolicy defines the retry policy.
Expand Down Expand Up @@ -816,7 +821,13 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
}

// Create outbound interceptor by wrapping backwards through chain
client.interceptor = &workflowClientInterceptor{client: client}
client.rootInterceptor = &workflowClientInterceptor{
client: client,
eagerDispatcher: &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]*workflowWorker),
},
}
client.interceptor = client.rootInterceptor
for i := len(options.Interceptors) - 1; i >= 0; i-- {
client.interceptor = options.Interceptors[i].InterceptClient(client.interceptor)
}
Expand Down
100 changes: 100 additions & 0 deletions internal/internal_eager_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"math/rand"
"sync"
"sync/atomic"

"go.temporal.io/api/workflowservice/v1"
)

// eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task.
type eagerWorkflowDispatcher struct {
lock sync.Mutex
workersByTaskQueue map[string][]*workflowWorker
}

func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) {
e.lock.Lock()
defer e.lock.Unlock()
e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker)
}

func (e *eagerWorkflowDispatcher) tryGetEagerWorkflowExecutor(options *StartWorkflowOptions) *eagerWorkflowExecutor {
e.lock.Lock()
defer e.lock.Unlock()
// Try every worker that is assigned to the desired task queue.
workers := e.workersByTaskQueue[options.TaskQueue]
rand.Shuffle(len(workers), func(i, j int) { workers[i], workers[j] = workers[j], workers[i] })
for _, worker := range workers {
executor := worker.reserveWorkflowExecutor()
if executor != nil {
return executor
}
}
return nil
}

// eagerWorkflowExecutor is a worker-scoped executor for an eager workflow task.
type eagerWorkflowExecutor struct {
handledResponse atomic.Bool
worker *workflowWorker
}

// handleResponse of an eager workflow task from a StartWorkflowExecution request.
func (e *eagerWorkflowExecutor) handleResponse(response *workflowservice.PollWorkflowTaskQueueResponse) {
if !e.handledResponse.CompareAndSwap(false, true) {
panic("eagerWorkflowExecutor trying to handle multiple responses")
}
// Before starting the goroutine we have to increase the wait group counter
// that the poller would have otherwise increased
e.worker.worker.stopWG.Add(1)
// Asynchronously execute
task := &eagerWorkflowTask{
task: response,
}
go func() {
// Mark completed when complete
defer func() {
e.release()
}()

// Process the task synchronously. We call the processor on the base
// worker instead of a higher level so we can get the benefits of metrics,
// stop wait group update, etc.
e.worker.worker.processTask(task)
}()
}

// release the executor task slot this eagerWorkflowExecutor was holding.
// If it is currently handling a responses or has already released the task slot
// then do nothing.
func (e *eagerWorkflowExecutor) release() {
if e.handledResponse.CompareAndSwap(false, true) {
// Assume there is room because it is reserved on creation, so we make a blocking send.
// The processTask does not do this itself because our task is not *polledTask.
e.worker.worker.pollerRequestCh <- struct{}{}
}
}
5 changes: 5 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ type (
laRetryCh chan *localActivityTask
}

// eagerWorkflowTask represents a workflow task sent from an eager workflow executor
eagerWorkflowTask struct {
task *workflowservice.PollWorkflowTaskQueueResponse
}

// activityTask wraps a activity task.
activityTask struct {
task *workflowservice.PollActivityTaskQueueResponse
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error {
switch task := task.(type) {
case *workflowTask:
return wtp.processWorkflowTask(task)
case *eagerWorkflowTask:
return wtp.processWorkflowTask(wtp.toWorkflowTask(task.task))
default:
panic("unknown task type.")
}
Expand Down
20 changes: 20 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,25 @@ func (ww *workflowWorker) Stop() {
ww.worker.Stop()
}

// reserveWorkflowExecutor
func (ww *workflowWorker) reserveWorkflowExecutor() *eagerWorkflowExecutor {
if !ww.worker.isWorkerStarted || ww.worker.isStop() {
return nil
}
// Reserve a spot for our request via a non-blocking attempt to take a poller
// request entry which essentially reserves a spot
select {
case <-ww.worker.pollerRequestCh:
default:
return nil
}

// We can request so return the worker
return &eagerWorkflowExecutor{
worker: ww,
}
}

func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, overrides *workerOverrides, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker {
if params.Identity == "" {
params.Identity = getWorkerIdentity(params.TaskQueue)
Expand Down Expand Up @@ -1559,6 +1578,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
} else {
workflowWorker = newWorkflowWorker(client.workflowService, workerParams, nil, registry)
}
client.rootInterceptor.eagerDispatcher.registerWorker(workflowWorker)
}

// activity types.
Expand Down
23 changes: 21 additions & 2 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type (
failureConverter converter.FailureConverter
contextPropagators []ContextPropagator
workerInterceptors []WorkerInterceptor
rootInterceptor *workflowClientInterceptor
interceptor ClientOutboundInterceptor
excludeInternalFromRetry *uberatomic.Bool
capabilities *workflowservice.GetSystemInfoResponse_Capabilities
Expand Down Expand Up @@ -1453,7 +1454,17 @@ func serializeSearchAttributes(input map[string]interface{}) (*commonpb.SearchAt
return &commonpb.SearchAttributes{IndexedFields: attr}, nil
}

type workflowClientInterceptor struct{ client *WorkflowClient }
type workflowClientInterceptor struct {
client *WorkflowClient
eagerDispatcher *eagerWorkflowDispatcher
}

func (w *workflowClientInterceptor) tryGetEagerWorkflowExecutor(options *StartWorkflowOptions) *eagerWorkflowExecutor {
if !options.EnableEagerStart || !w.client.capabilities.GetEagerWorkflowStart() {
return nil
}
return w.eagerDispatcher.tryGetEagerWorkflowExecutor(options)
}

func (w *workflowClientInterceptor) ExecuteWorkflow(
ctx context.Context,
Expand Down Expand Up @@ -1496,6 +1507,10 @@ func (w *workflowClientInterceptor) ExecuteWorkflow(
return nil, err
}

eagerExecutor := w.tryGetEagerWorkflowExecutor(in.Options)
if eagerExecutor != nil {
defer eagerExecutor.release()
}
// run propagators to extract information about tracing and other stuff, store in headers field
startRequest := &workflowservice.StartWorkflowExecutionRequest{
Namespace: w.client.namespace,
Expand All @@ -1513,6 +1528,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow(
CronSchedule: in.Options.CronSchedule,
Memo: memo,
SearchAttributes: searchAttr,
RequestEagerExecution: eagerExecutor != nil,
Header: header,
}

Expand All @@ -1524,7 +1540,10 @@ func (w *workflowClientInterceptor) ExecuteWorkflow(
defer cancel()

response, err = w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest)

eagerWorkflowTask := response.GetEagerWorkflowTask()
if eagerWorkflowTask != nil {
eagerExecutor.handleResponse(eagerWorkflowTask)
}
// Allow already-started error
var runID string
if e, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok && !in.Options.WorkflowExecutionErrorWhenAlreadyStarted {
Expand Down
1 change: 1 addition & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3912,6 +3912,7 @@ func (ts *IntegrationTestSuite) startWorkflowOptions(wfID string) client.StartWo
WorkflowExecutionTimeout: 15 * time.Second,
WorkflowTaskTimeout: time.Second,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
EnableEagerStart: true,
}
if wfID == CronWorkflowID {
wfOptions.CronSchedule = "@every 1s"
Expand Down

0 comments on commit 12943f2

Please sign in to comment.