Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Required wait update stage and polling improvements #251

Merged
merged 3 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Temporalio.Client.Interceptors
/// <param name="FirstExecutionRunId">Workflow first execution run ID if any.</param>
/// <param name="Update">Update name.</param>
/// <param name="Args">Update arguments.</param>
/// <param name="Options">Options if any.</param>
/// <param name="Options">Options.</param>
/// <param name="Headers">Headers if any. These will be encoded using the codec before sent
/// to the server.</param>
/// <remarks>
Expand All @@ -24,6 +24,6 @@ public record StartWorkflowUpdateInput(
string? FirstExecutionRunId,
string Update,
IReadOnlyCollection<object?> Args,
WorkflowUpdateOptions? Options,
WorkflowUpdateStartOptions Options,
IDictionary<string, Payload>? Headers);
}
48 changes: 32 additions & 16 deletions src/Temporalio/Client/TemporalClient.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ await Client.Options.DataConverter.ToPayloadsAsync(
public async override Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsync<TResult>(
StartWorkflowUpdateInput input)
{
if (input.Options.WaitForStage == WorkflowUpdateStage.None)
{
throw new ArgumentException("WaitForStage is required to start workflow update");
}
else if (input.Options.WaitForStage == WorkflowUpdateStage.Admitted)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just allow the server to reject this call? That way you don't need to update the SDK if the server adds support .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing was mentioned at temporalio/sdk-python#521 (comment). I think we need to document what works and what doesn't here anyways, so that'll have to be updated anyways, and hopefully this becomes a default parameter in some durable-update future.

{
throw new ArgumentException(
"Admitted is not an allowed wait stage to start workflow update");
}
// Build request
var req = new UpdateWorkflowExecutionRequest()
{
Expand All @@ -242,23 +251,17 @@ public async override Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsy
{
Meta = new()
{
UpdateId = input.Options?.UpdateID ?? Guid.NewGuid().ToString(),
UpdateId = input.Options.Id ?? Guid.NewGuid().ToString(),
Identity = Client.Connection.Options.Identity,
},
Input = new() { Name = input.Update },
},
WaitPolicy = new()
{
// Default is Accepted, but may be overridden later
LifecycleStage = UpdateWorkflowExecutionLifecycleStage.Accepted,
LifecycleStage = (UpdateWorkflowExecutionLifecycleStage)input.Options.WaitForStage,
},
FirstExecutionRunId = input.FirstExecutionRunId ?? string.Empty,
};
if (input.Options is { } options &&
options.WaitForStage != UpdateWorkflowExecutionLifecycleStage.Unspecified)
{
req.WaitPolicy.LifecycleStage = options.WaitForStage;
}
if (input.Args.Count > 0)
{
req.Request.Input.Args = new Payloads();
Expand All @@ -280,15 +283,28 @@ await Client.Options.DataConverter.ToPayloadsAsync(
}
}
}
// Invoke
var resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync(
req, DefaultRetryOptions(input.Options?.Rpc)).ConfigureAwait(false);
// Build handle for result
return new(Client, req.Request.Meta.UpdateId, input.Id, input.RunId)

// Continually try to start until the user-asked stage is reached or the stage is
// accepted
UpdateWorkflowExecutionResponse resp;
do
{
// Put outcome on the handle (may be null)
KnownOutcome = resp.Outcome,
};
resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync(
req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false);
}
while (resp.Stage < req.WaitPolicy.LifecycleStage &&
resp.Stage < UpdateWorkflowExecutionLifecycleStage.Accepted);

// If the requested stage is completed, wait for result, but discard the update
// exception, that will come when _they_ call get result
var handle = new WorkflowUpdateHandle<TResult>(
Client, req.Request.Meta.UpdateId, input.Id, input.RunId)
{ KnownOutcome = resp.Outcome };
if (input.Options.WaitForStage == WorkflowUpdateStage.Completed)
{
await handle.PollUntilOutcomeAsync(input.Options.Rpc).ConfigureAwait(false);
}
return handle;
}

