Skip to content

Commit

Permalink
feat: Circuit Breaker 상태 전파 기능 구현
Browse files Browse the repository at this point in the history
  • Loading branch information
jemin committed Nov 26, 2023
1 parent 0de81f0 commit 9217e47
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 111 deletions.
9 changes: 5 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ allprojects {
repositories {
mavenCentral()
}


}


Expand All @@ -21,17 +23,16 @@ subprojects {
apply plugin: 'io.spring.dependency-management'
apply plugin: 'jacoco'
apply plugin: 'jacoco-report-aggregation'


bootJar.enabled = false // core-api 모듈 이외에는 모두 boot 되지 않는 모듈
jar.enabled = true // jar는 필요함
jar.enabled = true

dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudDependenciesVersion}"
mavenBom "org.springframework.cloud:spring-cloud-dependencies:2022.0.3"
}
}

jacoco {
toolVersion = '0.8.8'
}
Expand Down
7 changes: 1 addition & 6 deletions core/core-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ dependencies {
annotationProcessor "jakarta.annotation:jakarta.annotation-api"
annotationProcessor "jakarta.persistence:jakarta.persistence-api"


/*
현재 Melly 프로젝트의 clients:client-auth 모듈에서 OpenFeign을 위한 Circuit Breaker를 위해 Resilience4j 라이브러리를 implementation으로 사용하고 있습니다.
core-api 모듈에서 client-auth 모듈을 의존하고 있기에 core-api의 compile classpath에 resilence4j가 들어오지는 않지만 runtime에는 사용이 가능합니다.
따라서 core-api 모듈에서는 중복으로 implementation을 하지 않고 compileOnly로 compile classpath에만 의존성을 추가했습니다.
*/
implementation "org.springframework.cloud:spring-cloud-starter-circuitbreaker-resilience4j:3.0.2"
implementation "io.github.resilience4j:resilience4j-all"

// Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package cmc.mellyserver.config.cache;

import static cmc.mellyserver.config.circuitbreaker.CircuitBreakerConstants.*;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cloud.client.circuitbreaker.CircuitBreaker;
import org.springframework.cloud.client.circuitbreaker.CircuitBreakerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
Expand All @@ -19,61 +22,58 @@
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import cmc.mellyserver.common.constants.CacheNames;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.core.registry.EntryAddedEvent;
import io.github.resilience4j.core.registry.EntryRemovedEvent;
import io.github.resilience4j.core.registry.EntryReplacedEvent;
import io.github.resilience4j.core.registry.RegistryEventConsumer;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@EnableCaching
@Configuration
public class CacheConfig {

private static final String CACHE_CURCUIT_BREAKER = "cache_curcuit_breaker";
@Value("${spring.redis.cache.host}")
private String host;
@Value("${spring.redis.cache.port}")
private int port;

/*
Redis를 운영 환경에서 사용할때는 Sentinel이나 Cluster 모드를 사용해서 고가용성을 보장할 것이고,
RedisConfiguration으로 RedisSentinelConfiguration이나 RedisClusterConfiguration을 사용할 것입니다.
현재 프로젝트에서는 가용성 보장을 위한 Replication을 하지는 못했기에 싱글 노드 기반의 RedisStandaloneConfiguration을 사용했습니다.
*/
@Bean(name = "redisCacheConnectionFactory")
RedisConnectionFactory redisCacheConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(host);
redisStandaloneConfiguration.setPort(port);

/*
Redis 서버가 다운됐을 시, 쿼리를 보낸 뒤 1초동안 응답이 없으면 Timeout 에러가 발생하도록 구현했습니다
Redis 서버가 다운됐을 시, Redis에 요청을 보낸 뒤 200ms동안 응답이 없으면 Timeout이 발생하도록 구현했습니다.
현재 프로젝트 내 Redis 캐시의 응답 시간이 보통 100ms 아래로 나오는 것을 참고했을때 200ms로 command timeout을 설정한다면,
Redis 서버가 죽지 않은 상황에서 일시적인 지연으로 인해 Timeout이 발생하고, DB 쿼리가 발생하는 상황을 방지할 수 있다고 생각했습니다.
*/
LettuceClientConfiguration lettuceClientConfiguration = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(1))
.commandTimeout(Duration.ofMillis(200L))
.build();

return new LettuceConnectionFactory(redisStandaloneConfiguration, lettuceClientConfiguration);
}

