Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add saga interceptor #222

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions saga/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package saga
ychensha marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not completely convinced hiding this stuff behind an interceptor is the clearest way. Here's an alternative from a comment on the other PR at temporalio/sdk-go#936 (comment) that I didn't test (just wrote there in comment):

func testWorkflow(ctx workflow.Context, a int) error {
	var txn txn
	zap.L().Debug("enter workflow")
	ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
		StartToCloseTimeout: time.Minute,
	})

	var id string
	if err := workflow.ExecuteActivity(ctx, "createOrder", a).Get(ctx, &id); err != nil {
		return err
	}
	zap.L().Debug("create order, id:", zap.String("id", id))
	defer txn.executeRollbackActivity(ctx, "deleteOrder", id)

	if err := workflow.ExecuteActivity(ctx, "stockDeduct", a).Get(ctx, nil); err != nil {
		return err
	}
	defer txn.executeRollbackActivity(ctx, "stockInc", a)

	if err := workflow.ExecuteActivity(ctx, "createPay", a).Get(ctx, nil); err != nil {
		return err
	}
	txn.markSuccessful()
	return nil
}

type txn bool

func (t *txn) markSuccessful() { *s = true }

func (t *txn) executeRollbackActivity(ctx workflow.Context, activity interface{}, args ...interface{}) {
	if !*txn {
		ctx, cancel := workflow.NewDisconnectedContext(ctx)
		defer cancel()
		if err := workflow.ExecuteActivity(ctx, activity, args...).Get(ctx); err != nil {
			workflow.GetLogger(ctx).Error("failed to convert to compensate req", zap.Error(err))
		}
	}
}

You can obviously convert the arg wherever you want. This uses much more familiar Go constructs like defer and is clearer to the developer what is called on rollback. There is no hiding things on interceptor and there is no pre-registration of what is run on which activity, etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concerns, but interceptor itself is hidden behind workflow. Action/Compensation is registered aside with workflow like the testing code, the removal of defer is by design.

Copy link
Member

