diff --git a/build.gradle b/build.gradle index 988e68a74..22be8d7d8 100644 --- a/build.gradle +++ b/build.gradle @@ -36,7 +36,7 @@ allprojects { apply plugin: 'groovy' ext { - spinnakerDependenciesVersion = project.hasProperty('spinnakerDependenciesVersion') ? project.property('spinnakerDependenciesVersion') : '0.157.3' + spinnakerDependenciesVersion = project.hasProperty('spinnakerDependenciesVersion') ? project.property('spinnakerDependenciesVersion') : '0.157.9' } def checkLocalVersions = [spinnakerDependenciesVersion: spinnakerDependenciesVersion] diff --git a/fiat-roles/fiat-roles.gradle b/fiat-roles/fiat-roles.gradle index 816718973..dfe5796bc 100644 --- a/fiat-roles/fiat-roles.gradle +++ b/fiat-roles/fiat-roles.gradle @@ -24,8 +24,8 @@ dependencies { compile spinnaker.dependency("bootActuator") compile spinnaker.dependency("bootWeb") compile spinnaker.dependency("korkHystrix") - // TODO(ttomsu): expose this in spinnaker-dependencies - compile "com.netflix.spinnaker.clouddriver:cats-redis:1.643.0" + compile spinnaker.dependency("korkJedis") + compile spinnaker.dependency("kork") compile "redis.clients:jedis:${spinnaker.version('jedis')}" compile "com.google.api-client:google-api-client:1.21.0" diff --git a/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/CatsSchedulerConfig.java b/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/CatsSchedulerConfig.java deleted file mode 100644 index 61af34b48..000000000 --- a/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/CatsSchedulerConfig.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2017 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License") - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.spinnaker.fiat.config; - -import com.netflix.spinnaker.cats.agent.Agent; -import com.netflix.spinnaker.cats.agent.ExecutionInstrumentation; -import com.netflix.spinnaker.cats.redis.JedisSource; -import com.netflix.spinnaker.cats.redis.cluster.AgentIntervalProvider; -import com.netflix.spinnaker.cats.redis.cluster.ClusteredAgentScheduler; -import com.netflix.spinnaker.cats.redis.cluster.DefaultAgentIntervalProvider; -import com.netflix.spinnaker.cats.redis.cluster.DefaultNodeIdentity; -import com.netflix.spinnaker.cats.redis.cluster.DefaultNodeStatusProvider; -import com.netflix.spinnaker.cats.redis.cluster.NodeIdentity; -import com.netflix.spinnaker.cats.redis.cluster.NodeStatusProvider; -import com.netflix.spinnaker.fiat.roles.UserRolesSyncer; -import lombok.extern.slf4j.Slf4j; -import lombok.val; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Component; - -import javax.annotation.PreDestroy; -import java.util.concurrent.TimeUnit; - -@Slf4j -@Configuration -@ConditionalOnExpression("${fiat.writeMode.enabled:true}") -public class CatsSchedulerConfig { - - @Autowired - UserRolesSyncer userRolesSyncer; - - ClusteredAgentScheduler scheduler; - - @Value("${fiat.writeMode.syncDelayMs:600000}") - String syncDelayMs; - - @Value("${fiat.writeMode.syncDelayTimeoutMs:30000}") - String syncDelayTimeoutMs; - - @Bean - NodeIdentity nodeIdentity() { - return new DefaultNodeIdentity(); - } - - @Bean - AgentIntervalProvider agentIntervalProvider() { - Long pollInterval = Long.parseLong(syncDelayMs); - Long timeout = Long.parseLong(syncDelayTimeoutMs); - return new DefaultAgentIntervalProvider(pollInterval, pollInterval + timeout); - } - - @Bean - NodeStatusProvider nodeStatusProvider() { - return new DefaultNodeStatusProvider(); - } - - @Bean - ClusteredAgentScheduler clusteredAgentScheduler(JedisSource jedisSource, - NodeIdentity nodeIdentity, - AgentIntervalProvider intervalProvider, - NodeStatusProvider nodeStatusProvider, - ExecutionInstrumentation executionInstrumentation) { - scheduler = new ClusteredAgentScheduler(jedisSource, nodeIdentity, intervalProvider, nodeStatusProvider); - scheduler.schedule(userRolesSyncer, userRolesSyncer.getAgentExecution(null), executionInstrumentation); - return scheduler; - } - - @PreDestroy - void cleanup() { - scheduler.unschedule(userRolesSyncer); - } - - @Slf4j - @Component - static class LoggingInstrumentation implements ExecutionInstrumentation { - - @Override - public void executionStarted(Agent agent) { - log.debug("{}:{} starting", agent.getProviderName(), agent.getAgentType()); - } - - @Override - public void executionCompleted(Agent agent, long durationMs) { - log.info("{}:{} completed in {}s", agent.getProviderName(), agent.getAgentType(), TimeUnit.MILLISECONDS.toSeconds(durationMs)); - } - - @Override - public void executionFailed(Agent agent, Throwable cause) { - log.warn(agent.getAgentType() + ":" + agent.getAgentType() + " failed", cause); - } - } -} diff --git a/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/LockConfig.java b/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/LockConfig.java new file mode 100644 index 000000000..b5d7a8afb --- /dev/null +++ b/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/LockConfig.java @@ -0,0 +1,52 @@ +/* + * Copyright 2018 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.fiat.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.spectator.api.Registry; +import com.netflix.spinnaker.kork.jedis.RedisClientDelegate; +import com.netflix.spinnaker.kork.jedis.lock.RedisLockManager; +import com.netflix.spinnaker.kork.lock.LockManager; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.time.Clock; +import java.util.Optional; + +@Configuration +public class LockConfig { + @Bean + Clock clock() { + return Clock.systemDefaultZone(); + } + + @Bean + LockManager redisLockManager(Clock clock, + Registry registry, + ObjectMapper mapper, + RedisClientDelegate redisClientDelegate) { + return new RedisLockManager( + null, // will fall back to running node name + clock, + registry, + mapper, + redisClientDelegate, + Optional.empty(), + Optional.empty() + ); + } +} diff --git a/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/RedisConfig.java b/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/RedisConfig.java index c15c98788..1be2c1c20 100644 --- a/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/RedisConfig.java +++ b/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/config/RedisConfig.java @@ -1,7 +1,7 @@ package com.netflix.spinnaker.fiat.config; -import com.netflix.spinnaker.cats.redis.JedisPoolSource; -import com.netflix.spinnaker.cats.redis.JedisSource; +import com.netflix.spinnaker.kork.jedis.JedisClientDelegate; +import com.netflix.spinnaker.kork.jedis.RedisClientDelegate; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.reflect.FieldUtils; @@ -14,9 +14,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.Protocol; +import redis.clients.jedis.*; import java.lang.reflect.Field; import java.net.URI; @@ -25,12 +23,6 @@ @Configuration @ConditionalOnProperty("redis.connection") public class RedisConfig { - - @Bean - public JedisSource jedisSource(JedisPool jedisPool) { - return new JedisPoolSource(jedisPool); - } - @Bean @ConfigurationProperties("redis") public GenericObjectPoolConfig redisPoolConfig() { @@ -48,6 +40,11 @@ public JedisPool jedisPool(@Value("${redis.connection:redis://localhost:6379}") return createPool(redisPoolConfig, connection, timeout); } + @Bean + RedisClientDelegate redisClientDelegate(JedisPool jedisPool) { + return new JedisClientDelegate(jedisPool); + } + private static JedisPool createPool(GenericObjectPoolConfig redisPoolConfig, String connection, int timeout) { diff --git a/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/permissions/RedisPermissionsRepository.java b/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/permissions/RedisPermissionsRepository.java index 5d4233596..ee8b4b2ce 100644 --- a/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/permissions/RedisPermissionsRepository.java +++ b/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/permissions/RedisPermissionsRepository.java @@ -22,21 +22,18 @@ import com.google.common.collect.ArrayTable; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; -import com.netflix.spinnaker.cats.redis.JedisSource; import com.netflix.spinnaker.fiat.config.UnrestrictedResourceConfig; import com.netflix.spinnaker.fiat.model.UserPermission; import com.netflix.spinnaker.fiat.model.resources.Resource; import com.netflix.spinnaker.fiat.model.resources.ResourceType; import com.netflix.spinnaker.fiat.model.resources.Role; +import com.netflix.spinnaker.kork.jedis.RedisClientDelegate; import lombok.NonNull; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import lombok.val; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.Pipeline; import redis.clients.jedis.Response; import java.util.HashMap; @@ -62,7 +59,6 @@ * will likely not be exactly what you get out. That's because of "unrestricted" resources, which * are added to the returned UserPermission. */ -// TODO(ttomsu): Add RedisCacheOptions from Clouddriver. @Component @Slf4j public class RedisPermissionsRepository implements PermissionsRepository { @@ -74,17 +70,19 @@ public class RedisPermissionsRepository implements PermissionsRepository { private static final String UNRESTRICTED = UnrestrictedResourceConfig.UNRESTRICTED_USERNAME; - @Value("${fiat.redis.prefix:spinnaker:fiat}") - @Setter - private String prefix; + private final ObjectMapper objectMapper; + private final RedisClientDelegate redisClientDelegate; - @Autowired - @Setter - private ObjectMapper objectMapper; + private final String prefix; @Autowired - @Setter - private JedisSource jedisSource; + public RedisPermissionsRepository(ObjectMapper objectMapper, + RedisClientDelegate redisClientDelegate, + @Value("${fiat.redis.prefix:spinnaker:fiat}") String prefix) { + this.objectMapper = objectMapper; + this.redisClientDelegate = redisClientDelegate; + this.prefix = prefix; + } @Override public RedisPermissionsRepository put(@NonNull UserPermission permission) { @@ -103,33 +101,31 @@ public RedisPermissionsRepository put(@NonNull UserPermission permission) { } }); - try (Jedis jedis = jedisSource.getJedis()) { - Pipeline deleteOldValuesPipeline = jedis.pipelined(); - Pipeline insertNewValuesPipeline = jedis.pipelined(); + try { + redisClientDelegate.withMultiKeyPipeline(pipeline -> { + String userId = permission.getId(); + pipeline.sadd(allUsersKey(), userId); - String userId = permission.getId(); - insertNewValuesPipeline.sadd(allUsersKey(), userId); - - if (permission.isAdmin()) { - insertNewValuesPipeline.sadd(adminKey(), userId); - } else { - deleteOldValuesPipeline.srem(adminKey(), userId); - } + if (permission.isAdmin()) { + pipeline.sadd(adminKey(), userId); + } else { + pipeline.srem(adminKey(), userId); + } - permission.getRoles().forEach(role -> insertNewValuesPipeline.sadd(roleKey(role), userId)); + permission.getRoles().forEach(role -> pipeline.sadd(roleKey(role), userId)); - for (ResourceType r : ResourceType.values()) { - String userResourceKey = userKey(userId, r); + for (ResourceType r : ResourceType.values()) { + String userResourceKey = userKey(userId, r); - deleteOldValuesPipeline.del(userResourceKey); + pipeline.del(userResourceKey); - Map redisValue = resourceTypeToRedisValue.get(r); - if (redisValue != null && !redisValue.isEmpty()) { - insertNewValuesPipeline.hmset(userResourceKey, redisValue); + Map redisValue = resourceTypeToRedisValue.get(r); + if (redisValue != null && !redisValue.isEmpty()) { + pipeline.hmset(userResourceKey, redisValue); + } } - } - deleteOldValuesPipeline.sync(); - insertNewValuesPipeline.sync(); + pipeline.sync(); + }); } catch (Exception e) { log.error("Storage exception writing " + permission.getId() + " entry.", e); } @@ -138,28 +134,29 @@ public RedisPermissionsRepository put(@NonNull UserPermission permission) { @Override public Optional get(@NonNull String id) { - try (Jedis jedis = jedisSource.getJedis()) { - RawUserPermission userResponseMap = new RawUserPermission(); - RawUserPermission unrestrictedResponseMap = new RawUserPermission(); - - Pipeline p = jedis.pipelined(); - Response isUserInRepo = p.sismember(allUsersKey(), id); - for (ResourceType r : ResourceType.values()) { - Response> resourceMap = p.hgetAll(userKey(id, r)); - userResponseMap.put(r, resourceMap); - Response> unrestrictedMap = p.hgetAll(unrestrictedUserKey(r)); - unrestrictedResponseMap.put(r, unrestrictedMap); - } - Response admin = p.sismember(adminKey(), id); - p.sync(); + try { + return redisClientDelegate.withMultiKeyPipeline(p -> { + RawUserPermission userResponseMap = new RawUserPermission(); + RawUserPermission unrestrictedResponseMap = new RawUserPermission(); - if (!isUserInRepo.get()) { - return Optional.empty(); - } + Response isUserInRepo = p.sismember(allUsersKey(), id); + for (ResourceType r : ResourceType.values()) { + Response> resourceMap = p.hgetAll(userKey(id, r)); + userResponseMap.put(r, resourceMap); + Response> unrestrictedMap = p.hgetAll(unrestrictedUserKey(r)); + unrestrictedResponseMap.put(r, unrestrictedMap); + } + Response admin = p.sismember(adminKey(), id); + p.sync(); + + if (!isUserInRepo.get()) { + return Optional.empty(); + } - userResponseMap.isAdmin = admin.get(); - UserPermission unrestrictedUser = getUserPermission(UNRESTRICTED, unrestrictedResponseMap); - return Optional.of(getUserPermission(id, userResponseMap).merge(unrestrictedUser)); + userResponseMap.isAdmin = admin.get(); + UserPermission unrestrictedUser = getUserPermission(UNRESTRICTED, unrestrictedResponseMap); + return Optional.of(getUserPermission(id, userResponseMap).merge(unrestrictedUser)); + }); } catch (Exception e) { log.error("Storage exception reading " + id + " entry.", e); } @@ -202,8 +199,7 @@ public Map getAllByRoles(List anyRoles) { return new HashMap<>(); } - try (Jedis jedis = jedisSource.getJedis()) { - Pipeline p = jedis.pipelined(); + return redisClientDelegate.withMultiKeyPipeline(p -> { List>> responses = anyRoles .stream() .map(role -> p.smembers(roleKey(role))) @@ -235,7 +231,7 @@ public Map getAllByRoles(List anyRoles) { }) .collect(Collectors.toMap(UserPermission::getId, permission -> permission.merge(unrestrictedUser))); - } + }); } private UserPermission getUserPermission(String userId, RawUserPermission raw) { @@ -254,33 +250,34 @@ private UserPermission getUserPermission(String userId, RawUserPermission raw) { } private Table>> getAllFromRedis() { - Set allUserIds; - try (Jedis jedis = jedisSource.getJedis()) { - allUserIds = jedis.smembers(allUsersKey()); + try { + return redisClientDelegate.withCommandsClient(jedis -> { + return getAllFromRedis(jedis.smembers(allUsersKey())); + }); } catch (Exception e) { log.error("Storage exception reading all entries.", e); return null; } - - return getAllFromRedis(allUserIds); } private Table>> getAllFromRedis(Set userIds) { if (userIds.size() == 0) { return HashBasedTable.create(); } - try (Jedis jedis = jedisSource.getJedis()) { - Table>> responseTable = - ArrayTable.create(userIds, new ArrayIterator<>(ResourceType.values())); - Pipeline p = jedis.pipelined(); - for (String userId : userIds) { - for (ResourceType r : ResourceType.values()) { - responseTable.put(userId, r, p.hgetAll(userKey(userId, r))); + try { + return redisClientDelegate.withMultiKeyPipeline(p -> { + Table>> responseTable = + ArrayTable.create(userIds, new ArrayIterator<>(ResourceType.values())); + + for (String userId : userIds) { + for (ResourceType r : ResourceType.values()) { + responseTable.put(userId, r, p.hgetAll(userKey(userId, r))); + } } - } - p.sync(); - return responseTable; + p.sync(); + return responseTable; + }); } catch (Exception e) { log.error("Storage exception reading all entries.", e); } @@ -289,30 +286,32 @@ private Table>> getAllFromRed @Override public void remove(@NonNull String id) { - try (Jedis jedis = jedisSource.getJedis()) { - Map userRolesById = jedis.hgetAll(userKey(id, ResourceType.ROLE)); - - Pipeline p = jedis.pipelined(); - - p.srem(allUsersKey(), id); - for (String roleName : userRolesById.keySet()) { - p.srem(roleKey(roleName), id); - } + try { + redisClientDelegate.withCommandsClient(jedis -> { + Map userRolesById = jedis.hgetAll(userKey(id, ResourceType.ROLE)); + + redisClientDelegate.withMultiKeyPipeline(p -> { + p.srem(allUsersKey(), id); + for (String roleName : userRolesById.keySet()) { + p.srem(roleKey(roleName), id); + } - for (ResourceType r : ResourceType.values()) { - p.del(userKey(id, r)); - } - p.srem(adminKey(), id); - p.sync(); + for (ResourceType r : ResourceType.values()) { + p.del(userKey(id, r)); + } + p.srem(adminKey(), id); + p.sync(); + }); + }); } catch (Exception e) { log.error("Storage exception reading " + id + " entry.", e); } } private Set getAllAdmins() { - try (Jedis jedis = jedisSource.getJedis()) { + return redisClientDelegate.withCommandsClient(jedis -> { return jedis.smembers(adminKey()); - } + }); } private String allUsersKey() { diff --git a/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/roles/UserRolesSyncer.java b/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/roles/UserRolesSyncer.java index e9a639293..ee01c21e3 100644 --- a/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/roles/UserRolesSyncer.java +++ b/fiat-roles/src/main/java/com/netflix/spinnaker/fiat/roles/UserRolesSyncer.java @@ -17,7 +17,6 @@ package com.netflix.spinnaker.fiat.roles; import com.diffplug.common.base.Functions; -import com.netflix.spinnaker.cats.agent.RunnableAgent; import com.netflix.spinnaker.fiat.config.ResourceProvidersHealthIndicator; import com.netflix.spinnaker.fiat.config.UnrestrictedResourceConfig; import com.netflix.spinnaker.fiat.model.UserPermission; @@ -29,8 +28,7 @@ import com.netflix.spinnaker.fiat.permissions.PermissionsResolver; import com.netflix.spinnaker.fiat.providers.ProviderException; import com.netflix.spinnaker.fiat.providers.ResourceProvider; -import lombok.Getter; -import lombok.Setter; +import com.netflix.spinnaker.kork.lock.LockManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -41,6 +39,7 @@ import org.springframework.util.backoff.BackOffExecution; import org.springframework.util.backoff.FixedBackOff; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,41 +48,49 @@ @Slf4j @Component @ConditionalOnExpression("${fiat.writeMode.enabled:true}") -public class UserRolesSyncer implements RunnableAgent { - - @Getter - private final String agentType = "UserRoleSyncer"; - @Getter - private final String providerName = "Fiat"; - - @Autowired - @Setter - private PermissionsRepository permissionsRepository; - - @Autowired - @Setter - private PermissionsResolver permissionsResolver; - - @Autowired(required = false) - @Setter - private ResourceProvider serviceAccountProvider; +public class UserRolesSyncer { + private final LockManager lockManager; + private final PermissionsRepository permissionsRepository; + private final PermissionsResolver permissionsResolver; + private final ResourceProvider serviceAccountProvider; + private final ResourceProvidersHealthIndicator healthIndicator; + + private final long retryIntervalMs; + private final long syncDelayMs; + private final long syncFailureDelayMs; + private final long syncDelayTimeoutMs; @Autowired - @Setter - private ResourceProvidersHealthIndicator healthIndicator; - - @Value("${fiat.writeMode.retryIntervalMs:10000}") - @Setter - private long retryIntervalMs = 10000; - - @Value("${fiat.writeMode.syncDelayTimeoutMs:30000}") - @Setter - private long syncDelayTimeoutMs = 30000; - + public UserRolesSyncer(LockManager lockManager, + PermissionsRepository permissionsRepository, + PermissionsResolver permissionsResolver, + ResourceProvider serviceAccountProvider, + ResourceProvidersHealthIndicator healthIndicator, + @Value("${fiat.writeMode.retryIntervalMs:10000}") long retryIntervalMs, + @Value("${fiat.writeMode.syncDelayMs:600000}") long syncDelayMs, + @Value("${fiat.writeMode.syncFailureDelayMs:600000}") long syncFailureDelayMs, + @Value("${fiat.writeMode.syncDelayTimeoutMs:30000}") long syncDelayTimeoutMs) { + this.lockManager = lockManager; + this.permissionsRepository = permissionsRepository; + this.permissionsResolver = permissionsResolver; + this.serviceAccountProvider = serviceAccountProvider; + this.healthIndicator = healthIndicator; + + this.retryIntervalMs = retryIntervalMs; + this.syncDelayMs = syncDelayMs; + this.syncFailureDelayMs = syncFailureDelayMs; + this.syncDelayTimeoutMs = syncDelayTimeoutMs; + } + @Scheduled(fixedDelay = 30000L) + public void schedule() { + LockManager.LockOptions lockOptions = new LockManager.LockOptions() + .withLockName("Fiat.UserRolesSyncer".toLowerCase()) + .withMaximumLockDuration(Duration.ofMillis(syncDelayMs + syncDelayTimeoutMs)) + .withSuccessInterval(Duration.ofMillis(syncDelayMs)) + .withFailureInterval(Duration.ofMillis(syncFailureDelayMs)); - public void run() { - syncAndReturn(); + lockManager.acquireLock(lockOptions, this::syncAndReturn); } public long syncAndReturn() { @@ -97,7 +104,7 @@ public long syncAndReturn() { if (!isServerHealthy()) { log.warn("Server is currently UNHEALTHY. User permission role synchronization and " + - "resolution may not complete until this server becomes healthy again."); + "resolution may not complete until this server becomes healthy again."); } while (true) { @@ -114,7 +121,7 @@ public long syncAndReturn() { } return updateUserPermissions(combo); - } catch (ProviderException|PermissionResolutionException ex) { + } catch (ProviderException | PermissionResolutionException ex) { Status status = healthIndicator.health().getStatus(); long waitTime = backOffExec.nextBackOff(); if (waitTime == BackOffExecution.STOP || System.currentTimeMillis() > timeout) { @@ -173,16 +180,16 @@ public long updateUserPermissions(Map permissionsById) { .map(permission -> new ExternalUser() .setId(permission.getId()) .setExternalRoles(permission.getRoles() - .stream() - .filter(role -> role.getSource() == Role.Source.EXTERNAL) - .collect(Collectors.toList()))) + .stream() + .filter(role -> role.getSource() == Role.Source.EXTERNAL) + .collect(Collectors.toList()))) .collect(Collectors.toList()); long count = permissionsResolver.resolve(extUsers) - .values() - .stream() - .map(permission -> permissionsRepository.put(permission)) - .count(); + .values() + .stream() + .map(permission -> permissionsRepository.put(permission)) + .count(); log.info("Synced {} non-anonymous user roles.", count); return count; } diff --git a/fiat-roles/src/test/groovy/com/netflix/spinnaker/fiat/permissions/RedisPermissionsRepositorySpec.groovy b/fiat-roles/src/test/groovy/com/netflix/spinnaker/fiat/permissions/RedisPermissionsRepositorySpec.groovy index 9cd062ad6..390458fb0 100644 --- a/fiat-roles/src/test/groovy/com/netflix/spinnaker/fiat/permissions/RedisPermissionsRepositorySpec.groovy +++ b/fiat-roles/src/test/groovy/com/netflix/spinnaker/fiat/permissions/RedisPermissionsRepositorySpec.groovy @@ -18,7 +18,6 @@ package com.netflix.spinnaker.fiat.permissions import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.databind.ObjectMapper -import com.netflix.spinnaker.cats.redis.JedisSource import com.netflix.spinnaker.fiat.config.UnrestrictedResourceConfig import com.netflix.spinnaker.fiat.model.UserPermission import com.netflix.spinnaker.fiat.model.resources.Account @@ -26,7 +25,9 @@ import com.netflix.spinnaker.fiat.model.resources.Application import com.netflix.spinnaker.fiat.model.resources.Role import com.netflix.spinnaker.fiat.model.resources.ServiceAccount import com.netflix.spinnaker.kork.jedis.EmbeddedRedis +import com.netflix.spinnaker.kork.jedis.JedisClientDelegate import redis.clients.jedis.Jedis +import redis.clients.jedis.JedisPool import spock.lang.AutoCleanup import spock.lang.Shared import spock.lang.Specification @@ -58,16 +59,11 @@ class RedisPermissionsRepositorySpec extends Specification { } def setup() { - JedisSource js = new JedisSource() { - @Override - Jedis getJedis() { - return embeddedRedis.jedis - } - } - repo = new RedisPermissionsRepository() - .setPrefix(prefix) - .setObjectMapper(objectMapper) - .setJedisSource(js) + repo = new RedisPermissionsRepository( + objectMapper, + new JedisClientDelegate(embeddedRedis.pool as JedisPool), + prefix + ) } def cleanup() { diff --git a/fiat-roles/src/test/groovy/com/netflix/spinnaker/fiat/roles/UserRolesSyncerSpec.groovy b/fiat-roles/src/test/groovy/com/netflix/spinnaker/fiat/roles/UserRolesSyncerSpec.groovy index e34d3205d..11d9992fc 100644 --- a/fiat-roles/src/test/groovy/com/netflix/spinnaker/fiat/roles/UserRolesSyncerSpec.groovy +++ b/fiat-roles/src/test/groovy/com/netflix/spinnaker/fiat/roles/UserRolesSyncerSpec.groovy @@ -18,7 +18,6 @@ package com.netflix.spinnaker.fiat.roles import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.databind.ObjectMapper -import com.netflix.spinnaker.cats.redis.JedisSource import com.netflix.spinnaker.fiat.config.ResourceProvidersHealthIndicator import com.netflix.spinnaker.fiat.config.UnrestrictedResourceConfig import com.netflix.spinnaker.fiat.model.UserPermission @@ -29,13 +28,19 @@ import com.netflix.spinnaker.fiat.permissions.PermissionsResolver import com.netflix.spinnaker.fiat.permissions.RedisPermissionsRepository import com.netflix.spinnaker.fiat.providers.ResourceProvider import com.netflix.spinnaker.kork.jedis.EmbeddedRedis +import com.netflix.spinnaker.kork.jedis.JedisClientDelegate +import com.netflix.spinnaker.kork.lock.LockManager import org.springframework.boot.actuate.health.Health import redis.clients.jedis.Jedis +import redis.clients.jedis.JedisPool import spock.lang.AutoCleanup import spock.lang.Shared import spock.lang.Specification import spock.lang.Subject +import javax.annotation.Nonnull +import java.util.concurrent.Callable + class UserRolesSyncerSpec extends Specification { private static final String UNRESTRICTED = UnrestrictedResourceConfig.UNRESTRICTED_USERNAME; @@ -60,15 +65,11 @@ class UserRolesSyncerSpec extends Specification { } def setup() { - JedisSource js = new JedisSource() { - @Override - Jedis getJedis() { - return embeddedRedis.jedis - } - } - repo = new RedisPermissionsRepository() - .setObjectMapper(objectMapper) - .setJedisSource(js) + repo = new RedisPermissionsRepository( + objectMapper, + new JedisClientDelegate(embeddedRedis.pool as JedisPool), + "unittests" + ) } def cleanup() { @@ -106,11 +107,25 @@ class UserRolesSyncerSpec extends Specification { } def permissionsResolver = Mock(PermissionsResolver) - @Subject syncer = new UserRolesSyncer() - .setPermissionsRepository(repo) - .setPermissionsResolver(permissionsResolver) - .setServiceAccountProvider(serviceAccountProvider) - .setHealthIndicator(new AlwaysUpHealthIndicator()) + + def lockManager = Mock(LockManager) { + _ * acquireLock() >> { LockManager.LockOptions lockOptions, Callable onLockAcquiredCallback -> + onLockAcquiredCallback.call() + } + } + + @Subject + def syncer = new UserRolesSyncer( + lockManager, + repo, + permissionsResolver, + serviceAccountProvider, + new AlwaysUpHealthIndicator(), + 1, + 1, + 1, + 1 + ) expect: repo.getAllById() == [ diff --git a/fiat-web/src/test/groovy/com/netflix/spinnaker/config/EmbeddedRedisConfig.groovy b/fiat-web/src/test/groovy/com/netflix/spinnaker/config/EmbeddedRedisConfig.groovy index bac8876eb..e11c29f53 100644 --- a/fiat-web/src/test/groovy/com/netflix/spinnaker/config/EmbeddedRedisConfig.groovy +++ b/fiat-web/src/test/groovy/com/netflix/spinnaker/config/EmbeddedRedisConfig.groovy @@ -17,8 +17,9 @@ package com.netflix.spinnaker.config -import com.netflix.spinnaker.cats.redis.JedisSource import com.netflix.spinnaker.kork.jedis.EmbeddedRedis +import com.netflix.spinnaker.kork.jedis.JedisClientDelegate +import com.netflix.spinnaker.kork.jedis.RedisClientDelegate import org.springframework.beans.factory.config.ConfigurableBeanFactory import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -45,9 +46,7 @@ class EmbeddedRedisConfig { } @Bean - JedisSource jedisSource() { - return { - jedisPool().getResource() - } + RedisClientDelegate redisClientDelegate(EmbeddedRedis embeddedRedis) { + return new JedisClientDelegate(embeddedRedis.pool) } }