Skip to content

Commit

Permalink
Move workflow update polling inside of interceptor (#2159)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jul 30, 2024
1 parent 6b39e44 commit bbf2de7
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 91 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/features.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ jobs:
with:
java-repo-path: ${{github.event.pull_request.head.repo.full_name}}
version: ${{github.event.pull_request.head.ref}}
version-is-repo-ref: true
version-is-repo-ref: true
features-repo-ref: java-update-iceptor-change
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.temporal.client.UpdateHandle;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase;
import io.temporal.opentracing.OpenTracingOptions;
Expand Down Expand Up @@ -119,7 +120,7 @@ public <R> QueryOutput<R> query(QueryInput<R> input) {
}

@Override
public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
public <R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input) {
Span workflowStartUpdateSpan =
contextAccessor.writeSpanContextToHeader(
() ->
Expand Down
51 changes: 15 additions & 36 deletions temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.temporal.common.interceptors.Header;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.client.LazyUpdateHandleImpl;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.serviceclient.StatusUtils;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -331,42 +332,20 @@ public <R> UpdateHandle<R> startUpdate(UpdateOptions<R> options, Object... args)
options.validate();
WorkflowExecution targetExecution = execution.get();
try {
WorkflowClientCallsInterceptor.StartUpdateOutput<R> result =
workflowClientInvoker.startUpdate(
new WorkflowClientCallsInterceptor.StartUpdateInput<>(
targetExecution,
options.getUpdateName(),
Header.empty(),
options.getUpdateId(),
args,
options.getResultClass(),
options.getResultType(),
options.getFirstExecutionRunId(),
WaitPolicy.newBuilder()
.setLifecycleStage(options.getWaitForStage().getProto())
.build()));

if (result.hasResult()) {
return new CompletedUpdateHandleImpl<>(
result.getReference().getUpdateId(),
result.getReference().getWorkflowExecution(),
result.getResult());
} else {
LazyUpdateHandleImpl<R> handle =
new LazyUpdateHandleImpl<>(
workflowClientInvoker,
workflowType.orElse(null),
options.getUpdateName(),
result.getReference().getUpdateId(),
result.getReference().getWorkflowExecution(),
options.getResultClass(),
options.getResultType());
if (options.getWaitForStage() == WorkflowUpdateStage.COMPLETED) {
// Don't return the handle until completed, since that's what's been asked for
handle.waitCompleted();
}
return handle;
}
return workflowClientInvoker.startUpdate(
new WorkflowClientCallsInterceptor.StartUpdateInput<>(
targetExecution,
workflowType,
options.getUpdateName(),
Header.empty(),
options.getUpdateId(),
args,
options.getResultClass(),
options.getResultType(),
options.getFirstExecutionRunId(),
WaitPolicy.newBuilder()
.setLifecycleStage(options.getWaitForStage().getProto())
.build()));
} catch (Exception e) {
Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.update.v1.UpdateRef;
import io.temporal.api.update.v1.WaitPolicy;
import io.temporal.client.UpdateHandle;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.Experimental;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -78,7 +78,7 @@ public interface WorkflowClientCallsInterceptor {
<R> QueryOutput<R> query(QueryInput<R> input);

@Experimental
<R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input);
<R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input);

@Experimental
<R> PollWorkflowUpdateOutput<R> pollWorkflowUpdate(PollWorkflowUpdateInput<R> input);
Expand Down Expand Up @@ -383,6 +383,7 @@ public WorkflowExecution getWorkflowExecution() {
@Experimental
final class StartUpdateInput<R> {
private final WorkflowExecution workflowExecution;
private final Optional<String> workflowType;
private final String updateName;
private final Header header;
private final Object[] arguments;
Expand All @@ -394,6 +395,7 @@ final class StartUpdateInput<R> {

public StartUpdateInput(
WorkflowExecution workflowExecution,
Optional<String> workflowType,
String updateName,
Header header,
String updateId,
Expand All @@ -403,6 +405,7 @@ public StartUpdateInput(
String firstExecutionRunId,
WaitPolicy waitPolicy) {
this.workflowExecution = workflowExecution;
this.workflowType = workflowType;
this.header = header;
this.updateId = updateId;
this.updateName = updateName;
Expand All @@ -417,6 +420,10 @@ public WorkflowExecution getWorkflowExecution() {
return workflowExecution;
}

public Optional<String> getWorkflowType() {
return workflowType;
}

public String getUpdateName() {
return updateName;
}
Expand Down Expand Up @@ -450,44 +457,6 @@ public WaitPolicy getWaitPolicy() {
}
}

@Experimental
final class UpdateOutput<R> {
private final R result;

public UpdateOutput(R result) {
this.result = result;
}

public R getResult() {
return result;
}
}

@Experimental
final class StartUpdateOutput<R> {
private final UpdateRef reference;
private final R result;
private final boolean hasResult;

public StartUpdateOutput(UpdateRef reference, boolean hasResult, R result) {
this.reference = reference;
this.result = result;
this.hasResult = hasResult;
}

public UpdateRef getReference() {
return reference;
}

public boolean hasResult() {
return hasResult;
}

public R getResult() {
return result;
}
}

@Experimental
final class PollWorkflowUpdateInput<R> {
private final WorkflowExecution workflowExecution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.common.interceptors;

import io.temporal.client.UpdateHandle;
import java.util.concurrent.TimeoutException;

/** Convenience base class for {@link WorkflowClientCallsInterceptor} implementations. */
Expand Down Expand Up @@ -62,7 +63,7 @@ public <R> QueryOutput<R> query(QueryInput<R> input) {
}

@Override
public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
public <R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input) {
return next.startUpdate(input);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
* limitations under the License.
*/

package io.temporal.client;
package io.temporal.internal.client;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.UpdateHandle;
import io.temporal.common.Experimental;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Experimental
final class CompletedUpdateHandleImpl<T> implements UpdateHandle<T> {
public final class CompletedUpdateHandleImpl<T> implements UpdateHandle<T> {

private final String id;
private final WorkflowExecution execution;
private final T result;

CompletedUpdateHandleImpl(String id, WorkflowExecution execution, T result) {
public CompletedUpdateHandleImpl(String id, WorkflowExecution execution, T result) {
this.id = id;
this.execution = execution;
this.result = result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
* limitations under the License.
*/

package io.temporal.client;
package io.temporal.internal.client;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.UpdateHandle;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowServiceException;
import io.temporal.common.Experimental;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.serviceclient.CheckedExceptionWrapper;
Expand All @@ -33,7 +36,7 @@
import java.util.concurrent.TimeoutException;

@Experimental
final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {
public final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {

private final WorkflowClientCallsInterceptor workflowClientInvoker;
private final String workflowType;
Expand All @@ -44,7 +47,7 @@ final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {
private final Type resultType;
private WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput<T> waitCompletedPollCall;

LazyUpdateHandleImpl(
public LazyUpdateHandleImpl(
WorkflowClientCallsInterceptor workflowClientInvoker,
String workflowType,
String updateName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.update.v1.*;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowUpdateException;
import io.temporal.client.*;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.internal.client.external.GenericWorkflowClient;
Expand Down Expand Up @@ -298,7 +297,7 @@ public <R> QueryOutput<R> query(QueryInput<R> input) {
}

@Override
public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
public <R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input) {
DataConverter dataConverterWithWorkflowContext =
clientOptions
.getDataConverter()
Expand Down Expand Up @@ -337,10 +336,11 @@ public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
// Re-attempt the update until it is at least accepted, or passes the lifecycle stage specified
// by the user.
UpdateWorkflowExecutionResponse result;
UpdateWorkflowExecutionLifecycleStage waitForStage = input.getWaitPolicy().getLifecycleStage();
do {
Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS);
result = genericClient.update(updateRequest, pollTimeoutDeadline);
} while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber()
} while (result.getStage().getNumber() < waitForStage.getNumber()
&& result.getStage().getNumber()
< UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
Expand All @@ -356,7 +356,10 @@ public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
input.getResultClass(),
input.getResultType(),
dataConverterWithWorkflowContext);
return new StartUpdateOutput<R>(result.getUpdateRef(), true, resultValue);
return new CompletedUpdateHandleImpl<>(
result.getUpdateRef().getUpdateId(),
result.getUpdateRef().getWorkflowExecution(),
resultValue);
case FAILURE:
throw new WorkflowUpdateException(
result.getUpdateRef().getWorkflowExecution(),
Expand All @@ -370,7 +373,20 @@ public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
+ result.getOutcome().getValueCase());
}
} else {
return new StartUpdateOutput<R>(result.getUpdateRef(), false, null);
LazyUpdateHandleImpl<R> handle =
new LazyUpdateHandleImpl<>(
this,
input.getWorkflowType().orElse(null),
input.getUpdateName(),
result.getUpdateRef().getUpdateId(),
result.getUpdateRef().getWorkflowExecution(),
input.getResultClass(),
input.getResultType());
if (waitForStage == WorkflowUpdateStage.COMPLETED.getProto()) {
// Don't return the handle until completed, since that's what's been asked for
handle.waitCompleted();
}
return handle;
}
}

Expand Down
Loading

0 comments on commit bbf2de7

Please sign in to comment.