From bbf2de7a7e1f72a8f1d4a6008006813dd7e56f85 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 30 Jul 2024 09:53:48 -0700 Subject: [PATCH] Move workflow update polling inside of interceptor (#2159) --- .github/workflows/features.yml | 3 +- ...TracingWorkflowClientCallsInterceptor.java | 3 +- .../io/temporal/client/WorkflowStubImpl.java | 51 ++++++------------ .../WorkflowClientCallsInterceptor.java | 49 ++++------------- .../WorkflowClientCallsInterceptorBase.java | 3 +- .../client/CompletedUpdateHandleImpl.java | 7 +-- .../client/LazyUpdateHandleImpl.java | 9 ++-- .../client/RootWorkflowClientInvoker.java | 28 +++++++--- .../workflow/updateTest/UpdateTest.java | 53 +++++++++++++++++++ 9 files changed, 115 insertions(+), 91 deletions(-) rename temporal-sdk/src/main/java/io/temporal/{ => internal}/client/CompletedUpdateHandleImpl.java (86%) rename temporal-sdk/src/main/java/io/temporal/{ => internal}/client/LazyUpdateHandleImpl.java (94%) diff --git a/.github/workflows/features.yml b/.github/workflows/features.yml index 41891b1b6..9c0a1b8e7 100644 --- a/.github/workflows/features.yml +++ b/.github/workflows/features.yml @@ -7,4 +7,5 @@ jobs: with: java-repo-path: ${{github.event.pull_request.head.repo.full_name}} version: ${{github.event.pull_request.head.ref}} - version-is-repo-ref: true \ No newline at end of file + version-is-repo-ref: true + features-repo-ref: java-update-iceptor-change \ No newline at end of file diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java index 3ddd42d29..fecd48d32 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java @@ -23,6 +23,7 @@ import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.Tracer; +import io.temporal.client.UpdateHandle; import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase; import io.temporal.opentracing.OpenTracingOptions; @@ -119,7 +120,7 @@ public QueryOutput query(QueryInput input) { } @Override - public StartUpdateOutput startUpdate(StartUpdateInput input) { + public UpdateHandle startUpdate(StartUpdateInput input) { Span workflowStartUpdateSpan = contextAccessor.writeSpanContextToHeader( () -> diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 023283477..51fe9cc37 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -31,6 +31,7 @@ import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; import io.temporal.failure.CanceledFailure; +import io.temporal.internal.client.LazyUpdateHandleImpl; import io.temporal.serviceclient.CheckedExceptionWrapper; import io.temporal.serviceclient.StatusUtils; import java.lang.reflect.Type; @@ -331,42 +332,20 @@ public UpdateHandle startUpdate(UpdateOptions options, Object... args) options.validate(); WorkflowExecution targetExecution = execution.get(); try { - WorkflowClientCallsInterceptor.StartUpdateOutput result = - workflowClientInvoker.startUpdate( - new WorkflowClientCallsInterceptor.StartUpdateInput<>( - targetExecution, - options.getUpdateName(), - Header.empty(), - options.getUpdateId(), - args, - options.getResultClass(), - options.getResultType(), - options.getFirstExecutionRunId(), - WaitPolicy.newBuilder() - .setLifecycleStage(options.getWaitForStage().getProto()) - .build())); - - if (result.hasResult()) { - return new CompletedUpdateHandleImpl<>( - result.getReference().getUpdateId(), - result.getReference().getWorkflowExecution(), - result.getResult()); - } else { - LazyUpdateHandleImpl handle = - new LazyUpdateHandleImpl<>( - workflowClientInvoker, - workflowType.orElse(null), - options.getUpdateName(), - result.getReference().getUpdateId(), - result.getReference().getWorkflowExecution(), - options.getResultClass(), - options.getResultType()); - if (options.getWaitForStage() == WorkflowUpdateStage.COMPLETED) { - // Don't return the handle until completed, since that's what's been asked for - handle.waitCompleted(); - } - return handle; - } + return workflowClientInvoker.startUpdate( + new WorkflowClientCallsInterceptor.StartUpdateInput<>( + targetExecution, + workflowType, + options.getUpdateName(), + Header.empty(), + options.getUpdateId(), + args, + options.getResultClass(), + options.getResultType(), + options.getFirstExecutionRunId(), + WaitPolicy.newBuilder() + .setLifecycleStage(options.getWaitForStage().getProto()) + .build())); } catch (Exception e) { Throwable throwable = throwAsWorkflowFailureException(e, targetExecution); throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java index 66cd5a6bc..0d32e05dd 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java @@ -22,8 +22,8 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.WorkflowExecutionStatus; -import io.temporal.api.update.v1.UpdateRef; import io.temporal.api.update.v1.WaitPolicy; +import io.temporal.client.UpdateHandle; import io.temporal.client.WorkflowOptions; import io.temporal.common.Experimental; import java.lang.reflect.Type; @@ -78,7 +78,7 @@ public interface WorkflowClientCallsInterceptor { QueryOutput query(QueryInput input); @Experimental - StartUpdateOutput startUpdate(StartUpdateInput input); + UpdateHandle startUpdate(StartUpdateInput input); @Experimental PollWorkflowUpdateOutput pollWorkflowUpdate(PollWorkflowUpdateInput input); @@ -383,6 +383,7 @@ public WorkflowExecution getWorkflowExecution() { @Experimental final class StartUpdateInput { private final WorkflowExecution workflowExecution; + private final Optional workflowType; private final String updateName; private final Header header; private final Object[] arguments; @@ -394,6 +395,7 @@ final class StartUpdateInput { public StartUpdateInput( WorkflowExecution workflowExecution, + Optional workflowType, String updateName, Header header, String updateId, @@ -403,6 +405,7 @@ public StartUpdateInput( String firstExecutionRunId, WaitPolicy waitPolicy) { this.workflowExecution = workflowExecution; + this.workflowType = workflowType; this.header = header; this.updateId = updateId; this.updateName = updateName; @@ -417,6 +420,10 @@ public WorkflowExecution getWorkflowExecution() { return workflowExecution; } + public Optional getWorkflowType() { + return workflowType; + } + public String getUpdateName() { return updateName; } @@ -450,44 +457,6 @@ public WaitPolicy getWaitPolicy() { } } - @Experimental - final class UpdateOutput { - private final R result; - - public UpdateOutput(R result) { - this.result = result; - } - - public R getResult() { - return result; - } - } - - @Experimental - final class StartUpdateOutput { - private final UpdateRef reference; - private final R result; - private final boolean hasResult; - - public StartUpdateOutput(UpdateRef reference, boolean hasResult, R result) { - this.reference = reference; - this.result = result; - this.hasResult = hasResult; - } - - public UpdateRef getReference() { - return reference; - } - - public boolean hasResult() { - return hasResult; - } - - public R getResult() { - return result; - } - } - @Experimental final class PollWorkflowUpdateInput { private final WorkflowExecution workflowExecution; diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java index 511b4f251..26ce5ecaf 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java @@ -20,6 +20,7 @@ package io.temporal.common.interceptors; +import io.temporal.client.UpdateHandle; import java.util.concurrent.TimeoutException; /** Convenience base class for {@link WorkflowClientCallsInterceptor} implementations. */ @@ -62,7 +63,7 @@ public QueryOutput query(QueryInput input) { } @Override - public StartUpdateOutput startUpdate(StartUpdateInput input) { + public UpdateHandle startUpdate(StartUpdateInput input) { return next.startUpdate(input); } diff --git a/temporal-sdk/src/main/java/io/temporal/client/CompletedUpdateHandleImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/CompletedUpdateHandleImpl.java similarity index 86% rename from temporal-sdk/src/main/java/io/temporal/client/CompletedUpdateHandleImpl.java rename to temporal-sdk/src/main/java/io/temporal/internal/client/CompletedUpdateHandleImpl.java index 40566be0a..06fe6040b 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/CompletedUpdateHandleImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/CompletedUpdateHandleImpl.java @@ -18,21 +18,22 @@ * limitations under the License. */ -package io.temporal.client; +package io.temporal.internal.client; import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.UpdateHandle; import io.temporal.common.Experimental; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @Experimental -final class CompletedUpdateHandleImpl implements UpdateHandle { +public final class CompletedUpdateHandleImpl implements UpdateHandle { private final String id; private final WorkflowExecution execution; private final T result; - CompletedUpdateHandleImpl(String id, WorkflowExecution execution, T result) { + public CompletedUpdateHandleImpl(String id, WorkflowExecution execution, T result) { this.id = id; this.execution = execution; this.result = result; diff --git a/temporal-sdk/src/main/java/io/temporal/client/LazyUpdateHandleImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/LazyUpdateHandleImpl.java similarity index 94% rename from temporal-sdk/src/main/java/io/temporal/client/LazyUpdateHandleImpl.java rename to temporal-sdk/src/main/java/io/temporal/internal/client/LazyUpdateHandleImpl.java index 024774d13..e48c49808 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/LazyUpdateHandleImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/LazyUpdateHandleImpl.java @@ -18,11 +18,14 @@ * limitations under the License. */ -package io.temporal.client; +package io.temporal.internal.client; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.UpdateHandle; +import io.temporal.client.WorkflowException; +import io.temporal.client.WorkflowServiceException; import io.temporal.common.Experimental; import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; import io.temporal.serviceclient.CheckedExceptionWrapper; @@ -33,7 +36,7 @@ import java.util.concurrent.TimeoutException; @Experimental -final class LazyUpdateHandleImpl implements UpdateHandle { +public final class LazyUpdateHandleImpl implements UpdateHandle { private final WorkflowClientCallsInterceptor workflowClientInvoker; private final String workflowType; @@ -44,7 +47,7 @@ final class LazyUpdateHandleImpl implements UpdateHandle { private final Type resultType; private WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput waitCompletedPollCall; - LazyUpdateHandleImpl( + public LazyUpdateHandleImpl( WorkflowClientCallsInterceptor workflowClientInvoker, String workflowType, String updateName, diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 8a337f024..439304e3f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -31,8 +31,7 @@ import io.temporal.api.query.v1.WorkflowQuery; import io.temporal.api.update.v1.*; import io.temporal.api.workflowservice.v1.*; -import io.temporal.client.WorkflowClientOptions; -import io.temporal.client.WorkflowUpdateException; +import io.temporal.client.*; import io.temporal.common.converter.DataConverter; import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; import io.temporal.internal.client.external.GenericWorkflowClient; @@ -298,7 +297,7 @@ public QueryOutput query(QueryInput input) { } @Override - public StartUpdateOutput startUpdate(StartUpdateInput input) { + public UpdateHandle startUpdate(StartUpdateInput input) { DataConverter dataConverterWithWorkflowContext = clientOptions .getDataConverter() @@ -337,10 +336,11 @@ public StartUpdateOutput startUpdate(StartUpdateInput input) { // Re-attempt the update until it is at least accepted, or passes the lifecycle stage specified // by the user. UpdateWorkflowExecutionResponse result; + UpdateWorkflowExecutionLifecycleStage waitForStage = input.getWaitPolicy().getLifecycleStage(); do { Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS); result = genericClient.update(updateRequest, pollTimeoutDeadline); - } while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber() + } while (result.getStage().getNumber() < waitForStage.getNumber() && result.getStage().getNumber() < UpdateWorkflowExecutionLifecycleStage .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED @@ -356,7 +356,10 @@ public StartUpdateOutput startUpdate(StartUpdateInput input) { input.getResultClass(), input.getResultType(), dataConverterWithWorkflowContext); - return new StartUpdateOutput(result.getUpdateRef(), true, resultValue); + return new CompletedUpdateHandleImpl<>( + result.getUpdateRef().getUpdateId(), + result.getUpdateRef().getWorkflowExecution(), + resultValue); case FAILURE: throw new WorkflowUpdateException( result.getUpdateRef().getWorkflowExecution(), @@ -370,7 +373,20 @@ public StartUpdateOutput startUpdate(StartUpdateInput input) { + result.getOutcome().getValueCase()); } } else { - return new StartUpdateOutput(result.getUpdateRef(), false, null); + LazyUpdateHandleImpl handle = + new LazyUpdateHandleImpl<>( + this, + input.getWorkflowType().orElse(null), + input.getUpdateName(), + result.getUpdateRef().getUpdateId(), + result.getUpdateRef().getWorkflowExecution(), + input.getResultClass(), + input.getResultType()); + if (waitForStage == WorkflowUpdateStage.COMPLETED.getProto()) { + // Don't return the handle until completed, since that's what's been asked for + handle.waitCompleted(); + } + return handle; } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java index 9be543336..9b9470618 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java @@ -30,7 +30,11 @@ import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionRequest; import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionResponse; import io.temporal.client.*; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase; +import io.temporal.common.interceptors.WorkflowClientInterceptorBase; import io.temporal.failure.ApplicationFailure; +import io.temporal.internal.client.CompletedUpdateHandleImpl; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.worker.WorkerOptions; @@ -103,6 +107,55 @@ public void testUpdate() { assertEquals("Execute-Hello Update Execute-Hello Update 2", result); } + private static class FakesResultUpdateInterceptor extends WorkflowClientInterceptorBase { + @Override + public WorkflowClientCallsInterceptor workflowClientCallsInterceptor( + WorkflowClientCallsInterceptor next) { + return new WorkflowClientCallsInterceptorBase(next) { + @Override + public UpdateHandle startUpdate(StartUpdateInput input) { + super.startUpdate(input); + return new CompletedUpdateHandleImpl<>( + "someid", input.getWorkflowExecution(), (R) "fake"); + } + }; + } + } + + @Test + public void testUpdateIntercepted() { + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = + WorkflowClient.newInstance( + testWorkflowRule.getWorkflowServiceStubs(), + WorkflowClientOptions.newBuilder(testWorkflowRule.getWorkflowClient().getOptions()) + .setInterceptors(new FakesResultUpdateInterceptor()) + .validateAndBuildWithDefaults()); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .build(); + WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options); + // To execute workflow client.execute() would do. But we want to start workflow and immediately + // return. + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + SDKTestWorkflowRule.waitForOKQuery(workflow); + assertEquals("initial", workflow.getState()); + assertEquals(workflowId, execution.getWorkflowId()); + + assertEquals("fake", workflow.update(0, "Hello Update")); + assertEquals("fake", workflow.update(1, "Hello Update 2")); + workflow.complete(); + + String result = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(execution, Optional.empty()) + .getResult(String.class); + assertEquals("Execute-Hello Update Execute-Hello Update 2", result); + } + @Test public void testUpdateUntyped() throws ExecutionException, InterruptedException { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();