Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand WF context locking to cover WFT responses #1179

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion internal/internal_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,25 @@ type (

// WorkflowTaskHandler represents workflow task handlers.
WorkflowTaskHandler interface {
WorkflowContextManager

// Processes the workflow task
// The response could be:
// - RespondWorkflowTaskCompletedRequest
// - RespondWorkflowTaskFailedRequest
// - RespondQueryTaskCompletedRequest
ProcessWorkflowTask(
task *workflowTask,
ctx *workflowExecutionContextImpl,
f workflowTaskHeartbeatFunc,
) (response interface{}, resetter EventLevelResetter, err error)
) (response interface{}, err error)
}

WorkflowContextManager interface {
GetOrCreateWorkflowContext(
task *workflowservice.PollWorkflowTaskQueueResponse,
historyIterator HistoryIterator,
) (*workflowExecutionContextImpl, error)
}

// ActivityTaskHandler represents activity task handlers.
Expand Down
56 changes: 34 additions & 22 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,16 @@ func newWorkflowExecutionContext(
return workflowContext
}

// Lock acquires the lock on this context object, use Unlock to release the
// lock.
func (w *workflowExecutionContextImpl) Lock() {
w.mutex.Lock()
}

func (w *workflowExecutionContextImpl) Unlock(err error) {
// ErrorCleanup clears this context's state and removes it from cache as
// necessary if the supplied error is not nil or if this context's internal
// error field has been set or if the workflow run has completed.
func (w *workflowExecutionContextImpl) ErrorCleanup(err error) {
if err != nil || w.err != nil || w.isWorkflowCompleted ||
(w.wth.cache.MaxWorkflowCacheSize() <= 0 && !w.hasPendingLocalActivityWork()) {
// TODO: in case of closed, it asumes the close command always succeed. need server side change to return
Expand All @@ -496,7 +501,14 @@ func (w *workflowExecutionContextImpl) Unlock(err error) {
// exited
w.clearState()
}
}

// Unlock performs any necessary error cleanup that might be needed due to
// workflow completion or context errors and then releases the lock on this
// context. This implementation is _not_ idempotent so it should only be called
// once and only if the context lock is held by the calling goroutine.
func (w *workflowExecutionContextImpl) Unlock() {
w.ErrorCleanup(nil)
w.mutex.Unlock()
}

Expand Down Expand Up @@ -631,7 +643,11 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
return newWorkflowExecutionContext(workflowInfo, wth), nil
}

func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(
// GetOrCreateWorkflowContext finds an existing cached context object for this
// run ID or creates a new object, adds it to cache, and returns it. In all
// non-error cases the returned context object is in a locked state (i.e.
// WorkflowContext.Lock() has been called).
func (wth *workflowTaskHandlerImpl) GetOrCreateWorkflowContext(
task *workflowservice.PollWorkflowTaskQueueResponse,
historyIterator HistoryIterator,
) (workflowContext *workflowExecutionContextImpl, err error) {
Expand Down Expand Up @@ -681,10 +697,12 @@ func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(
wth.cache.removeWorkflowContext(runID)
workflowContext.clearState()
}
workflowContext.Unlock(err)
workflowContext.ErrorCleanup(err)
workflowContext.Unlock()
workflowContext = nil
}
}

// If the workflow was not cached or the cache was stale.
if workflowContext == nil {
if !isFullHistory {
Expand All @@ -711,7 +729,8 @@ func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(

err = workflowContext.resetStateIfDestroyed(task, historyIterator)
if err != nil {
workflowContext.Unlock(err)
workflowContext.ErrorCleanup(err)
workflowContext.Unlock()
}

return
Expand Down Expand Up @@ -756,10 +775,11 @@ func (w *workflowExecutionContextImpl) resetStateIfDestroyed(task *workflowservi
// ProcessWorkflowTask processes all the events of the workflow task.
func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
workflowTask *workflowTask,
workflowContext *workflowExecutionContextImpl,
heartbeatFunc workflowTaskHeartbeatFunc,
) (completeRequest interface{}, resetter EventLevelResetter, errRet error) {
) (completeRequest interface{}, errRet error) {
if workflowTask == nil || workflowTask.task == nil {
return nil, nil, errors.New("nil workflow task provided")
return nil, errors.New("nil workflow task provided")
}
task := workflowTask.task
if task.History == nil || len(task.History.Events) == 0 {
Expand All @@ -768,11 +788,11 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
}
}
if task.Query == nil && len(task.History.Events) == 0 {
return nil, nil, errors.New("nil or empty history")
return nil, errors.New("nil or empty history")
}

if task.Query != nil && len(task.Queries) != 0 {
return nil, nil, errors.New("invalid query workflow task")
return nil, errors.New("invalid query workflow task")
}

runID := task.WorkflowExecution.GetRunId()
Expand All @@ -786,19 +806,14 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
tagPreviousStartedEventID, task.GetPreviousStartedEventId())
})

workflowContext, err := wth.getOrCreateWorkflowContext(task, workflowTask.historyIterator)
if err != nil {
return nil, nil, err
}

defer func() {
workflowContext.Unlock(errRet)
}()

var response interface{}
var (
response interface{}
err error
heartbeatTimer *time.Timer
)

var heartbeatTimer *time.Timer
defer func() {
workflowContext.ErrorCleanup(errRet)
if heartbeatTimer != nil {
heartbeatTimer.Stop()
}
Expand Down Expand Up @@ -882,7 +897,6 @@ processWorkflowLoop:
}
errRet = err
completeRequest = response
resetter = workflowContext.SetPreviousStartedEventID
return
}

Expand Down Expand Up @@ -1250,8 +1264,6 @@ func (w *workflowExecutionContextImpl) SetCurrentTask(task *workflowservice.Poll
}

func (w *workflowExecutionContextImpl) SetPreviousStartedEventID(eventID int64) {
w.mutex.Lock() // This call can race against the cache eviction thread - see clearState
defer w.mutex.Unlock()
w.previousStartedEventID = eventID
}

Expand Down
14 changes: 11 additions & 3 deletions internal/internal_task_handlers_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,19 @@ type sampleWorkflowTaskHandler struct{}

func (wth sampleWorkflowTaskHandler) ProcessWorkflowTask(
workflowTask *workflowTask,
_ *workflowExecutionContextImpl,
_ workflowTaskHeartbeatFunc,
) (interface{}, EventLevelResetter, error) {
) (interface{}, error) {
return &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: workflowTask.task.TaskToken,
}, nil, nil
}, nil
}

func (wth sampleWorkflowTaskHandler) GetOrCreateWorkflowContext(
task *workflowservice.PollWorkflowTaskQueueResponse,
historyIterator HistoryIterator,
) (*workflowExecutionContextImpl, error) {
return nil, nil
}

func newSampleWorkflowTaskHandler() *sampleWorkflowTaskHandler {
Expand Down Expand Up @@ -115,7 +123,7 @@ func (s *PollLayerInterfacesTestSuite) TestProcessWorkflowTaskInterface() {

// Process task and respond to the service.
taskHandler := newSampleWorkflowTaskHandler()
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: response}, nil)
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: response}, nil, nil)
completionRequest := request.(*workflowservice.RespondWorkflowTaskCompletedRequest)
s.NoError(err)

Expand Down
Loading
Loading