Skip to content

Commit

Permalink
flow improvements (#52)
Browse files Browse the repository at this point in the history
* flow improvements

Signed-off-by: Vasiliy Tolstov <[email protected]>
  • Loading branch information
vtolstov authored Jun 30, 2021
1 parent 2a54863 commit bd4d4c3
Show file tree
Hide file tree
Showing 4 changed files with 620 additions and 3 deletions.
34 changes: 34 additions & 0 deletions flow/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package flow

import (
"context"
)

type flowKey struct{}

// FromContext returns Flow from context
func FromContext(ctx context.Context) (Flow, bool) {
if ctx == nil {
return nil, false
}
c, ok := ctx.Value(flowKey{}).(Flow)
return c, ok
}

// NewContext stores Flow to context
func NewContext(ctx context.Context, f Flow) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, flowKey{}, f)
}

// SetOption returns a function to setup a context with given value
func SetOption(k, v interface{}) Option {
return func(o *Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
319 changes: 319 additions & 0 deletions flow/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
package flow

import (
"context"
"fmt"
"sync"

"github.com/google/uuid"
"github.com/silas/dag"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
)

type microFlow struct {
opts Options
}

type microWorkflow struct {
id string
g *dag.AcyclicGraph
init bool
sync.RWMutex
opts Options
steps map[string]Step
}

func (w *microWorkflow) ID() string {
return w.id
}

func (w *microWorkflow) Steps() [][]Step {
return nil
}

func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error {
return nil
}

func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error {
return nil
}

func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) {
w.Lock()
if !w.init {
if err := w.g.Validate(); err != nil {
w.Unlock()
return "", err
}
w.g.TransitiveReduction()
w.init = true
}
w.Unlock()

uid, err := uuid.NewRandom()
if err != nil {
return "", err
}

options := NewExecuteOptions(opts...)
var steps [][]Step
fn := func(n dag.Vertex, idx int) error {
if idx == 0 {
steps = make([][]Step, 1)
steps[0] = make([]Step, 0, 1)
} else if idx >= len(steps) {
tsteps := make([][]Step, idx+1)
copy(tsteps, steps)
steps = tsteps
steps[idx] = make([]Step, 0, 1)
}
steps[idx] = append(steps[idx], n.(Step))
return nil
}

var root dag.Vertex
if options.Start != "" {
var ok bool
w.RLock()
root, ok = w.steps[options.Start]
w.RUnlock()
if !ok {
return "", ErrStepNotExists
}
} else {
root, err = w.g.Root()
if err != nil {
return "", err
}
}
if options.Reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
if err != nil {
return "", err
}

var wg sync.WaitGroup
cherr := make(chan error, 1)
defer close(cherr)

nctx, cancel := context.WithCancel(ctx)
defer cancel()
nopts := make([]ExecuteOption, 0, len(opts)+5)
nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store))

go func() {
for idx := range steps {
wg.Add(len(steps[idx]))
for nidx := range steps[idx] {
go func(step Step) {
defer wg.Done()
if err = step.Execute(nctx, req, nopts...); err != nil {
cherr <- err
cancel()
}
}(steps[idx][nidx])
}
wg.Wait()
}
cherr <- nil
}()

err = <-cherr

return uid.String(), err
}

func NewFlow(opts ...Option) Flow {
options := NewOptions(opts...)
return &microFlow{opts: options}
}

func (f *microFlow) Options() Options {
return f.opts
}

func (f *microFlow) Init(opts ...Option) error {
for _, o := range opts {
o(&f.opts)
}
if err := f.opts.Client.Init(); err != nil {
return err
}
if err := f.opts.Tracer.Init(); err != nil {
return err
}
if err := f.opts.Logger.Init(); err != nil {
return err
}
if err := f.opts.Meter.Init(); err != nil {
return err
}
if err := f.opts.Store.Init(); err != nil {
return err
}
return nil
}

func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) {
return nil, nil
}

func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) {
w := &microWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))}

for _, s := range steps {
w.steps[s.String()] = s
w.g.Add(s)
}

for _, dst := range steps {
for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
return nil, ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
}
}

if err := w.g.Validate(); err != nil {
return nil, err
}
w.g.TransitiveReduction()

w.init = true

return w, nil
}

func (f *microFlow) WorkflowRemove(ctx context.Context, id string) error {
return nil
}

func (f *microFlow) WorkflowSave(ctx context.Context, w Workflow) error {
return nil
}

func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, error) {
return nil, nil
}

type microCallStep struct {
opts StepOptions
service string
method string
}

func (s *microCallStep) ID() string {
return s.String()
}

func (s *microCallStep) Options() StepOptions {
return s.opts
}

func (s *microCallStep) Endpoint() string {
return s.method
}

func (s *microCallStep) Requires() []string {
return s.opts.Requires
}

func (s *microCallStep) Require(steps ...Step) error {
for _, step := range steps {
s.opts.Requires = append(s.opts.Requires, step.String())
}
return nil
}

func (s *microCallStep) String() string {
if s.opts.ID != "" {
return s.opts.ID
}
return fmt.Sprintf("%s.%s", s.service, s.method)
}

func (s *microCallStep) Name() string {
return s.String()
}

func (s *microCallStep) Hashcode() interface{} {
return s.String()
}

func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
options := NewExecuteOptions(opts...)
if options.Client == nil {
return fmt.Errorf("client not set")
}
rsp := &codec.Frame{}
copts := []client.CallOption{client.WithRetries(0)}
if options.Timeout > 0 {
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
}
err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp)
return err
}

type microPublishStep struct {
opts StepOptions
topic string
}

func (s *microPublishStep) ID() string {
return s.String()
}

func (s *microPublishStep) Options() StepOptions {
return s.opts
}

func (s *microPublishStep) Endpoint() string {
return s.topic
}

func (s *microPublishStep) Requires() []string {
return s.opts.Requires
}

func (s *microPublishStep) Require(steps ...Step) error {
for _, step := range steps {
s.opts.Requires = append(s.opts.Requires, step.String())
}
return nil
}

func (s *microPublishStep) String() string {
if s.opts.ID != "" {
return s.opts.ID
}
return fmt.Sprintf("%s", s.topic)
}

func (s *microPublishStep) Name() string {
return s.String()
}

func (s *microPublishStep) Hashcode() interface{} {
return s.String()
}

func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
return nil
}

func NewCallStep(service string, method string, opts ...StepOption) Step {
options := NewStepOptions(opts...)
return &microCallStep{service: service, method: method, opts: options}
}

func NewPublishStep(topic string, opts ...StepOption) Step {
options := NewStepOptions(opts...)
return &microPublishStep{topic: topic, opts: options}
}
Loading

0 comments on commit bd4d4c3

Please sign in to comment.