Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(executor): executor hang when task panic happens #8

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions examples/example.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"log"
"os"
Expand All @@ -14,24 +13,24 @@ func main() {
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU() - 1))
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
gotaskflow.NewTask("A", func(ctx *context.Context) {
gotaskflow.NewTask("A", func() {
fmt.Println("A")
}),
gotaskflow.NewTask("B", func(ctx *context.Context) {
gotaskflow.NewTask("B", func() {
fmt.Println("B")
}),
gotaskflow.NewTask("C", func(ctx *context.Context) {
gotaskflow.NewTask("C", func() {
fmt.Println("C")
})

A1, B1, C1 :=
gotaskflow.NewTask("A1", func(ctx *context.Context) {
gotaskflow.NewTask("A1", func() {
fmt.Println("A1")
}),
gotaskflow.NewTask("B1", func(ctx *context.Context) {
gotaskflow.NewTask("B1", func() {
fmt.Println("B1")
}),
gotaskflow.NewTask("C1", func(ctx *context.Context) {
gotaskflow.NewTask("C1", func() {
fmt.Println("C1")
})
A.Precede(B)
Expand All @@ -42,13 +41,13 @@ func main() {

subflow := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
A2, B2, C2 :=
gotaskflow.NewTask("A2", func(ctx *context.Context) {
gotaskflow.NewTask("A2", func() {
fmt.Println("A2")
}),
gotaskflow.NewTask("B2", func(ctx *context.Context) {
gotaskflow.NewTask("B2", func() {
fmt.Println("B2")
}),
gotaskflow.NewTask("C2", func(ctx *context.Context) {
gotaskflow.NewTask("C2", func() {
fmt.Println("C2")
})
A2.Precede(B2)
Expand All @@ -58,13 +57,13 @@ func main() {

subflow2 := gotaskflow.NewSubflow("sub2", func(sf *gotaskflow.Subflow) {
A3, B3, C3 :=
gotaskflow.NewTask("A3", func(ctx *context.Context) {
gotaskflow.NewTask("A3", func() {
fmt.Println("A3")
}),
gotaskflow.NewTask("B3", func(ctx *context.Context) {
gotaskflow.NewTask("B3", func() {
fmt.Println("B3")
}),
gotaskflow.NewTask("C3", func(ctx *context.Context) {
gotaskflow.NewTask("C3", func() {
fmt.Println("C3")
// time.Sleep(10 * time.Second)
})
Expand Down
90 changes: 51 additions & 39 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ import (

type Executor interface {
Wait()
// WaitForAll()
Profile(w io.Writer) error
Run(tf *TaskFlow) Executor
// Observe()
}

type ExecutorImpl struct {
Expand Down Expand Up @@ -52,29 +50,29 @@ func (e *ExecutorImpl) Run(tf *TaskFlow) Executor {
return e
}

func (e *ExecutorImpl) invoke_graph(g *Graph, parentSpan *span) {
func (e *ExecutorImpl) invokeGraph(g *Graph, parentSpan *span) {
ctx := context.Background()
for {
g.scheCond.L.Lock()
for g.JoinCounter() != 0 && e.wq.Len() == 0 {
for g.JoinCounter() != 0 && e.wq.Len() == 0 && !g.canceled.Load() {
g.scheCond.Wait()
}
g.scheCond.L.Unlock()

if g.JoinCounter() == 0 {
if g.JoinCounter() == 0 || g.canceled.Load() {
break
}

node := e.wq.PeakAndTake() // hang
e.invoke_node(&ctx, node, parentSpan)
e.invokeNode(&ctx, node, parentSpan)
}
}

func (e *ExecutorImpl) invoke(tf *TaskFlow) {
e.invoke_graph(tf.graph, nil)
e.invokeGraph(tf.graph, nil)
}

func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node, parentSpan *span) {
func (e *ExecutorImpl) invokeNode(ctx *context.Context, node *Node, parentSpan *span) {
// do job
switch p := node.ptr.(type) {
case *Static:
Expand All @@ -83,25 +81,30 @@ func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node, parentSpan
typ: NodeStatic,
name: node.name,
}, begin: time.Now(), parent: parentSpan}

defer func() {
span.end = time.Now()
span.extra.success = true
e.profiler.AddSpan(&span)
if r := recover(); r != nil {
fmt.Println("node", node.name, "recovered ", r)
node.g.canceled.Store(true)
} else {
e.profiler.AddSpan(&span) // remove canceled node span
}

e.wg.Done()
node.drop()
for _, n := range node.successors {
if n.JoinCounter() == 0 {
e.schedule(n)
}
}
node.g.scheCond.Signal()
}()

defer e.wg.Done()
node.state.Store(kNodeStateRunning)
defer node.state.Store(kNodeStateFinished)

p.handle(ctx)
node.drop()
for _, n := range node.successors {
// fmt.Println("put", n.Name)
if n.JoinCounter() == 0 {
e.schedule(n)
}
}
node.g.scheCond.Signal()
p.handle()
node.state.Store(kNodeStateFinished)
})
case *Subflow:
e.pool.Go(func() {
Expand All @@ -112,49 +115,58 @@ func (e *ExecutorImpl) invoke_node(ctx *context.Context, node *Node, parentSpan
defer func() {
span.end = time.Now()
span.extra.success = true
e.profiler.AddSpan(&span)
if r := recover(); r != nil {
fmt.Println("subflow", node.name, "recovered ", r)
node.g.canceled.Store(true)
p.g.canceled.Store(true)
} else {
e.profiler.AddSpan(&span) // remove canceled node span
}
e.wg.Done()
e.scheduleGraph(p.g, &span)
node.drop()

for _, n := range node.successors {
if n.JoinCounter() == 0 {
e.schedule(n)
}
}
node.g.scheCond.Signal()
}()

defer e.wg.Done()
node.state.Store(kNodeStateRunning)
defer node.state.Store(kNodeStateFinished)

if !p.g.instancelized {
p.handle(p)
}
p.g.instancelized = true
node.state.Store(kNodeStateFinished)

e.schedule_graph(p.g, &span)
node.drop()

for _, n := range node.successors {
if n.JoinCounter() == 0 {
e.schedule(n)
}
}

node.g.scheCond.Signal()
})
default:
fmt.Println("exit: ", node.name)
panic("do nothing")
panic("unsupported node")
}
}

func (e *ExecutorImpl) schedule(node *Node) {
if node.g.canceled.Load() {
node.g.scheCond.Signal()
fmt.Println("node cannot be scheduled, cuz graph canceled", node.name)
return
}

e.wg.Add(1)
e.wq.Put(node)
node.state.Store(kNodeStateWaiting)
node.g.scheCond.Signal()
}

func (e *ExecutorImpl) schedule_graph(g *Graph, parentSpan *span) {
func (e *ExecutorImpl) scheduleGraph(g *Graph, parentSpan *span) {
g.setup()
for _, node := range g.entries {
e.schedule(node)
}

e.invoke_graph(g, parentSpan)
e.invokeGraph(g, parentSpan)

g.scheCond.Signal()
}
Expand Down
13 changes: 6 additions & 7 deletions executor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gotaskflow_test

import (
"context"
"fmt"
"os"
"runtime"
Expand All @@ -14,24 +13,24 @@ func TestExecutor(t *testing.T) {
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
gotaskflow.NewTask("A", func(ctx *context.Context) {
gotaskflow.NewTask("A", func() {
fmt.Println("A")
}),
gotaskflow.NewTask("B", func(ctx *context.Context) {
gotaskflow.NewTask("B", func() {
fmt.Println("B")
}),
gotaskflow.NewTask("C", func(ctx *context.Context) {
gotaskflow.NewTask("C", func() {
fmt.Println("C")
})

A1, B1, C1 :=
gotaskflow.NewTask("A1", func(ctx *context.Context) {
gotaskflow.NewTask("A1", func() {
fmt.Println("A1")
}),
gotaskflow.NewTask("B1", func(ctx *context.Context) {
gotaskflow.NewTask("B1", func() {
fmt.Println("B1")
}),
gotaskflow.NewTask("C1", func(ctx *context.Context) {
gotaskflow.NewTask("C1", func() {
fmt.Println("C1")
})
A.Precede(B)
Expand Down
6 changes: 2 additions & 4 deletions flow.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package gotaskflow

import "context"

var FlowBuilder = flowBuilder{}

type flowBuilder struct{}

type Static struct {
handle func(ctx *context.Context)
handle func()
}

type Subflow struct {
Expand All @@ -21,7 +19,7 @@ func (sf *Subflow) Push(tasks ...*Task) {
}
}

func (fb *flowBuilder) NewStatic(name string, f func(ctx *context.Context)) *Node {
func (fb *flowBuilder) NewStatic(name string, f func()) *Node {
node := newNode(name)
node.ptr = &Static{
handle: f,
Expand Down
20 changes: 15 additions & 5 deletions graph.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package gotaskflow

import (
"fmt"
"sync"
"sync/atomic"

"github.com/noneback/go-taskflow/utils"
)
Expand All @@ -13,6 +15,7 @@ type Graph struct {
entries []*Node
scheCond *sync.Cond
instancelized bool
canceled atomic.Bool // only changes when task in graph panic
}

func newGraph(name string) *Graph {
Expand Down Expand Up @@ -70,11 +73,18 @@ func (g *Graph) instancelize() {
}

// only for visualizer
func (g *Graph) topologicalSort() ([]*Node, bool) {
g.instancelize()
func (g *Graph) topologicalSort() (sorted []*Node, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("instancelize may failed or paniced")
return
}
}()

g.instancelize() // may require panic recover
indegree := map[*Node]int{} // Node -> indegree
zeros := make([]*Node, 0) // zero deps
sorted := make([]*Node, 0, len(g.nodes))
sorted = make([]*Node, 0, len(g.nodes))

for _, node := range g.nodes {
set := map[*Node]struct{}{}
Expand Down Expand Up @@ -104,9 +114,9 @@ func (g *Graph) topologicalSort() ([]*Node, bool) {

for _, node := range g.nodes {
if indegree[node] > 0 {
return nil, false
return nil, fmt.Errorf("graph has cycles")
}
}

return sorted, true
return
}
Loading
Loading