diff --git a/cmd/run.go b/cmd/run.go index 5e995d73..2772ab58 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -30,8 +30,9 @@ import ( ) const ( - summaryListenAddr = "127.0.0.1:0" - FeaturePassed = "PASSED" + proxyExecutableAuto = "auto" + freePortListenAddr = "127.0.0.1:0" + FeaturePassed = "PASSED" ) func runCmd() *cli.Command { @@ -66,6 +67,40 @@ type RunConfig struct { RetainTempDir bool SummaryURI string HTTPProxyURL string + ProxyControlURI string + ProxyListenHostPort string +} + +func (config RunConfig) appendFlags(out []string) ([]string, error) { + out = append(out, "--server="+config.Server) + out = append(out, "--namespace="+config.Namespace) + if config.ClientCertPath != "" { + clientCertPath, err := filepath.Abs(config.ClientCertPath) + if err != nil { + return nil, err + } + out = append(out, "--client-cert-path="+clientCertPath) + } + if config.ClientKeyPath != "" { + clientKeyPath, err := filepath.Abs(config.ClientKeyPath) + if err != nil { + return nil, err + } + out = append(out, "--client-key-path="+clientKeyPath) + } + if config.SummaryURI != "" { + out = append(out, "--summary-uri="+config.SummaryURI) + } + if config.HTTPProxyURL != "" { + out = append(out, "--http-proxy-url", config.HTTPProxyURL) + } + if config.ProxyControlURI != "" { + out = append(out, "--proxy-control-uri="+config.ProxyControlURI) + } + if config.ProxyListenHostPort != "" { + out = append(out, "--proxy-listen-host-port="+config.ProxyListenHostPort) + } + return out, nil } // dockerRunFlags are a subset of flags that apply when running in a docker container @@ -188,13 +223,17 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { } // Aa task queue to every feature run := &cmd.Run{Features: make([]cmd.RunFeature, len(features))} - var expectsProxy bool + var expectsHTTPProxy bool + var expectsGrpcProxy bool for i, feature := range features { run.Features[i].Dir = feature.Dir run.Features[i].TaskQueue = fmt.Sprintf("features-%v-%v", feature.Dir, uuid.NewString()) run.Features[i].Config = feature.Config if feature.Config.ExpectUnauthedProxyCount > 0 || feature.Config.ExpectAuthedProxyCount > 0 { - expectsProxy = true + expectsHTTPProxy = true + } + if feature.Config.GrpcProxy { + expectsGrpcProxy = true } } @@ -228,15 +267,32 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { } // If any feature requires an HTTP proxy, we must run it - var proxyServer *harness.HTTPConnectProxyServer - if expectsProxy { - proxyServer, err = harness.StartHTTPConnectProxyServer(harness.HTTPConnectProxyServerOptions{Log: r.log}) + var httpProxyServer *harness.HTTPConnectProxyServer + if expectsHTTPProxy { + httpProxyServer, err = harness.StartHTTPConnectProxyServer(harness.HTTPConnectProxyServerOptions{Log: r.log}) + if err != nil { + return fmt.Errorf("could not start http proxy server: %w", err) + } + r.config.HTTPProxyURL = "http://" + httpProxyServer.Address + r.log.Info("Started HTTP CONNECT proxy server", "address", httpProxyServer.Address) + defer httpProxyServer.Close() + } + + // if any feature requires a gRPC proxy, we must run it + if expectsGrpcProxy { + grpcProxyServer, err := harness.StartGRPCProxyServer(harness.GRPCProxyServerOptions{ + DialAddress: r.config.Server, + ClientCertPath: r.config.ClientCertPath, + ClientKeyPath: r.config.ClientKeyPath, + Log: r.log, + }) if err != nil { - return fmt.Errorf("could not start proxy server: %w", err) + return fmt.Errorf("could not start grpc proxy server: %w", err) } - r.config.HTTPProxyURL = "http://" + proxyServer.Address - r.log.Info("Started HTTP CONNECT proxy server", "address", proxyServer.Address) - defer proxyServer.Close() + r.config.ProxyListenHostPort = grpcProxyServer.ProxyAddress() + r.config.ProxyControlURI = "http://" + grpcProxyServer.ControlAddress() + r.log.Info("Started gRPC proxy server", "address", grpcProxyServer.ProxyAddress(), "control", grpcProxyServer.ControlAddress()) + defer grpcProxyServer.Close() } // Ensure any created temp dir is cleaned on ctrl-c or normal exit @@ -251,14 +307,14 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { defer r.destroyTempDir() } - l, err := net.Listen("tcp", summaryListenAddr) + summaryListener, err := net.Listen("tcp", freePortListenAddr) if err != nil { return err } - defer l.Close() + defer summaryListener.Close() summaryChan := make(chan Summary) - go r.summaryServer(l, summaryChan) - r.config.SummaryURI = "tcp://" + l.Addr().String() + go r.summaryServer(summaryListener, summaryChan) + r.config.SummaryURI = "tcp://" + summaryListener.Addr().String() err = nil switch r.config.Lang { @@ -273,12 +329,14 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { } } else { err = cmd.NewRunner(cmd.RunConfig{ - Server: r.config.Server, - Namespace: r.config.Namespace, - ClientCertPath: r.config.ClientCertPath, - ClientKeyPath: r.config.ClientKeyPath, - SummaryURI: r.config.SummaryURI, - HTTPProxyURL: r.config.HTTPProxyURL, + Server: r.config.Server, + Namespace: r.config.Namespace, + ClientCertPath: r.config.ClientCertPath, + ClientKeyPath: r.config.ClientKeyPath, + SummaryURI: r.config.SummaryURI, + HTTPProxyURL: r.config.HTTPProxyURL, + ProxyControlURI: r.config.ProxyControlURI, + ProxyListenHostPort: r.config.ProxyListenHostPort, }).Run(ctx, run) } case "java": @@ -315,7 +373,7 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { if err != nil { return err } - l.Close() + summaryListener.Close() summary, ok := <-summaryChan if !ok { r.log.Debug("did not receive a test run summary - adopting legacy behavior of assuming no tests were skipped") @@ -327,7 +385,7 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { // For features that expected proxy connections, count how many expected // ignoring skips and compare count with actual. If any failed we don't need // even do the comparison. - if proxyServer != nil { + if httpProxyServer != nil { var anyFailed bool var expectUnauthedProxyCount, expectAuthedProxyCount int for _, summ := range summary { @@ -345,21 +403,26 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { } } if !anyFailed { - if proxyServer.UnauthedConnectionsTunneled.Load() != uint32(expectUnauthedProxyCount) { + if httpProxyServer.UnauthedConnectionsTunneled.Load() != uint32(expectUnauthedProxyCount) { return fmt.Errorf("expected %v unauthed HTTP proxy connections, got %v", - expectUnauthedProxyCount, proxyServer.UnauthedConnectionsTunneled.Load()) - } else if proxyServer.AuthedConnectionsTunneled.Load() != uint32(expectAuthedProxyCount) { + expectUnauthedProxyCount, httpProxyServer.UnauthedConnectionsTunneled.Load()) + } else if httpProxyServer.AuthedConnectionsTunneled.Load() != uint32(expectAuthedProxyCount) { return fmt.Errorf("expected %v authed HTTP proxy connections, got %v", - expectAuthedProxyCount, proxyServer.AuthedConnectionsTunneled.Load()) + expectAuthedProxyCount, httpProxyServer.AuthedConnectionsTunneled.Load()) } else { r.log.Debug("Matched expected HTTP proxy connections", - "expectUnauthed", expectUnauthedProxyCount, "actualUnauthed", proxyServer.UnauthedConnectionsTunneled.Load(), - "expectAuthed", expectAuthedProxyCount, "actualAuthed", proxyServer.AuthedConnectionsTunneled.Load()) + "expectUnauthed", expectUnauthedProxyCount, "actualUnauthed", httpProxyServer.UnauthedConnectionsTunneled.Load(), + "expectAuthed", expectAuthedProxyCount, "actualAuthed", httpProxyServer.AuthedConnectionsTunneled.Load()) } } } - return r.handleHistory(ctx, run, summary) + err = r.handleHistory(ctx, run, summary) + if err != nil { + return err + } + + return nil } func (r *Runner) handleHistory(ctx context.Context, run *cmd.Run, summary Summary) error { diff --git a/cmd/run_dotnet.go b/cmd/run_dotnet.go index 05df1e40..c5176ad7 100644 --- a/cmd/run_dotnet.go +++ b/cmd/run_dotnet.go @@ -64,11 +64,15 @@ func (r *Runner) RunDotNetExternal(ctx context.Context, run *cmd.Run) error { } } - args := []string{"--server", r.config.Server, "--namespace", r.config.Namespace} - if r.config.ClientCertPath != "" { - args = append(args, "--client-cert-path", r.config.ClientCertPath, "--client-key-path", r.config.ClientKeyPath) + // Build args + args := make([]string, 0, 64) + args, err := r.config.appendFlags(args) + if err != nil { + return err } args = append(args, run.ToArgs()...) + + // Run cmd, err := r.program.NewCommand(ctx, args...) if err == nil { r.log.Debug("Running Go separately", "Args", cmd.Args) diff --git a/cmd/run_go.go b/cmd/run_go.go index 6af576c7..86cc7c75 100644 --- a/cmd/run_go.go +++ b/cmd/run_go.go @@ -68,6 +68,12 @@ func (r *Runner) RunGoExternal(ctx context.Context, run *cmd.Run) error { if r.config.HTTPProxyURL != "" { args = append(args, "--http-proxy-url", r.config.HTTPProxyURL) } + if r.config.ProxyControlURI != "" { + args = append(args, "--proxy-control-uri", r.config.ProxyControlURI) + } + if r.config.ProxyListenHostPort != "" { + args = append(args, "--proxy-listen-host-port", r.config.ProxyListenHostPort) + } args = append(args, run.ToArgs()...) cmd, err := r.program.NewCommand(ctx, args...) if err == nil { diff --git a/cmd/run_java.go b/cmd/run_java.go index 27c62c2b..b6eeea1b 100644 --- a/cmd/run_java.go +++ b/cmd/run_java.go @@ -3,7 +3,6 @@ package cmd import ( "context" "fmt" - "path/filepath" "github.com/temporalio/features/harness/go/cmd" "github.com/temporalio/features/sdkbuild" @@ -40,26 +39,10 @@ func (r *Runner) RunJavaExternal(ctx context.Context, run *cmd.Run) error { } // Build args - args := []string{"--server", r.config.Server, "--namespace", r.config.Namespace} - if r.config.ClientCertPath != "" { - clientCertPath, err := filepath.Abs(r.config.ClientCertPath) - if err != nil { - return err - } - args = append(args, "--client-cert-path", clientCertPath) - } - if r.config.ClientKeyPath != "" { - clientKeyPath, err := filepath.Abs(r.config.ClientKeyPath) - if err != nil { - return err - } - args = append(args, "--client-key-path", clientKeyPath) - } - if r.config.SummaryURI != "" { - args = append(args, "--summary-uri", r.config.SummaryURI) - } - if r.config.HTTPProxyURL != "" { - args = append(args, "--http-proxy-url", r.config.HTTPProxyURL) + args := make([]string, 0, 64) + args, err := r.config.appendFlags(args) + if err != nil { + return err } args = append(args, run.ToArgs()...) diff --git a/cmd/run_python.go b/cmd/run_python.go index 52d840a5..81707c0a 100644 --- a/cmd/run_python.go +++ b/cmd/run_python.go @@ -60,23 +60,11 @@ func (r *Runner) RunPythonExternal(ctx context.Context, run *cmd.Run) error { } // Build args - args := []string{"harness.python.main", "--server", r.config.Server, "--namespace", r.config.Namespace} - if r.config.ClientCertPath != "" { - clientCertPath, err := filepath.Abs(r.config.ClientCertPath) - if err != nil { - return err - } - args = append(args, "--client-cert-path", clientCertPath) - } - if r.config.ClientKeyPath != "" { - clientKeyPath, err := filepath.Abs(r.config.ClientKeyPath) - if err != nil { - return err - } - args = append(args, "--client-key-path", clientKeyPath) - } - if r.config.HTTPProxyURL != "" { - args = append(args, "--http-proxy-url", r.config.HTTPProxyURL) + args := make([]string, 0, 64) + args = append(args, "harness.python.main") + args, err := r.config.appendFlags(args) + if err != nil { + return err } args = append(args, run.ToArgs()...) diff --git a/cmd/run_typescript.go b/cmd/run_typescript.go index ae2aa905..228b77fc 100644 --- a/cmd/run_typescript.go +++ b/cmd/run_typescript.go @@ -58,26 +58,11 @@ func (r *Runner) RunTypeScriptExternal(ctx context.Context, run *cmd.Run) error } // Build args - args := []string{ - "./tslib/harness/ts/main.js", - "--server", - r.config.Server, - "--namespace", - r.config.Namespace, - } - if r.config.ClientCertPath != "" { - clientCertPath, err := filepath.Abs(r.config.ClientCertPath) - if err != nil { - return err - } - args = append(args, "--client-cert-path", clientCertPath) - } - if r.config.ClientKeyPath != "" { - clientKeyPath, err := filepath.Abs(r.config.ClientKeyPath) - if err != nil { - return err - } - args = append(args, "--client-key-path", clientKeyPath) + args := make([]string, 0, 64) + args = append(args, "./tslib/harness/ts/main.js") + args, err := r.config.appendFlags(args) + if err != nil { + return err } if r.config.HTTPProxyURL != "" { args = append(args, "--http-proxy-url", r.config.HTTPProxyURL) diff --git a/features/features.go b/features/features.go index 82f2862f..f944aea1 100644 --- a/features/features.go +++ b/features/features.go @@ -26,6 +26,9 @@ import ( data_converter_json_protobuf "github.com/temporalio/features/features/data_converter/json_protobuf" eager_activity_non_remote_activities_worker "github.com/temporalio/features/features/eager_activity/non_remote_activities_worker" eager_workflow_successful_start "github.com/temporalio/features/features/eager_workflow/successful_start" + grpc_retry_server_frozen_for_initiator "github.com/temporalio/features/features/grpc_retry/server_frozen_for_initiator" + grpc_retry_server_restarted_for_initiator "github.com/temporalio/features/features/grpc_retry/server_restarted_for_initiator" + grpc_retry_server_unavailable_for_initiator "github.com/temporalio/features/features/grpc_retry/server_unavailable_for_initiator" query_successful_query "github.com/temporalio/features/features/query/successful_query" query_timeout_due_to_no_active_workers "github.com/temporalio/features/features/query/timeout_due_to_no_active_workers" query_unexpected_arguments "github.com/temporalio/features/features/query/unexpected_arguments" @@ -71,15 +74,18 @@ func init() { client_http_proxy.Feature, client_http_proxy_auth.Feature, continue_as_new_continue_as_same.Feature, - data_converter_binary_protobuf.Feature, data_converter_binary.Feature, + data_converter_binary_protobuf.Feature, data_converter_codec.Feature, data_converter_empty.Feature, data_converter_failure.Feature, - data_converter_json_protobuf.Feature, data_converter_json.Feature, + data_converter_json_protobuf.Feature, eager_activity_non_remote_activities_worker.Feature, eager_workflow_successful_start.Feature, + grpc_retry_server_frozen_for_initiator.Feature, + grpc_retry_server_restarted_for_initiator.Feature, + grpc_retry_server_unavailable_for_initiator.Feature, query_successful_query.Feature, query_timeout_due_to_no_active_workers.Feature, query_unexpected_arguments.Feature, @@ -96,8 +102,8 @@ func init() { update_activities.Feature, update_async_accepted.Feature, update_basic.Feature, - update_deduplication.Feature, update_client_interceptor.Feature, + update_deduplication.Feature, update_non_durable_reject.Feature, update_self.Feature, update_task_failure.Feature, diff --git a/features/grpc_retry/server_frozen_for_initiator/.config.json b/features/grpc_retry/server_frozen_for_initiator/.config.json new file mode 100644 index 00000000..976135b4 --- /dev/null +++ b/features/grpc_retry/server_frozen_for_initiator/.config.json @@ -0,0 +1,3 @@ +{ + "grpcProxy": true +} \ No newline at end of file diff --git a/features/grpc_retry/server_frozen_for_initiator/README.md b/features/grpc_retry/server_frozen_for_initiator/README.md new file mode 100644 index 00000000..84c4d199 --- /dev/null +++ b/features/grpc_retry/server_frozen_for_initiator/README.md @@ -0,0 +1,3 @@ +# Server frozen for initiator + +A connected client will retry requests while the server is temporarily not responding. \ No newline at end of file diff --git a/features/grpc_retry/server_frozen_for_initiator/feature.go b/features/grpc_retry/server_frozen_for_initiator/feature.go new file mode 100644 index 00000000..db1583d8 --- /dev/null +++ b/features/grpc_retry/server_frozen_for_initiator/feature.go @@ -0,0 +1,39 @@ +package server_frozen_for_initiator + +import ( + "context" + "sync" + "time" + + "github.com/temporalio/features/harness/go/harness" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" +) + +var Feature = harness.Feature{ + Workflows: Workflow, + ClientUsesProxy: true, + Execute: func(ctx context.Context, runner *harness.Runner) (client.WorkflowRun, error) { + var wg sync.WaitGroup + defer wg.Wait() + if err := runner.ProxyFreezeAndThaw(ctx, &wg, 5*time.Second); err != nil { + return nil, err + } + + opts := client.StartWorkflowOptions{ + TaskQueue: runner.TaskQueue, + WorkflowExecutionTimeout: 1 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Millisecond, + MaximumInterval: 100 * time.Millisecond, + BackoffCoefficient: 2.0, + }, + } + return runner.Client.ExecuteWorkflow(ctx, opts, Workflow) + }, +} + +func Workflow(ctx workflow.Context) (string, error) { + return "OK", nil +} diff --git a/features/grpc_retry/server_frozen_for_initiator/feature.java b/features/grpc_retry/server_frozen_for_initiator/feature.java new file mode 100644 index 00000000..26259fd3 --- /dev/null +++ b/features/grpc_retry/server_frozen_for_initiator/feature.java @@ -0,0 +1,36 @@ +package grpc_retry.server_frozen_for_initiator; + +import io.temporal.client.WorkflowOptions; +import io.temporal.common.RetryOptions; +import io.temporal.sdkfeatures.Feature; +import io.temporal.sdkfeatures.Run; +import io.temporal.sdkfeatures.Runner; +import io.temporal.sdkfeatures.SimpleWorkflow; +import java.time.Duration; + +public interface feature extends Feature, SimpleWorkflow { + class Impl implements feature { + @Override + public Run execute(Runner runner) throws Exception { + return runner.proxyFreezeAndThaw(Duration.ofSeconds(5), () -> feature.super.execute(runner)); + } + + @Override + public void workflowOptions(WorkflowOptions.Builder builder) { + builder.setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(1)) + .setMaximumInterval(Duration.ofMillis(100)) + .setBackoffCoefficient(2.0) + .validateBuildWithDefaults()); + } + + @Override + public void workflow() {} + + @Override + public boolean initiatorUsesProxy() { + return true; + } + } +} diff --git a/features/grpc_retry/server_restarted_for_initiator/.config.json b/features/grpc_retry/server_restarted_for_initiator/.config.json new file mode 100644 index 00000000..976135b4 --- /dev/null +++ b/features/grpc_retry/server_restarted_for_initiator/.config.json @@ -0,0 +1,3 @@ +{ + "grpcProxy": true +} \ No newline at end of file diff --git a/features/grpc_retry/server_restarted_for_initiator/README.md b/features/grpc_retry/server_restarted_for_initiator/README.md new file mode 100644 index 00000000..e13af440 --- /dev/null +++ b/features/grpc_retry/server_restarted_for_initiator/README.md @@ -0,0 +1,3 @@ +# Server restarted for initiator + +A connected client request succeed after the server has gone down and come back up. \ No newline at end of file diff --git a/features/grpc_retry/server_restarted_for_initiator/feature.go b/features/grpc_retry/server_restarted_for_initiator/feature.go new file mode 100644 index 00000000..4b17349e --- /dev/null +++ b/features/grpc_retry/server_restarted_for_initiator/feature.go @@ -0,0 +1,36 @@ +package server_restarted_for_initiator + +import ( + "context" + "time" + + "github.com/temporalio/features/harness/go/harness" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" +) + +var Feature = harness.Feature{ + Workflows: Workflow, + ClientUsesProxy: true, + Execute: func(ctx context.Context, runner *harness.Runner) (client.WorkflowRun, error) { + if err := runner.ProxyRestart(ctx, 5*time.Second, true); err != nil { + return nil, err + } + + opts := client.StartWorkflowOptions{ + TaskQueue: runner.TaskQueue, + WorkflowExecutionTimeout: 1 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Millisecond, + MaximumInterval: 100 * time.Millisecond, + BackoffCoefficient: 2.0, + }, + } + return runner.Client.ExecuteWorkflow(ctx, opts, Workflow) + }, +} + +func Workflow(ctx workflow.Context) (string, error) { + return "OK", nil +} diff --git a/features/grpc_retry/server_restarted_for_initiator/feature.java b/features/grpc_retry/server_restarted_for_initiator/feature.java new file mode 100644 index 00000000..c0da3d49 --- /dev/null +++ b/features/grpc_retry/server_restarted_for_initiator/feature.java @@ -0,0 +1,39 @@ +package grpc_retry.server_restarted_for_initiator; + +import io.temporal.activity.ActivityInterface; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.RetryOptions; +import io.temporal.sdkfeatures.Feature; +import io.temporal.sdkfeatures.Run; +import io.temporal.sdkfeatures.Runner; +import io.temporal.sdkfeatures.SimpleWorkflow; +import java.time.Duration; + +@ActivityInterface +public interface feature extends Feature, SimpleWorkflow { + class Impl implements feature { + @Override + public Run execute(Runner runner) throws Exception { + runner.proxyRestart(Duration.ofSeconds(5), true); + return feature.super.execute(runner); + } + + @Override + public void workflowOptions(WorkflowOptions.Builder builder) { + builder.setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(1)) + .setMaximumInterval(Duration.ofMillis(100)) + .setBackoffCoefficient(2.0) + .validateBuildWithDefaults()); + } + + @Override + public void workflow() {} + + @Override + public boolean initiatorUsesProxy() { + return true; + } + } +} diff --git a/features/grpc_retry/server_unavailable_for_initiator/.config.json b/features/grpc_retry/server_unavailable_for_initiator/.config.json new file mode 100644 index 00000000..976135b4 --- /dev/null +++ b/features/grpc_retry/server_unavailable_for_initiator/.config.json @@ -0,0 +1,3 @@ +{ + "grpcProxy": true +} \ No newline at end of file diff --git a/features/grpc_retry/server_unavailable_for_initiator/README.md b/features/grpc_retry/server_unavailable_for_initiator/README.md new file mode 100644 index 00000000..7964880d --- /dev/null +++ b/features/grpc_retry/server_unavailable_for_initiator/README.md @@ -0,0 +1,3 @@ +# Server unavailable for initiator + +A connected client will retry requests while the server is returning gRPC status code UNAVAILABLE. \ No newline at end of file diff --git a/features/grpc_retry/server_unavailable_for_initiator/feature.go b/features/grpc_retry/server_unavailable_for_initiator/feature.go new file mode 100644 index 00000000..332a66a2 --- /dev/null +++ b/features/grpc_retry/server_unavailable_for_initiator/feature.go @@ -0,0 +1,39 @@ +package server_unavailable_for_initiator + +import ( + "context" + "sync" + "time" + + "github.com/temporalio/features/harness/go/harness" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" +) + +var Feature = harness.Feature{ + Workflows: Workflow, + ClientUsesProxy: true, + Execute: func(ctx context.Context, runner *harness.Runner) (client.WorkflowRun, error) { + var wg sync.WaitGroup + defer wg.Wait() + if err := runner.ProxyRejectAndAccept(ctx, &wg, 5*time.Second); err != nil { + return nil, err + } + + opts := client.StartWorkflowOptions{ + TaskQueue: runner.TaskQueue, + WorkflowExecutionTimeout: 1 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Millisecond, + MaximumInterval: 100 * time.Millisecond, + BackoffCoefficient: 2.0, + }, + } + return runner.Client.ExecuteWorkflow(ctx, opts, Workflow) + }, +} + +func Workflow(ctx workflow.Context) (string, error) { + return "OK", nil +} diff --git a/features/grpc_retry/server_unavailable_for_initiator/feature.java b/features/grpc_retry/server_unavailable_for_initiator/feature.java new file mode 100644 index 00000000..d5d1b759 --- /dev/null +++ b/features/grpc_retry/server_unavailable_for_initiator/feature.java @@ -0,0 +1,39 @@ +package grpc_retry.server_unavailable_for_initiator; + +import io.temporal.activity.ActivityInterface; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.RetryOptions; +import io.temporal.sdkfeatures.Feature; +import io.temporal.sdkfeatures.Run; +import io.temporal.sdkfeatures.Runner; +import io.temporal.sdkfeatures.SimpleWorkflow; +import java.time.Duration; + +@ActivityInterface +public interface feature extends Feature, SimpleWorkflow { + class Impl implements feature { + @Override + public Run execute(Runner runner) throws Exception { + return runner.proxyRejectAndAccept( + Duration.ofSeconds(5), () -> feature.super.execute(runner)); + } + + @Override + public void workflowOptions(WorkflowOptions.Builder builder) { + builder.setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(1)) + .setMaximumInterval(Duration.ofMillis(100)) + .setBackoffCoefficient(2.0) + .validateBuildWithDefaults()); + } + + @Override + public void workflow() {} + + @Override + public boolean initiatorUsesProxy() { + return true; + } + } +} diff --git a/harness/dotnet/Temporalio.Features.Harness/App.cs b/harness/dotnet/Temporalio.Features.Harness/App.cs index 0a1ad1bb..099420e1 100644 --- a/harness/dotnet/Temporalio.Features.Harness/App.cs +++ b/harness/dotnet/Temporalio.Features.Harness/App.cs @@ -27,6 +27,10 @@ public static class App name: "--client-key-path", description: "Path to a client key for TLS"); + private static readonly Option summaryUriOption = new( + name: "--summary-uri", + description: "Where to stream the test summary JSONL (not implemented)"); + private static readonly Argument> featuresArgument = new( name: "features", parse: result => result.Tokens.Select(token => @@ -121,4 +125,4 @@ private static async Task RunCommandAsync(InvocationContext ctx) logger.LogInformation("All features passed"); } -} \ No newline at end of file +} diff --git a/harness/go/cmd/run.go b/harness/go/cmd/run.go index 4c101074..e37fddd4 100644 --- a/harness/go/cmd/run.go +++ b/harness/go/cmd/run.go @@ -79,6 +79,7 @@ type RunFeatureConfig struct { Go RunFeatureConfigGo `json:"go"` ExpectUnauthedProxyCount int `json:"expectUnauthedProxyCount"` ExpectAuthedProxyCount int `json:"expectAuthedProxyCount"` + GrpcProxy bool `json:"grpcProxy"` } // RunFeatureConfigGo is go-specific configuration in the JSON file. @@ -88,12 +89,14 @@ type RunFeatureConfigGo struct { // RunConfig is configuration for NewRunner. type RunConfig struct { - Server string - Namespace string - ClientCertPath string - ClientKeyPath string - SummaryURI string - HTTPProxyURL string + Server string + Namespace string + ClientCertPath string + ClientKeyPath string + SummaryURI string + ProxyControlURI string + ProxyListenHostPort string + HTTPProxyURL string } func (r *RunConfig) flags() []cli.Flag { @@ -128,6 +131,16 @@ func (r *RunConfig) flags() []cli.Flag { Usage: "URL for an HTTP CONNECT proxy to the server", Destination: &r.HTTPProxyURL, }, + &cli.StringFlag{ + Name: "proxy-control-uri", + Usage: "how to simulate network outages via temporal-features-test-proxy (optional)", + Destination: &r.ProxyControlURI, + }, + &cli.StringFlag{ + Name: "proxy-listen-host-port", + Usage: "The host:port of the gRPC proxy", + Destination: &r.ProxyListenHostPort, + }, } } @@ -177,6 +190,15 @@ func (r *Runner) Run(ctx context.Context, run *Run) error { return err } defer summary.Close() + + var proxyControlURL *url.URL + if r.config.ProxyControlURI != "" { + proxyControlURL, err = url.Parse(r.config.ProxyControlURI) + if err != nil { + return err + } + } + var failureCount int failureSummary := "" allFeatures := harness.RegisteredFeatures() @@ -218,13 +240,15 @@ func (r *Runner) Run(ctx context.Context, run *Run) error { } runnerConfig := harness.RunnerConfig{ - ServerHostPort: r.config.Server, - Namespace: r.config.Namespace, - ClientCertPath: r.config.ClientCertPath, - ClientKeyPath: r.config.ClientKeyPath, - TaskQueue: runFeature.TaskQueue, - Log: r.log, - HTTPProxyURL: r.config.HTTPProxyURL, + ServerHostPort: r.config.Server, + Namespace: r.config.Namespace, + ClientCertPath: r.config.ClientCertPath, + ClientKeyPath: r.config.ClientKeyPath, + TaskQueue: runFeature.TaskQueue, + Log: r.log, + HTTPProxyURL: r.config.HTTPProxyURL, + ProxyControlURL: proxyControlURL, + ProxyListenHostPort: r.config.ProxyListenHostPort, } err := r.runFeature(ctx, runnerConfig, feature) diff --git a/harness/go/harness/feature.go b/harness/go/harness/feature.go index 714a6d1b..982e018f 100644 --- a/harness/go/harness/feature.go +++ b/harness/go/harness/feature.go @@ -50,6 +50,12 @@ type Feature struct { // DisableWorkflowPanicPolicyOverride field to true. WorkerOptions worker.Options + // BeforeDial provides a hook that will be called just before calling client.Dial. + BeforeDial func(runner *Runner) error + + // BeforeWorkerStart provides a hook that will be called just before calling Worker.Start. + BeforeWorkerStart func(runner *Runner) error + // Can modify the workflow options that are used by the default executor. Some values such as // task queue and workflow execution timeout, are set by default (but may be overridden by this // mutator). @@ -75,6 +81,16 @@ type Feature struct { // If non-empty, this feature will be skipped without checking any other // values. SkipReason string + + // ClientUsesProxy indicates if the default client used by test harness + // should be one that goes through the gRPC proxy + // instead of talking directly to the server. + ClientUsesProxy bool + + // WorkerUsesProxy indicates if the client used to run the worker + // should be one that goes through the gRPC proxy + // instead of talking directly to the server. + WorkerUsesProxy bool } type WorkflowWithOptions struct { diff --git a/harness/go/harness/grpcproxy.go b/harness/go/harness/grpcproxy.go new file mode 100644 index 00000000..e67596db --- /dev/null +++ b/harness/go/harness/grpcproxy.go @@ -0,0 +1,659 @@ +package harness + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "io/fs" + "net" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "sync" + "time" + + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/log" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +const HelpText = `The test proxy exposes the following control endpoints: + +- POST /restart + Gracefully shut down the gRPC server, then start it again. + + - Query param: sleep= + Forces the restart to block for the given duration; default: 0s. + + - Query param: forceful= + If true, forces a non-graceful shutdown; default: false. + +- POST /reject + Immediately reject incoming gRPC requests with UNAVAILABLE. + +- POST /accept + Accept incoming gRPC requests; this is the default. + +- POST /freeze + Block on incoming accepted gRPC requests. + +- POST /thaw + Process incoming accepted gRPC requests immediately; this is the default. +` + +// GRPCProxyServer +type GRPCProxyServer struct { + proxyServer *ProxyServer + controlServer *ControlServer + log log.Logger +} + +// GRPCProxyServerOptions are options for GRPC proxy. +type GRPCProxyServerOptions struct { + DialAddress string + ClientCertPath string + ClientKeyPath string + Log log.Logger +} + +// StartGRPCProxyServer starts up a GRPC proxy +func StartGRPCProxyServer(options GRPCProxyServerOptions) (*GRPCProxyServer, error) { + if options.Log == nil { + options.Log = DefaultLogger + } + options.Log.Info("Starting GRPC proxy server") + proxyServer, err := newProxyServer("127.0.0.1:", options.DialAddress, options.ClientCertPath, options.ClientKeyPath, options.Log) + if err != nil { + return nil, err + } + err = proxyServer.Run() + if err != nil { + options.Log.Error("failed to start gRPC proxy server", "error", err) + return nil, err + } + + controlServer, err := startControlServer(proxyServer, "127.0.0.1:", options.Log) + if err != nil { + options.Log.Error("failed to start HTTP control server", "error", err) + return nil, err + } + err = controlServer.Run() + if err != nil { + return nil, err + } + + srv := &GRPCProxyServer{ + proxyServer: proxyServer, + controlServer: controlServer, + log: options.Log, + } + return srv, nil +} + +// Close immediately stops the proxy. +func (g *GRPCProxyServer) Close() error { + err := g.controlServer.Close() + if IsErrClosed(err) { + err = nil + } + if err != nil { + g.log.Warn("failed to gracefully shut down HTTP control server", "error", err) + } + + err = g.proxyServer.Close() + if IsErrClosed(err) { + err = nil + } + if err != nil { + g.log.Warn("failed to gracefully shut down HTTP proxy server", "error", err) + } + return nil +} + +// ProxyAddress returns the address of the proxy server. +func (g *GRPCProxyServer) ProxyAddress() string { + return g.proxyServer.listen +} + +// ControlAddress returns the address of the control server. +func (g *GRPCProxyServer) ControlAddress() string { + return g.controlServer.listen +} + +var ErrUnknownCommand = errors.New("unknown command") + +type queryKeyType struct{} + +var queryKey queryKeyType + +type ActionFunc = func(context.Context) error + +func (c *ControlServer) actionRestart(ctx context.Context) error { + q, _ := ctx.Value(queryKey).(url.Values) + + var sleep time.Duration + if q.Has("sleep") { + d, err := time.ParseDuration(q.Get("sleep")) + if err != nil { + return err + } + if d < 0 { + d = 0 + } + sleep = d + } + + var forceful bool + if q.Has("forceful") { + b, err := strconv.ParseBool(q.Get("forceful")) + if err != nil { + return err + } + forceful = b + } + + c.Log.Info("/restart: restarting proxy", "forceful", forceful) + + mode := "gracefully" + fn := c.ps.Shutdown + if forceful { + mode = "forcefully" + fn = func(context.Context) error { + return c.ps.Close() + } + } + + err := fn(ctx) + if IsErrClosed(err) { + err = nil + } + if err != nil { + c.Log.Warn("failed to shut down gRPC proxy server", "mode", mode, "error", err) + } + + c.ps.ForceClose() + + if sleep > 0 { + c.Log.Info("/restart: sleeping", "duration", sleep) + time.Sleep(sleep) + } + + err = c.ps.Run() + if err != nil { + return err + } + + c.Log.Info("/restart: proxy has been restarted") + return nil +} + +func (cs *ControlServer) actionReject(ctx context.Context) error { + cs.ps.mu.Lock() + defer cs.ps.mu.Unlock() + + if cs.ps.stateRejecting { + return nil + } + cs.ps.stateRejecting = true + cs.Log.Info("/reject: proxy is rejecting requests") + return nil +} + +func (cs *ControlServer) actionAccept(ctx context.Context) error { + cs.ps.mu.Lock() + defer cs.ps.mu.Unlock() + + if !cs.ps.stateRejecting { + return nil + } + cs.ps.stateRejecting = false + cs.Log.Info("/accept: proxy is NOT rejecting requests") + return nil +} + +func (cs *ControlServer) actionFreeze(ctx context.Context) error { + cs.ps.mu.Lock() + defer cs.ps.mu.Unlock() + + if cs.ps.stateFrozen { + return nil + } + cs.ps.stateFrozen = true + cs.Log.Info("/freeze: proxy is stalling requests") + return nil +} + +func (cs *ControlServer) actionThaw(ctx context.Context) error { + cs.ps.mu.Lock() + defer cs.ps.mu.Unlock() + + if !cs.ps.stateFrozen { + return nil + } + cs.ps.stateFrozen = false + cs.ps.stateCond.Broadcast() + cs.Log.Info("/thaw: proxy is NOT stalling requests") + return nil +} + +func HandleHelp(w http.ResponseWriter, r *http.Request) { + if path.Clean(r.URL.Path) != "/" { + http.NotFound(w, r) + return + } + if !CheckMethod(w, r, http.MethodGet, http.MethodHead) { + return + } + body := []byte(strings.ReplaceAll(HelpText, "\n", "\r\n")) + h := w.Header() + h.Set("Content-Type", "text/plain; charset=utf-8") + h.Set("Content-Length", fmt.Sprint(len(body))) + w.WriteHeader(http.StatusOK) + w.Write(body) +} + +func HandleAction(action ActionFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !CheckMethod(w, r, http.MethodPost) { + return + } + + q, err := url.ParseQuery(r.URL.RawQuery) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + ctx := r.Context() + ctx = context.WithValue(ctx, queryKey, q) + if err := action(ctx); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + } +} + +func CheckMethod(w http.ResponseWriter, r *http.Request, allowed ...string) bool { + for _, item := range allowed { + if r.Method == item { + return true + } + } + h := w.Header() + h.Set("Allow", strings.Join(allowed, ", ")) + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return false +} + +type ControlServer struct { + listen string + mu sync.Mutex + cv *sync.Cond + l net.Listener + quitCh chan struct{} + server http.Server + mux http.ServeMux + Log log.Logger + ps *ProxyServer +} + +func startControlServer(proxyServer *ProxyServer, listen string, log log.Logger) (*ControlServer, error) { + var s ControlServer + s.ps = proxyServer + s.Log = log + s.listen = listen + s.cv = sync.NewCond(&s.mu) + s.l = nil + s.quitCh = nil + s.server.Handler = &s.mux + s.server.ReadTimeout = 30 * time.Second + s.server.WriteTimeout = 30 * time.Second + s.server.IdleTimeout = 60 * time.Second + s.mux.HandleFunc("/", HandleHelp) + s.mux.HandleFunc("/restart", HandleAction(s.actionRestart)) + s.mux.HandleFunc("/reject", HandleAction(s.actionReject)) + s.mux.HandleFunc("/accept", HandleAction(s.actionAccept)) + s.mux.HandleFunc("/freeze", HandleAction(s.actionFreeze)) + s.mux.HandleFunc("/thaw", HandleAction(s.actionThaw)) + return &s, nil +} + +func (s *ControlServer) Run() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.quitCh != nil { + panic("BUG! ControlServer is already running") + } + + l, err := net.Listen("tcp", s.listen) + s.listen = l.Addr().String() + if err != nil { + return fmt.Errorf("failed to listen on %q: %w", s.listen, err) + } + + s.l = l + s.quitCh = make(chan struct{}) + + go s.serveThread() + return nil +} + +func (s *ControlServer) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.quitCh == nil { + return nil + } + + close(s.quitCh) + err := s.server.Close() + for s.quitCh != nil { + s.cv.Wait() + } + return err +} + +func (s *ControlServer) ForceClose() { + err := s.Close() + if IsErrClosed(err) { + err = nil + } + if err != nil { + s.Log.Warn("failed to stop HTTP control server", "error", err) + } +} + +func (s *ControlServer) serveThread() { + defer s.finish() + + err := s.server.Serve(s.l) + if IsErrClosed(err) { + err = nil + } + if err != nil { + s.Log.Error("failed to serve HTTP control server", "error", err) + } + + err = s.l.Close() + if IsErrClosed(err) { + err = nil + } + if err != nil { + s.Log.Error("failed to close listener for HTTP control server", "error", err) + } +} + +func (s *ControlServer) finish() { + s.mu.Lock() + s.l = nil + s.quitCh = nil + s.cv.Broadcast() + s.mu.Unlock() +} + +type ProxyServer struct { + listen string + dial string + tlsConfig *tls.Config + mu sync.Mutex + cv *sync.Cond + gc *grpc.ClientConn + gs *grpc.Server + l net.Listener + wc workflowservice.WorkflowServiceClient + ws workflowservice.WorkflowServiceServer + quitCh chan struct{} + log log.Logger + stateCond *sync.Cond + stateRejecting bool + stateFrozen bool +} + +func newProxyServer(listen, dial, clientCertPath, clientKeyPath string, log log.Logger) (*ProxyServer, error) { + p := &ProxyServer{ + listen: listen, + dial: dial, + log: log, + } + if clientCertPath != "" { + cert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath) + if err != nil { + return nil, fmt.Errorf("failed to load certs: %s", err) + } + p.tlsConfig = &tls.Config{Certificates: []tls.Certificate{cert}} + } + p.cv = sync.NewCond(&p.mu) + p.stateCond = sync.NewCond(&p.mu) + return p, nil +} + +func (s *ProxyServer) Run() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.quitCh != nil { + panic("BUG! gRPC proxy server is already running") + } + + l, err := net.Listen("tcp", s.listen) + // keep a stable address across restarts + s.listen = l.Addr().String() + if err != nil { + return fmt.Errorf("failed to listen on %q: %w", s.listen, err) + } + s.log.Info("gRPC proxy server is running on", "address", l.Addr().String()) + + needListenerClose := true + defer func() { + if needListenerClose { + err := l.Close() + if IsErrClosed(err) { + err = nil + } + if err != nil { + s.log.Warn("failed to close listener for gRPC proxy server", "error", err) + } + } + }() + + opts := []grpc.DialOption{grpc.WithBlock()} + if s.tlsConfig != nil { + creds := credentials.NewTLS(s.tlsConfig) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + gc, err := grpc.Dial( + s.dial, + opts..., + ) + if err != nil { + return fmt.Errorf("failed to dial %q: %w", s.dial, err) + } + + needClientClose := true + defer func() { + if needClientClose { + err := gc.Close() + if IsErrClosed(err) { + err = nil + } + if err != nil { + s.log.Warn("failed to close gRPC client connection", "error", err) + } + } + }() + + wc := workflowservice.NewWorkflowServiceClient(gc) + ws, err := client.NewWorkflowServiceProxyServer(client.WorkflowServiceProxyOptions{Client: wc}) + if err != nil { + return fmt.Errorf("failed to create WorkflowService proxy server: %w", err) + } + + gs := grpc.NewServer( + grpc.UnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + s.log.Debug("incoming gRPC request", "method", info.FullMethod) + if err := s.awaitPermitted(); err != nil { + return nil, err + } + return handler(ctx, req) + }), + grpc.StreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if err := s.awaitPermitted(); err != nil { + return err + } + return handler(srv, ss) + }), + ) + grpc_health_v1.RegisterHealthServer(gs, &TrivialHealthServer{}) + workflowservice.RegisterWorkflowServiceServer(gs, ws) + + needClientClose = false + needListenerClose = false + s.gc = gc + s.gs = gs + s.l = l + s.wc = wc + s.ws = ws + s.quitCh = make(chan struct{}) + + go s.serveThread() + return nil +} + +func (s *ProxyServer) Shutdown(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.quitCh == nil { + return nil + } + + close(s.quitCh) + s.gs.GracefulStop() + for s.quitCh != nil { + s.cv.Wait() + } + return nil +} + +func (s *ProxyServer) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.quitCh == nil { + return nil + } + + close(s.quitCh) + s.gs.Stop() + for s.quitCh != nil { + s.cv.Wait() + } + return nil +} + +func (s *ProxyServer) ForceClose() { + err := s.Close() + if IsErrClosed(err) { + err = nil + } + if err != nil { + s.log.Warn("failed to stop gRPC proxy server", "error", err) + } +} + +type Stopper interface { + Stop() +} + +func (s *ProxyServer) serveThread() { + defer s.finish() + + err := s.gs.Serve(s.l) + if IsErrClosed(err) { + err = nil + } + if err != nil { + s.log.Error("failed to serve gRPC proxy server", "error", err) + } + + err = s.l.Close() + if IsErrClosed(err) { + err = nil + } + if err != nil { + s.log.Warn("failed to close listener for gRPC proxy server", "error", err) + } + + err = s.gc.Close() + if IsErrClosed(err) { + err = nil + } + if err != nil { + s.log.Warn("failed to close gRPC client connection", "error", err) + } +} + +func (s *ProxyServer) finish() { + s.mu.Lock() + s.gc = nil + s.gs = nil + s.l = nil + s.wc = nil + s.ws = nil + s.quitCh = nil + s.cv.Broadcast() + s.mu.Unlock() +} + +func (s *ProxyServer) awaitPermitted() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.stateRejecting { + return status.Error(codes.Unavailable, "proxy unavailable") + } + for s.stateFrozen { + s.stateCond.Wait() + } + return nil +} + +type TrivialHealthServer struct { + grpc_health_v1.UnimplementedHealthServer +} + +func (*TrivialHealthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (resp *grpc_health_v1.HealthCheckResponse, err error) { + return &grpc_health_v1.HealthCheckResponse{}, nil +} + +func IsErrClosed(err error) bool { + switch { + case err == nil: + return false + case errors.Is(err, io.EOF): + return true + case errors.Is(err, fs.ErrClosed): + return true + case errors.Is(err, net.ErrClosed): + return true + case errors.Is(err, http.ErrServerClosed): + return true + default: + return false + } +} diff --git a/harness/go/harness/runner.go b/harness/go/harness/runner.go index 33d529a9..9a90e309 100644 --- a/harness/go/harness/runner.go +++ b/harness/go/harness/runner.go @@ -4,8 +4,13 @@ import ( "context" "errors" "fmt" + "io" + "net/http" + "net/url" + "path" "path/filepath" "reflect" + "strconv" "strings" "sync" "time" @@ -31,10 +36,11 @@ type skipFeatureError struct { // Runner represents a runner that can run a feature. type Runner struct { RunnerConfig - Client client.Client - Worker worker.Worker - Feature *PreparedFeature - CreateTime time.Time + Client client.Client + DirectClient client.Client + Worker worker.Worker + Feature *PreparedFeature + CreateTime time.Time Assert *assert.Assertions LastAssertErr error @@ -43,13 +49,15 @@ type Runner struct { // RunnerConfig is configuration for NewRunner. type RunnerConfig struct { - ServerHostPort string - Namespace string - TaskQueue string - ClientCertPath string - ClientKeyPath string - Log log.Logger - HTTPProxyURL string + ServerHostPort string + Namespace string + TaskQueue string + ClientCertPath string + ClientKeyPath string + ProxyControlURL *url.URL + ProxyListenHostPort string + Log log.Logger + HTTPProxyURL string } // NewRunner creates a new runner for the given config and feature. @@ -78,22 +86,42 @@ func NewRunner(config RunnerConfig, feature *PreparedFeature) (*Runner, error) { }() // Create client + var err error + tlsCfg, err := LoadTLSConfig(r.ClientCertPath, r.ClientKeyPath) + if err != nil { + return nil, err + } + r.Feature.ClientOptions.HostPort = r.ServerHostPort + if r.Feature.ClientUsesProxy { + r.Feature.ClientOptions.HostPort = r.ProxyListenHostPort + } else { + // Don't use TLS for the proxy connection + r.Feature.ClientOptions.ConnectionOptions.TLS = tlsCfg + } r.Feature.ClientOptions.Namespace = r.Namespace if r.Feature.ClientOptions.Logger == nil { r.Feature.ClientOptions.Logger = r.Log } - var err error - tlsCfg, err := LoadTLSConfig(r.ClientCertPath, r.ClientKeyPath) - if err != nil { - return nil, err + + if r.Feature.BeforeDial != nil { + if err = r.Feature.BeforeDial(r); err != nil { + return nil, err + } } - r.Feature.ClientOptions.ConnectionOptions.TLS = tlsCfg if r.Client, err = client.Dial(r.Feature.ClientOptions); err != nil { return nil, fmt.Errorf("failed creating client: %w", err) } + savedValue := r.Feature.ClientOptions.HostPort + r.Feature.ClientOptions.HostPort = r.ServerHostPort + r.Feature.ClientOptions.ConnectionOptions.TLS = tlsCfg + if r.DirectClient, err = client.Dial(r.Feature.ClientOptions); err != nil { + return nil, fmt.Errorf("failed creating client: %w", err) + } + r.Feature.ClientOptions.HostPort = savedValue + // Create worker r.CreateTime = time.Now() if !r.Feature.DisableWorkflowPanicPolicyOverride { @@ -351,6 +379,104 @@ func (r *Runner) DoUntilEventually( } } +func (r *Runner) ProxyRestart(ctx context.Context, sleep time.Duration, forceful bool) error { + sleepStr := sleep.String() + forcefulStr := strconv.FormatBool(forceful) + return r.proxySendCommand(ctx, "restart", "sleep", sleepStr, "forceful", forcefulStr) +} + +func (r *Runner) ProxyReject(ctx context.Context) error { + return r.proxySendCommand(ctx, "reject") +} + +func (r *Runner) ProxyAccept(ctx context.Context) error { + return r.proxySendCommand(ctx, "accept") +} + +func (r *Runner) ProxyFreeze(ctx context.Context) error { + return r.proxySendCommand(ctx, "freeze") +} + +func (r *Runner) ProxyThaw(ctx context.Context) error { + return r.proxySendCommand(ctx, "thaw") +} + +func (r *Runner) ProxyRejectAndAccept(ctx context.Context, wg *sync.WaitGroup, sleep time.Duration) error { + if err := r.ProxyReject(ctx); err != nil { + return err + } + + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(sleep) + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + _ = r.ProxyAccept(ctx) + }() + return nil +} + +func (r *Runner) ProxyFreezeAndThaw(ctx context.Context, wg *sync.WaitGroup, sleep time.Duration) error { + if err := r.ProxyFreeze(ctx); err != nil { + return err + } + + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(sleep) + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + _ = r.ProxyThaw(ctx) + }() + return nil +} + +func (r *Runner) proxySendCommand(ctx context.Context, command string, args ...string) error { + if r.ProxyControlURL == nil { + return r.Skip("temporal-features-test-proxy is required for this test") + } + + u := *r.ProxyControlURL + u.Path = path.Join(u.Path, command) + if numArgs := len(args); numArgs != 0 { + q := make(url.Values, numArgs/2) + for i := 0; i < numArgs; i += 2 { + key, value := args[i], args[i+1] + q.Add(key, value) + } + u.RawQuery = q.Encode() + } + + reqMethod := http.MethodPost + reqURL := u.String() + reqName := fmt.Sprintf("%s %s", reqMethod, reqURL) + req, err := http.NewRequestWithContext(ctx, reqMethod, reqURL, nil) + if err != nil { + return fmt.Errorf("failed to create net/http.Request %q: %w", reqName, err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to perform HTTP request %q: %w", reqName, err) + } + + _, err = io.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return fmt.Errorf("failed to read body for HTTP %03d response to request %q: %w", resp.StatusCode, reqName, err) + } + + if resp.StatusCode >= 400 { + return fmt.Errorf("HTTP %03d response to request %q", resp.StatusCode, reqName) + } + + return nil +} + // Close closes this runner. func (r *Runner) Close() { if r.Worker != nil { @@ -396,7 +522,12 @@ func (r *Runner) StartWorker() error { if r.Worker != nil { return errors.New("worker is currently running, cannot start a new one") } - r.Worker = worker.New(r.Client, r.RunnerConfig.TaskQueue, r.Feature.WorkerOptions) + + c := r.DirectClient + if r.Feature.WorkerUsesProxy { + c = r.Client + } + r.Worker = worker.New(c, r.RunnerConfig.TaskQueue, r.Feature.WorkerOptions) // Register the workflows and activities for _, workflow := range r.Feature.Workflows { @@ -418,6 +549,12 @@ func (r *Runner) StartWorker() error { } } + if r.Feature.BeforeWorkerStart != nil { + if err := r.Feature.BeforeWorkerStart(r); err != nil { + return err + } + } + // Start the worker if err := r.Worker.Start(); err != nil { return fmt.Errorf("failed starting worker: %w", err) diff --git a/harness/java/io/temporal/sdkfeatures/Feature.java b/harness/java/io/temporal/sdkfeatures/Feature.java index e9eb8f90..975a1899 100644 --- a/harness/java/io/temporal/sdkfeatures/Feature.java +++ b/harness/java/io/temporal/sdkfeatures/Feature.java @@ -12,11 +12,10 @@ public interface Feature { - @SuppressWarnings("unchecked") default T activities(Class activityIface, Consumer builderFunc) { var builder = ActivityOptions.newBuilder(); builderFunc.accept(builder); - return (T) Workflow.newActivityStub(activityIface, builder.build()); + return Workflow.newActivityStub(activityIface, builder.build()); } default void workflowServiceOptions(WorkflowServiceStubsOptions.Builder builder) {} @@ -29,6 +28,14 @@ default void workerOptions(WorkerOptions.Builder builder) {} default void workflowOptions(WorkflowOptions.Builder builder) {} + default boolean workerUsesProxy() { + return false; + } + + default boolean initiatorUsesProxy() { + return false; + } + default Run execute(Runner runner) throws Exception { return runner.executeSingleParameterlessWorkflow(); } diff --git a/harness/java/io/temporal/sdkfeatures/Main.java b/harness/java/io/temporal/sdkfeatures/Main.java index 83267c73..ba2b5691 100644 --- a/harness/java/io/temporal/sdkfeatures/Main.java +++ b/harness/java/io/temporal/sdkfeatures/Main.java @@ -10,6 +10,8 @@ import java.io.*; import java.net.Socket; import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; @@ -17,6 +19,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; +import picocli.CommandLine.Parameters; @Command(name = "features", description = "Runs Java features") public class Main implements Runnable { @@ -57,7 +62,8 @@ BufferedWriter createSummaryServerWriter() { switch (uri.getScheme()) { case "tcp": Socket socket = new Socket(uri.getHost(), uri.getPort()); - return new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8")); + return new BufferedWriter( + new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)); case "file": FileWriter fileWriter = new FileWriter(uri.getPath(), true); return new BufferedWriter(fileWriter); @@ -89,6 +95,16 @@ BufferedWriter createSummaryServerWriter() { @Option(names = "--http-proxy-url", description = "URL for an HTTP CONNECT proxy to the server") private String httpProxyUrl; + @Option( + names = "--proxy-control-uri", + description = "The URL of temporal-features-test-proxy (optional)") + private String proxyControlUri; + + @Option( + names = "--proxy-listen-host-port", + description = "The host:port of the server, bypassing the temporal-features-test-proxy") + private String proxyListenHotsPort; + @Parameters(description = "Features as dir + ':' + task queue") private List features; @@ -113,6 +129,16 @@ public void run() { throw new RuntimeException("Client cert path must be specified since key path is"); } + // Parse proxyControlUri if present + URI proxyControl = null; + if (proxyControlUri != null && !proxyControlUri.isEmpty()) { + try { + proxyControl = new URI(proxyControlUri); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + try (BufferedWriter writer = createSummaryServerWriter()) { ObjectMapper mapper = new ObjectMapper(); @@ -140,6 +166,8 @@ public void run() { config.namespace = namespace; config.httpProxyUrl = httpProxyUrl; config.sslContext = sslContext; + config.proxyControl = proxyControl; + config.proxyListenHostPort = proxyListenHotsPort; config.taskQueue = pieces[1]; Outcome outcome = Outcome.PASSED; String message = ""; diff --git a/harness/java/io/temporal/sdkfeatures/PreparedFeature.java b/harness/java/io/temporal/sdkfeatures/PreparedFeature.java index 78a32b94..1508dd8a 100644 --- a/harness/java/io/temporal/sdkfeatures/PreparedFeature.java +++ b/harness/java/io/temporal/sdkfeatures/PreparedFeature.java @@ -7,8 +7,8 @@ public class PreparedFeature { static PreparedFeature[] ALL = PreparedFeature.prepareFeatures( activity.basic_no_workflow_timeout.feature.Impl.class, - activity.retry_on_error.feature.Impl.class, activity.cancel_try_cancel.feature.Impl.class, + activity.retry_on_error.feature.Impl.class, child_workflow.result.feature.Impl.class, child_workflow.signal.feature.Impl.class, client.http_proxy.feature.Impl.class, @@ -21,6 +21,9 @@ public class PreparedFeature { data_converter.json.feature.Impl.class, data_converter.json_protobuf.feature.Impl.class, eager_activity.non_remote_activities_worker.feature.Impl.class, + grpc_retry.server_frozen_for_initiator.feature.Impl.class, + grpc_retry.server_restarted_for_initiator.feature.Impl.class, + grpc_retry.server_unavailable_for_initiator.feature.Impl.class, query.successful_query.feature.Impl.class, query.timeout_due_to_no_active_workers.feature.Impl.class, query.unexpected_arguments.feature.Impl.class, @@ -34,12 +37,12 @@ public class PreparedFeature { signal.external.feature.Impl.class, update.activities.feature.Impl.class, update.async_accepted.feature.Impl.class, - update.deduplication.feature.Impl.class, update.client_interceptor.feature.Impl.class, + update.deduplication.feature.Impl.class, update.non_durable_reject.feature.Impl.class, update.task_failure.feature.Impl.class, - update.worker_restart.feature.Impl.class, update.validation_replay.feature.Impl.class, + update.worker_restart.feature.Impl.class, update.self.feature.Impl.class); @SafeVarargs diff --git a/harness/java/io/temporal/sdkfeatures/Runner.java b/harness/java/io/temporal/sdkfeatures/Runner.java index 83a169e2..c354c788 100644 --- a/harness/java/io/temporal/sdkfeatures/Runner.java +++ b/harness/java/io/temporal/sdkfeatures/Runner.java @@ -12,11 +12,12 @@ import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.history.v1.History; +import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.workflow.v1.WorkflowExecutionInfo; import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; import io.temporal.client.*; +import io.temporal.common.WorkflowExecutionHistory; import io.temporal.internal.client.WorkflowClientHelper; -import io.temporal.internal.common.WorkflowExecutionHistory; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.worker.Worker; @@ -24,6 +25,8 @@ import io.temporal.worker.WorkerFactoryOptions; import io.temporal.worker.WorkerOptions; import java.io.Closeable; +import java.io.IOException; +import java.net.*; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; @@ -43,6 +46,8 @@ public static class Config { public Scope metricsScope = new NoopScope(); public SslContext sslContext; public String httpProxyUrl; + public URI proxyControl; + public String proxyListenHostPort; } public final Config config; @@ -50,6 +55,8 @@ public static class Config { public final Feature feature; public final WorkflowServiceStubs service; public final WorkflowClient client; + public final WorkflowServiceStubs directService; + public final WorkflowClient directClient; private WorkerFactory workerFactory; private Worker worker; @@ -62,28 +69,169 @@ public static class Config { feature = featureInfo.newInstance(); // Build service - var serviceBuild = + final var serviceBuild = WorkflowServiceStubsOptions.newBuilder() .setTarget(config.serverHostPort) .setSslContext(config.sslContext) .setMetricsScope(config.metricsScope); + if (feature.initiatorUsesProxy()) { + serviceBuild.setTarget(config.proxyListenHostPort); + serviceBuild.setSslContext(null); + } feature.workflowServiceOptions(serviceBuild); - service = WorkflowServiceStubs.newServiceStubs(serviceBuild.build()); + final var serviceOptions = serviceBuild.build(); + final var directServiceOptions = + serviceBuild.setTarget(config.serverHostPort).setSslContext(config.sslContext).build(); + + service = WorkflowServiceStubs.newConnectedServiceStubs(serviceOptions, Duration.ofSeconds(10)); // Shutdown service on failure try { - // Build client - var clientBuild = WorkflowClientOptions.newBuilder().setNamespace(config.namespace); - feature.workflowClientOptions(clientBuild); - client = WorkflowClient.newInstance(service, clientBuild.build()); - - // Build worker - restartWorker(); + directService = + WorkflowServiceStubs.newConnectedServiceStubs( + directServiceOptions, Duration.ofSeconds(10)); + try { + // Build client + var clientBuild = WorkflowClientOptions.newBuilder().setNamespace(config.namespace); + feature.workflowClientOptions(clientBuild); + client = WorkflowClient.newInstance(service, clientBuild.build()); + directClient = WorkflowClient.newInstance(directService, clientBuild.build()); + + // Build worker + restartWorker(); + } catch (Throwable e) { + try { + directService.shutdownNow(); + } catch (Throwable ignored) { + } + throw e; + } } catch (Throwable e) { - service.shutdownNow(); + try { + service.shutdownNow(); + } catch (Throwable ignored) { + } throw e; } } + public WorkflowClient workerClient() { + if (feature.workerUsesProxy()) { + return client; + } else { + return directClient; + } + } + + public WorkflowClient initiatorClient() { + return client; + } + + public WorkflowServiceStubs initiatorService() { + return service; + } + + public void proxyReject() throws IOException { + proxySendCommand("reject"); + } + + public void proxyAccept() throws IOException { + proxySendCommand("accept"); + } + + public void proxyFreeze() throws IOException { + proxySendCommand("freeze"); + } + + public void proxyThaw() throws IOException { + proxySendCommand("thaw"); + } + + public void proxyRestart(Duration sleep, boolean forceful) throws IOException { + final var sleepStr = sleep.toMillis() + "ms"; + final var forcefulStr = forceful ? "true" : "false"; + proxySendCommand("restart", "sleep", sleepStr, "forceful", forcefulStr); + } + + public T proxyRejectAndAccept( + Duration sleep, CheckedCallable runnable) throws E, IOException { + return proxyFirstAndSecond(sleep, runnable, this::proxyReject, this::proxyAccept); + } + + public T proxyFreezeAndThaw( + Duration sleep, CheckedCallable callable) throws E, IOException { + return proxyFirstAndSecond(sleep, callable, this::proxyFreeze, this::proxyThaw); + } + + private T proxyFirstAndSecond( + Duration sleep, + CheckedCallable callable, + CheckedRunnable first, + CheckedRunnable second) + throws E, IOException { + first.run(); + final var thread = + new Thread( + () -> { + try { + Thread.sleep(sleep.toMillis()); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + try { + second.run(); + } catch (IOException ignored) { + } + }); + thread.start(); + try { + return callable.call(); + } finally { + try { + thread.join(); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + } + + public void proxySendCommand(String method, String... args) throws IOException { + if (config.proxyControl == null) { + skip("temporal-features-test-proxy is required for this test"); + } + + final StringBuilder sb = new StringBuilder(); + sb.append('/'); + sb.append(method); + if (args != null && args.length != 0) { + char separator = '?'; + for (int i = 0; i < args.length; i += 2) { + String key = args[i]; + String value = args[i + 1]; + sb.append(separator); + sb.append(URLEncoder.encode(key, StandardCharsets.UTF_8)); + sb.append('='); + sb.append(URLEncoder.encode(value, StandardCharsets.UTF_8)); + separator = '&'; + } + } + final URI uri = config.proxyControl.resolve(sb.toString()); + log.info("proxySendCommand: {}", uri); + var connection = (HttpURLConnection) uri.toURL().openConnection(); + connection.setConnectTimeout(1000); + connection.setReadTimeout(10000); + connection.setInstanceFollowRedirects(false); + connection.setRequestMethod("POST"); + try { + connection.connect(); + final int code = connection.getResponseCode(); + if (code >= 400) { + throw new IOException("proxy command failed with HTTP code " + code); + } + } finally { + connection.disconnect(); + } + } + /** * Instantiates a new worker, replacing the existing worker and workerFactory. You should shut * down the worker factory before calling this. @@ -91,7 +239,7 @@ public static class Config { public void restartWorker() { var factoryBuild = WorkerFactoryOptions.newBuilder(); feature.workerFactoryOptions(factoryBuild); - this.workerFactory = WorkerFactory.newInstance(client, factoryBuild.build()); + this.workerFactory = WorkerFactory.newInstance(workerClient(), factoryBuild.build()); var workerBuild = WorkerOptions.newBuilder(); feature.workerOptions(workerBuild); this.worker = workerFactory.newWorker(config.taskQueue, workerBuild.build()); @@ -160,7 +308,7 @@ public Run executeSingleWorkflow(WorkflowOptions options, Object... args) { options = builder.build(); } - var stub = client.newUntypedWorkflowStub(methods.get(0).getName(), options); + var stub = initiatorClient().newUntypedWorkflowStub(methods.get(0).getName(), options); // Call workflow with args return new Run(methods.get(0), stub.start(args)); @@ -174,7 +322,7 @@ public Object waitForRunResult(Run run) { } public T waitForRunResult(Run run, Class type) { - var stub = client.newUntypedWorkflowStub(run.execution, Optional.empty()); + var stub = initiatorClient().newUntypedWorkflowStub(run.execution, Optional.empty()); return stub.getResult(type); } @@ -196,7 +344,7 @@ public WorkflowExecution executeWorkflow( public History getWorkflowHistory(Run run) throws Exception { var eventIter = WorkflowClientHelper.getHistory( - service, config.namespace, run.execution, config.metricsScope); + initiatorService(), config.namespace, run.execution, config.metricsScope); return History.newBuilder().addAllEvents(() -> eventIter).build(); } @@ -204,7 +352,7 @@ public Payload getWorkflowResultPayload(Run run) throws Exception { var history = getWorkflowHistory(run); var event = history.getEventsList().stream() - .filter(e -> e.hasWorkflowExecutionCompletedEventAttributes()) + .filter(HistoryEvent::hasWorkflowExecutionCompletedEventAttributes) .findFirst(); return event.get().getWorkflowExecutionCompletedEventAttributes().getResult().getPayloads(0); } @@ -213,7 +361,7 @@ public Payload getWorkflowArgumentPayload(Run run) throws Exception { var history = getWorkflowHistory(run); var event = history.getEventsList().stream() - .filter(e -> e.hasWorkflowExecutionStartedEventAttributes()) + .filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes) .findFirst(); return event.get().getWorkflowExecutionStartedEventAttributes().getInput().getPayloads(0); } @@ -225,7 +373,7 @@ public WorkflowExecutionInfo getWorkflowExecutionInfo(Run run) throws Exception .setExecution(run.execution) .build(); var exec = - this.client + this.initiatorClient() .getWorkflowServiceStubs() .blockingStub() .describeWorkflowExecution(describeRequest); @@ -251,7 +399,6 @@ public void checkCurrentAndPastHistories(Run run) throws Exception { } } - @SuppressWarnings("UnstableApiUsage") public Map loadPastHistories() throws Exception { var pkg = featureInfo.dir.replace('/', '.') + ".history"; var jsonPaths = new Reflections(pkg, Scanners.Resources).getResources(".*\\.json"); @@ -302,6 +449,20 @@ public Map loadPastHistories() throws Excep public void close() { try { workerFactory.shutdownNow(); + } catch (Throwable e) { + try { + directService.shutdownNow(); + } catch (Throwable ignored) { + } + try { + service.shutdownNow(); + } catch (Throwable ignored) { + } + throw e; + } + + try { + directService.shutdownNow(); } catch (Throwable e) { try { service.shutdownNow(); @@ -324,14 +485,14 @@ public void requireNoUpdateRejectedEvents(Run run) throws Exception { var history = getWorkflowHistory(run); var event = history.getEventsList().stream() - .filter(e -> e.hasWorkflowExecutionUpdateRejectedEventAttributes()) + .filter(HistoryEvent::hasWorkflowExecutionUpdateRejectedEventAttributes) .findFirst(); Assertions.assertFalse(event.isPresent()); } public void skipIfUpdateNotSupported() { try { - client.newUntypedWorkflowStub("fake").update("also_fake", Void.class); + initiatorClient().newUntypedWorkflowStub("fake").update("also_fake", Void.class); } catch (WorkflowNotFoundException exception) { return; } catch (WorkflowServiceException exception) { @@ -342,6 +503,8 @@ public void skipIfUpdateNotSupported() { "server support for update is disabled; set frontend.enableUpdateWorkflowExecution=true in dynamic config to enable"); case UNIMPLEMENTED: skip("server version too old to support update"); + default: + break; } } skip("unknown"); @@ -349,7 +512,7 @@ public void skipIfUpdateNotSupported() { public void skipIfAsyncAcceptedUpdateNotSupported() { try { - client.newUntypedWorkflowStub("fake").startUpdate("also_fake", Void.class); + initiatorClient().newUntypedWorkflowStub("fake").startUpdate("also_fake", Void.class); } catch (WorkflowNotFoundException exception) { return; } catch (WorkflowServiceException exception) { @@ -360,6 +523,8 @@ public void skipIfAsyncAcceptedUpdateNotSupported() { "server support for async accepted update is disabled; set frontend.enableUpdateWorkflowExecutionAsyncAccepted=true in dynamic config to enable"); case UNIMPLEMENTED: skip("server version too old to support update"); + default: + break; } } skip("unknown"); @@ -382,4 +547,14 @@ public void retry(Supplier fn, int retries, Duration sleepBetweenRetrie } Assertions.fail("retry limit exceeded"); } + + @FunctionalInterface + public interface CheckedRunnable { + void run() throws E; + } + + @FunctionalInterface + public interface CheckedCallable { + T call() throws E; + } } diff --git a/harness/python/main.py b/harness/python/main.py index d07d2f8a..44cde4df 100644 --- a/harness/python/main.py +++ b/harness/python/main.py @@ -21,6 +21,10 @@ async def run(): "--client-cert-path", help="Path to a client certificate for TLS" ) parser.add_argument("--client-key-path", help="Path to a client key for TLS") + parser.add_argument( + "--summary-uri", + help="where to stream the test summary JSONL (not implemented)", + ) parser.add_argument("--log-level", help="Log level", default="INFO") parser.add_argument("--http-proxy-url", help="HTTP proxy URL") parser.add_argument( diff --git a/harness/ts/main.ts b/harness/ts/main.ts index 3fbb42f4..9a51ce64 100644 --- a/harness/ts/main.ts +++ b/harness/ts/main.ts @@ -14,6 +14,7 @@ async function run() { .option('--client-cert-path ', 'Path to a client certificate for TLS') .option('--client-key-path ', 'Path to a client key for TLS') .option('--http-proxy-url ', 'HTTP proxy URL') + .option('--summary-uri ', 'where to stream the test summary JSONL (not implemented)') .argument('', 'Features as dir + ":" + task queue'); const opts = program.parse(process.argv).opts<{