From 9701ef095c0ea36028e8894d9fac7bd7b210f0e8 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Fri, 1 Mar 2024 17:13:05 -0800 Subject: [PATCH] Recalculate schedule times from previous action on update (#5381) ## What changed? After an update, the schedule workflow goes back to the previous action/update and recomputes next times from there, instead of from the current time. ## Why? Fixes #4866. Fixes the situation where an update happens in between the nominal time and the jittered time of an action. ## How did you test it? new unit test --- service/worker/scheduler/workflow.go | 94 ++++++++++++++--------- service/worker/scheduler/workflow_test.go | 70 ++++++++++++++++- 2 files changed, 127 insertions(+), 37 deletions(-) diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index 5c40f55b862..176f10f3914 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -74,6 +74,8 @@ const ( InclusiveBackfillStartTime = 4 // do backfill incrementally IncrementalBackfill = 5 + // update from previous action instead of current time + UpdateFromPrevious = 6 ) const ( @@ -206,7 +208,7 @@ var ( AllowZeroSleep: true, ReuseTimer: true, NextTimeCacheV2Size: 14, // see note below - Version: DontTrackOverlapping, // TODO: upgrade to IncrementalBackfill + Version: DontTrackOverlapping, // TODO: upgrade to UpdateFromPrevious } // Note on NextTimeCacheV2Size: This value must be > FutureActionCountForList. Each @@ -290,8 +292,14 @@ func (s *scheduler) run() error { // handle signals after processing time range that just elapsed scheduleChanged := s.processSignals() if scheduleChanged { - // need to calculate sleep again - nextWakeup = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false, nil) + // need to calculate sleep again. note that processSignals may move LastProcessedTime backwards. + nextWakeup = s.processTimeRange( + s.State.LastProcessedTime.AsTime(), + t2, + enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, + false, + nil, + ) } // process backfills if we have any too s.processBackfills() @@ -584,19 +592,28 @@ func (s *scheduler) fillNextTimeCacheV2(start time.Time) { } func (s *scheduler) getNextTime(after time.Time) getNextTimeResult { + // Implementation using a cache to save markers + computation. if s.hasMinVersion(NewCacheAndJitter) { return s.getNextTimeV2(after, after) + } else if s.hasMinVersion(BatchAndCacheTimeQueries) { + return s.getNextTimeV1(after) } - return s.getNextTimeV1(after) + // Run this logic in a SideEffect so that we can fix bugs there without breaking + // existing schedule workflows. + var next getNextTimeResult + panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { + return s.cspec.getNextTime(s.jitterSeed(), after) + }).Get(&next)) + return next } func (s *scheduler) processTimeRange( - t1, t2 time.Time, + start, end time.Time, overlapPolicy enumspb.ScheduleOverlapPolicy, manual bool, limit *int, ) time.Time { - s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlap-policy", overlapPolicy, "manual", manual) + s.logger.Debug("processTimeRange", "start", start, "end", end, "overlap-policy", overlapPolicy, "manual", manual) if s.cspec == nil { return time.Time{} @@ -612,30 +629,23 @@ func (s *scheduler) processTimeRange( // take an action now. (Don't count as missed catchup window either.) // Skip over entire time range if paused or no actions can be taken if !s.canTakeScheduledAction(manual, false) { - return s.getNextTime(t2).Next + return s.getNextTime(end).Next } } - for { - var next getNextTimeResult - if !s.hasMinVersion(BatchAndCacheTimeQueries) { - // Run this logic in a SideEffect so that we can fix bugs there without breaking - // existing schedule workflows. - panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { - return s.cspec.getNextTime(s.jitterSeed(), t1) - }).Get(&next)) - } else { - next = s.getNextTime(t1) - } - t1 = next.Next - if t1.IsZero() || t1.After(t2) { - return t1 - } + var next getNextTimeResult + for next = s.getNextTime(start); !(next.Next.IsZero() || next.Next.After(end)); next = s.getNextTime(next.Next) { if !s.hasMinVersion(BatchAndCacheTimeQueries) && !s.canTakeScheduledAction(manual, false) { continue } - if !manual && t2.Sub(t1) > catchupWindow { - s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1) + if !manual && s.Info.UpdateTime.AsTime().After(next.Next) { + // We're reprocessing since the most recent event after an update. Discard actions before + // the update time (which was just set to "now"). This doesn't have to be guarded with + // hasMinVersion because this condition couldn't happen in previous versions. + continue + } + if !manual && end.Sub(next.Next) > catchupWindow { + s.logger.Warn("Schedule missed catchup window", "now", end, "time", next.Next) s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.Name()).Inc(1) s.Info.MissedCatchupWindow++ continue @@ -643,12 +653,12 @@ func (s *scheduler) processTimeRange( s.addStart(next.Nominal, next.Next, overlapPolicy, manual) if limit != nil { - (*limit)-- - if *limit <= 0 { - return t1 + if (*limit)--; *limit <= 0 { + break } } } + return next.Next } func (s *scheduler) canTakeScheduledAction(manual, decrement bool) bool { @@ -852,6 +862,14 @@ func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) { s.ensureFields() s.compileSpec() + if s.hasMinVersion(UpdateFromPrevious) { + // We need to start re-processing from the last event, so that we catch actions whose + // nominal time is before now but actual time (with jitter) is after now. Logic in + // processTimeRange will discard actions before the UpdateTime. + // Note: get last event time before updating s.Info.UpdateTime, otherwise it'll always be now. + s.State.LastProcessedTime = timestamppb.New(s.getLastEvent()) + } + s.Info.UpdateTime = timestamppb.New(s.now()) s.incSeqNo() } @@ -1365,16 +1383,22 @@ func (s *scheduler) getRetentionExpiration(nextWakeup time.Time) time.Time { return time.Time{} } - var lastActionTime time.Time + // retention starts from the last "event" + return s.getLastEvent().Add(s.tweakables.RetentionTime) +} + +// Returns the time of the last "event" to happen to the schedule. An event here is the +// schedule getting created or updated, or an action. This value is used for calculating the +// retention time (how long an idle schedule lives after becoming idle), and also for +// recalculating times after an update to account for jitter. +func (s *scheduler) getLastEvent() time.Time { + var lastEvent time.Time if len(s.Info.RecentActions) > 0 { - lastActionTime = timestamp.TimeValue(s.Info.RecentActions[len(s.Info.RecentActions)-1].ActualTime) + lastEvent = s.Info.RecentActions[len(s.Info.RecentActions)-1].ActualTime.AsTime() } - - // retention base is max(CreateTime, UpdateTime, and last action time) - retentionBase := lastActionTime - retentionBase = util.MaxTime(retentionBase, timestamp.TimeValue(s.Info.CreateTime)) - retentionBase = util.MaxTime(retentionBase, timestamp.TimeValue(s.Info.UpdateTime)) - return retentionBase.Add(s.tweakables.RetentionTime) + lastEvent = util.MaxTime(lastEvent, s.Info.CreateTime.AsTime()) + lastEvent = util.MaxTime(lastEvent, s.Info.UpdateTime.AsTime()) + return lastEvent } func (s *scheduler) newUUIDString() string { diff --git a/service/worker/scheduler/workflow_test.go b/service/worker/scheduler/workflow_test.go index 4ff7eefea7e..a386182235e 100644 --- a/service/worker/scheduler/workflow_test.go +++ b/service/worker/scheduler/workflow_test.go @@ -212,9 +212,10 @@ func (s *workflowSuite) setupMocksForWorkflows(runs []workflowRun, state *runAcr s.Failf("multiple starts", "for %s at %s (prev %s)", req.Request.WorkflowId, s.now(), prev) } state.started[req.Request.WorkflowId] = s.now() + overhead := time.Duration(100+rand.Intn(100)) * time.Millisecond return &schedspb.StartWorkflowResponse{ RunId: uuid.NewString(), - RealStartTime: timestamppb.New(time.Now()), + RealStartTime: timestamppb.New(s.now().Add(overhead)), }, nil }) // set up short-poll watchers @@ -319,7 +320,7 @@ func (s *workflowSuite) runAcrossContinue( s.Require().NoError(payloads.Decode(canErr.Input, &startArgs)) } // check starts that we actually got - s.Require().Equal(len(runs), len(state.started)) + s.Require().Equalf(len(runs), len(state.started), "started %#v", state.started) for _, run := range runs { actual := state.started[run.id] inRange := !actual.Before(run.start.Add(-run.startTolerance)) && !actual.After(run.start.Add(run.startTolerance)) @@ -1669,6 +1670,71 @@ func (s *workflowSuite) TestUpdateNotRetroactive() { ) } +// Tests that an update between a nominal time and jittered time for a start, that doesn't +// modify that start, will still start it. +func (s *workflowSuite) TestUpdateBetweenNominalAndJitter() { + // TODO: remove once default version is UpdateFromPrevious + prevTweakables := currentTweakablePolicies + currentTweakablePolicies.Version = UpdateFromPrevious + defer func() { currentTweakablePolicies = prevTweakables }() + + spec := &schedpb.ScheduleSpec{ + Interval: []*schedpb.IntervalSpec{{ + Interval: durationpb.New(1 * time.Hour), + }}, + Jitter: durationpb.New(1 * time.Hour), + } + s.runAcrossContinue( + []workflowRun{ + { + id: "myid-2022-06-01T01:00:00Z", + start: time.Date(2022, 6, 1, 1, 49, 22, 594000000, time.UTC), + end: time.Date(2022, 6, 1, 1, 53, 0, 0, time.UTC), + result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + }, + { + id: "myid-2022-06-01T02:00:00Z", + start: time.Date(2022, 6, 1, 2, 2, 39, 204000000, time.UTC), + end: time.Date(2022, 6, 1, 2, 11, 0, 0, time.UTC), + result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + }, + { + id: "newid-2022-06-01T03:00:00Z", + start: time.Date(2022, 6, 1, 3, 37, 29, 538000000, time.UTC), + end: time.Date(2022, 6, 1, 3, 41, 0, 0, time.UTC), + result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + }, + { + id: "newid-2022-06-01T04:00:00Z", + start: time.Date(2022, 6, 1, 4, 23, 34, 755000000, time.UTC), + end: time.Date(2022, 6, 1, 4, 27, 0, 0, time.UTC), + result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + }, + }, + []delayedCallback{ + { + // update after nominal time 03:00:00 but before jittered time 03:37:29 + at: time.Date(2022, 6, 1, 3, 22, 10, 0, time.UTC), + f: func() { + s.env.SignalWorkflow(SignalNameUpdate, &schedspb.FullUpdateRequest{ + Schedule: &schedpb.Schedule{ + Spec: spec, + Action: s.defaultAction("newid"), + }, + }) + }, + }, + { + at: time.Date(2022, 6, 1, 5, 0, 0, 0, time.UTC), + finishTest: true, + }, + }, + &schedpb.Schedule{ + Spec: spec, + }, + ) +} + func (s *workflowSuite) TestLimitedActions() { // written using low-level mocks so we can sleep forever