/*
설정 종류
- SerializationFeature.INDENT_OUTPUT : 콘솔에 출력할때 포맷팅해서 나옵니다.
- JavaTimeModule : 해당 모듈을 등록해줘야 Java 8의 date/time을 사용해서 string으로 직렬화 가능합니다.
- SerializationFeature.WRITE_DATES_AS_TIMESTAMP : 해당 속성을 true로 설정하면 Long 타입으로 직렬화됩니다. 현재 Disable로 설정해서 String으로 직렬화되도록 설정했습니다.
- DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES : 역직렬화시 클래스 변수에 매핑되지 않는 값이 있을때 예외를 발생시킬지 체크, 현재는 예외가 발생하지 않도록 false로 설정했습니다.
*/
@Bean
public ObjectMapper objectMapper() {

return new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand All @@ -85,7 +85,6 @@ public ObjectMapper objectMapper() {
- cache value는 여러 타입이 들어올 수 있기에 GenericJackon2JsonRedisSerializer를 사용했습니다.
- User 데이터는 수정이 적을 것으로 예상되어 1시간으로 TTL을 설정했습니다.
- Memory 데이터도 수정이 적을 것으로 예상되어 1시간으로 TTL을 설정했습니다.
- Memory 리스트인 FEED는 수정이 잦을 것으로 예상되어 1분으로 TTL을 설정했습니다.
- Group 데이터도 수정이 적을 것으로 예상되어 1시간으로 TTL을 설정했습니다.
Circuit breaker를 통한 HA 보장
Expand All @@ -94,68 +93,31 @@ public ObjectMapper objectMapper() {
막음으로써 Fail Fast를 통한 Redis Server Recovery를 유도했습니다.
*/
@Bean
public CacheManager redisCacheManager(
@Qualifier("redisCacheConnectionFactory") RedisConnectionFactory connectionFactory) {
public CacheManager customCacheManager(
@Qualifier("redisCacheConnectionFactory") RedisConnectionFactory connectionFactory,
CircuitBreakerFactory circuitBreakerFactory) {

/* Serializer 설정 */
RedisCacheConfiguration defaultConfig = RedisCacheConfiguration.defaultCacheConfig()
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(
new GenericJackson2JsonRedisSerializer(objectMapper())));

Map<String, RedisCacheConfiguration> redisCacheConfigMap = new HashMap<>();
/* Cache TTL 설정 */
Map<String, RedisCacheConfiguration> redisCacheConfigMap = new ConcurrentHashMap<>();
redisCacheConfigMap.put(CacheNames.USER, defaultConfig.entryTtl(Duration.ofHours(1)));
redisCacheConfigMap.put(CacheNames.USER_VOLUME, defaultConfig.entryTtl(Duration.ofHours(1)));
redisCacheConfigMap.put(CacheNames.MEMORY, defaultConfig.entryTtl(Duration.ofHours(1)));
redisCacheConfigMap.put(CacheNames.FEED, defaultConfig.entryTtl(Duration.ofMinutes(1)));
redisCacheConfigMap.put(CacheNames.GROUP, defaultConfig.entryTtl(Duration.ofHours(1)));
redisCacheConfigMap.put(CacheNames.SCRAP, defaultConfig.entryTtl(Duration.ofHours(1)));

RedisCacheManager redisCacheManager = RedisCacheManager.builder(connectionFactory)
.withInitialCacheConfigurations(redisCacheConfigMap)
.build();

CircuitBreakerRegistry circuitBreakerRegistry = configCircuitBreaker();

return new CustomCacheManager(redisCacheManager, circuitBreakerRegistry.circuitBreaker(CACHE_CURCUIT_BREAKER));
}

private CircuitBreakerRegistry configCircuitBreaker() {

CircuitBreakerConfig config = CircuitBreakerConfig
.custom()
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
.slidingWindowSize(5)
.failureRateThreshold(80)
.waitDurationInOpenState(Duration.ofSeconds(20))
.permittedNumberOfCallsInHalfOpenState(4)
.automaticTransitionFromOpenToHalfOpenEnabled(true)
.recordExceptions(CallNotPermittedException.class, QueryTimeoutException.class)
.build();

CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.custom().withCircuitBreakerConfig(config)
.addRegistryEventConsumer(new RegistryEventConsumer<CircuitBreaker>() {

@Override
public void onEntryAddedEvent(EntryAddedEvent<CircuitBreaker> entryAddedEvent) {

CircuitBreaker.EventPublisher eventPublisher = entryAddedEvent.getAddedEntry().getEventPublisher();

eventPublisher.onStateTransition(event -> log.info("onStateTransition {}", event));
eventPublisher.onError(event -> log.error("onError {}", event));
eventPublisher.onSuccess(event -> log.info("onSuccess {}", event));
eventPublisher.onCallNotPermitted(event -> log.info("onCallNotPermitted {}", event));
}

@Override
public void onEntryRemovedEvent(EntryRemovedEvent<CircuitBreaker> entryRemoveEvent) {

}

@Override
public void onEntryReplacedEvent(EntryReplacedEvent<CircuitBreaker> entryReplacedEvent) {
CircuitBreaker circuitBreaker = circuitBreakerFactory.create(CACHE_CIRCUIT);

}
}).build();
return circuitBreakerRegistry;
/* Circuit Breaker 설정 */
return new CustomCacheManager(redisCacheManager, circuitBreaker);
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package cmc.mellyserver.common.constants;
package cmc.mellyserver.config.cache;

public abstract class CacheNames {

public static final String USER = "user";

public static final String USER_VOLUME = "user_volume";

public static final String SCRAP = "scrap";

public static final String GROUP = "group";

public static final String MEMORY = "memory";

public static final String FEED = "feed";
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package cmc.mellyserver.config.cache;

import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.springframework.cache.Cache;
import org.springframework.cloud.client.circuitbreaker.CircuitBreaker;
import org.springframework.dao.QueryTimeoutException;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.decorators.Decorators;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -35,17 +34,13 @@ public Object getNativeCache() {

@Override
public ValueWrapper get(Object key) {

Supplier<ValueWrapper> flightsSupplier = () -> (globalCache.get(key));

return Decorators
.ofSupplier(flightsSupplier)
.withCircuitBreaker(circuitBreaker)
.withFallback((e) -> fallback())
.decorate().get();

return circuitBreaker.run(flightsSupplier, (throwable -> fallback()));
}

/*
Cache가 ValueWrapper로 null을 반환하면 캐싱된 데이터가 없다고 판단 후, 실제 로직을 통해 DB 쿼리를 진행합니다.
*/
private ValueWrapper fallback() {
log.error("global cache server down, fallback method start");
return null;
Expand All @@ -63,28 +58,21 @@ public <T> T get(Object key, Callable<T> valueLoader) {

@Override
public void put(Object key, Object value) {
Consumer consumer = (k) -> globalCache.put(key, value);
Decorators
.ofConsumer(consumer)
.withCircuitBreaker(circuitBreaker)
.decorate();

try {
globalCache.put(key, value);
} catch (QueryTimeoutException e) {
log.error(e.getMessage());
}
}

@Override
public void evict(Object key) {
Consumer consumer = (k) -> globalCache.evict(key);
Decorators
.ofConsumer(consumer)
.withCircuitBreaker(circuitBreaker)
.decorate();
globalCache.evict(key);
}

@Override
public void clear() {
Consumer consumer = (k) -> globalCache.clear();
Decorators
.ofConsumer(consumer)
.withCircuitBreaker(circuitBreaker)
.decorate();
globalCache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@

import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cloud.client.circuitbreaker.CircuitBreaker;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CustomCacheManager implements CacheManager {

private final CacheManager delegate;

private final CircuitBreaker circuitBreaker;

public CustomCacheManager(CacheManager delegate, CircuitBreaker circuitBreaker) {
Expand All @@ -22,7 +18,6 @@ public CustomCacheManager(CacheManager delegate, CircuitBreaker circuitBreaker)

@Override
public Cache getCache(String name) {

return new CustomCache(delegate.getCache(name), circuitBreaker);
}

Expand Down
Loading

0 comments on commit 9217e47

Please sign in to comment.