Skip to content

Commit

Permalink
Timeout query with proper error if worker is down (#5252)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Fixing #1546 by rejecting
Workflow Queries with the following message if no poller has been seen
recently: `"no poller seen for task queue recently, worker may be
down"`.
We wait for 1 sec less than the actual timeout before returning this
error.

## Why?
<!-- Tell your future self why have you made these changes -->
The previous behavior did not provide useful error messages in cases
where the workers are down.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
unit test

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
None

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No
  • Loading branch information
ShahabT authored Jan 12, 2024
1 parent 872ed94 commit 41c67ba
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 7 deletions.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ const (
// MatchingMaxWaitForPollerBeforeFwd in presence of a non-negligible backlog, we resume forwarding tasks if the
// duration since last poll exceeds this threshold.
MatchingMaxWaitForPollerBeforeFwd = "matching.maxWaitForPollerBeforeFwd"
// QueryPollerUnavailableWindow WF Queries are rejected after a while if no poller has been seen within the window
QueryPollerUnavailableWindow = "matching.queryPollerUnavailableWindow"

// for matching testing only:

Expand Down
3 changes: 3 additions & 0 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2345,6 +2345,9 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows
}
hResponse, err := wh.historyClient.QueryWorkflow(ctx, req)
if err != nil {
if common.IsContextDeadlineExceededErr(err) {
return nil, serviceerror.NewDeadlineExceeded("query timed out before a worker could process it")
}
return nil, err
}
return hResponse.GetResponse(), nil
Expand Down
14 changes: 9 additions & 5 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type (
GetUserDataLongPollTimeout dynamicconfig.DurationPropertyFn
BacklogNegligibleAge dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters
MaxWaitForPollerBeforeFwd dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters
QueryPollerUnavailableWindow dynamicconfig.DurationPropertyFn

// Time to hold a poll request before returning an empty response if there are no tasks
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters
Expand Down Expand Up @@ -106,10 +107,11 @@ type (

taskQueueConfig struct {
forwarderConfig
SyncMatchWaitDuration func() time.Duration
BacklogNegligibleAge func() time.Duration
MaxWaitForPollerBeforeFwd func() time.Duration
TestDisableSyncMatch func() bool
SyncMatchWaitDuration func() time.Duration
BacklogNegligibleAge func() time.Duration
MaxWaitForPollerBeforeFwd func() time.Duration
QueryPollerUnavailableWindow func() time.Duration
TestDisableSyncMatch func() bool
// Time to hold a poll request before returning an empty response if there are no tasks
LongPollExpirationInterval func() time.Duration
RangeSize int64
Expand Down Expand Up @@ -199,6 +201,7 @@ func NewConfig(
GetUserDataLongPollTimeout: dc.GetDurationProperty(dynamicconfig.MatchingGetUserDataLongPollTimeout, 5*time.Minute-10*time.Second),
BacklogNegligibleAge: dc.GetDurationPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingBacklogNegligibleAge, 24*365*10*time.Hour),
MaxWaitForPollerBeforeFwd: dc.GetDurationPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingMaxWaitForPollerBeforeFwd, 200*time.Millisecond),
QueryPollerUnavailableWindow: dc.GetDurationProperty(dynamicconfig.QueryPollerUnavailableWindow, 20*time.Second),

AdminNamespaceToPartitionDispatchRate: dc.GetFloatPropertyFilteredByNamespace(dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate, 10000),
AdminNamespaceTaskqueueToPartitionDispatchRate: dc.GetFloatPropertyFilteredByTaskQueueInfo(dynamicconfig.AdminMatchingNamespaceTaskqueueToPartitionDispatchRate, 1000),
Expand Down Expand Up @@ -240,7 +243,8 @@ func newTaskQueueConfig(id *taskQueueID, config *Config, namespace namespace.Nam
MaxWaitForPollerBeforeFwd: func() time.Duration {
return config.MaxWaitForPollerBeforeFwd(namespace.String(), taskQueueName, taskType)
},
TestDisableSyncMatch: config.TestDisableSyncMatch,
QueryPollerUnavailableWindow: config.QueryPollerUnavailableWindow,
TestDisableSyncMatch: config.TestDisableSyncMatch,
LoadUserData: func() bool {
return config.LoadUserData(namespace.String(), taskQueueName, taskType)
},
Expand Down
28 changes: 26 additions & 2 deletions service/matching/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// TaskMatcher matches a task producer with a task consumer
Expand Down Expand Up @@ -79,7 +81,8 @@ const (

var (
// Sentinel error to redirect while blocked in matcher.
errInterrupted = errors.New("interrupted offer")
errInterrupted = errors.New("interrupted offer")
errNoRecentPoller = status.Error(codes.FailedPrecondition, "no poller seen for task queue recently, worker may be down")
)

// newTaskMatcher returns a task matcher instance. The returned instance can be used by task producers and consumers to
Expand Down Expand Up @@ -230,6 +233,16 @@ func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) (*mat
}

fwdrTokenC := tm.fwdrAddReqTokenC()
var noPollerCtxC <-chan struct{}

if deadline, ok := ctx.Deadline(); ok && fwdrTokenC == nil {
// Reserving 1sec to customize the timeout error if user is querying a workflow
// without having started the workers.
noPollerTimeout := time.Until(deadline) - time.Second
noPollerCtx, cancel := context.WithTimeout(ctx, noPollerTimeout)
noPollerCtxC = noPollerCtx.Done()
defer cancel()
}

for {
select {
Expand All @@ -249,6 +262,13 @@ func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) (*mat
continue
}
return nil, err
case <-noPollerCtxC:
// only error if there has not been a recent poller. Otherwise, let it wait for the remaining time
// hopping for a match, or ultimately returning the default CDE error.
if tm.timeSinceLastPoll() > tm.config.QueryPollerUnavailableWindow() {
return nil, errNoRecentPoller
}
continue
case <-ctx.Done():
return nil, ctx.Err()
}
Expand Down Expand Up @@ -303,7 +323,7 @@ forLoop:
// and they are all stuck in the wrong (root) partition. (Note that since
// frontend balanced the number of pending pollers per partition this only
// becomes an issue when the pollers are fewer than the partitions)
lp := time.Since(time.Unix(0, tm.lastPoller.Load()))
lp := tm.timeSinceLastPoll()
maxWaitForLocalPoller := tm.config.MaxWaitForPollerBeforeFwd()
if lp < maxWaitForLocalPoller {
fwdTokenC = nil
Expand Down Expand Up @@ -621,3 +641,7 @@ func (tm *TaskMatcher) emitForwardedSourceStats(
tm.metricsHandler.Counter(metrics.LocalToLocalMatchPerTaskQueueCounter.Name()).Record(1)
}
}

func (tm *TaskMatcher) timeSinceLastPoll() time.Duration {
return time.Since(time.Unix(0, tm.lastPoller.Load()))
}
88 changes: 88 additions & 0 deletions service/matching/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,94 @@ func (t *MatcherTestSuite) TestSyncMatchFailure() {
t.False(syncMatch)
}

func (t *MatcherTestSuite) TestQueryNoCurrentPollersButRecentPollers() {
t.client.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(arg0 context.Context, arg1 *matchingservice.PollWorkflowTaskQueueRequest, arg2 ...interface{}) {
_, err := t.rootMatcher.PollForQuery(arg0, &pollMetadata{})
t.Assert().Error(err, context.DeadlineExceeded)
},
).Return(nil, context.DeadlineExceeded).AnyTimes()

// make a poll that expires
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
_, err := t.matcher.PollForQuery(ctx, &pollMetadata{})
t.Assert().Error(err, context.DeadlineExceeded)
cancel()

// send query and expect generic DeadlineExceeded error
task := newInternalQueryTask(uuid.New(), &matchingservice.QueryWorkflowRequest{})
t.client.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(ctx context.Context, req *matchingservice.QueryWorkflowRequest, arg2 ...interface{}) {
task.forwardedFrom = req.GetForwardedSource()
resp, err := t.rootMatcher.OfferQuery(ctx, task)
t.Nil(resp)
t.Assert().Error(err, context.DeadlineExceeded)
},
).Return(nil, context.DeadlineExceeded)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
_, err = t.matcher.OfferQuery(ctx, task)
cancel()
t.Error(err, context.DeadlineExceeded)
}

func (t *MatcherTestSuite) TestQueryNoRecentPoller() {
t.client.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(arg0 context.Context, arg1 *matchingservice.PollWorkflowTaskQueueRequest, arg2 ...interface{}) {
_, err := t.rootMatcher.PollForQuery(arg0, &pollMetadata{})
t.Assert().Error(err, context.DeadlineExceeded)
},
).Return(nil, context.DeadlineExceeded).AnyTimes()

// make a poll that expires
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
_, err := t.matcher.PollForQuery(ctx, &pollMetadata{})
t.Assert().Error(err, context.DeadlineExceeded)
cancel()

// wait 10ms after the poll
time.Sleep(time.Millisecond * 10)

// set the window to 5ms
t.cfg.QueryPollerUnavailableWindow = func() time.Duration {
return time.Millisecond * 5
}

// make the query and expect errNoRecentPoller
task := newInternalQueryTask(uuid.New(), &matchingservice.QueryWorkflowRequest{})
t.client.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(ctx context.Context, req *matchingservice.QueryWorkflowRequest, arg2 ...interface{}) {
task.forwardedFrom = req.GetForwardedSource()
resp, err := t.rootMatcher.OfferQuery(ctx, task)
t.Nil(resp)
t.Assert().Error(err, errNoRecentPoller)
},
).Return(nil, errNoRecentPoller)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
_, err = t.matcher.OfferQuery(ctx, task)
cancel()
t.Error(err, errNoRecentPoller)
}

func (t *MatcherTestSuite) TestQueryNoPollerAtAll() {
task := newInternalQueryTask(uuid.New(), &matchingservice.QueryWorkflowRequest{})

t.client.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(ctx context.Context, req *matchingservice.QueryWorkflowRequest, arg2 ...interface{}) {
task.forwardedFrom = req.GetForwardedSource()
resp, err := t.rootMatcher.OfferQuery(ctx, task)
t.Nil(resp)
t.Assert().Error(err, errNoRecentPoller)
},
).Return(nil, errNoRecentPoller)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
_, err := t.matcher.OfferQuery(ctx, task)
cancel()
t.Error(err, errNoRecentPoller)
}

func (t *MatcherTestSuite) TestQueryLocalSyncMatch() {
// force disable remote forwarding
<-t.fwdr.AddReqTokenC()
Expand Down

0 comments on commit 41c67ba

Please sign in to comment.