Skip to content

Commit

Permalink
refactor: move data flow suspension call in transfer process manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Feb 11, 2025
1 parent fc34b5a commit 489cff2
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.eclipse.edc.connector.controlplane.services.transferprocess.TransferProcessProtocolServiceImpl;
import org.eclipse.edc.connector.controlplane.services.transferprocess.TransferProcessServiceImpl;
import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser;
import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable;
import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore;
Expand Down Expand Up @@ -184,9 +183,6 @@ public class ControlPlaneServicesExtension implements ServiceExtension {
@Inject
private ProtocolVersionRegistry protocolVersionRegistry;

@Inject
private DataFlowManager dataFlowManager;

@Inject
private TransferTypeParser transferTypeParser;

Expand Down Expand Up @@ -273,7 +269,7 @@ public TransferProcessService transferProcessService() {
public TransferProcessProtocolService transferProcessProtocolService() {
return new TransferProcessProtocolServiceImpl(transferProcessStore, transactionContext, contractNegotiationStore,
contractValidationService, protocolTokenValidator(), dataAddressValidator, transferProcessObservable, clock,
monitor, telemetry, dataFlowManager);
monitor, telemetry);
}

@Provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.edc.connector.controlplane.contract.spi.validation.ContractValidationService;
import org.eclipse.edc.connector.controlplane.services.spi.protocol.ProtocolTokenValidator;
import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessProtocolService;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable;
import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessStartedData;
import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore;
Expand Down Expand Up @@ -67,14 +66,13 @@ public class TransferProcessProtocolServiceImpl implements TransferProcessProtoc
private final Clock clock;
private final Monitor monitor;
private final Telemetry telemetry;
private final DataFlowManager dataFlowManager;

public TransferProcessProtocolServiceImpl(TransferProcessStore transferProcessStore,
TransactionContext transactionContext, ContractNegotiationStore negotiationStore,
ContractValidationService contractValidationService,
ProtocolTokenValidator protocolTokenValidator,
DataAddressValidatorRegistry dataAddressValidator, TransferProcessObservable observable,
Clock clock, Monitor monitor, Telemetry telemetry, DataFlowManager dataFlowManager) {
Clock clock, Monitor monitor, Telemetry telemetry) {
this.transferProcessStore = transferProcessStore;
this.transactionContext = transactionContext;
this.negotiationStore = negotiationStore;
Expand All @@ -85,7 +83,6 @@ public TransferProcessProtocolServiceImpl(TransferProcessStore transferProcessSt
this.clock = clock;
this.monitor = monitor;
this.telemetry = telemetry;
this.dataFlowManager = dataFlowManager;
}

@Override
Expand Down Expand Up @@ -212,18 +209,11 @@ private ServiceResult<TransferProcess> completedAction(TransferCompletionMessage

@NotNull
private ServiceResult<TransferProcess> suspendedAction(TransferSuspensionMessage message, TransferProcess transferProcess) {
if (transferProcess.getType() == PROVIDER) {
var suspension = dataFlowManager.suspend(transferProcess);
if (suspension.failed()) {
return ServiceResult.conflict("Cannot suspend transfer process %s: %s".formatted(transferProcess.getId(), suspension.getFailureDetail()));
}
}
if (transferProcess.canBeTerminated()) {
if (transferProcess.canBeSuspended()) {
var reason = message.getReason().stream().map(Object::toString).collect(joining(", "));
transferProcess.transitionSuspended(reason);
transferProcess.transitionSuspendingRequested(reason);
transferProcess.protocolMessageReceived(message.getId());
update(transferProcess);
observable.invokeForEach(l -> l.suspended(transferProcess));
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be suspended"));
Expand Down
Loading

0 comments on commit 489cff2

Please sign in to comment.