-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add eager workflow start #1164
Add eager workflow start #1164
Conversation
c25aa88
to
4433bdd
Compare
if !e.handledResponse.CompareAndSwap(false, true) { | ||
panic("eagerWorkflowExecutor trying to handle multiple responses") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the situation you are trying to protect from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a caller accidentally reusing eagerWorkflowExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be allowed to trust callers on non-exported things, i.e. ourselves, here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't
internal/internal_workflow_client.go
Outdated
var eagerExecutor *eagerWorkflowExecutor | ||
if in.Options.EnableEagerStart && w.client.capabilities.GetEagerWorkflowStart() && w.eagerDispatcher != nil { | ||
eagerExecutor = w.eagerDispatcher.applyToRequest(startRequest) | ||
if eagerExecutor != nil { | ||
defer eagerExecutor.release() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not wondering if there is value in a separate dispatcher and executor concept since this is so straightforward and done on only this call. I wonder if you can just store a eagerWorkflowWorkers sync.Map
on WorkflowClient
and change this to:
var eagerWorkerInUse eagerWorker
if in.Options.EnableEagerStart && w.client.capabilities.GetEagerWorkflowStart() {
eagerWorkerIface, _ := w.client.eagerWorkflowWorkers.Load(request.GetTaskQueue().GetName())
eagerWorkerInUse, _ = eagerWorkerIface.(eagerWorker)
request.RequestEagerExecution = eagerWorkerInUse != nil && eagerWorkerInUse.tryReserveSlot()
}
And after response:
if request.RequestEagerExecution {
if task := response.GetEagerWorkflowTask(); task != nil {
eagerWorkerInUse.processTaskAsync(&eagerWorkflowTask{task}, eagerWorkerInUse.releaseSlot)
} else {
eagerWorkerInUse.releaseSlot()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the abstraction of a separate dispatcher and executor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You took a few lines and made a big file and multiple new types such :-( But it's all subjective, so technically there's no right way I suppose, though I do find inlining simpler in this case.
internal/internal_eager_workflow.go
Outdated
// applyToRequest updates request if eager workflow dispatch is possible and returns the eagerWorkflowExecutor to use | ||
func (e *eagerWorkflowDispatcher) applyToRequest(request *workflowservice.StartWorkflowExecutionRequest) *eagerWorkflowExecutor { | ||
e.lock.Lock() | ||
defer e.lock.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A sync.Map
may be a better choice here, and even if not, you should use a RWMutex
and only RLock
here, and you should only lock map access, not the slot reservation IMO. (though as mentioned in another comment I think a simple eagerWorkflowWorkers sync.Map
on the client itself is probably plenty and the dispatcher/executor is unnecessary)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't believe it is safe to only lock the map access, not the slot reservation. Since the map could be modified while iterating. Originally I thought I might need some limits like eager activity dispatch has, so a mutex made more sense, but I don't think they are needed so I'm fine to use a sync.Map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slot reservation should be per worker and concurrency safe from a caller POV. Only choosing of a worker I think needs to be synchronized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just saying I don't you can safety move the mutex in a way that would result in less code being under the critical section, regardless doesn't matter because we are going to using a sync.map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually on further review I think a sync.map
here is a bad choice because we loose any type safety and insertion would not be safe . So i'll switch to a RWMutex
mutex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, it's ok to lose type safety on sync.Map
in this case. It's for internal use. Nobody would ever use sync.Map
if they were subject to such requirements even on non-exported fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me. Left a couple of optional comments.
@@ -310,6 +310,34 @@ func (bw *baseWorker) runPoller() { | |||
} | |||
} | |||
|
|||
func (bw *baseWorker) tryReserveSlot() bool { | |||
if !bw.isWorkerStarted || bw.isStop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit scared of accessing this field in a non-thread-safe way, but probably ok since only mutated on start/stop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cretz @Quinn-With-Two-Ns Yes, I've found a race here on the roadrunner-temporal
tests, on the one where we start and then stop temporal workers from the different goroutines, but protected by mutex from our side. Here is the stack trace of the race:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rustatian can you link the roadrunner-temporal
test failure? I'll look into it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rustatian can you link the
roadrunner-temporal
test failure? I'll look into it
f898c0a
to
8d20c42
Compare
8d20c42
to
46a7154
Compare
Add eager workflow start