/// <inheritdoc />
Expand Down
52 changes: 26 additions & 26 deletions src/Temporalio/Client/WorkflowHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,11 @@ public Task<TQueryResult> QueryAsync<TQueryResult>(
/// </summary>
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle> StartUpdateAsync<TWorkflow>(
Expression<Func<TWorkflow, Task>> updateCall, WorkflowUpdateOptions? options = null)
Expression<Func<TWorkflow, Task>> updateCall, WorkflowUpdateStartOptions options)
{
var (method, args) = ExpressionUtil.ExtractCall(updateCall);
return StartUpdateAsync(
Expand All @@ -326,12 +326,12 @@ public Task<WorkflowUpdateHandle> StartUpdateAsync<TWorkflow>(
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <typeparam name="TUpdateResult">Update result type.</typeparam>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TWorkflow, TUpdateResult>(
Expression<Func<TWorkflow, Task<TUpdateResult>>> updateCall,
WorkflowUpdateOptions? options = null)
WorkflowUpdateStartOptions options)
{
var (method, args) = ExpressionUtil.ExtractCall(updateCall);
return StartUpdateAsync<TUpdateResult>(
Expand All @@ -345,11 +345,11 @@ public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TWorkflow, TUp
/// </summary>
/// <param name="update">Name of the update.</param>
/// <param name="args">Arguments for the update.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle> StartUpdateAsync(
string update, IReadOnlyCollection<object?> args, WorkflowUpdateOptions? options = null) =>
string update, IReadOnlyCollection<object?> args, WorkflowUpdateStartOptions options) =>
StartUpdateAsync<ValueTuple>(update, args, options).ContinueWith<WorkflowUpdateHandle>(
t => t.Result, TaskScheduler.Current);

Expand All @@ -359,11 +359,11 @@ public Task<WorkflowUpdateHandle> StartUpdateAsync(
/// <typeparam name="TUpdateResult">Update result type.</typeparam>
/// <param name="update">Name of the update.</param>
/// <param name="args">Arguments for the update.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TUpdateResult>(
string update, IReadOnlyCollection<object?> args, WorkflowUpdateOptions? options = null) =>
string update, IReadOnlyCollection<object?> args, WorkflowUpdateStartOptions options) =>
Client.OutboundInterceptor.StartWorkflowUpdateAsync<TUpdateResult>(new(
Id: Id,
RunId: RunId,
Expand All @@ -375,7 +375,7 @@ public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TUpdateResult>

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync{TWorkflow}(Expression{Func{TWorkflow, Task}}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync{TWorkflow}(Expression{Func{TWorkflow, Task}}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand All @@ -398,7 +398,7 @@ public async Task ExecuteUpdateAsync<TWorkflow>(

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync{TWorkflow, TUpdateResult}(Expression{Func{TWorkflow, Task{TUpdateResult}}}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync{TWorkflow, TUpdateResult}(Expression{Func{TWorkflow, Task{TUpdateResult}}}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle{TResult}.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand All @@ -421,7 +421,7 @@ public async Task<TUpdateResult> ExecuteUpdateAsync<TWorkflow, TUpdateResult>(

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync(string, IReadOnlyCollection{object?}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync(string, IReadOnlyCollection{object?}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand All @@ -444,7 +444,7 @@ public async Task ExecuteUpdateAsync(

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync{TUpdateResult}(string, IReadOnlyCollection{object?}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync{TUpdateResult}(string, IReadOnlyCollection{object?}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand Down Expand Up @@ -603,14 +603,14 @@ private async IAsyncEnumerable<HistoryEvent> FetchHistoryEventsInternalAsync(
/// </summary>
/// <param name="options">Options to use as base.</param>
/// <returns>New options.</returns>
private protected static WorkflowUpdateOptions UpdateOptionsWithDefaultsForExecute(
WorkflowUpdateOptions? options)
{
var newOptions = options == null ? new() : (WorkflowUpdateOptions)options.Clone();
// Force override the wait for stage to completed
newOptions.WaitForStage = UpdateWorkflowExecutionLifecycleStage.Completed;
return newOptions;
}
private protected static WorkflowUpdateStartOptions UpdateOptionsWithDefaultsForExecute(
WorkflowUpdateOptions? options) =>
(WorkflowUpdateStartOptions)new WorkflowUpdateStartOptions()
{
Id = options?.Id,
Rpc = options?.Rpc,
WaitForStage = WorkflowUpdateStage.Completed,
}.Clone();
}

/// <summary>
Expand Down Expand Up @@ -678,29 +678,29 @@ public Task<TQueryResult> QueryAsync<TQueryResult>(
/// Start a workflow update via a call to a WorkflowUpdate attributed method.
/// </summary>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle> StartUpdateAsync(
Expression<Func<TWorkflow, Task>> updateCall, WorkflowUpdateOptions? options = null) =>
Expression<Func<TWorkflow, Task>> updateCall, WorkflowUpdateStartOptions options) =>
StartUpdateAsync<TWorkflow>(updateCall, options);

/// <summary>
/// Start a workflow update via a call to a WorkflowUpdate attributed method.
/// </summary>
/// <typeparam name="TUpdateResult">Update result type.</typeparam>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TUpdateResult>(
Expression<Func<TWorkflow, Task<TUpdateResult>>> updateCall,
WorkflowUpdateOptions? options = null) =>
WorkflowUpdateStartOptions options) =>
StartUpdateAsync<TWorkflow, TUpdateResult>(updateCall, options);

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync(Expression{Func{TWorkflow, Task}}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync(Expression{Func{TWorkflow, Task}}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand All @@ -721,7 +721,7 @@ public async Task ExecuteUpdateAsync(

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync{TUpdateResult}(Expression{Func{TWorkflow, Task{TUpdateResult}}}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync{TUpdateResult}(Expression{Func{TWorkflow, Task{TUpdateResult}}}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle{TResult}.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand Down
Loading
Loading