Skip to content

Commit

Permalink
refactor: replace all the RetryProcess usages with RetryProcessor (#4817
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ndr-brt authored Feb 12, 2025
1 parent 12cdffc commit eee070b
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
package org.eclipse.edc.statemachine.retry;

import org.eclipse.edc.spi.retry.WaitStrategy;
import org.eclipse.edc.statemachine.retry.processor.RetryProcessor;

import java.util.function.Supplier;

/**
* Configure a {@link RetryProcess}
* Configure a {@link RetryProcessor}
*/
public record EntityRetryProcessConfiguration(int retryLimit, Supplier<WaitStrategy> delayStrategySupplier) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import java.util.function.Supplier;

/**
* Permits to instance {@link RetryProcess} implementations, that will permit a certain process acting on a {@link StatefulEntity}
* to be tried again in case of failure. Please look at {@link RetryProcess} for further details.
* Permit to instantiate a {@link RetryProcessor}. Please look at {@link RetryProcessor} for further details.
*/
public class EntityRetryProcessFactory {
private final Monitor monitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.StoreResult;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
Expand All @@ -34,7 +35,7 @@ public class FutureResultRetryProcess<E extends StatefulEntity<E>, I, O> impleme

private final String name;
private final BiFunction<E, I, CompletableFuture<StatusResult<O>>> function;
private Function<String, E> entityReload;
private Function<String, StoreResult<E>> entityReload;

public FutureResultRetryProcess(String name, BiFunction<E, I, CompletableFuture<StatusResult<O>>> function) {
this.name = name;
Expand All @@ -49,17 +50,13 @@ public CompletableFuture<ProcessContext<E, O>> execute(ProcessContext<E, I> cont
.thenCompose(asyncContext -> new ResultRetryProcess<E, I, O>(name, (e, c) -> asyncContext.content())
.execute(new ProcessContext<>(asyncContext.entity(), context.content())));
} catch (Throwable throwable) {
return failedFuture(new UnrecoverableEntityStateException(reloadEntity(context.entity()), name, throwable.getMessage()));
return failedFuture(new UnrecoverableEntityStateException(context.entity(), name, throwable.getMessage()));
}
}

public FutureResultRetryProcess<E, I, O> entityReload(Function<String, E> entityReload) {
public FutureResultRetryProcess<E, I, O> entityReload(Function<String, StoreResult<E>> entityReload) {
this.entityReload = entityReload;
return this;
}

private E reloadEntity(E entity) {
return entityReload == null ? entity : entityReload.apply(entity.getId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package org.eclipse.edc.statemachine.retry.processor;

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.result.StoreFailure;
import org.eclipse.edc.spi.result.StoreResult;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
Expand All @@ -33,7 +36,7 @@ public class FutureRetryProcess<E extends StatefulEntity<E>, I, O> implements Pr

private final String name;
private final BiFunction<E, I, CompletableFuture<O>> function;
private Function<String, E> entityReload;
private Function<String, StoreResult<E>> entityReload;

public FutureRetryProcess(String name, BiFunction<E, I, CompletableFuture<O>> function) {
this.name = name;
Expand All @@ -45,25 +48,28 @@ public CompletableFuture<ProcessContext<E, O>> execute(ProcessContext<E, I> cont
try {
return function.apply(context.entity(), context.content())
.handle((content, throwable) -> {
var reloadedEntity = reloadEntity(context.entity());
var reloadedEntity = entityReload == null
? context.entity()
: entityReload.apply(context.entity().getId()).orElseThrow(failedEntityReload(context));

if (throwable == null) {
return new ProcessContext<>(reloadedEntity, content);
} else {
throw new EntityStateException(reloadedEntity, name, throwable.getMessage());
}
});
} catch (Throwable throwable) {
return failedFuture(new UnrecoverableEntityStateException(reloadEntity(context.entity()), name, throwable.getMessage()));
return failedFuture(new UnrecoverableEntityStateException(context.entity(), name, throwable.getMessage()));
}
}

public FutureRetryProcess<E, I, O> entityReload(Function<String, E> entityReload) {
public FutureRetryProcess<E, I, O> entityReload(Function<String, StoreResult<E>> entityReload) {
this.entityReload = entityReload;
return this;
}

private E reloadEntity(E entity) {
return entityReload == null ? entity : entityReload.apply(entity.getId());
private @NotNull Function<StoreFailure, UnrecoverableEntityStateException> failedEntityReload(ProcessContext<E, I> context) {
return failure -> new UnrecoverableEntityStateException(context.entity(), name, "Cannot reload entity: " + failure.getFailureDetail());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static <E extends StatefulEntity<E>, I, O> Process<E, I, O> result(String name,
* @param function that executes the process.
* @return the process instance.
*/
static <E extends StatefulEntity<E>, I, O> Process<E, I, O> future(String name, BiFunction<E, I, CompletableFuture<O>> function) {
static <E extends StatefulEntity<E>, I, O> FutureRetryProcess<E, I, O> future(String name, BiFunction<E, I, CompletableFuture<O>> function) {
return new FutureRetryProcess<>(name, function);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.statemachine.retry.TestEntity;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -86,8 +87,8 @@ void shouldReloadEntity_whenConfigured() {
var entityId = UUID.randomUUID().toString();
var entity = TestEntity.Builder.newInstance().id(entityId).build();
var reloadedEntity = TestEntity.Builder.newInstance().id(entityId).build();
Function<String, TestEntity> entityReload = mock();
when(entityReload.apply(any())).thenReturn(reloadedEntity);
Function<String, StoreResult<TestEntity>> entityReload = mock();
when(entityReload.apply(any())).thenReturn(StoreResult.success(reloadedEntity));
var retryProcess = new FutureResultRetryProcess<TestEntity, Object, String>("process", (e, i) -> completedFuture(StatusResult.success("content")))
.entityReload(entityReload);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.edc.statemachine.retry.processor;

import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.statemachine.retry.TestEntity;
import org.junit.jupiter.api.Test;

Expand All @@ -25,6 +26,7 @@
import java.util.function.Function;

import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -38,7 +40,7 @@ class FutureRetryProcessTest {
@Test
void shouldReturnSuccess_whenFunctionSucceeds() {
var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).build();
var retryProcess = new FutureRetryProcess<TestEntity, Object, String>("process", (e, i) -> CompletableFuture.completedFuture("content"));
var retryProcess = new FutureRetryProcess<TestEntity, Object, String>("process", (e, i) -> completedFuture("content"));

var future = retryProcess.execute(new ProcessContext<>(entity, "any"));

Expand Down Expand Up @@ -82,14 +84,31 @@ void shouldReloadEntity_whenConfigured() {
var entityId = UUID.randomUUID().toString();
var entity = TestEntity.Builder.newInstance().id(entityId).build();
var reloadedEntity = TestEntity.Builder.newInstance().id(entityId).build();
Function<String, TestEntity> entityReload = mock();
when(entityReload.apply(any())).thenReturn(reloadedEntity);
var retryProcess = new FutureRetryProcess<TestEntity, Object, String>("process", (e, i) -> CompletableFuture.completedFuture("content"))
Function<String, StoreResult<TestEntity>> entityReload = mock();
when(entityReload.apply(any())).thenReturn(StoreResult.success(reloadedEntity));
var retryProcess = new FutureRetryProcess<TestEntity, Object, String>("process", (e, i) -> completedFuture("content"))
.entityReload(entityReload);

var future = retryProcess.execute(new ProcessContext<>(entity, "any"));

assertThat(future).succeedsWithin(timeout).extracting(ProcessContext::entity).isSameAs(reloadedEntity);
verify(entityReload).apply(entityId);
}

@Test
void shouldReturnUnrecoverable_whenEntityReloadFailed() {
var entityId = UUID.randomUUID().toString();
var entity = TestEntity.Builder.newInstance().id(entityId).build();
var retryProcess = new FutureRetryProcess<TestEntity, Object, String>("process", (e, i) -> completedFuture("content"))
.entityReload(id -> StoreResult.alreadyLeased("error"));

var future = retryProcess.execute(new ProcessContext<>(entity, "any"));

assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class)
.extracting(Throwable::getCause).isInstanceOfSatisfying(UnrecoverableEntityStateException.class, exception -> {
assertThat(exception.getEntity()).isSameAs(entity);
assertThat(exception.getProcessName()).isEqualTo("process");
assertThat(exception.getMessage()).isEqualTo("Cannot reload entity: error");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import org.eclipse.edc.statemachine.AbstractStateEntityManager;
import org.eclipse.edc.statemachine.Processor;
import org.eclipse.edc.statemachine.ProcessorImpl;
import org.eclipse.edc.statemachine.retry.AsyncStatusResultRetryProcess;
import org.eclipse.edc.statemachine.retry.processor.Process;
import org.eclipse.edc.statemachine.retry.processor.RetryProcessor;

import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -74,16 +75,15 @@ protected boolean processTerminating(ContractNegotiation negotiation) {
.rejectionReason(negotiation.getErrorDetail())
.policy(negotiation.getLastContractOffer().getPolicy());

return dispatch(messageBuilder, negotiation, Object.class)
return dispatch(messageBuilder, negotiation, Object.class, "[%s] send termination".formatted(type().name()))
.onSuccess((n, result) -> transitionToTerminated(n))
.onFailure((n, throwable) -> transitionToTerminating(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminated(n, format("Failed to send termination to counter party: %s", throwable.getMessage())))
.execute("[%s] send termination".formatted(type().name()));
.onFinalFailure((n, throwable) -> transitionToTerminated(n, format("Failed to send termination to counter party: %s", throwable.getMessage())))
.execute();
}

protected <T> AsyncStatusResultRetryProcess<ContractNegotiation, T, ?> dispatch(ProcessRemoteMessage.Builder<?, ?> messageBuilder,
ContractNegotiation negotiation, Class<T> responseType) {
protected <T> RetryProcessor<ContractNegotiation, T> dispatch(ProcessRemoteMessage.Builder<?, ?> messageBuilder,
ContractNegotiation negotiation, Class<T> responseType, String name) {
messageBuilder.counterPartyAddress(negotiation.getCounterPartyAddress())
.counterPartyId(negotiation.getCounterPartyId())
.protocol(negotiation.getProtocol())
Expand All @@ -103,7 +103,8 @@ protected boolean processTerminating(ContractNegotiation negotiation) {

negotiation.lastSentProtocolMessage(message.getId());

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(responseType, message));
return entityRetryProcessFactory.retryProcessor(negotiation)
.doProcess(Process.futureResult(name, (n, v) -> dispatcherRegistry.dispatch(responseType, message)));
}

protected void transitionToInitial(ContractNegotiation negotiation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ private boolean processRequesting(ContractNegotiation negotiation) {
.callbackAddress(callbackAddress.url())
.type(ContractRequestMessage.Type.INITIAL);

return dispatch(messageBuilder, negotiation, ContractNegotiationAck.class)
.onSuccessResult(this::transitionToRequested)
return dispatch(messageBuilder, negotiation, ContractNegotiationAck.class, "[Consumer] send request")
.onSuccess(this::transitionToRequested)
.onFailure((n, throwable) -> transitionToRequesting(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send request to provider: %s", throwable.getMessage())))
.execute("[Consumer] send request");
.onFinalFailure((n, throwable) -> transitionToTerminating(n, format("Failed to send request to provider: %s", throwable.getMessage())))
.execute();
} else {
transitionToTerminated(negotiation, "No callback address found for protocol: %s".formatted(negotiation.getProtocol()));
return true;
Expand All @@ -144,12 +143,11 @@ private boolean processRequesting(ContractNegotiation negotiation) {
private boolean processAccepting(ContractNegotiation negotiation) {
var messageBuilder = ContractNegotiationEventMessage.Builder.newInstance().type(ACCEPTED);
messageBuilder.policy(negotiation.getLastContractOffer().getPolicy());
return dispatch(messageBuilder, negotiation, Object.class)
return dispatch(messageBuilder, negotiation, Object.class, "[consumer] send acceptance")
.onSuccess((n, result) -> transitionToAccepted(n))
.onFailure((n, throwable) -> transitionToAccepting(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send acceptance to provider: %s", throwable.getMessage())))
.execute("[consumer] send acceptance");
.onFinalFailure((n, throwable) -> transitionToTerminating(n, format("Failed to send acceptance to provider: %s", throwable.getMessage())))
.execute();
}

/**
Expand All @@ -175,12 +173,11 @@ private boolean processVerifying(ContractNegotiation negotiation) {
var messageBuilder = ContractAgreementVerificationMessage.Builder.newInstance()
.policy(negotiation.getContractAgreement().getPolicy());

return dispatch(messageBuilder, negotiation, Object.class)
return dispatch(messageBuilder, negotiation, Object.class, "[consumer] send verification")
.onSuccess((n, result) -> transitionToVerified(n))
.onFailure((n, throwable) -> transitionToVerifying(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send verification to provider: %s", throwable.getMessage())))
.execute("[consumer] send verification");
.onFinalFailure((n, throwable) -> transitionToTerminating(n, format("Failed to send verification to provider: %s", throwable.getMessage())))
.execute();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,11 @@ private boolean processOffering(ContractNegotiation negotiation) {
.contractOffer(negotiation.getLastContractOffer())
.callbackAddress(callbackAddress.url());

return dispatch(messageBuilder, negotiation, ContractNegotiationAck.class)
.onSuccessResult(this::transitionToOffered)
return dispatch(messageBuilder, negotiation, ContractNegotiationAck.class, "[Provider] send offer")
.onSuccess(this::transitionToOffered)
.onFailure((n, throwable) -> transitionToOffering(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send offer to consumer: %s", throwable.getMessage())))
.execute("[Provider] send offer");
.onFinalFailure((n, throwable) -> transitionToTerminating(n, format("Failed to send offer to consumer: %s", throwable.getMessage())))
.execute();
} else {
transitionToTerminated(negotiation, "No callback address found for protocol: %s".formatted(negotiation.getProtocol()));
return true;
Expand Down Expand Up @@ -141,12 +140,11 @@ private boolean processAgreeing(ContractNegotiation negotiation) {

var messageBuilder = ContractAgreementMessage.Builder.newInstance().contractAgreement(agreement);

return dispatch(messageBuilder, negotiation, Object.class)
return dispatch(messageBuilder, negotiation, Object.class, "[Provider] send agreement")
.onSuccess((n, result) -> transitionToAgreed(n, agreement))
.onFailure((n, throwable) -> transitionToAgreeing(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send agreement to consumer: %s", throwable.getMessage())))
.execute("[Provider] send agreement");
.onFinalFailure((n, throwable) -> transitionToTerminating(n, format("Failed to send agreement to consumer: %s", throwable.getMessage())))
.execute();
}

/**
Expand All @@ -173,12 +171,11 @@ private boolean processFinalizing(ContractNegotiation negotiation) {
.type(ContractNegotiationEventMessage.Type.FINALIZED)
.policy(negotiation.getContractAgreement().getPolicy());

return dispatch(messageBuilder, negotiation, Object.class)
return dispatch(messageBuilder, negotiation, Object.class, "[Provider] send finalization")
.onSuccess((n, result) -> transitionToFinalized(n))
.onFailure((n, throwable) -> transitionToFinalizing(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send finalization to consumer: %s", throwable.getMessage())))
.execute("[Provider] send finalization");
.onFinalFailure((n, throwable) -> transitionToTerminating(n, format("Failed to send finalization to consumer: %s", throwable.getMessage())))
.execute();
}

public static class Builder extends AbstractContractNegotiationManager.Builder<ProviderContractNegotiationManagerImpl> {
Expand Down
Loading

0 comments on commit eee070b

Please sign in to comment.