Skip to content

Commit

Permalink
feat(pipelines executions/orca): Support for bulk saving pipelines
Browse files Browse the repository at this point in the history
(cherry picked from commit ab8e74a)
  • Loading branch information
Arifullah Pattan authored and Richard Timpson committed Aug 13, 2024
1 parent 463b214 commit 1a9743f
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ interface Front50Service {
@POST("/pipelines")
Response savePipeline(@Body Map pipeline, @Query("staleCheck") boolean staleCheck)

@POST("/pipelines/bulksave")
Response savePipelineList(@Body List<Map<String, Object>> pipelineList, @Query("staleCheck") boolean staleCheck)
@POST("/pipelines/batchUpdate")
Response savePipelines(@Body List<Map<String, Object>> pipelines, @Query("staleCheck") boolean staleCheck)

@PUT("/pipelines/{pipelineId}")
Response updatePipeline(@Path("pipelineId") String pipelineId, @Body Map pipeline)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.spinnaker.orca.front50.tasks;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spinnaker.orca.api.pipeline.RetryableTask;
import com.netflix.spinnaker.orca.api.pipeline.TaskResult;
Expand All @@ -25,18 +27,20 @@
import com.netflix.spinnaker.orca.front50.pipeline.SavePipelineStage;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import retrofit.RetrofitError;
import retrofit.client.Response;

@Component
public class SavePipelineTask implements RetryableTask {

private Logger log = LoggerFactory.getLogger(getClass());
private final Logger log = LoggerFactory.getLogger(getClass());

@Autowired
SavePipelineTask(
Expand All @@ -60,80 +64,90 @@ public TaskResult execute(StageExecution stage) {
"Front50 is not enabled, no way to save pipeline. Fix this by setting front50.enabled: true");
}

if (!stage.getContext().containsKey("pipeline")) {
throw new IllegalArgumentException("pipeline context must be provided");
}
Map<String, Object> pipeline = new HashMap<>();
List<Map<String, Object>> pipelines = new ArrayList<>();

boolean isSavingMultiplePipelines =
(boolean) stage.getContext().getOrDefault("isSavingMultiplePipelines", false);

boolean isBulkSavingPipelines =
(boolean) stage.getContext().getOrDefault("isBulkSavingPipelines", false);

boolean staleCheck =
(Boolean) Optional.ofNullable(stage.getContext().get("staleCheck")).orElse(false);

Map<String, Object> pipeline = null;
List<Map<String, Object>> pipelineList = new ArrayList<>();
Boolean staleCheck = false;
Boolean isSavingMultiplePipelines = false;
boolean bulksave = false;
if (!(stage.getContext().get("pipeline") instanceof String)) {
pipeline = (Map<String, Object>) stage.getContext().get("pipeline");
} else if (stage.getContext().containsKey("bulksave")
&& (boolean) stage.getContext().get("bulksave")) {
pipelineList = (List) stage.decodeBase64("/pipeline", List.class);
bulksave = true;
if (isBulkSavingPipelines) {
if (!stage.getContext().containsKey("pipelines")) {
throw new IllegalArgumentException(
"pipelines context must be provided when saving multiple pipelines");
}
pipelines = (List<Map<String, Object>>) stage.decodeBase64("/pipelines", List.class);
log.info(
"Bulk saving the following pipelines: {}",
pipelines.stream().map(p -> p.get("name")).collect(Collectors.toList()));
} else {
pipeline = (Map<String, Object>) stage.decodeBase64("/pipeline", Map.class);
pipelineList.add(pipeline);
if (!stage.getContext().containsKey("pipeline")) {
throw new IllegalArgumentException(
"pipeline context must be provided when saving a single pipeline");
}
if (!(stage.getContext().get("pipeline") instanceof String)) {
pipeline = (Map<String, Object>) stage.getContext().get("pipeline");
} else {
pipeline = (Map<String, Object>) stage.decodeBase64("/pipeline", Map.class);
}
pipelines.add(pipeline);
log.info("Saving single pipeline {}", pipeline.get("name"));
}
for (Map<String, Object> obj : pipelineList) {
pipeline = obj;
if (!pipeline.containsKey("index")) {
Map<String, Object> existingPipeline = fetchExistingPipeline(pipeline);

// Preprocess pipelines before saving
for (Map<String, Object> pipe : pipelines) {
if (!pipe.containsKey("index")) {
Map<String, Object> existingPipeline = fetchExistingPipeline(pipe);
if (existingPipeline != null) {
pipeline.put("index", existingPipeline.get("index"));
pipe.put("index", existingPipeline.get("index"));
}
}

String serviceAccount = (String) stage.getContext().get("pipeline.serviceAccount");
if (serviceAccount != null) {
updateServiceAccount(pipeline, serviceAccount);
updateServiceAccount(pipe, serviceAccount);
}
isSavingMultiplePipelines =
(Boolean)
Optional.ofNullable(stage.getContext().get("isSavingMultiplePipelines"))
.orElse(false);
staleCheck =
(Boolean) Optional.ofNullable(stage.getContext().get("staleCheck")).orElse(false);

if (stage.getContext().get("pipeline.id") != null
&& pipeline.get("id") == null
&& pipe.get("id") == null
&& !isSavingMultiplePipelines) {
pipeline.put("id", stage.getContext().get("pipeline.id"));
pipe.put("id", stage.getContext().get("pipeline.id"));

// We need to tell front50 to regenerate cron trigger id's
pipeline.put("regenerateCronTriggerIds", true);
pipe.put("regenerateCronTriggerIds", true);
}

Map<String, Object> finalPipeline = pipeline;
Map<String, Object> finalPipeline1 = pipeline;
pipelineModelMutators.stream()
.filter(m -> m.supports(finalPipeline))
.forEach(m -> m.mutate(finalPipeline1));
pipelineModelMutators.stream().filter(m -> m.supports(pipe)).forEach(m -> m.mutate(pipe));
}
Response response = null;
if (bulksave) {
response = front50Service.savePipelineList(pipelineList, false);

Response response;
if (isBulkSavingPipelines) {
response = front50Service.savePipelines(pipelines, staleCheck);
} else {
response = front50Service.savePipeline(pipeline, staleCheck);
}

Map<String, Object> outputs = new HashMap<>();
outputs.put("notification.type", "savepipeline");
outputs.put("application", pipeline.get("application"));
outputs.put("pipeline.name", pipeline.get("name"));
outputs.put("application", stage.getContext().get("application"));

Map<String, Object> saveResult = new HashMap<>();
try {
Map<String, Object> savedPipeline =
(Map<String, Object>) objectMapper.readValue(response.getBody().in(), Map.class);
outputs.put("bulksave", savedPipeline);
saveResult = (Map<String, Object>) objectMapper.readValue(response.getBody().in(), Map.class);
} catch (Exception e) {
log.error("Unable to deserialize saved pipeline, reason: ", e.getMessage());
log.error("Unable to deserialize save pipeline(s) result, reason: ", e);
}

if (pipeline.containsKey("id")) {
outputs.put("pipeline.id", pipeline.get("id"));
}
if (isBulkSavingPipelines) {
outputs.put("bulksave", saveResult);
} else {
outputs.put("pipeline.name", pipeline.get("name"));
outputs.put("pipeline.id", saveResult.getOrDefault("id", pipeline.getOrDefault("id", "")));
}

final ExecutionStatus status;
Expand Down Expand Up @@ -179,14 +193,16 @@ private void updateServiceAccount(Map<String, Object> pipeline, String serviceAc
}

private Map<String, Object> fetchExistingPipeline(Map<String, Object> newPipeline) {
String applicationName = (String) newPipeline.get("application");
String newPipelineID = (String) newPipeline.get("id");
if (!StringUtils.isEmpty(newPipelineID)) {
return front50Service.getPipelines(applicationName).stream()
.filter(m -> m.containsKey("id"))
.filter(m -> m.get("id").equals(newPipelineID))
.findFirst()
.orElse(null);
if (StringUtils.isNotEmpty(newPipelineID)) {
try {
return front50Service.getPipeline(newPipelineID);
} catch (RetrofitError e) {
// Return a null if pipeline with expected id not found
if (e.getResponse() != null && e.getResponse().getStatus() == HTTP_NOT_FOUND) {
log.debug("Existing pipeline with id {} not found. Returning null.", newPipelineID);
}
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ package com.netflix.spinnaker.orca.front50.tasks

import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.collect.ImmutableMap
import com.netflix.spinnaker.orca.api.pipeline.TaskResult
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType
import com.netflix.spinnaker.orca.front50.Front50Service
import com.netflix.spinnaker.orca.front50.PipelineModelMutator
import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl
import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl
import groovy.json.JsonOutput
import retrofit.client.Response
import retrofit.mime.TypedString
import spock.lang.Specification
import spock.lang.Subject

Expand All @@ -42,10 +46,12 @@ class SavePipelineTaskSpec extends Specification {
def pipeline = [
application: 'orca',
name: 'my pipeline',
id: 'my id',
stages: []
]
def stage = new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "whatever", [
pipeline: Base64.encoder.encodeToString(objectMapper.writeValueAsString(pipeline).bytes)
pipeline: Base64.encoder.encodeToString(objectMapper.writeValueAsString(pipeline).bytes),
application: 'orca'
])

when:
Expand All @@ -61,7 +67,8 @@ class SavePipelineTaskSpec extends Specification {
result.context == ImmutableMap.copyOf([
'notification.type': 'savepipeline',
'application': 'orca',
'pipeline.name': 'my pipeline'
'pipeline.name': 'my pipeline',
'pipeline.id': 'my id'
])
}

Expand All @@ -84,7 +91,7 @@ class SavePipelineTaskSpec extends Specification {
]

when:
front50Service.getPipelines(_) >> [existingPipeline]
front50Service.getPipeline(_ as String) >> existingPipeline
front50Service.savePipeline(_, _) >> { Map<String, Object> newPipeline, Boolean staleCheck ->
receivedIndex = newPipeline.get("index")
new Response('http://front50', 200, 'OK', [], null)
Expand Down Expand Up @@ -216,4 +223,123 @@ class SavePipelineTaskSpec extends Specification {
then:
result.status == ExecutionStatus.FAILED_CONTINUE
}

def "should save multiple pipelines"() {
given:
def pipelines = [
[
application: 'test_app1',
name: 'pipeline1',
id: "id1",
index: 1
],
[
application: 'test_app1',
name: 'pipeline2',
id: "id2",
index: 2
],
[
application: 'test_app2',
name: 'pipeline1',
id: "id3",
index: 1
],
[
application: 'test_ap2',
name: 'pipeline2',
id: "id4",
index: 2
]
]
def stage = new StageExecutionImpl(
new PipelineExecutionImpl(ExecutionType.ORCHESTRATION, "bulk_save_app"),
"savePipeline",
[
application: "bulk_save_app",
pipelines: Base64.encoder.encodeToString(objectMapper.writeValueAsString(pipelines).bytes),
isBulkSavingPipelines: true
]
)

when:
def result = task.execute(stage)

then:
1 * front50Service.savePipelines(pipelines, _) >> {
new Response('http://front50',
200,
'OK',
[],
new TypedString(new JsonOutput().toJson(
[
successful_pipelines_count: 4,
successful_pipelines: ["pipeline1", "pipeline2", "pipeline3", "pipeline4"],
failed_pipelines_count: 0,
failed_pipelines: []
]
))
)
}
result == TaskResult.builder(ExecutionStatus.SUCCEEDED)
.context([
"notification.type": "savepipeline",
application: "bulk_save_app",
bulksave: [
successful_pipelines_count: 4,
successful_pipelines: ["pipeline1", "pipeline2", "pipeline3", "pipeline4"],
failed_pipelines_count: 0,
failed_pipelines: []
]])
.build()
}

def "should fail save multiple pipelines if no pipelines provided"() {
given:
def stage = new StageExecutionImpl(
new PipelineExecutionImpl(ExecutionType.ORCHESTRATION, "bulk_save_app"),
"savePipeline",
[
application : "bulk_save_app",
isBulkSavingPipelines: true
]
)

when:
task.execute(stage)

then:
def error = thrown(IllegalArgumentException)
error.getMessage() == "pipelines context must be provided when saving multiple pipelines"
}

def "should fail task when front 50 save pipelines call fails"() {
given:
def pipelines = [
[
application: 'test_app1',
name: 'pipeline1',
id: "id1",
index: 1
]
]
def stage = new StageExecutionImpl(
new PipelineExecutionImpl(ExecutionType.ORCHESTRATION, "bulk_save_app"),
"savePipeline",
[
application : "bulk_save_app",
isBulkSavingPipelines: true,
pipelines: Base64.encoder.encodeToString(objectMapper.writeValueAsString(pipelines).bytes)
]
)

when:
def result = task.execute(stage)

then:
1 * front50Service.savePipelines(pipelines, _) >> {
new Response('http://front50', 500, 'OK', [], null)
}
result.status == ExecutionStatus.TERMINAL
}
}

0 comments on commit 1a9743f

Please sign in to comment.