From 816e304cc4eb8a0b54b80fb4deb5dcf7336c262d Mon Sep 17 00:00:00 2001 From: Eric Zimanyi Date: Mon, 18 Jun 2018 14:23:17 -0400 Subject: [PATCH] fix(pipelines): Generalize fix for pipeline race condition (#275) --- .../echo/pipelinetriggers/PipelineCache.java | 21 +++------------ .../monitor/BuildEventMonitor.java | 3 ++- .../monitor/DockerEventMonitor.java | 3 ++- .../monitor/GitEventMonitor.java | 3 ++- .../monitor/PubsubEventMonitor.java | 2 +- .../monitor/WebhookEventMonitor.java | 3 ++- .../pipelinetriggers/PipelineCacheSpec.groovy | 5 +++- .../monitor/BuildEventMonitorSpec.groovy | 14 +++++----- .../monitor/DockerEventMonitorSpec.groovy | 22 ++++++++-------- .../monitor/GitEventMonitorSpec.groovy | 26 +++++++++---------- .../monitor/PubsubEventMonitorSpec.groovy | 14 +++++----- .../monitor/WebhookEventMonitorSpec.groovy | 7 ++--- ...issedPipelineTriggerCompensationJob.groovy | 4 +-- .../impl/PipelineConfigsPollingAgent.groovy | 2 +- .../impl/PipelineTriggerConverter.groovy | 2 +- .../PipelineConfigsPollingAgentSpec.groovy | 9 ++++--- .../PipelineTriggerActionConverterSpec.groovy | 3 ++- 17 files changed, 69 insertions(+), 74 deletions(-) diff --git a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/PipelineCache.java b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/PipelineCache.java index a2f0f7f9d..8ceb4c4a7 100644 --- a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/PipelineCache.java +++ b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/PipelineCache.java @@ -53,7 +53,6 @@ public class PipelineCache implements MonitoredPoller { private transient Subscription subscription; private transient SerializedSubject, List> pipelineSubject = ReplaySubject.>createWithSize(1).toSerialized(); - private transient AtomicReference> pipelines = new AtomicReference<>(Collections.emptyList()); @Autowired public PipelineCache(@NonNull Scheduler scheduler, @@ -75,7 +74,7 @@ public void start() { .doOnError(this::onFront50Error) .retry() .map(PipelineCache::decorateTriggers) - .doOnNext(this::cachePipelines) + .doOnNext(this::logRefresh) .subscribe(pipelineSubject); } } @@ -109,26 +108,12 @@ public int getPollingIntervalSeconds() { * * @return An observable emitting the pipelines as of the most recent polling cycle */ - public Observable> getPipelinesAsync() { + public Observable> getPipelines() { return pipelineSubject.take(1); } - /** - * Returns the pipelines as of the most recent polling cycle. If no polling cycles have been - * completed, returns an empty list. - * - * See {@link #getPipelinesAsync()} for an alternate way of getting pipelines that will wait at - * at least one polling cycle before returning a value. - * - * @return The pipelines as of the most recent polling cycle - */ - public List getPipelines() { - return pipelines.get(); - } - - private void cachePipelines(final List pipelines) { + private void logRefresh(final List pipelines) { log.info("Refreshing pipelines"); - this.pipelines.set(pipelines); } private void onFront50Request(final long tick) { diff --git a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/BuildEventMonitor.java b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/BuildEventMonitor.java index 925ac63b6..63c55a2f6 100644 --- a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/BuildEventMonitor.java +++ b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/BuildEventMonitor.java @@ -71,7 +71,8 @@ public void processEvent(Event event) { BuildEvent buildEvent = objectMapper.convertValue(event, BuildEvent.class); Observable.just(buildEvent) .doOnNext(this::onEchoResponse) - .subscribe(triggerEachMatchFrom(pipelineCache.getPipelines())); + .zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new) + .subscribe(triggerEachMatch()); } @Override diff --git a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/DockerEventMonitor.java b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/DockerEventMonitor.java index 6d73e5642..317a53a20 100644 --- a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/DockerEventMonitor.java +++ b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/DockerEventMonitor.java @@ -68,7 +68,8 @@ public void processEvent(Event event) { DockerEvent dockerEvent = objectMapper.convertValue(event, DockerEvent.class); Observable.just(dockerEvent) .doOnNext(this::onEchoResponse) - .subscribe(triggerEachMatchFrom(pipelineCache.getPipelines())); + .zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new) + .subscribe(triggerEachMatch()); } @Override diff --git a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/GitEventMonitor.java b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/GitEventMonitor.java index 39dcf076b..3e948d9fe 100644 --- a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/GitEventMonitor.java +++ b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/GitEventMonitor.java @@ -75,7 +75,8 @@ public void processEvent(Event event) { GitEvent buildEvent = objectMapper.convertValue(event, GitEvent.class); Observable.just(buildEvent) .doOnNext(this::onEchoResponse) - .subscribe(triggerEachMatchFrom(pipelineCache.getPipelines())); + .zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new) + .subscribe(triggerEachMatch()); } @Override diff --git a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/PubsubEventMonitor.java b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/PubsubEventMonitor.java index b56b63070..dffb1aed5 100644 --- a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/PubsubEventMonitor.java +++ b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/PubsubEventMonitor.java @@ -69,7 +69,7 @@ public void processEvent(Event event) { Observable.just(pubsubEvent) .doOnNext(this::onEchoResponse) - .zipWith(pipelineCache.getPipelinesAsync(), TriggerMatchParameters::new) + .zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new) .subscribe(triggerEachMatch()); } diff --git a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/WebhookEventMonitor.java b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/WebhookEventMonitor.java index 70b01bf20..61f414c0a 100644 --- a/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/WebhookEventMonitor.java +++ b/echo-pipelinetriggers/src/main/java/com/netflix/spinnaker/echo/pipelinetriggers/monitor/WebhookEventMonitor.java @@ -74,7 +74,8 @@ public void processEvent(Event event) { Observable.just(webhookEvent) .doOnNext(this::onEchoResponse) - .subscribe(triggerEachMatchFrom(pipelineCache.getPipelines())); + .zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new) + .subscribe(triggerEachMatch()); } @Override diff --git a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/PipelineCacheSpec.groovy b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/PipelineCacheSpec.groovy index 64e46f1e7..2c145973b 100644 --- a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/PipelineCacheSpec.groovy +++ b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/PipelineCacheSpec.groovy @@ -24,6 +24,7 @@ import com.netflix.spinnaker.echo.model.Pipeline import com.netflix.spinnaker.echo.model.Trigger import com.netflix.spinnaker.echo.services.Front50Service import com.netflix.spinnaker.echo.test.RetrofitStubs +import rx.observers.TestSubscriber import rx.schedulers.Schedulers import spock.lang.Shared import spock.lang.Specification @@ -136,15 +137,17 @@ class PipelineCacheSpec extends Specification implements RetrofitStubs { def "keeps polling if Front50 returns an error"() { given: + TestSubscriber> testSubscriber = new TestSubscriber<>(); def pipeline = Pipeline.builder().application('application').name('Pipeline').id('P1').build() pipelineCache.start() when: waitForTicks(3) + pipelineCache.getPipelines().subscribe(testSubscriber) then: front50.getPipelines() >> just([]) >> { throw unavailable() } >> just([pipeline]) - pipelineCache.pipelines == [pipeline] + testSubscriber.assertValue([pipeline]) } def "we can serialize pipelines with triggers that have a parent"() { diff --git a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/BuildEventMonitorSpec.groovy b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/BuildEventMonitorSpec.groovy index c8ad7b055..4f9e1d837 100644 --- a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/BuildEventMonitorSpec.groovy +++ b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/BuildEventMonitorSpec.groovy @@ -5,8 +5,8 @@ import com.netflix.spectator.api.NoopRegistry import com.netflix.spinnaker.echo.model.Event import com.netflix.spinnaker.echo.model.Pipeline import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache -import com.netflix.spinnaker.echo.pipelinetriggers.monitor.BuildEventMonitor import com.netflix.spinnaker.echo.test.RetrofitStubs +import rx.Observable import rx.functions.Action1 import spock.lang.Specification import spock.lang.Subject @@ -27,7 +27,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs { def "triggers pipelines for successful builds for #triggerType"() { given: def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -46,7 +46,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "attaches #triggerType trigger to the pipeline"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -67,7 +67,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs { def "an event can trigger multiple pipelines"() { given: - pipelineCache.getPipelines() >> pipelines + pipelineCache.getPipelines() >> Observable.just(pipelines) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -90,7 +90,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger pipelines for #description builds"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -113,7 +113,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger #description pipelines"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -138,7 +138,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger a pipeline that has an enabled #triggerType trigger with missing #field"() { given: - pipelineCache.getPipelines() >> [badPipeline, goodPipeline] + pipelineCache.getPipelines() >> Observable.just([badPipeline, goodPipeline]) println objectMapper.writeValueAsString(createBuildEventWith(SUCCESS)) when: diff --git a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/DockerEventMonitorSpec.groovy b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/DockerEventMonitorSpec.groovy index db13d16da..d7ac2604c 100644 --- a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/DockerEventMonitorSpec.groovy +++ b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/DockerEventMonitorSpec.groovy @@ -21,9 +21,9 @@ import com.netflix.spectator.api.NoopRegistry import com.netflix.spinnaker.echo.model.Event import com.netflix.spinnaker.echo.model.Pipeline import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache -import com.netflix.spinnaker.echo.pipelinetriggers.monitor.DockerEventMonitor import com.netflix.spinnaker.echo.test.RetrofitStubs import com.netflix.spinnaker.kork.artifacts.model.Artifact +import rx.Observable import rx.functions.Action1 import spock.lang.Specification import spock.lang.Subject @@ -42,7 +42,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { def "triggers pipelines for successful builds for #triggerType"() { given: def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -59,7 +59,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { def "attaches docker trigger to the pipeline"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -87,7 +87,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { def "an event can trigger multiple pipelines"() { given: - pipelineCache.getPipelines() >> pipelines + pipelineCache.getPipelines() >> Observable.just(pipelines) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -110,7 +110,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger #description pipelines"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -130,7 +130,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger #description pipelines for docker"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -152,7 +152,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger a pipeline that has an enabled docker trigger with missing #field"() { given: - pipelineCache.getPipelines() >> [badPipeline, goodPipeline] + pipelineCache.getPipelines() >> Observable.just([badPipeline, goodPipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -175,7 +175,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { def "triggers a pipeline that has an enabled docker trigger with regex"() { given: def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -194,7 +194,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { def "triggers a pipeline that has an enabled docker trigger with empty string for regex"() { given: def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -213,7 +213,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { def "triggers a pipeline that has an enabled docker trigger with only whitespace for regex"() { given: def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -232,7 +232,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs { def "does not trigger a pipeline that has an enabled docker trigger with regex"() { given: def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) diff --git a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/GitEventMonitorSpec.groovy b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/GitEventMonitorSpec.groovy index 04147c759..c2707589b 100644 --- a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/GitEventMonitorSpec.groovy +++ b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/GitEventMonitorSpec.groovy @@ -21,8 +21,8 @@ import com.netflix.spectator.api.NoopRegistry import com.netflix.spinnaker.echo.model.Event import com.netflix.spinnaker.echo.model.Pipeline import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache -import com.netflix.spinnaker.echo.pipelinetriggers.monitor.GitEventMonitor import com.netflix.spinnaker.echo.test.RetrofitStubs +import rx.Observable import rx.functions.Action1 import spock.lang.Specification import spock.lang.Subject @@ -41,7 +41,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { def "triggers pipelines for successful builds for #triggerType"() { given: def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -59,7 +59,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { def "attaches stash trigger to the pipeline"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -79,7 +79,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { def "attaches bitbucket trigger to the pipeline"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -99,7 +99,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { def "an event can trigger multiple pipelines"() { given: - pipelineCache.getPipelines() >> pipelines + pipelineCache.getPipelines() >> Observable.just(pipelines) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -122,7 +122,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger #description pipelines"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -142,7 +142,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger #description pipelines for stash"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -165,7 +165,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger #description pipelines for bitbucket"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -188,7 +188,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger a pipeline that has an enabled stash trigger with missing #field"() { given: - pipelineCache.getPipelines() >> [badPipeline, goodPipeline] + pipelineCache.getPipelines() >> Observable.just([badPipeline, goodPipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -210,7 +210,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger a pipeline that has an enabled bitbucket trigger with missing #field"() { given: - pipelineCache.getPipelines() >> [badPipeline, goodPipeline] + pipelineCache.getPipelines() >> Observable.just([badPipeline, goodPipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -236,7 +236,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { gitEvent.content.branch = eventBranch def trigger = enabledStashTrigger.atBranch(triggerBranch) def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(gitEvent, Event)) @@ -262,7 +262,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { gitEvent.content.branch = eventBranch def trigger = enabledStashTrigger.atBranch(triggerBranch) def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(gitEvent, Event)) @@ -288,7 +288,7 @@ class GitEventMonitorSpec extends Specification implements RetrofitStubs { def trigger = enabledGithubTrigger.atSecret(secret).atBranch("master") def pipeline = createPipelineWith(trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(gitEvent, Event)) diff --git a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/PubsubEventMonitorSpec.groovy b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/PubsubEventMonitorSpec.groovy index dd760d921..2fbed79f7 100644 --- a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/PubsubEventMonitorSpec.groovy +++ b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/PubsubEventMonitorSpec.groovy @@ -83,7 +83,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs { def "triggers pipelines for successful builds for Google pubsub"() { given: def pipeline = createPipelineWith(goodExpectedArtifacts, trigger) - pipelineCache.getPipelinesAsync() >> Observable.just([pipeline]) + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -105,7 +105,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs { def "attaches Google pubsub trigger to the pipeline"() { given: - pipelineCache.getPipelinesAsync() >> Observable.just([pipeline]) + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -125,7 +125,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger #description pipelines for Google pubsub"() { given: - pipelineCache.getPipelinesAsync() >> Observable.just([pipeline]) + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -146,7 +146,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger #description pipelines containing artifacts for Google pubsub"() { given: - pipelineCache.getPipelinesAsync() >> Observable.just([pipeline]) + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -165,7 +165,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger a pipeline that has an enabled pubsub trigger with missing #field"() { given: - pipelineCache.getPipelinesAsync() >> Observable.just([badPipeline, goodPipeline]) + pipelineCache.getPipelines() >> Observable.just([badPipeline, goodPipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -196,7 +196,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs { .build() def pipeline = createPipelineWith(goodExpectedArtifacts, trigger) - pipelineCache.getPipelinesAsync() >> Observable.just([pipeline]) + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: def content = new PubsubEvent.Content() @@ -233,7 +233,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs { .build() def pipeline = createPipelineWith(goodExpectedArtifacts, trigger) - pipelineCache.getPipelinesAsync() >> Observable.just([pipeline]) + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: def content = new PubsubEvent.Content() diff --git a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/WebhookEventMonitorSpec.groovy b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/WebhookEventMonitorSpec.groovy index 7ef8d6a05..b9cb97c33 100644 --- a/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/WebhookEventMonitorSpec.groovy +++ b/echo-pipelinetriggers/src/test/groovy/com/netflix/spinnaker/echo/pipelinetriggers/monitor/WebhookEventMonitorSpec.groovy @@ -21,6 +21,7 @@ import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache import com.netflix.spinnaker.echo.test.RetrofitStubs import com.netflix.spinnaker.kork.artifacts.model.Artifact import com.netflix.spinnaker.kork.artifacts.model.ExpectedArtifact +import rx.Observable import rx.functions.Action1 import spock.lang.Shared import spock.lang.Specification @@ -51,7 +52,7 @@ class WebhookEventMonitorSpec extends Specification implements RetrofitStubs { def 'triggers pipelines for successful builds for webhook'() { given: def pipeline = createPipelineWith(goodExpectedArtifacts, trigger) - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -72,7 +73,7 @@ class WebhookEventMonitorSpec extends Specification implements RetrofitStubs { def 'attaches webhook trigger to the pipeline'() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) @@ -92,7 +93,7 @@ class WebhookEventMonitorSpec extends Specification implements RetrofitStubs { @Unroll def "does not trigger #description pipelines for webhook"() { given: - pipelineCache.getPipelines() >> [pipeline] + pipelineCache.getPipelines() >> Observable.just([pipeline]) when: monitor.processEvent(objectMapper.convertValue(event, Event)) diff --git a/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/MissedPipelineTriggerCompensationJob.groovy b/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/MissedPipelineTriggerCompensationJob.groovy index 73dd5794a..482ab1d2f 100644 --- a/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/MissedPipelineTriggerCompensationJob.groovy +++ b/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/MissedPipelineTriggerCompensationJob.groovy @@ -117,7 +117,7 @@ class MissedPipelineTriggerCompensationJob implements ApplicationListener Observable.just(pipelineCache.pipelines) } + .flatMap { tick -> pipelineCache.getPipelines() } .doOnError { onPipelineCacheError(it) } .retry() .subscribe { List pipelines -> @@ -136,7 +136,7 @@ class MissedPipelineTriggerCompensationJob implements ApplicationListener Observable.just(pipelineCache.pipelines) } + .flatMap { tick -> pipelineCache.getPipelines() } .doOnError { onPipelineCacheError(it) } .retry() .subscribe { List pipelines -> diff --git a/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/PipelineConfigsPollingAgent.groovy b/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/PipelineConfigsPollingAgent.groovy index 7a96adf6b..cdb5f8af1 100644 --- a/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/PipelineConfigsPollingAgent.groovy +++ b/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/PipelineConfigsPollingAgent.groovy @@ -79,7 +79,7 @@ class PipelineConfigsPollingAgent extends AbstractPollingAgent { log.info("Running the pipeline configs polling agent...") // Only interested in pipelines that have triggers and that too scheduled triggers - def pipelines = pipelineCache.getPipelines().findAll { + def pipelines = pipelineCache.getPipelines().toBlocking().first().findAll { it.triggers && it.triggers.any { TRIGGER_TYPE.equalsIgnoreCase(it.type) } } diff --git a/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/PipelineTriggerConverter.groovy b/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/PipelineTriggerConverter.groovy index 1843c5440..7a459afda 100644 --- a/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/PipelineTriggerConverter.groovy +++ b/echo-scheduler/src/main/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/impl/PipelineTriggerConverter.groovy @@ -54,7 +54,7 @@ class PipelineTriggerConverter { triggerBuilder.runAsUser(parameters.runAsUser) } - def existingPipeline = pipelineCache.getPipelines().find { it.id == parameters.id } + def existingPipeline = pipelineCache.getPipelines().toBlocking().first().find { it.id == parameters.id } if (!existingPipeline) { throw new IllegalStateException("No pipeline found (id: ${parameters.id})") } diff --git a/echo-scheduler/src/test/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/PipelineConfigsPollingAgentSpec.groovy b/echo-scheduler/src/test/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/PipelineConfigsPollingAgentSpec.groovy index a6e7fa5b0..29576ced0 100644 --- a/echo-scheduler/src/test/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/PipelineConfigsPollingAgentSpec.groovy +++ b/echo-scheduler/src/test/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/PipelineConfigsPollingAgentSpec.groovy @@ -25,6 +25,7 @@ import com.netflix.spinnaker.echo.model.Pipeline import com.netflix.spinnaker.echo.model.Trigger import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache import com.netflix.spinnaker.echo.scheduler.actions.pipeline.impl.PipelineConfigsPollingAgent +import rx.Observable import spock.lang.Specification import spock.lang.Subject import spock.lang.Unroll @@ -44,7 +45,7 @@ class PipelineConfigsPollingAgentSpec extends Specification { .cronExpression('* 0/30 * * * ? *') .build() Pipeline pipeline = buildPipeline([trigger]) - pipelineCache.getPipelines() >> PipelineCache.decorateTriggers([pipeline]) + pipelineCache.getPipelines() >> Observable.just(PipelineCache.decorateTriggers([pipeline])) actionsOperator.getActionInstances() >> [] when: @@ -70,7 +71,7 @@ class PipelineConfigsPollingAgentSpec extends Specification { Pipeline pipeline = buildPipeline([trigger], pipelineDisabled) def decoratedPipelines = PipelineCache.decorateTriggers([pipeline]) // new id for trigger will be generated here ActionInstance actionInstance = buildScheduledAction(decoratedPipelines[0].triggers[0].id, '* 0/30 * * * ? *', true) - pipelineCache.getPipelines() >> decoratedPipelines + pipelineCache.getPipelines() >> Observable.just(decoratedPipelines) actionsOperator.getActionInstances() >> [actionInstance] when: @@ -95,7 +96,7 @@ class PipelineConfigsPollingAgentSpec extends Specification { given: Pipeline pipeline = buildPipeline([]) ActionInstance actionInstance = buildScheduledAction('t1', '* 0/30 * * * ? *', true) - pipelineCache.getPipelines() >> PipelineCache.decorateTriggers([pipeline]) + pipelineCache.getPipelines() >> Observable.just(PipelineCache.decorateTriggers([pipeline])) actionsOperator.getActionInstances() >> [actionInstance] when: @@ -123,7 +124,7 @@ class PipelineConfigsPollingAgentSpec extends Specification { Pipeline pipeline = buildPipeline([trigger]) def decoratedPipelines = PipelineCache.decorateTriggers([pipeline]) // new id for trigger will be generated here ActionInstance actionInstance = buildScheduledAction(changeTrigger ? trigger.id : decoratedPipelines[0].triggers[0].id, actionCron, actionEnabled) - pipelineCache.getPipelines() >> decoratedPipelines + pipelineCache.getPipelines() >> Observable.just(decoratedPipelines) actionsOperator.getActionInstances() >> [actionInstance] when: diff --git a/echo-scheduler/src/test/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/PipelineTriggerActionConverterSpec.groovy b/echo-scheduler/src/test/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/PipelineTriggerActionConverterSpec.groovy index 18dd01f01..8d3eeb580 100644 --- a/echo-scheduler/src/test/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/PipelineTriggerActionConverterSpec.groovy +++ b/echo-scheduler/src/test/groovy/com/netflix/spinnaker/echo/scheduler/actions/pipeline/PipelineTriggerActionConverterSpec.groovy @@ -23,6 +23,7 @@ import com.netflix.spinnaker.echo.model.Pipeline import com.netflix.spinnaker.echo.model.Trigger import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache import com.netflix.spinnaker.echo.scheduler.actions.pipeline.impl.PipelineTriggerConverter +import rx.Observable import rx.functions.Action1 import spock.lang.Shared import spock.lang.Specification @@ -68,7 +69,7 @@ class PipelineTriggerActionConverterSpec extends Specification { void 'fromParameters() should return an equivalent valid Pipeline instance'() { setup: def pipelineCache = Mock(PipelineCache) { - getPipelines() >> { [pipeline ]} + getPipelines() >> Observable.just([pipeline]) } Map parameters = [