Skip to content

Commit

Permalink
Revert old behavior with infinite retries by default
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Jan 27, 2025
1 parent 03291a9 commit 960c518
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 16 deletions.
20 changes: 18 additions & 2 deletions topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,30 @@

import org.slf4j.Logger;

import tech.ydb.common.retry.ExponentialBackoffRetry;
import tech.ydb.common.retry.RetryConfig;
import tech.ydb.common.retry.RetryPolicy;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;

/**
* @author Nikolay Perfilov
*/
public abstract class GrpcStreamRetrier {
public static final RetryConfig RETRY_ALL = new RetryConfig() {
@Override
public RetryPolicy isStatusRetryable(StatusCode code) {
return RETRY_ALL_POLICY;
}

@Override
public RetryPolicy isThrowableRetryable(Throwable th) {
return RETRY_ALL_POLICY;
}
};

private static final RetryPolicy RETRY_ALL_POLICY = new ExponentialBackoffRetry(256, 7);

private static final int ID_LENGTH = 6;
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
.toCharArray();
Expand Down Expand Up @@ -97,7 +113,7 @@ protected CompletableFuture<Void> shutdownImpl(String reason) {
protected void onSessionClosed(Status status, Throwable th) {
logger.info("[{}] onSessionClosed called", id);

RetryPolicy retryPolicy = null;
RetryPolicy retryPolicy;
if (th != null) {
logger.error("[{}] Exception in {} stream session: ", id, getStreamName(), th);
retryPolicy = retryConfig.isThrowableRetryable(th);
Expand All @@ -111,8 +127,8 @@ protected void onSessionClosed(Status status, Throwable th) {
}
} else {
logger.warn("[{}] Error in {} stream session: {}", id, getStreamName(), status);
retryPolicy = retryConfig.isStatusRetryable(status.getCode());
}
retryPolicy = retryConfig.isStatusRetryable(status.getCode());
}

if (isStopped.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableList;

import tech.ydb.common.retry.RetryConfig;
import tech.ydb.topic.impl.GrpcStreamRetrier;

/**
* @author Nikolay Perfilov
Expand Down Expand Up @@ -69,7 +70,7 @@ public static class Builder {
private boolean readWithoutConsumer = false;
private String readerName = null;
private List<TopicReadSettings> topics = new ArrayList<>();
private RetryConfig retryConfig = RetryConfig.idempotentRetryForever();
private RetryConfig retryConfig = GrpcStreamRetrier.RETRY_ALL;
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
private Executor decompressionExecutor = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import tech.ydb.common.retry.RetryConfig;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.impl.GrpcStreamRetrier;

/**
* @author Nikolay Perfilov
Expand Down Expand Up @@ -75,7 +76,7 @@ public static class Builder {
private String messageGroupId = null;
private Long partitionId = null;
private Codec codec = Codec.GZIP;
private RetryConfig retryConfig = RetryConfig.idempotentRetryForever();
private RetryConfig retryConfig = GrpcStreamRetrier.RETRY_ALL;
private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT;
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import org.junit.Assert;
import org.junit.Test;

import tech.ydb.common.retry.RetryConfig;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.topic.settings.RetryMode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.SyncWriter;
Expand All @@ -18,10 +19,10 @@
*
* @author Aleksandr Gorshenin
*/
public class RetryModeTest extends BaseMockedTest {
public class TopicRetriesTest extends BaseMockedTest {

@Test
public void alwaysRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
public void defaultRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
mockStreams()
.then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE))
.then(defaultStreamMockAnswer())
Expand All @@ -30,7 +31,6 @@ public void alwaysRetryWriterTest() throws InterruptedException, ExecutionExcept

SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder()
.setTopicPath("/mocked_topic")
.setRetryMode(RetryMode.ALWAYS)
.build());
writer.init();

Expand All @@ -47,7 +47,7 @@ public void alwaysRetryWriterTest() throws InterruptedException, ExecutionExcept
stream1.nextMsg().isWrite().hasWrite(2, 1);
stream1.responseWriteWritten(1, 1);

stream1.complete(Status.SUCCESS);
stream1.complete(Status.of(StatusCode.SUCCESS));

// Retry #2 - Stream is closed by server
getScheduler().hasTasks(1).executeNextTasks(1);
Expand Down Expand Up @@ -88,7 +88,7 @@ public void disabledRetryNetworkErrorTest() throws InterruptedException, Executi

WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath("/mocked_topic")
.setRetryMode(RetryMode.NONE)
.setRetryConfig(RetryConfig.noRetries())
.build();

SyncWriter writer = client.createSyncWriter(settings);
Expand All @@ -109,7 +109,7 @@ public void disabledRetryNetworkErrorTest() throws InterruptedException, Executi
public void disabledRetryStreamCloseTest() throws InterruptedException, ExecutionException, TimeoutException {
WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath("/mocked_topic")
.setRetryMode(RetryMode.NONE)
.setRetryConfig(RetryConfig.noRetries())
.build();

SyncWriter writer = client.createSyncWriter(settings);
Expand All @@ -134,7 +134,7 @@ public void disabledRetryStreamCloseTest() throws InterruptedException, Executio
public void disabledRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException {
WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath("/mocked_topic")
.setRetryMode(RetryMode.NONE)
.setRetryConfig(RetryConfig.noRetries())
.build();

SyncWriter writer = client.createSyncWriter(settings);
Expand Down Expand Up @@ -162,7 +162,7 @@ public void recoverRetryNetworkErrorTest() throws InterruptedException, Executio

WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath("/mocked_topic")
.setRetryMode(RetryMode.RECOVER)
.setRetryConfig(RetryConfig.noRetries())
.build();

SyncWriter writer = client.createSyncWriter(settings);
Expand All @@ -180,7 +180,7 @@ public void recoverRetryNetworkErrorTest() throws InterruptedException, Executio
}

@Test
public void recoverRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
public void idempotentRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
mockStreams()
.then(defaultStreamMockAnswer())
.then(errorStreamMockAnswer(StatusCode.OVERLOADED))
Expand All @@ -190,7 +190,7 @@ public void recoverRetryWriterTest() throws InterruptedException, ExecutionExcep

SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder()
.setTopicPath("/mocked_topic")
.setRetryMode(RetryMode.RECOVER)
.setRetryConfig(RetryConfig.idempotentRetryForever())
.build());
writer.init();

Expand All @@ -203,7 +203,9 @@ public void recoverRetryWriterTest() throws InterruptedException, ExecutionExcep
stream1.nextMsg().isWrite().hasWrite(2, 1);
stream1.responseWriteWritten(1, 1);

stream1.complete(new RuntimeException("io exception"));
stream1.complete(new RuntimeException("io exception",
new UnexpectedResultException("inner", Status.of(StatusCode.CLIENT_INTERNAL_ERROR)))
);

// Retry #1 - Stream is by runtime exception
getScheduler().hasTasks(1).executeNextTasks(1);
Expand Down

0 comments on commit 960c518

Please sign in to comment.