Skip to content

Commit

Permalink
Recalculate schedule times from previous action on update (#5381)
Browse files Browse the repository at this point in the history
## What changed?
After an update, the schedule workflow goes back to the previous
action/update and recomputes next times from there, instead of from the
current time.

## Why?
Fixes #4866. Fixes the situation where an update happens in between the
nominal time and the jittered time of an action.

## How did you test it?
new unit test
  • Loading branch information
dnr authored and yycptt committed Apr 26, 2024
1 parent dd4323a commit 9701ef0
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 37 deletions.
94 changes: 59 additions & 35 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ const (
InclusiveBackfillStartTime = 4
// do backfill incrementally
IncrementalBackfill = 5
// update from previous action instead of current time
UpdateFromPrevious = 6
)

const (
Expand Down Expand Up @@ -206,7 +208,7 @@ var (
AllowZeroSleep: true,
ReuseTimer: true,
NextTimeCacheV2Size: 14, // see note below
Version: DontTrackOverlapping, // TODO: upgrade to IncrementalBackfill
Version: DontTrackOverlapping, // TODO: upgrade to UpdateFromPrevious
}

// Note on NextTimeCacheV2Size: This value must be > FutureActionCountForList. Each
Expand Down Expand Up @@ -290,8 +292,14 @@ func (s *scheduler) run() error {
// handle signals after processing time range that just elapsed
scheduleChanged := s.processSignals()
if scheduleChanged {
// need to calculate sleep again
nextWakeup = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false, nil)
// need to calculate sleep again. note that processSignals may move LastProcessedTime backwards.
nextWakeup = s.processTimeRange(
s.State.LastProcessedTime.AsTime(),
t2,
enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED,
false,
nil,
)
}
// process backfills if we have any too
s.processBackfills()
Expand Down Expand Up @@ -584,19 +592,28 @@ func (s *scheduler) fillNextTimeCacheV2(start time.Time) {
}

func (s *scheduler) getNextTime(after time.Time) getNextTimeResult {
// Implementation using a cache to save markers + computation.
if s.hasMinVersion(NewCacheAndJitter) {
return s.getNextTimeV2(after, after)
} else if s.hasMinVersion(BatchAndCacheTimeQueries) {
return s.getNextTimeV1(after)
}
return s.getNextTimeV1(after)
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
var next getNextTimeResult
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(s.jitterSeed(), after)
}).Get(&next))
return next
}

func (s *scheduler) processTimeRange(
t1, t2 time.Time,
start, end time.Time,
overlapPolicy enumspb.ScheduleOverlapPolicy,
manual bool,
limit *int,
) time.Time {
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlap-policy", overlapPolicy, "manual", manual)
s.logger.Debug("processTimeRange", "start", start, "end", end, "overlap-policy", overlapPolicy, "manual", manual)

if s.cspec == nil {
return time.Time{}
Expand All @@ -612,43 +629,36 @@ func (s *scheduler) processTimeRange(
// take an action now. (Don't count as missed catchup window either.)
// Skip over entire time range if paused or no actions can be taken
if !s.canTakeScheduledAction(manual, false) {
return s.getNextTime(t2).Next
return s.getNextTime(end).Next
}
}

for {
var next getNextTimeResult
if !s.hasMinVersion(BatchAndCacheTimeQueries) {
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(s.jitterSeed(), t1)
}).Get(&next))
} else {
next = s.getNextTime(t1)
}
t1 = next.Next
if t1.IsZero() || t1.After(t2) {
return t1
}
var next getNextTimeResult
for next = s.getNextTime(start); !(next.Next.IsZero() || next.Next.After(end)); next = s.getNextTime(next.Next) {
if !s.hasMinVersion(BatchAndCacheTimeQueries) && !s.canTakeScheduledAction(manual, false) {
continue
}
if !manual && t2.Sub(t1) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
if !manual && s.Info.UpdateTime.AsTime().After(next.Next) {
// We're reprocessing since the most recent event after an update. Discard actions before
// the update time (which was just set to "now"). This doesn't have to be guarded with
// hasMinVersion because this condition couldn't happen in previous versions.
continue
}
if !manual && end.Sub(next.Next) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", end, "time", next.Next)
s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.Name()).Inc(1)
s.Info.MissedCatchupWindow++
continue
}
s.addStart(next.Nominal, next.Next, overlapPolicy, manual)

if limit != nil {
(*limit)--
if *limit <= 0 {
return t1
if (*limit)--; *limit <= 0 {
break
}
}
}
return next.Next
}

func (s *scheduler) canTakeScheduledAction(manual, decrement bool) bool {
Expand Down Expand Up @@ -852,6 +862,14 @@ func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) {
s.ensureFields()
s.compileSpec()

if s.hasMinVersion(UpdateFromPrevious) {
// We need to start re-processing from the last event, so that we catch actions whose
// nominal time is before now but actual time (with jitter) is after now. Logic in
// processTimeRange will discard actions before the UpdateTime.
// Note: get last event time before updating s.Info.UpdateTime, otherwise it'll always be now.
s.State.LastProcessedTime = timestamppb.New(s.getLastEvent())
}

s.Info.UpdateTime = timestamppb.New(s.now())
s.incSeqNo()
}
Expand Down Expand Up @@ -1365,16 +1383,22 @@ func (s *scheduler) getRetentionExpiration(nextWakeup time.Time) time.Time {
return time.Time{}
}

