Skip to content

Commit

Permalink
Fix the issues of XDS flow control
Browse files Browse the repository at this point in the history
Signed-off-by: hanbingleixue <[email protected]>
  • Loading branch information
hanbingleixue committed Feb 5, 2025
1 parent 91e61f3 commit 1e080c7
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -139,11 +140,20 @@ private static XdsInstanceCircuitBreakers parseInstanceCircuitBreakers(Cluster c
xdsInstanceCircuitBreakers.setConsecutiveGatewayFailure(outlierDetection.getConsecutiveGatewayFailure()
.getValue());
xdsInstanceCircuitBreakers.setConsecutive5xxFailure(outlierDetection.getConsecutive5Xx().getValue());
long interval = java.time.Duration.ofSeconds(outlierDetection.getInterval().getSeconds()).toMillis();
xdsInstanceCircuitBreakers.setInterval(interval);
long ejectionTime = java.time.Duration.ofSeconds(outlierDetection.getBaseEjectionTime().getSeconds())
.toMillis();
xdsInstanceCircuitBreakers.setBaseEjectionTime(ejectionTime);
long interval = outlierDetection.getInterval().getSeconds();
if (interval != 0) {
xdsInstanceCircuitBreakers.setInterval(Duration.ofSeconds(interval).toMillis());
} else {
interval = Duration.ofNanos(outlierDetection.getInterval().getNanos()).toMillis();
xdsInstanceCircuitBreakers.setInterval(interval);
}
long ejectionTime = outlierDetection.getBaseEjectionTime().getSeconds();
if (ejectionTime == 0) {
xdsInstanceCircuitBreakers.setBaseEjectionTime(Duration.ofSeconds(ejectionTime).toMillis());
} else {
ejectionTime = Duration.ofNanos(outlierDetection.getBaseEjectionTime().getNanos()).toMillis();
xdsInstanceCircuitBreakers.setBaseEjectionTime(ejectionTime);
}
xdsInstanceCircuitBreakers.setMaxEjectionPercent(outlierDetection.getMaxEjectionPercent().getValue());
xdsInstanceCircuitBreakers.setFailurePercentageMinimumHosts(outlierDetection.getFailurePercentageMinimumHosts()
.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,13 @@ private static XdsRetryPolicy parseRetryPolicy(RetryPolicy retryPolicy) {
xdsRetryPolicy.setRetryConditions(Arrays.asList(retryPolicy.getRetryOn().split(CommonConstant.COMMA)));
}
xdsRetryPolicy.setMaxAttempts(retryPolicy.getNumRetries().getValue());
long perTryTimeout = Duration.ofSeconds(retryPolicy.getPerTryTimeout().getSeconds()).toMillis();
long perTryTimeout = retryPolicy.getPerTryTimeout().getSeconds();
if (perTryTimeout != 0) {
xdsRetryPolicy.setPerTryTimeout(Duration.ofSeconds(perTryTimeout).toMillis());
} else {
perTryTimeout = Duration.ofNanos(retryPolicy.getPerTryTimeout().getNanos()).toMillis();
xdsRetryPolicy.setPerTryTimeout(perTryTimeout);
}
xdsRetryPolicy.setPerTryTimeout(perTryTimeout);
return xdsRetryPolicy;
}
Expand Down Expand Up @@ -307,8 +313,13 @@ private static XdsAbort parseAbort(FaultAbort faultAbort) {

private static XdsDelay parseDelay(FaultDelay faultDelay) {
XdsDelay xdsDelay = new XdsDelay();
long fixedDelay = Duration.ofSeconds(faultDelay.getFixedDelay().getSeconds()).toMillis();
xdsDelay.setFixedDelay(fixedDelay);
long fixedDelay = faultDelay.getFixedDelay().getSeconds();
if (fixedDelay != 0) {
xdsDelay.setFixedDelay(Duration.ofSeconds(fixedDelay).toMillis());
} else {
fixedDelay = Duration.ofNanos(faultDelay.getFixedDelay().getNanos()).toMillis();
xdsDelay.setFixedDelay(fixedDelay);
}
io.sermant.core.service.xds.entity.FractionalPercent fractionalPercent =
new io.sermant.core.service.xds.entity.FractionalPercent();
fractionalPercent.setNumerator(faultDelay.getPercentage().getNumerator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ public class CommonConst {
*/
public static final String DEFAULT_CONTENT_TYPE = "text/plain";

/**
* Minimum response code for a successful request
*/
public static final int MIN_SUCCESS_STATUS_CODE = 200;

/**
* Maximum response code for a successful request
*/
public static final int MAX_SUCCESS_STATUS_CODE = 399;

private CommonConst() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.xds.entity.XdsRetryPolicy;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.config.CommonConst;
import io.sermant.flowcontrol.common.config.FlowControlConfig;
import io.sermant.flowcontrol.common.support.ReflectMethodCacheSupport;
import io.sermant.flowcontrol.common.xds.retry.ExceptionRetryConditionType;
import io.sermant.flowcontrol.common.xds.retry.ResultRetryConditionType;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;
import io.sermant.flowcontrol.common.xds.retry.RetryConditionType;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -89,11 +92,12 @@ public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) {
return false;
}
String statusCode = statusCodeOptional.get();
if (conditions.contains(statusCode)) {
return true;
if (isSuccess(statusCode)) {
return false;
}
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
Optional<RetryCondition> retryConditionOptional = ResultRetryConditionType
.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
Expand All @@ -110,7 +114,8 @@ public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
return false;
}
for (String conditionName : retryPolicy.getRetryConditions()) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
Optional<RetryCondition> retryConditionOptional = ExceptionRetryConditionType
.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
Expand All @@ -121,6 +126,20 @@ public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
return false;
}

/**
* Determine if the request is successful
*
* @param statusCode status code
* @return if the request is successful,true : success false: failure
*/
public static boolean isSuccess(String statusCode) {
if (StringUtils.isEmpty(statusCode)) {
return false;
}
int code = Integer.parseInt(statusCode);
return code >= CommonConst.MIN_SUCCESS_STATUS_CODE && code <= CommonConst.MAX_SUCCESS_STATUS_CODE;
}

/**
* implemented by subclasses, if subclass implement {@link #isNeedRetry(Set, Object)}, no need to implement the get
* code method
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol.common.xds.retry;

import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.condition.ConnectErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetBeforeRequestErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ServerExceptionCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificHeaderNameErrorRetryCondition;

import java.util.Optional;

/**
* Retry Condition Manager
*
* @author zhp
* @since 2024-11-29
*/
public enum ExceptionRetryConditionType {
/**
* The type of conditional judgment for server errors
*/
SERVER_ERROR("5xx", new ServerExceptionCondition()),

/**
* The type of conditional judgment for reset errors
*/
RESET_ERROR("reset", new ResetErrorCondition()),

/**
* The type of conditional judgment for resetting errors before request
*/
RESET_BEFORE_REQUEST_ERROR("reset-before-request", new ResetBeforeRequestErrorCondition()),

/**
* The type of conditional judgment for connect errors
*/
CONNECT_ERROR("connect-failure", new ConnectErrorRetryCondition()),

/**
* The type of conditional judgment for Specify response headers
*/
SPECIFIC_HEADER_NAME_ERROR("retriable-headers", new SpecificHeaderNameErrorRetryCondition());

/**
* the name of retry condition
*/
private final String conditionName;

/**
* the instance of implements class for retry condition
*/
private final RetryCondition retryCondition;

ExceptionRetryConditionType(String conditionName, RetryCondition retryCondition) {
this.conditionName = conditionName;
this.retryCondition = retryCondition;
}

public String getConditionName() {
return conditionName;
}

public RetryCondition getRetryCondition() {
return retryCondition;
}

/**
* get the instance of implements class by condition name
*
* @param conditionName condition name
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionByName(String conditionName) {
for (ExceptionRetryConditionType retryConditionType : ExceptionRetryConditionType.values()) {
if (StringUtils.equals(retryConditionType.getConditionName(), conditionName)) {
return Optional.of(retryConditionType.getRetryCondition());
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.condition.ClientErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ConnectErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.GatewayErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetBeforeRequestErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ServerErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificHeaderNameErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificStatusCodeErrorRetryCondition;
Expand All @@ -34,7 +31,7 @@
* @author zhp
* @since 2024-11-29
*/
public enum RetryConditionType {
public enum ResultRetryConditionType {
/**
* The type of conditional judgment for server errors
*/
Expand All @@ -50,21 +47,6 @@ public enum RetryConditionType {
*/
GATEWAY_ERROR("gateway-error", new GatewayErrorCondition()),

/**
* The type of conditional judgment for reset errors
*/
RESET_ERROR("reset", new ResetErrorCondition()),

/**
* The type of conditional judgment for resetting errors before request
*/
RESET_BEFORE_REQUEST_ERROR("reset-before-request", new ResetBeforeRequestErrorCondition()),

/**
* The type of conditional judgment for connect errors
*/
CONNECT_ERROR("connect-failure", new ConnectErrorRetryCondition()),

/**
* The type of conditional judgment for Specify response code
*/
Expand All @@ -85,7 +67,7 @@ public enum RetryConditionType {
*/
private final RetryCondition retryCondition;

RetryConditionType(String conditionName, RetryCondition retryCondition) {
ResultRetryConditionType(String conditionName, RetryCondition retryCondition) {
this.conditionName = conditionName;
this.retryCondition = retryCondition;
}
Expand All @@ -105,7 +87,7 @@ public RetryCondition getRetryCondition() {
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionByName(String conditionName) {
for (RetryConditionType retryConditionType : RetryConditionType.values()) {
for (ResultRetryConditionType retryConditionType : ResultRetryConditionType.values()) {
if (StringUtils.equals(retryConditionType.getConditionName(), conditionName)) {
return Optional.of(retryConditionType.getRetryCondition());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.sermant.flowcontrol.common.util.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;

import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
Expand Down Expand Up @@ -51,8 +50,9 @@ public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object re
}

private boolean isConnectErrorException(Throwable ex) {
if (ex instanceof InterruptedIOException && StringUtils.contains(ex.getMessage(), "timeout")) {
return true;
if ((ex instanceof SocketTimeoutException || ex instanceof TimeoutException)
&& !StringUtils.isEmpty(ex.getMessage()) && ex.getMessage().contains("Read timed out")) {
return false;
}
return ex instanceof SocketTimeoutException || ex instanceof ConnectException || ex instanceof TimeoutException
|| ex instanceof NoRouteToHostException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import io.sermant.flowcontrol.common.handler.retry.Retry;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;

import java.net.SocketException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Locale;
import java.util.concurrent.TimeoutException;

/**
* Retry condition check, determine if the current error is a connect reset error, and trigger a retry if
Expand All @@ -34,18 +36,25 @@
public class ResetErrorCondition implements RetryCondition {
@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
if (ex == null) {
return false;
}
Throwable realException = ex;
if (ex instanceof InvokerWrapperException) {
InvokerWrapperException invokerWrapperException = (InvokerWrapperException) ex;
if (invokerWrapperException.getRealException() != null) {
realException = invokerWrapperException.getRealException();
}
}
if (realException instanceof SocketTimeoutException && !StringUtils.isEmpty(ex.getMessage())
&& realException.getMessage().contains("Read timed out")) {
String message = realException.getMessage();
if (StringUtils.isEmpty(message)) {
return false;
}
if ((ex instanceof SocketTimeoutException || ex instanceof TimeoutException)
&& message.contains("Read timed out")) {
return true;
}
return realException instanceof SocketException && !StringUtils.isEmpty(realException.getMessage())
&& (realException.getMessage().contains("reset") || ex.getMessage().contains("disconnection"));
return realException instanceof IOException
&& (message.contains("Connection reset") || message.toLowerCase(Locale.ROOT).contains("broken pipe"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ServerErrorCondition implements RetryCondition {
@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
if (StringUtils.isEmpty(statusCode)) {
return true;
return false;
}
int code = Integer.parseInt(statusCode);
return code >= MIN_5XX_FAILURE && code <= MAX_5XX_FAILURE;
Expand Down
Loading

0 comments on commit 1e080c7

Please sign in to comment.