diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index af6f408..8d12ef1 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -40,7 +40,7 @@ jobs: name: "Features integration test (sdk-test-suite version ${{ matrix.sdk-test-suite }})" strategy: matrix: - sdk-test-suite: [ "1.6" ] + sdk-test-suite: [ "2.1" ] permissions: contents: read issues: read diff --git a/test-services/exclusions.yaml b/test-services/exclusions.yaml index 4a1fbb8..0358b96 100644 --- a/test-services/exclusions.yaml +++ b/test-services/exclusions.yaml @@ -1,4 +1,4 @@ exclusions: - "default": [] - "alwaysSuspending": [] - "singleThreadSinglePartition": [] + "default": ["RunRetry"] + "alwaysSuspending": ["RunRetry"] + "singleThreadSinglePartition": ["RunRetry"] diff --git a/test-services/testutils.go b/test-services/testutils.go index 2cee784..2e5f0de 100644 --- a/test-services/testutils.go +++ b/test-services/testutils.go @@ -1,6 +1,7 @@ package main import ( + "os" "strings" "sync/atomic" "time" @@ -21,6 +22,17 @@ type CreateAwakeableAndAwaitItResponse struct { Value *string `json:"value,omitempty"` } +type InterpretRequest struct { + ListName string `json:"listName"` + Commands []Command `json:"commands"` +} + +type Command struct { + Type string `json:"type"` + AwakeableKey string `json:"awakeableKey"` + EnvName string `json:"envName"` +} + func init() { REGISTRY.AddDefinition( restate.NewService("TestUtilsService"). @@ -36,6 +48,10 @@ func init() { func(ctx restate.Context, _ restate.Void) (map[string]string, error) { return ctx.Request().Headers, nil })). + Handler("rawEcho", restate.NewServiceHandler( + func(ctx restate.Context, input []byte) ([]byte, error) { + return input, nil + }, restate.WithBinary)). Handler("createAwakeableAndAwaitIt", restate.NewServiceHandler( func(ctx restate.Context, req CreateAwakeableAndAwaitItRequest) (CreateAwakeableAndAwaitItResponse, error) { awakeable := restate.Awakeable[string](ctx) @@ -95,5 +111,48 @@ func init() { }) } return invokedSideEffects.Load(), nil - }))) + })). + Handler("getEnvVariable", restate.NewServiceHandler( + func(ctx restate.Context, env string) (string, error) { + return restate.Run(ctx, func(ctx restate.RunContext) (string, error) { + return os.Getenv(env), nil + }) + })). + Handler("interpretCommands", restate.NewServiceHandler( + func(ctx restate.Context, req InterpretRequest) (restate.Void, error) { + // restate.ObjectSend(ctx, "ListObject", req.ListName, "append") + + for _, command := range req.Commands { + switch command.Type { + case "createAwakeableAndAwaitIt": + result, err := createAwakeableAndAwaitIt(ctx, command.AwakeableKey) + if err != nil { + return restate.Void{}, err + } + restate.ObjectSend(ctx, "ListObject", req.ListName, "append").Send(result) + case "getEnvVariable": + result, err := getEnvVariable(ctx, command.EnvName) + if err != nil { + return restate.Void{}, err + } + restate.ObjectSend(ctx, "ListObject", req.ListName, "append").Send(result) + } + } + return restate.Void{}, nil + })), + ) +} + +func createAwakeableAndAwaitIt(ctx restate.Context, awakeableKey string) (string, error) { + awakeable := restate.Awakeable[string](ctx) + if _, err := restate.Object[restate.Void](ctx, "AwakeableHolder", awakeableKey, "hold").Request(awakeable.Id()); err != nil { + return "", err + } + return awakeable.Result() +} + +func getEnvVariable(ctx restate.Context, envName string) (string, error) { + return restate.Run(ctx, func(ctx restate.RunContext) (string, error) { + return os.Getenv(envName), nil + }) }