var lastActionTime time.Time
// retention starts from the last "event"
return s.getLastEvent().Add(s.tweakables.RetentionTime)
}

// Returns the time of the last "event" to happen to the schedule. An event here is the
// schedule getting created or updated, or an action. This value is used for calculating the
// retention time (how long an idle schedule lives after becoming idle), and also for
// recalculating times after an update to account for jitter.
func (s *scheduler) getLastEvent() time.Time {
var lastEvent time.Time
if len(s.Info.RecentActions) > 0 {
lastActionTime = timestamp.TimeValue(s.Info.RecentActions[len(s.Info.RecentActions)-1].ActualTime)
lastEvent = s.Info.RecentActions[len(s.Info.RecentActions)-1].ActualTime.AsTime()
}

// retention base is max(CreateTime, UpdateTime, and last action time)
retentionBase := lastActionTime
retentionBase = util.MaxTime(retentionBase, timestamp.TimeValue(s.Info.CreateTime))
retentionBase = util.MaxTime(retentionBase, timestamp.TimeValue(s.Info.UpdateTime))
return retentionBase.Add(s.tweakables.RetentionTime)
lastEvent = util.MaxTime(lastEvent, s.Info.CreateTime.AsTime())
lastEvent = util.MaxTime(lastEvent, s.Info.UpdateTime.AsTime())
return lastEvent
}

func (s *scheduler) newUUIDString() string {
Expand Down
70 changes: 68 additions & 2 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,10 @@ func (s *workflowSuite) setupMocksForWorkflows(runs []workflowRun, state *runAcr
s.Failf("multiple starts", "for %s at %s (prev %s)", req.Request.WorkflowId, s.now(), prev)
}
state.started[req.Request.WorkflowId] = s.now()
overhead := time.Duration(100+rand.Intn(100)) * time.Millisecond
return &schedspb.StartWorkflowResponse{
RunId: uuid.NewString(),
RealStartTime: timestamppb.New(time.Now()),
RealStartTime: timestamppb.New(s.now().Add(overhead)),
}, nil
})
// set up short-poll watchers
Expand Down Expand Up @@ -319,7 +320,7 @@ func (s *workflowSuite) runAcrossContinue(
s.Require().NoError(payloads.Decode(canErr.Input, &startArgs))
}
// check starts that we actually got
s.Require().Equal(len(runs), len(state.started))
s.Require().Equalf(len(runs), len(state.started), "started %#v", state.started)
for _, run := range runs {
actual := state.started[run.id]
inRange := !actual.Before(run.start.Add(-run.startTolerance)) && !actual.After(run.start.Add(run.startTolerance))
Expand Down Expand Up @@ -1669,6 +1670,71 @@ func (s *workflowSuite) TestUpdateNotRetroactive() {
)
}

// Tests that an update between a nominal time and jittered time for a start, that doesn't
// modify that start, will still start it.
func (s *workflowSuite) TestUpdateBetweenNominalAndJitter() {
// TODO: remove once default version is UpdateFromPrevious
prevTweakables := currentTweakablePolicies
currentTweakablePolicies.Version = UpdateFromPrevious
defer func() { currentTweakablePolicies = prevTweakables }()

spec := &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: durationpb.New(1 * time.Hour),
}},
Jitter: durationpb.New(1 * time.Hour),
}
s.runAcrossContinue(
[]workflowRun{
{
id: "myid-2022-06-01T01:00:00Z",
start: time.Date(2022, 6, 1, 1, 49, 22, 594000000, time.UTC),
end: time.Date(2022, 6, 1, 1, 53, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "myid-2022-06-01T02:00:00Z",
start: time.Date(2022, 6, 1, 2, 2, 39, 204000000, time.UTC),
end: time.Date(2022, 6, 1, 2, 11, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "newid-2022-06-01T03:00:00Z",
start: time.Date(2022, 6, 1, 3, 37, 29, 538000000, time.UTC),
end: time.Date(2022, 6, 1, 3, 41, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "newid-2022-06-01T04:00:00Z",
start: time.Date(2022, 6, 1, 4, 23, 34, 755000000, time.UTC),
end: time.Date(2022, 6, 1, 4, 27, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
},
[]delayedCallback{
{
// update after nominal time 03:00:00 but before jittered time 03:37:29
at: time.Date(2022, 6, 1, 3, 22, 10, 0, time.UTC),
f: func() {
s.env.SignalWorkflow(SignalNameUpdate, &schedspb.FullUpdateRequest{
Schedule: &schedpb.Schedule{
Spec: spec,
Action: s.defaultAction("newid"),
},
})
},
},
{
at: time.Date(2022, 6, 1, 5, 0, 0, 0, time.UTC),
finishTest: true,
},
},
&schedpb.Schedule{
Spec: spec,
},
)
}

func (s *workflowSuite) TestLimitedActions() {
// written using low-level mocks so we can sleep forever

Expand Down

0 comments on commit 9701ef0

Please sign in to comment.