Skip to content

Commit

Permalink
feat(scheduler): Enable compensation job to run regularly, not just o…
Browse files Browse the repository at this point in the history
…n startup (#255)
  • Loading branch information
robzienert authored Apr 20, 2018
1 parent 97641bb commit 26a8697
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.quartz.CronExpression
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.actuate.metrics.CounterService
import org.springframework.boot.actuate.metrics.GaugeService
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.context.ApplicationListener
import org.springframework.context.event.ContextRefreshedEvent
Expand All @@ -36,75 +37,108 @@ import rx.Scheduler
import rx.Subscription

import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.ZoneId
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit

import static net.logstash.logback.argument.StructuredArguments.*

import static com.netflix.spinnaker.echo.model.Trigger.Type.CRON
import static net.logstash.logback.argument.StructuredArguments.kv
/**
* Finds and executes all pipeline triggers that should have run in the last configured time window during startup.
* This job will wait until the {@link com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache} has run prior to
* finding any missed triggers.
*
* If enabled (`scheduler.compensationJob.enableRecurring`, default on), after the initial startup compesation job
* has been performed, a recurring job will be started at a less aggressive poll cycle to ensure lost triggers are
* re-scheduled.
*/
@ConditionalOnExpression('${scheduler.enabled:false} && ${scheduler.compensationJob.enabled:false}')
@Component
@Slf4j
class MissedPipelineTriggerCompensationJob implements ApplicationListener<ContextRefreshedEvent> {

final static int POLLING_INTERVAL_SECONDS = 5
final static Duration STARTUP_POLL_INTERVAL = Duration.ofSeconds(5)
final static Duration RECURRING_POLL_INTERVAL = Duration.ofMinutes(15)

final Scheduler scheduler
final PipelineCache pipelineCache
final OrcaService orcaService
final CounterService counter
final GaugeService gauge
final PipelineInitiator pipelineInitiator
final boolean enableRecurring
final DateContext dateContext

transient Subscription subscription
Subscription startupSubscription
Subscription recurringSubscription

@Autowired
MissedPipelineTriggerCompensationJob(Scheduler scheduler,
PipelineCache pipelineCache,
OrcaService orcaService,
PipelineInitiator pipelineInitiator,
CounterService counter,
GaugeService gauge,
@Value('${scheduler.compensationJob.windowMs:1800000}') long compensationWindowMs,
@Value('${scheduler.cron.timezone:America/Los_Angeles}') String timeZoneId) {
this(scheduler, pipelineCache, orcaService, pipelineInitiator, counter, compensationWindowMs, timeZoneId, null)
@Value('${scheduler.cron.timezone:America/Los_Angeles}') String timeZoneId,
@Value('${scheduler.compensationJob.enableRecurring:true}') boolean enableRecurring) {
this(scheduler, pipelineCache, orcaService, pipelineInitiator, counter, gauge, compensationWindowMs, timeZoneId, enableRecurring, null)
}

MissedPipelineTriggerCompensationJob(Scheduler scheduler,
PipelineCache pipelineCache,
OrcaService orcaService,
PipelineInitiator pipelineInitiator,
CounterService counter,
GaugeService gauge,
@Value('${scheduler.compensationJob.windowMs:1800000}') long compensationWindowMs,
@Value('${scheduler.cron.timezone:America/Los_Angeles}') String timeZoneId,
@Value('${scheduler.compensationJob.enableRecurring:true}') boolean enableRecurring,
DateContext dateContext) {
this.scheduler = scheduler
this.pipelineCache = pipelineCache
this.orcaService = orcaService
this.pipelineInitiator = pipelineInitiator
this.counter = counter
this.gauge = gauge
this.enableRecurring = enableRecurring
this.dateContext = dateContext ?: DateContext.fromCompensationWindow(timeZoneId, compensationWindowMs)
}

@Override
void onApplicationEvent(ContextRefreshedEvent event) {
if (subscription == null) {
subscription = Observable.interval(POLLING_INTERVAL_SECONDS, TimeUnit.SECONDS, scheduler)
if (startupSubscription == null) {
startupSubscription = Observable.interval(STARTUP_POLL_INTERVAL.toMillis(), TimeUnit.MILLISECONDS, scheduler)
.doOnNext { onPipelineCacheAwait(it) }
.flatMap { tick -> Observable.just(pipelineCache.pipelines) }
.doOnError { onPipelineCacheError(it) }
.retry()
.subscribe { List<Pipeline> pipelines ->
if (pipelines.isEmpty()) {
return
}
triggerMissedExecutions(pipelines)
startupSubscription.unsubscribe()

scheduleRecurringCompensation()
}
}
}

void scheduleRecurringCompensation() {
if (enableRecurring && recurringSubscription == null) {
recurringSubscription = Observable.interval(RECURRING_POLL_INTERVAL.toMinutes(), TimeUnit.MINUTES, scheduler)
.doOnNext { onPipelineCacheAwait(it) }
.flatMap { tick -> Observable.just(pipelineCache.pipelines) }
.doOnError { onPipelineCacheError(it) }
.retry()
.subscribe { List<Pipeline> pipelines ->
if (pipelines.size() == 0) {
if (pipelines.isEmpty()) {
return
}
triggerMissedExecutions(pipelines)
subscription.unsubscribe()
}
}
}
Expand Down Expand Up @@ -138,7 +172,7 @@ class MissedPipelineTriggerCompensationJob implements ApplicationListener<Contex
it.startTime != null ? new Date(it.startTime) : null
}

if (executions.size() == 0) {
if (executions.isEmpty()) {
return
}

Expand All @@ -150,10 +184,13 @@ class MissedPipelineTriggerCompensationJob implements ApplicationListener<Contex
def expr = new CronExpression(trigger.cronExpression)
expr.timeZone = dateContext.timeZone

int misses = 0
if (missedExecution(expr, lastExecution, dateContext.triggerWindowFloor, dateContext.now)) {
log.info('Triggering missed execution on pipeline {} {}', kv('application', pipeline.application), kv('pipelineConfigId', pipeline.id))
pipelineInitiator.call(pipeline)
misses++
}
gauge.submit("triggers.cronMisfires", misses)
}
}

Expand All @@ -166,7 +203,7 @@ class MissedPipelineTriggerCompensationJob implements ApplicationListener<Contex
(List<Trigger>) pipelines
.collect { it.triggers }
.flatten()
.findAll { Trigger it -> it && it.enabled && it.type == Trigger.Type.CRON.toString() }
.findAll { Trigger it -> it && it.enabled && it.type == CRON.toString() }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.netflix.spinnaker.echo.pipelinetriggers.orca.PipelineInitiator
import com.netflix.spinnaker.echo.scheduler.actions.pipeline.impl.MissedPipelineTriggerCompensationJob
import org.quartz.CronExpression
import org.springframework.boot.actuate.metrics.CounterService
import org.springframework.boot.actuate.metrics.GaugeService
import rx.schedulers.Schedulers
import spock.lang.Specification
import spock.lang.Unroll
Expand All @@ -39,6 +40,7 @@ class MissedPipelineTriggerCompensationJobSpec extends Specification {
def orcaService = Mock(OrcaService)
def pipelineInitiator = Mock(PipelineInitiator)
def counterService = Stub(CounterService)
def gaugeService = Mock(GaugeService)

def 'should trigger pipelines for all missed executions'() {
given:
Expand All @@ -64,7 +66,7 @@ class MissedPipelineTriggerCompensationJobSpec extends Specification {
triggerWindowFloor: getDateOffset(0),
now: getDateOffset(50)
)
def compensationJob = new MissedPipelineTriggerCompensationJob(scheduler, pipelineCache, orcaService, pipelineInitiator, counterService, 30000, 'America/Los_Angeles', dateContext)
def compensationJob = new MissedPipelineTriggerCompensationJob(scheduler, pipelineCache, orcaService, pipelineInitiator, counterService, gaugeService, 30000, 'America/Los_Angeles', true, dateContext)

when:
compensationJob.triggerMissedExecutions(pipelines)
Expand All @@ -81,6 +83,7 @@ class MissedPipelineTriggerCompensationJobSpec extends Specification {
])
}
1 * pipelineInitiator.call((Pipeline) pipelines[0])
1 * gaugeService.submit(_, _)
0 * _
}

Expand All @@ -101,7 +104,7 @@ class MissedPipelineTriggerCompensationJobSpec extends Specification {
triggerWindowFloor: getDateOffset(0),
now: getDateOffset(0)
)
def compensationJob = new MissedPipelineTriggerCompensationJob(scheduler, pipelineCache, orcaService, pipelineInitiator, counterService, 30000, 'America/Los_Angeles', dateContext)
def compensationJob = new MissedPipelineTriggerCompensationJob(scheduler, pipelineCache, orcaService, pipelineInitiator, counterService, gaugeService, 30000, 'America/Los_Angeles', true, dateContext)

when:
compensationJob.triggerMissedExecutions(pipelines)
Expand All @@ -113,6 +116,7 @@ class MissedPipelineTriggerCompensationJobSpec extends Specification {
])
}
0 * pipelineInitiator.call(_)
1 * gaugeService.submit(_, _)
0 * _
}

Expand Down

0 comments on commit 26a8697

Please sign in to comment.