Skip to content

Commit

Permalink
fix(pipelines): Generalize fix for pipeline race condition (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
ezimanyi authored Jun 18, 2018
1 parent 87177e7 commit 816e304
Show file tree
Hide file tree
Showing 17 changed files with 69 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public class PipelineCache implements MonitoredPoller {
private transient Subscription subscription;

private transient SerializedSubject<List<Pipeline>, List<Pipeline>> pipelineSubject = ReplaySubject.<List<Pipeline>>createWithSize(1).toSerialized();
private transient AtomicReference<List<Pipeline>> pipelines = new AtomicReference<>(Collections.emptyList());

@Autowired
public PipelineCache(@NonNull Scheduler scheduler,
Expand All @@ -75,7 +74,7 @@ public void start() {
.doOnError(this::onFront50Error)
.retry()
.map(PipelineCache::decorateTriggers)
.doOnNext(this::cachePipelines)
.doOnNext(this::logRefresh)
.subscribe(pipelineSubject);
}
}
Expand Down Expand Up @@ -109,26 +108,12 @@ public int getPollingIntervalSeconds() {
*
* @return An observable emitting the pipelines as of the most recent polling cycle
*/
public Observable<List<Pipeline>> getPipelinesAsync() {
public Observable<List<Pipeline>> getPipelines() {
return pipelineSubject.take(1);
}

/**
* Returns the pipelines as of the most recent polling cycle. If no polling cycles have been
* completed, returns an empty list.
*
* See {@link #getPipelinesAsync()} for an alternate way of getting pipelines that will wait at
* at least one polling cycle before returning a value.
*
* @return The pipelines as of the most recent polling cycle
*/
public List<Pipeline> getPipelines() {
return pipelines.get();
}

private void cachePipelines(final List<Pipeline> pipelines) {
private void logRefresh(final List<Pipeline> pipelines) {
log.info("Refreshing pipelines");
this.pipelines.set(pipelines);
}

private void onFront50Request(final long tick) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void processEvent(Event event) {
BuildEvent buildEvent = objectMapper.convertValue(event, BuildEvent.class);
Observable.just(buildEvent)
.doOnNext(this::onEchoResponse)
.subscribe(triggerEachMatchFrom(pipelineCache.getPipelines()));
.zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new)
.subscribe(triggerEachMatch());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void processEvent(Event event) {
DockerEvent dockerEvent = objectMapper.convertValue(event, DockerEvent.class);
Observable.just(dockerEvent)
.doOnNext(this::onEchoResponse)
.subscribe(triggerEachMatchFrom(pipelineCache.getPipelines()));
.zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new)
.subscribe(triggerEachMatch());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void processEvent(Event event) {
GitEvent buildEvent = objectMapper.convertValue(event, GitEvent.class);
Observable.just(buildEvent)
.doOnNext(this::onEchoResponse)
.subscribe(triggerEachMatchFrom(pipelineCache.getPipelines()));
.zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new)
.subscribe(triggerEachMatch());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void processEvent(Event event) {

Observable.just(pubsubEvent)
.doOnNext(this::onEchoResponse)
.zipWith(pipelineCache.getPipelinesAsync(), TriggerMatchParameters::new)
.zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new)
.subscribe(triggerEachMatch());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void processEvent(Event event) {

Observable.just(webhookEvent)
.doOnNext(this::onEchoResponse)
.subscribe(triggerEachMatchFrom(pipelineCache.getPipelines()));
.zipWith(pipelineCache.getPipelines(), TriggerMatchParameters::new)
.subscribe(triggerEachMatch());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.netflix.spinnaker.echo.model.Pipeline
import com.netflix.spinnaker.echo.model.Trigger
import com.netflix.spinnaker.echo.services.Front50Service
import com.netflix.spinnaker.echo.test.RetrofitStubs
import rx.observers.TestSubscriber
import rx.schedulers.Schedulers
import spock.lang.Shared
import spock.lang.Specification
Expand Down Expand Up @@ -136,15 +137,17 @@ class PipelineCacheSpec extends Specification implements RetrofitStubs {

def "keeps polling if Front50 returns an error"() {
given:
TestSubscriber<List<Pipeline>> testSubscriber = new TestSubscriber<>();
def pipeline = Pipeline.builder().application('application').name('Pipeline').id('P1').build()
pipelineCache.start()

when:
waitForTicks(3)
pipelineCache.getPipelines().subscribe(testSubscriber)

then:
front50.getPipelines() >> just([]) >> { throw unavailable() } >> just([pipeline])
pipelineCache.pipelines == [pipeline]
testSubscriber.assertValue([pipeline])
}

def "we can serialize pipelines with triggers that have a parent"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import com.netflix.spectator.api.NoopRegistry
import com.netflix.spinnaker.echo.model.Event
import com.netflix.spinnaker.echo.model.Pipeline
import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache
import com.netflix.spinnaker.echo.pipelinetriggers.monitor.BuildEventMonitor
import com.netflix.spinnaker.echo.test.RetrofitStubs
import rx.Observable
import rx.functions.Action1
import spock.lang.Specification
import spock.lang.Subject
Expand All @@ -27,7 +27,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs {
def "triggers pipelines for successful builds for #triggerType"() {
given:
def pipeline = createPipelineWith(trigger)
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -46,7 +46,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs {
@Unroll
def "attaches #triggerType trigger to the pipeline"() {
given:
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -67,7 +67,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs {

def "an event can trigger multiple pipelines"() {
given:
pipelineCache.getPipelines() >> pipelines
pipelineCache.getPipelines() >> Observable.just(pipelines)

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -90,7 +90,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs {
@Unroll
def "does not trigger pipelines for #description builds"() {
given:
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -113,7 +113,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs {
@Unroll
def "does not trigger #description pipelines"() {
given:
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -138,7 +138,7 @@ class BuildEventMonitorSpec extends Specification implements RetrofitStubs {
@Unroll
def "does not trigger a pipeline that has an enabled #triggerType trigger with missing #field"() {
given:
pipelineCache.getPipelines() >> [badPipeline, goodPipeline]
pipelineCache.getPipelines() >> Observable.just([badPipeline, goodPipeline])
println objectMapper.writeValueAsString(createBuildEventWith(SUCCESS))

when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import com.netflix.spectator.api.NoopRegistry
import com.netflix.spinnaker.echo.model.Event
import com.netflix.spinnaker.echo.model.Pipeline
import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache
import com.netflix.spinnaker.echo.pipelinetriggers.monitor.DockerEventMonitor
import com.netflix.spinnaker.echo.test.RetrofitStubs
import com.netflix.spinnaker.kork.artifacts.model.Artifact
import rx.Observable
import rx.functions.Action1
import spock.lang.Specification
import spock.lang.Subject
Expand All @@ -42,7 +42,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {
def "triggers pipelines for successful builds for #triggerType"() {
given:
def pipeline = createPipelineWith(trigger)
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -59,7 +59,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {

def "attaches docker trigger to the pipeline"() {
given:
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand Down Expand Up @@ -87,7 +87,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {

def "an event can trigger multiple pipelines"() {
given:
pipelineCache.getPipelines() >> pipelines
pipelineCache.getPipelines() >> Observable.just(pipelines)

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -110,7 +110,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {
@Unroll
def "does not trigger #description pipelines"() {
given:
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -130,7 +130,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {
@Unroll
def "does not trigger #description pipelines for docker"() {
given:
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -152,7 +152,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {
@Unroll
def "does not trigger a pipeline that has an enabled docker trigger with missing #field"() {
given:
pipelineCache.getPipelines() >> [badPipeline, goodPipeline]
pipelineCache.getPipelines() >> Observable.just([badPipeline, goodPipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -175,7 +175,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {
def "triggers a pipeline that has an enabled docker trigger with regex"() {
given:
def pipeline = createPipelineWith(trigger)
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -194,7 +194,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {
def "triggers a pipeline that has an enabled docker trigger with empty string for regex"() {
given:
def pipeline = createPipelineWith(trigger)
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -213,7 +213,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {
def "triggers a pipeline that has an enabled docker trigger with only whitespace for regex"() {
given:
def pipeline = createPipelineWith(trigger)
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand All @@ -232,7 +232,7 @@ class DockerEventMonitorSpec extends Specification implements RetrofitStubs {
def "does not trigger a pipeline that has an enabled docker trigger with regex"() {
given:
def pipeline = createPipelineWith(trigger)
pipelineCache.getPipelines() >> [pipeline]
pipelineCache.getPipelines() >> Observable.just([pipeline])

when:
monitor.processEvent(objectMapper.convertValue(event, Event))
Expand Down
Loading

0 comments on commit 816e304

Please sign in to comment.