@cretz cretz Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I understand it is by design, the question is if that is the best design. Explicit defer at the callsite is clearer to a code writer than a hidden equivalent at the registration site. Using interceptors for this, besides hiding what is happening from someone reading the workflow code and matching against history, also is a bit error prone due to making sure that interceptors are always setup the same way for all workers and that the compensations never change in successive deployments. Not to mention interceptors also add more code, more complexity, and are more limiting feature wise (e.g. can't skip a compensation if it doesn't make sense, can't use a local variable as a parameter to the compensation, etc).

Copy link
Author

@ychensha ychensha Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is good enough to quick start. When number of code in workflow is large, and activity executions is in multi functions, defer is not a easy way to handle error and do compensations. Interceptor is designed to seperate non business logic apart.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So interceptor is not welcome to implement saga pattern?

Copy link
Member

@cretz cretz Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is good enough to quick start

Sure, the point is that it's easy to run something on complete when your code fails

defer is not a easy way to handle error and do compensations.

I disagree, I think it s very easy and common in Go. Many Go functions run defer after success of something but on failure of the function as a whole.

Interceptor is designed to seperate non business logic apart.

I am not sure we want to encourage that separation. Hiding some steps of a workflow can make it hard to match against history for readers. We want workflow actions to be explicit not implicit.

So interceptor is not welcome to implement saga pattern?

I am not sure it's the best way to implement the saga pattern as it hides so much from the user as opposed to the defer pattern. These samples are meant to document Temporal best practices, and I am not sure we want to encourage use of interceptors this way.

Let me see if I can get others' opinions here.


import (
"errors"
"time"

"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/workflow"
"go.uber.org/multierr"
"go.uber.org/zap"
ychensha marked this conversation as resolved.
Show resolved Hide resolved
)

const (
malformedActivity = "args of saga activity != 1"
)

var (
defaultActivityOpts = workflow.ActivityOptions{
ScheduleToStartTimeout: 1 * time.Minute,
StartToCloseTimeout: 5 * time.Minute,
}
)

type (
// TransactionOptions is options for a saga transactional workflow
TransactionOptions struct{}

// CompensationOptions is options for compensate.
CompensationOptions struct {
// ActivityType is the name of compensate activity.
ActivityType string
// ActivityOptions is the activity execute options, local activity is not supported.
ActivityOptions *workflow.ActivityOptions
// Convertor optional. Convert req & response to request for compensate activity.
// currently, activity func is not available for worker, so decode futures should be done by developer.
Convertor func(ctx workflow.Context, f workflow.Future, args interface{}) (interface{}, error)
ychensha marked this conversation as resolved.
Show resolved Hide resolved
}

//InterceptorOptions is options for saga interceptor.
InterceptorOptions struct {
// WorkflowRegistry names for workflow to be treated as Saga transaction.
WorkflowRegistry map[string]TransactionOptions
// ActivityRegistry Action -> CompensateAction, key is activity type for action.
ActivityRegistry map[string]CompensationOptions
}

sagaInterceptor struct {
interceptor.WorkerInterceptorBase
options InterceptorOptions
}

workflowInboundInterceptor struct {
interceptor.WorkflowInboundInterceptorBase
root *sagaInterceptor
ctx sagaContext
}
workflowOutboundInterceptor struct {
ychensha marked this conversation as resolved.
Show resolved Hide resolved
interceptor.WorkflowOutboundInterceptorBase
root *sagaInterceptor
ctx *sagaContext
}

action struct {
ActivityType string
Future workflow.Future
Arg interface{}
}

sagaContext struct {
Actions []*action
}
)

// NewInterceptor creates an interceptor for execute in Saga patterns.
// when workflow fails, registered compensate activities will be executed automatically.
// NOTE: action&compensate activity has only one arg,
// like func(ctx context.Context, arg interface{}) (interface{}, error),
// or func(ctx context.Context, arg interface{}) error.
func NewInterceptor(options InterceptorOptions) (interceptor.WorkerInterceptor, error) {
return &sagaInterceptor{options: options}, nil
}

func (s *sagaInterceptor) InterceptWorkflow(
ctx workflow.Context,
next interceptor.WorkflowInboundInterceptor,
) interceptor.WorkflowInboundInterceptor {
if _, ok := s.options.WorkflowRegistry[workflow.GetInfo(ctx).WorkflowType.Name]; !ok {
return next
}

workflow.GetLogger(ctx).Debug("intercept saga workflow")
i := &workflowInboundInterceptor{root: s}
i.Next = next
return i
}

func (w *workflowInboundInterceptor) Init(outbound interceptor.WorkflowOutboundInterceptor) error {
i := &workflowOutboundInterceptor{root: w.root, ctx: &w.ctx}
i.Next = outbound
return w.Next.Init(i)
}

func (w *workflowInboundInterceptor) ExecuteWorkflow(
ctx workflow.Context,
in *interceptor.ExecuteWorkflowInput,
) (ret interface{}, err error) {
workflow.GetLogger(ctx).Debug("intercept ExecuteWorkflow")
ret, wferr := w.Next.ExecuteWorkflow(ctx, in)
if wferr == nil || len(w.ctx.Actions) == 0 {
return ret, wferr
}

ctx, cancel := workflow.NewDisconnectedContext(ctx)
defer cancel()
for i := len(w.ctx.Actions) - 1; i >= 0; i-- {
a := w.ctx.Actions[i]
opt, ok := w.root.options.ActivityRegistry[a.ActivityType]
if !ok {
workflow.GetLogger(ctx).Warn("action in history has no compensate activity",
"activity_type", a.ActivityType)
continue
}

// only compensate action with success
if err := a.Future.Get(ctx, nil); err != nil {
continue
}

// add opts if not config
activityOpts := opt.ActivityOptions
if activityOpts == nil {
activityOpts = &defaultActivityOpts
}
ctx = workflow.WithActivityOptions(ctx, *activityOpts)

// use arg in action as default for compensate
arg := a.Arg
if opt.Convertor != nil {
arg, err = opt.Convertor(ctx, a.Future, arg)
if err != nil {
workflow.GetLogger(ctx).Error("failed to convert to compensate req", zap.Error(err))
return ret, multierr.Append(wferr, err)
}
}

if err := workflow.ExecuteActivity(ctx, opt.ActivityType, arg).Get(ctx, nil); err != nil {
ychensha marked this conversation as resolved.
Show resolved Hide resolved
return ret, multierr.Append(wferr, err)
}
}
return ret, wferr
}

func (w *workflowOutboundInterceptor) ExecuteActivity(
ctx workflow.Context,
activityType string,
args ...interface{},
) workflow.Future {
workflow.GetLogger(ctx).Debug("intercept ExecuteActivity")
f := w.Next.ExecuteActivity(ctx, activityType, args...)
if _, ok := w.root.options.ActivityRegistry[activityType]; ok {
ychensha marked this conversation as resolved.
Show resolved Hide resolved
workflow.GetLogger(ctx).Debug("save action future", "activity_type", activityType)
if len(args) != 1 {
ychensha marked this conversation as resolved.
Show resolved Hide resolved
f, set := workflow.NewFuture(ctx)
set.SetError(errors.New(malformedActivity))
return f
ychensha marked this conversation as resolved.
Show resolved Hide resolved
}
w.ctx.Actions = append(w.ctx.Actions, &action{
ActivityType: activityType,
Future: f,
Arg: args[0],
})
}

return f
}

func (w *workflowOutboundInterceptor) ExecuteLocalActivity(
ctx workflow.Context,
activityType string,
args ...interface{},
) workflow.Future {
workflow.GetLogger(ctx).Debug("intercept ExecuteLocalActivity")
f := w.Next.ExecuteLocalActivity(ctx, activityType, args...)
if _, ok := w.root.options.ActivityRegistry[activityType]; ok {
workflow.GetLogger(ctx).Debug("save action future", "activity_type", activityType)
if len(args) != 1 {
f, set := workflow.NewFuture(ctx)
set.SetError(errors.New(malformedActivity))
return f
}
w.ctx.Actions = append(w.ctx.Actions, &action{
ActivityType: activityType,
Future: f,
Arg: args[0],
})
}

return f
}
139 changes: 139 additions & 0 deletions saga/interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package saga

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.uber.org/zap"
)

