Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batchUpdate): Pipeline config batch update #4773

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions orca-front50/orca-front50.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
testImplementation(project(":orca-test-groovy"))
testImplementation(project(":orca-pipelinetemplate"))
testImplementation("com.github.ben-manes.caffeine:guava")
testImplementation("org.codehaus.groovy:groovy-json")
dbyron-sf marked this conversation as resolved.
Show resolved Hide resolved
testRuntimeOnly("net.bytebuddy:byte-buddy")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ interface Front50Service {
@POST("/pipelines")
Response savePipeline(@Body Map pipeline, @Query("staleCheck") boolean staleCheck)

@POST("/pipelines/batchUpdate")
Response savePipelines(@Body List<Map<String, Object>> pipelines, @Query("staleCheck") boolean staleCheck)

@PUT("/pipelines/{pipelineId}")
Response updatePipeline(@Path("pipelineId") String pipelineId, @Body Map pipeline)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.retrofit.RetrofitConfiguration
import com.netflix.spinnaker.orca.retrofit.logging.RetrofitSlf4jLog
import groovy.transform.CompileStatic
import okhttp3.OkHttpClient
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand All @@ -42,6 +43,8 @@ import retrofit.RequestInterceptor
import retrofit.RestAdapter
import retrofit.converter.JacksonConverter

import java.util.concurrent.TimeUnit

import static retrofit.Endpoints.newFixedEndpoint

@Configuration
Expand Down Expand Up @@ -71,11 +74,17 @@ class Front50Configuration {
}

@Bean
Front50Service front50Service(Endpoint front50Endpoint, ObjectMapper mapper) {
Front50Service front50Service(Endpoint front50Endpoint, ObjectMapper mapper, Front50ConfigurationProperties front50ConfigurationProperties) {
OkHttpClient okHttpClient = clientProvider.getClient(new DefaultServiceEndpoint("front50", front50Endpoint.getUrl()));
okHttpClient = okHttpClient.newBuilder()
.readTimeout(front50ConfigurationProperties.okhttp.readTimeoutMs, TimeUnit.MILLISECONDS)
.writeTimeout(front50ConfigurationProperties.okhttp.writeTimeoutMs, TimeUnit.MILLISECONDS)
.connectTimeout(front50ConfigurationProperties.okhttp.connectTimeoutMs, TimeUnit.MILLISECONDS)
.build();
new RestAdapter.Builder()
.setRequestInterceptor(spinnakerRequestInterceptor)
.setEndpoint(front50Endpoint)
.setClient(new Ok3Client(clientProvider.getClient(new DefaultServiceEndpoint("front50", front50Endpoint.getUrl()))))
.setClient(new Ok3Client(okHttpClient))
.setLogLevel(retrofitLogLevel)
.setLog(new RetrofitSlf4jLog(Front50Service))
.setConverter(new JacksonConverter(mapper))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,15 @@ public class Front50ConfigurationProperties {
* <p>When true: GET /pipelines/triggeredBy/{pipelineId}/{status} When false: GET /pipelines
*/
boolean useTriggeredByEndpoint = true;

OkHttpConfigurationProperties okhttp = new OkHttpConfigurationProperties();

@Data
public static class OkHttpConfigurationProperties {
int readTimeoutMs = 10000;

int writeTimeoutMs = 10000;

int connectTimeoutMs = 10000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.netflix.spinnaker.orca.exceptions.PipelineTemplateValidationException
import com.netflix.spinnaker.orca.api.pipeline.ExecutionPreprocessor
import com.netflix.spinnaker.orca.front50.DependentPipelineStarter
import com.netflix.spinnaker.orca.front50.Front50Service
import com.netflix.spinnaker.orca.front50.config.Front50Configuration
import com.netflix.spinnaker.orca.front50.config.Front50ConfigurationProperties
import com.netflix.spinnaker.orca.listeners.ExecutionListener
import com.netflix.spinnaker.orca.listeners.Persister
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package com.netflix.spinnaker.orca.front50.tasks;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spinnaker.kork.retrofit.exceptions.SpinnakerHttpException;
import com.netflix.spinnaker.orca.api.pipeline.RetryableTask;
import com.netflix.spinnaker.orca.api.pipeline.TaskResult;
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus;
Expand All @@ -25,6 +28,7 @@
import com.netflix.spinnaker.orca.front50.pipeline.SavePipelineStage;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,7 +40,7 @@
@Component
public class SavePipelineTask implements RetryableTask {

private Logger log = LoggerFactory.getLogger(getClass());
private final Logger log = LoggerFactory.getLogger(getClass());

@Autowired
SavePipelineTask(
Expand All @@ -60,62 +64,90 @@ public TaskResult execute(StageExecution stage) {
"Front50 is not enabled, no way to save pipeline. Fix this by setting front50.enabled: true");
}

if (!stage.getContext().containsKey("pipeline")) {
throw new IllegalArgumentException("pipeline context must be provided");
}
Map<String, Object> pipeline = new HashMap<>();
List<Map<String, Object>> pipelines = new ArrayList<>();

boolean isSavingMultiplePipelines =
(boolean) stage.getContext().getOrDefault("isSavingMultiplePipelines", false);

boolean isBulkSavingPipelines =
(boolean) stage.getContext().getOrDefault("isBulkSavingPipelines", false);

Map<String, Object> pipeline;
if (!(stage.getContext().get("pipeline") instanceof String)) {
pipeline = (Map<String, Object>) stage.getContext().get("pipeline");
boolean staleCheck =
(Boolean) Optional.ofNullable(stage.getContext().get("staleCheck")).orElse(false);

if (isBulkSavingPipelines) {
if (!stage.getContext().containsKey("pipelines")) {
throw new IllegalArgumentException(
"pipelines context must be provided when saving multiple pipelines");
}
pipelines = (List<Map<String, Object>>) stage.decodeBase64("/pipelines", List.class);
log.info(
"Bulk saving the following pipelines: {}",
pipelines.stream().map(p -> p.get("name")).collect(Collectors.toList()));
} else {
pipeline = (Map<String, Object>) stage.decodeBase64("/pipeline", Map.class);
if (!stage.getContext().containsKey("pipeline")) {
throw new IllegalArgumentException(
"pipeline context must be provided when saving a single pipeline");
}
if (!(stage.getContext().get("pipeline") instanceof String)) {
pipeline = (Map<String, Object>) stage.getContext().get("pipeline");
} else {
pipeline = (Map<String, Object>) stage.decodeBase64("/pipeline", Map.class);
}
pipelines.add(pipeline);
log.info("Saving single pipeline {}", pipeline.get("name"));
}

if (!pipeline.containsKey("index")) {
Map<String, Object> existingPipeline = fetchExistingPipeline(pipeline);
if (existingPipeline != null) {
pipeline.put("index", existingPipeline.get("index"));
// Preprocess pipelines before saving
for (Map<String, Object> pipe : pipelines) {
if (!pipe.containsKey("index")) {
Map<String, Object> existingPipeline = fetchExistingPipeline(pipe);
if (existingPipeline != null) {
pipe.put("index", existingPipeline.get("index"));
}
}
}
String serviceAccount = (String) stage.getContext().get("pipeline.serviceAccount");
if (serviceAccount != null) {
updateServiceAccount(pipeline, serviceAccount);
}
final Boolean isSavingMultiplePipelines =
(Boolean)
Optional.ofNullable(stage.getContext().get("isSavingMultiplePipelines")).orElse(false);
final Boolean staleCheck =
(Boolean) Optional.ofNullable(stage.getContext().get("staleCheck")).orElse(false);
if (stage.getContext().get("pipeline.id") != null
&& pipeline.get("id") == null
&& !isSavingMultiplePipelines) {
pipeline.put("id", stage.getContext().get("pipeline.id"));

// We need to tell front50 to regenerate cron trigger id's
pipeline.put("regenerateCronTriggerIds", true);
}
String serviceAccount = (String) stage.getContext().get("pipeline.serviceAccount");
if (serviceAccount != null) {
updateServiceAccount(pipe, serviceAccount);
}

if (stage.getContext().get("pipeline.id") != null
&& pipe.get("id") == null
&& !isSavingMultiplePipelines) {
pipe.put("id", stage.getContext().get("pipeline.id"));

pipelineModelMutators.stream()
.filter(m -> m.supports(pipeline))
.forEach(m -> m.mutate(pipeline));
// We need to tell front50 to regenerate cron trigger id's
pipe.put("regenerateCronTriggerIds", true);
}

pipelineModelMutators.stream().filter(m -> m.supports(pipe)).forEach(m -> m.mutate(pipe));
}

Response response = front50Service.savePipeline(pipeline, staleCheck);
Response response;
if (isBulkSavingPipelines) {
response = front50Service.savePipelines(pipelines, staleCheck);
} else {
response = front50Service.savePipeline(pipeline, staleCheck);
}

Map<String, Object> outputs = new HashMap<>();
outputs.put("notification.type", "savepipeline");
outputs.put("application", pipeline.get("application"));
outputs.put("pipeline.name", pipeline.get("name"));
outputs.put("application", stage.getContext().get("application"));

Map<String, Object> saveResult = new HashMap<>();
try {
Map<String, Object> savedPipeline =
(Map<String, Object>) objectMapper.readValue(response.getBody().in(), Map.class);
outputs.put("pipeline.id", savedPipeline.get("id"));
saveResult = (Map<String, Object>) objectMapper.readValue(response.getBody().in(), Map.class);
} catch (Exception e) {
log.error("Unable to deserialize saved pipeline, reason: ", e.getMessage());
log.error("Unable to deserialize save pipeline(s) result, reason: ", e);
}

if (pipeline.containsKey("id")) {
outputs.put("pipeline.id", pipeline.get("id"));
}
if (isBulkSavingPipelines) {
outputs.put("bulksave", saveResult);
} else {
outputs.put("pipeline.name", pipeline.get("name"));
outputs.put("pipeline.id", saveResult.getOrDefault("id", pipeline.getOrDefault("id", "")));
}

final ExecutionStatus status;
Expand Down Expand Up @@ -161,14 +193,16 @@ private void updateServiceAccount(Map<String, Object> pipeline, String serviceAc
}

private Map<String, Object> fetchExistingPipeline(Map<String, Object> newPipeline) {
String applicationName = (String) newPipeline.get("application");
String newPipelineID = (String) newPipeline.get("id");
if (!StringUtils.isEmpty(newPipelineID)) {
return front50Service.getPipelines(applicationName).stream()
.filter(m -> m.containsKey("id"))
.filter(m -> m.get("id").equals(newPipelineID))
.findFirst()
.orElse(null);
if (StringUtils.isNotEmpty(newPipelineID)) {
try {
return front50Service.getPipeline(newPipelineID);
} catch (SpinnakerHttpException e) {
// Return a null if pipeline with expected id not found
if (e.getResponseCode() == HTTP_NOT_FOUND) {
log.debug("Existing pipeline with id {} not found. Returning null.", newPipelineID);
}
}
}
return null;
}
Expand Down
Loading