diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index 0c726a42e4f..39edbc6b1d6 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -92,6 +92,10 @@ type ( watchingWorkflowId string watchingFuture workflow.Future + // Signal requests + pendingPatch *schedpb.SchedulePatch + pendingUpdate *schedspb.FullUpdateRequest + uuidBatch []string } @@ -170,11 +174,12 @@ func (s *scheduler) run() error { s.Info.CreateTime = s.State.LastProcessedTime } - // A schedule may be created with an initial Patch, e.g. start one immediately. Handle that now. - s.processPatch(s.InitialPatch) + // A schedule may be created with an initial Patch, e.g. start one immediately. Put that in + // the state so it takes effect below. + s.pendingPatch = s.InitialPatch s.InitialPatch = nil - for iters := s.tweakables.IterationsBeforeContinueAsNew; iters > 0; iters-- { + for iters := s.tweakables.IterationsBeforeContinueAsNew; iters > 0 || s.pendingUpdate != nil || s.pendingPatch != nil; iters-- { t1 := timestamp.TimeValue(s.State.LastProcessedTime) t2 := s.now() if t2.Before(t1) { @@ -189,6 +194,13 @@ func (s *scheduler) run() error { false, ) s.State.LastProcessedTime = timestamp.TimePtr(t2) + // handle signals after processing time range that just elapsed + scheduleChanged := s.processSignals() + if scheduleChanged { + // need to calculate sleep again + nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false) + } + // try starting workflows in the buffer for s.processBuffer() { } s.updateMemoAndSearchAttributes() @@ -266,11 +278,7 @@ func (s *scheduler) now() time.Time { } func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) { - s.logger.Debug("processPatch", "patch", patch) - - if patch == nil { - return - } + s.logger.Info("Schedule patch", "patch", patch.String()) if trigger := patch.TriggerImmediately; trigger != nil { now := s.now() @@ -367,10 +375,14 @@ func (s *scheduler) sleep(nextSleep time.Duration) { sel := workflow.NewSelector(s.ctx) upCh := workflow.GetSignalChannel(s.ctx, SignalNameUpdate) - sel.AddReceive(upCh, s.handleUpdateSignal) + sel.AddReceive(upCh, func(ch workflow.ReceiveChannel, _ bool) { + ch.Receive(s.ctx, &s.pendingUpdate) + }) reqCh := workflow.GetSignalChannel(s.ctx, SignalNamePatch) - sel.AddReceive(reqCh, s.handlePatchSignal) + sel.AddReceive(reqCh, func(ch workflow.ReceiveChannel, _ bool) { + ch.Receive(s.ctx, &s.pendingPatch) + }) refreshCh := workflow.GetSignalChannel(s.ctx, SignalNameRefresh) sel.AddReceive(refreshCh, s.handleRefreshSignal) @@ -450,9 +462,7 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future) { s.logger.Info("started workflow finished", "workflow", id, "status", res.Status, "pause-after-failure", pauseOnFailure) } -func (s *scheduler) handleUpdateSignal(ch workflow.ReceiveChannel, _ bool) { - var req schedspb.FullUpdateRequest - ch.Receive(s.ctx, &req) +func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) { if err := s.checkConflict(req.ConflictToken); err != nil { s.logger.Warn("Update conflicted with concurrent change") return @@ -473,13 +483,6 @@ func (s *scheduler) handleUpdateSignal(ch workflow.ReceiveChannel, _ bool) { s.incSeqNo() } -func (s *scheduler) handlePatchSignal(ch workflow.ReceiveChannel, _ bool) { - var patch schedpb.SchedulePatch - ch.Receive(s.ctx, &patch) - s.logger.Info("Schedule patch", "patch", patch.String()) - s.processPatch(&patch) -} - func (s *scheduler) handleRefreshSignal(ch workflow.ReceiveChannel, _ bool) { ch.Receive(s.ctx, nil) s.logger.Debug("got refresh signal") @@ -488,6 +491,20 @@ func (s *scheduler) handleRefreshSignal(ch workflow.ReceiveChannel, _ bool) { s.State.NeedRefresh = true } +func (s *scheduler) processSignals() bool { + scheduleChanged := false + if s.pendingPatch != nil { + s.processPatch(s.pendingPatch) + s.pendingPatch = nil + } + if s.pendingUpdate != nil { + s.processUpdate(s.pendingUpdate) + s.pendingUpdate = nil + scheduleChanged = true + } + return scheduleChanged +} + func (s *scheduler) getFutureActionTimes(n int) []*time.Time { if s.cspec == nil { return nil diff --git a/service/worker/scheduler/workflow_test.go b/service/worker/scheduler/workflow_test.go index 11b940f84ba..4457f60e4a1 100644 --- a/service/worker/scheduler/workflow_test.go +++ b/service/worker/scheduler/workflow_test.go @@ -285,7 +285,7 @@ func (s *workflowSuite) runAcrossContinue( s.True(s.env.IsWorkflowCompleted()) result := s.env.GetWorkflowError() var canErr *workflow.ContinueAsNewError - s.True(errors.As(result, &canErr)) + s.Require().True(errors.As(result, &canErr)) s.env.AssertExpectations(s.T()) @@ -299,9 +299,9 @@ func (s *workflowSuite) runAcrossContinue( s.NoError(payloads.Decode(canErr.Input, &startArgs)) } // check starts that we actually got - s.Equal(len(runs), len(gotRuns)) + s.Require().Equal(len(runs), len(gotRuns)) for _, run := range runs { - s.True(run.start.Equal(gotRuns[run.id])) + s.Truef(run.start.Equal(gotRuns[run.id]), "%v != %v", run.start, gotRuns[run.id]) } } } @@ -1246,6 +1246,56 @@ func (s *workflowSuite) TestUpdate() { ) } +func (s *workflowSuite) TestUpdateNotRetroactive() { + s.runAcrossContinue( + []workflowRun{ + { + id: "myid-2022-06-01T01:00:00Z", + start: time.Date(2022, 6, 1, 1, 0, 0, 0, time.UTC), + end: time.Date(2022, 6, 1, 1, 0, 30, 0, time.UTC), + result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + }, + { + id: "newid-2022-06-01T01:07:20Z", + start: time.Date(2022, 6, 1, 1, 7, 20, 0, time.UTC), + end: time.Date(2022, 6, 1, 1, 7, 30, 0, time.UTC), + result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + }, + { + id: "newid-2022-06-01T01:07:40Z", + start: time.Date(2022, 6, 1, 1, 7, 40, 0, time.UTC), + end: time.Date(2022, 6, 1, 1, 7, 50, 0, time.UTC), + result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + }, + }, + []delayedCallback{ + { + at: time.Date(2022, 6, 1, 1, 7, 10, 0, time.UTC), + f: func() { + s.env.SignalWorkflow(SignalNameUpdate, &schedspb.FullUpdateRequest{ + Schedule: &schedpb.Schedule{ + Spec: &schedpb.ScheduleSpec{ + Interval: []*schedpb.IntervalSpec{{ + Interval: timestamp.DurationPtr(20 * time.Second), + }}, + }, + Action: s.defaultAction("newid"), + }, + }) + }, + }, + }, + &schedpb.Schedule{ + Spec: &schedpb.ScheduleSpec{ + Interval: []*schedpb.IntervalSpec{{ + Interval: timestamp.DurationPtr(1 * time.Hour), + }}, + }, + }, + 5, + ) +} + func (s *workflowSuite) TestLimitedActions() { s.runAcrossContinue( []workflowRun{