type orderInfo struct {
ID string
IsDelete bool
}

var (
orders = map[string]*orderInfo{}
amount = 1
suite testsuite.WorkflowTestSuite
)

func init() {
logger, _ := zap.NewDevelopmentConfig().Build()
zap.ReplaceGlobals(logger)
}

func createOrder(ctx context.Context, amount int) (string, error) {
zap.L().Info("enter createOrder")
id := "abc"
orders[id] = &orderInfo{
ID: id,
}
return id, nil
}

func deleteOrder(ctx context.Context, id string) error {
zap.L().Info("enter deleteOrder", zap.String("id", id))
orders[id].IsDelete = true
return nil
}

func stockDeduct(ctx context.Context, in int) error {
zap.L().Info("enter stockDeduct")
amount -= in
return nil
}

func stockInc(ctx context.Context, in int) error {
zap.L().Info("enter stockInc")
amount += in
return nil
}

func createPay(ctx context.Context, in int) error {
return errors.New("must fail")
}

func testConvertor(ctx workflow.Context, f workflow.Future, req interface{}) (rsp interface{}, err error) {
zap.L().Info("convert", zap.Int("req", req.(int)))
var id string
if err := f.Get(ctx, &id); err != nil {
return nil, err
}
return id, nil
}

func testWorkflow(ctx workflow.Context, a int) error {
zap.L().Debug("enter workflow")
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
})
var id string
if err := workflow.ExecuteActivity(ctx, "createOrder", a).Get(ctx, &id); err != nil {
return err
}
zap.L().Debug("create order, id:", zap.String("id", id))
if err := workflow.ExecuteActivity(ctx, "stockDeduct", a).Get(ctx, nil); err != nil {
return err
}
if err := workflow.ExecuteActivity(ctx, "createPay", a).Get(ctx, nil); err != nil {
return err
}

return nil
}

func TestWorkflow(t *testing.T) {
env := suite.NewTestWorkflowEnvironment()
intercept, _ := NewInterceptor(InterceptorOptions{
WorkflowRegistry: map[string]TransactionOptions{
"testWorkflow": {},
},
ActivityRegistry: map[string]CompensationOptions{
"createOrder": {
ActivityType: "deleteOrder",
Convertor: testConvertor,
},
"stockDeduct": {
ActivityType: "stockInc",
},
},
})
env.SetWorkerOptions(worker.Options{Interceptors: []interceptor.WorkerInterceptor{intercept}})
env.RegisterWorkflowWithOptions(testWorkflow, workflow.RegisterOptions{
Name: "testWorkflow",
})
env.RegisterActivityWithOptions(createOrder, activity.RegisterOptions{
Name: "createOrder",
})
env.RegisterActivityWithOptions(deleteOrder, activity.RegisterOptions{
Name: "deleteOrder",
})
env.RegisterActivityWithOptions(stockDeduct, activity.RegisterOptions{
Name: "stockDeduct",
})
env.RegisterActivityWithOptions(stockInc, activity.RegisterOptions{
Name: "stockInc",
})
env.RegisterActivityWithOptions(createPay, activity.RegisterOptions{
Name: "createPay",
})

env.ExecuteWorkflow(testWorkflow, 1)
require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError())
require.Equal(t, 1, len(orders))
for _, order := range orders {
require.True(t, order.IsDelete)
}
require.Equal(t, 1, amount)
env.AssertExpectations(t)
}