Skip to content

Commit

Permalink
Refactor to avoid raw access of baseWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Jul 17, 2023
1 parent 12943f2 commit c25aa88
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 101 deletions.
2 changes: 1 addition & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
client.rootInterceptor = &workflowClientInterceptor{
client: client,
eagerDispatcher: &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]*workflowWorker),
workersByTaskQueue: make(map[string][]eagerWorker),
},
}
client.interceptor = client.rootInterceptor
Expand Down
35 changes: 35 additions & 0 deletions internal/internal_eager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

// eagerWorker is the minimal worker interface needed for eager activities and workflows
type eagerWorker interface {
// tryReserveSlot tries to reserver a task slot on the worker without blocking
// caller is expected to release the slot with releaseSlot
tryReserveSlot() bool
// releaseSlot release a task slot acquired by tryReserveSlot
releaseSlot()
// processTaskAsync process a new task on the worker asynchronously and
// call callback once complete
processTaskAsync(task interface{}, callback func())
}
42 changes: 12 additions & 30 deletions internal/internal_eager_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
type eagerActivityExecutor struct {
eagerActivityExecutorOptions

activityWorker *activityWorker
activityWorker eagerWorker
heldSlotCount int
countLock sync.Mutex
}
Expand Down Expand Up @@ -97,11 +97,8 @@ func (e *eagerActivityExecutor) reserveOnePendingSlot() bool {
// No more room
return false
}
// Reserve a spot for our request via a non-blocking attempt to take a poller
// request entry which essentially reserves a spot
select {
case <-e.activityWorker.worker.pollerRequestCh:
default:
// Reserve a spot for our request via a non-blocking attempt
if !e.activityWorker.tryReserveSlot() {
return false
}

Expand Down Expand Up @@ -131,35 +128,20 @@ func (e *eagerActivityExecutor) handleResponse(

// Put every unfulfilled slot back on the poller channel
for i := 0; i < unfulfilledSlots; i++ {
// Like other parts that push onto this channel, we assume there is room
// because we took it, so we do a blocking send
e.activityWorker.worker.pollerRequestCh <- struct{}{}
e.activityWorker.releaseSlot()
}

// Start each activity asynchronously
for _, activity := range resp.GetActivityTasks() {
// Before starting the goroutine we have to increase the wait group counter
// that the poller would have otherwise increased
e.activityWorker.worker.stopWG.Add(1)
// Asynchronously execute
task := &activityTask{activity}
go func() {
// Mark completed when complete
defer func() {
// Like other sends to this channel, we assume there is room because we
// reserved it, so we make a blocking send. The processTask does not do
// this itself because our task is *activityTask, not *polledTask.
e.activityWorker.worker.pollerRequestCh <- struct{}{}
// Decrement executing count
e.countLock.Lock()
e.heldSlotCount--
e.countLock.Unlock()
}()

// Process the task synchronously. We call the processor on the base
// worker instead of a higher level so we can get the benefits of metrics,
// stop wait group update, etc.
e.activityWorker.worker.processTask(task)
}()
e.activityWorker.processTaskAsync(task, func() {
// The processTaskAsync does not do this itself because our task is *activityTask, not *polledTask.
e.activityWorker.releaseSlot()
// Decrement executing count
e.countLock.Lock()
e.heldSlotCount--
e.countLock.Unlock()
})
}
}
26 changes: 17 additions & 9 deletions internal/internal_eager_activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
func TestEagerActivityDisabled(t *testing.T) {
exec := newEagerActivityExecutor(eagerActivityExecutorOptions{disabled: true, taskQueue: "task-queue1"})
exec.activityWorker = newActivityWorker(nil,
workerExecutionParameters{TaskQueue: "task-queue1"}, nil, newRegistry(), nil)
workerExecutionParameters{TaskQueue: "task-queue1"}, nil, newRegistry(), nil).worker

// Turns requests to false when disabled
var req workflowservice.RespondWorkflowTaskCompletedRequest
Expand All @@ -59,11 +59,13 @@ func TestEagerActivityNoActivityWorker(t *testing.T) {

func TestEagerActivityWrongTaskQueue(t *testing.T) {
exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1"})
exec.activityWorker = newActivityWorker(nil,
workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil)
activityWorker := newActivityWorker(nil, workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil)
activityWorker.worker.isWorkerStarted = true

exec.activityWorker = activityWorker.worker
// Fill up the poller request channel
for i := 0; i < 10; i++ {
exec.activityWorker.worker.pollerRequestCh <- struct{}{}
activityWorker.worker.pollerRequestCh <- struct{}{}
}

// Turns requests to false when wrong task queue
Expand All @@ -77,11 +79,14 @@ func TestEagerActivityWrongTaskQueue(t *testing.T) {

func TestEagerActivityMaxPerTask(t *testing.T) {
exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1"})
exec.activityWorker = newActivityWorker(nil,
activityWorker := newActivityWorker(nil,
workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil)
activityWorker.worker.isWorkerStarted = true

exec.activityWorker = activityWorker.worker
// Fill up the poller request channel
for i := 0; i < 10; i++ {
exec.activityWorker.worker.pollerRequestCh <- struct{}{}
activityWorker.worker.pollerRequestCh <- struct{}{}
}

// Add 8, but it limits to only the first 3
Expand All @@ -99,16 +104,19 @@ func TestEagerActivityCounts(t *testing.T) {
// We'll create an eager activity executor with 3 max eager concurrent and 5
// max concurrent
exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1", maxConcurrent: 3})
exec.activityWorker = newActivityWorker(nil,
activityWorker := newActivityWorker(nil,
workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 5}, nil, newRegistry(), nil)
activityWorker.worker.isWorkerStarted = true

exec.activityWorker = activityWorker.worker
// Fill up the poller request channel
slotsCh := exec.activityWorker.worker.pollerRequestCh
slotsCh := activityWorker.worker.pollerRequestCh
for i := 0; i < 5; i++ {
slotsCh <- struct{}{}
}
// Replace task processor
taskProcessor := newWaitingTaskProcessor()
exec.activityWorker.worker.options.taskWorker = taskProcessor
activityWorker.worker.options.taskWorker = taskProcessor

// Request 2 commands on wrong task queue then 5 commands on proper task queue
// but have 2nd request disabled
Expand Down
42 changes: 18 additions & 24 deletions internal/internal_eager_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,29 @@ import (
// eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task.
type eagerWorkflowDispatcher struct {
lock sync.Mutex
workersByTaskQueue map[string][]*workflowWorker
workersByTaskQueue map[string][]eagerWorker
}

// registerWorker registers a worker that can be used for eager workflow dispatch
func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) {
e.lock.Lock()
defer e.lock.Unlock()
e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker)
e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker)
}

func (e *eagerWorkflowDispatcher) tryGetEagerWorkflowExecutor(options *StartWorkflowOptions) *eagerWorkflowExecutor {
// applyToRequest updates request if eager workflow dispatch is possible and returns the eagerWorkflowExecutor to use
func (e *eagerWorkflowDispatcher) applyToRequest(request *workflowservice.StartWorkflowExecutionRequest) *eagerWorkflowExecutor {
e.lock.Lock()
defer e.lock.Unlock()
// Try every worker that is assigned to the desired task queue.
workers := e.workersByTaskQueue[options.TaskQueue]
workers := e.workersByTaskQueue[request.GetTaskQueue().Name]
rand.Shuffle(len(workers), func(i, j int) { workers[i], workers[j] = workers[j], workers[i] })
for _, worker := range workers {
executor := worker.reserveWorkflowExecutor()
if executor != nil {
return executor
if worker.tryReserveSlot() {
request.RequestEagerExecution = true
return &eagerWorkflowExecutor{
worker: worker,
}
}
}
return nil
Expand All @@ -60,32 +64,22 @@ func (e *eagerWorkflowDispatcher) tryGetEagerWorkflowExecutor(options *StartWork
// eagerWorkflowExecutor is a worker-scoped executor for an eager workflow task.
type eagerWorkflowExecutor struct {
handledResponse atomic.Bool
worker *workflowWorker
worker eagerWorker
}

// handleResponse of an eager workflow task from a StartWorkflowExecution request.
func (e *eagerWorkflowExecutor) handleResponse(response *workflowservice.PollWorkflowTaskQueueResponse) {
if !e.handledResponse.CompareAndSwap(false, true) {
panic("eagerWorkflowExecutor trying to handle multiple responses")
}
// Before starting the goroutine we have to increase the wait group counter
// that the poller would have otherwise increased
e.worker.worker.stopWG.Add(1)
// Asynchronously execute
// Asynchronously execute the task
task := &eagerWorkflowTask{
task: response,
}
go func() {
// Mark completed when complete
defer func() {
e.release()
}()

// Process the task synchronously. We call the processor on the base
// worker instead of a higher level so we can get the benefits of metrics,
// stop wait group update, etc.
e.worker.worker.processTask(task)
}()
e.worker.processTaskAsync(task, func() {
// The processTaskAsync does not do this itself because our task is *eagerWorkflowTask, not *polledTask.
e.release()
})
}

// release the executor task slot this eagerWorkflowExecutor was holding.
Expand All @@ -95,6 +89,6 @@ func (e *eagerWorkflowExecutor) release() {
if e.handledResponse.CompareAndSwap(false, true) {
// Assume there is room because it is reserved on creation, so we make a blocking send.
// The processTask does not do this itself because our task is not *polledTask.
e.worker.worker.pollerRequestCh <- struct{}{}
e.worker.releaseSlot()
}
}
25 changes: 4 additions & 21 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,25 +376,6 @@ func (ww *workflowWorker) Stop() {
ww.worker.Stop()
}

// reserveWorkflowExecutor
func (ww *workflowWorker) reserveWorkflowExecutor() *eagerWorkflowExecutor {
if !ww.worker.isWorkerStarted || ww.worker.isStop() {
return nil
}
// Reserve a spot for our request via a non-blocking attempt to take a poller
// request entry which essentially reserves a spot
select {
case <-ww.worker.pollerRequestCh:
default:
return nil
}

// We can request so return the worker
return &eagerWorkflowExecutor{
worker: ww,
}
}

func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, overrides *workerOverrides, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker {
if params.Identity == "" {
params.Identity = getWorkerIdentity(params.TaskQueue)
Expand Down Expand Up @@ -1578,15 +1559,17 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
} else {
workflowWorker = newWorkflowWorker(client.workflowService, workerParams, nil, registry)
}
client.rootInterceptor.eagerDispatcher.registerWorker(workflowWorker)
if client.rootInterceptor != nil {
client.rootInterceptor.eagerDispatcher.registerWorker(workflowWorker)
}
}

// activity types.
var activityWorker *activityWorker
if !options.LocalActivityWorkerOnly {
activityWorker = newActivityWorker(client.workflowService, workerParams, nil, registry, nil)
// Set the activity worker on the eager executor
workerParams.eagerActivityExecutor.activityWorker = activityWorker
workerParams.eagerActivityExecutor.activityWorker = activityWorker.worker
}

var sessionWorker *sessionWorker
Expand Down
33 changes: 30 additions & 3 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,34 @@ func (bw *baseWorker) runPoller() {
}
}

func (bw *baseWorker) tryReserveSlot() bool {
if !bw.isWorkerStarted || bw.isStop() {
return false
}
// Reserve a executor slot via a non-blocking attempt to take a poller
// request entry which essentially reserves a slot
select {
case <-bw.pollerRequestCh:
default:
return false
}
return true
}

func (bw *baseWorker) releaseSlot() {
// Like other sends to this channel, we assume there is room because we
// reserved it, so we make a blocking send.
bw.pollerRequestCh <- struct{}{}
}

func (bw *baseWorker) processTaskAsync(task interface{}, callback func()) {
bw.stopWG.Add(1)
go func() {
defer callback()
bw.processTask(task)
}()
}

func (bw *baseWorker) runTaskDispatcher() {
defer bw.stopWG.Done()

Expand All @@ -323,15 +351,14 @@ func (bw *baseWorker) runTaskDispatcher() {
case <-bw.stopCh:
return
case task := <-bw.taskQueueCh:
// for non-polled-task (local activity result as task), we don't need to rate limit
// for non-polled-task (local activity result as task or eager task), we don't need to rate limit
_, isPolledTask := task.(*polledTask)
if isPolledTask && bw.taskLimiter.Wait(bw.limiterContext) != nil {
if bw.isStop() {
return
}
}
bw.stopWG.Add(1)
go bw.processTask(task)
bw.processTaskAsync(task, func() {})
}
}
}
Expand Down
Loading

0 comments on commit c25aa88

Please sign in to comment.