-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathengineForkJoin.go
112 lines (94 loc) · 2.92 KB
/
engineForkJoin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package sup
import (
"context"
"sync/atomic"
)
type superviseFJ struct {
name string
tasks []*boundTask
phase uint32
reportCh <-chan reportMsg
groupCancel func()
awaiting map[*boundTask]struct{}
results map[*boundTask]*ErrChild
firstErr error
}
func (mgr superviseFJ) Phase() Phase {
return Phase(atomic.LoadUint32(&mgr.phase))
}
func (mgr superviseFJ) init(tasks []Task) Supervisor {
mgr.phase = uint32(Phase_init)
mgr.tasks = bindTasks(tasks)
return &mgr
}
func (mgr superviseFJ) Name() string {
return mgr.name
}
func (mgr *superviseFJ) Run(parentCtx context.Context) error {
// Enforce single-run under mutex for sanity.
ok := atomic.CompareAndSwapUint32(&mgr.phase, uint32(Phase_init), uint32(Phase_collecting))
if !ok {
panic("supervisor can only be Run() once!")
}
// Allocate statekeepers.
mgr.awaiting = make(map[*boundTask]struct{}, len(mgr.tasks))
mgr.results = make(map[*boundTask]*ErrChild, len(mgr.tasks))
// Step through phases (the halting phase will return a nil next phase).
for phase := mgr._running; phase != nil; {
phase = phase(parentCtx)
}
return mgr.firstErr
}
func (mgr *superviseFJ) _running(parentCtx context.Context) phaseFn {
// Build the child status channel we'll be watching,
// and the groupCtx which will let us cancel all children in bulk.
reportCh := make(chan reportMsg)
mgr.reportCh = reportCh
groupCtx, groupCancel := context.WithCancel(parentCtx)
mgr.groupCancel = groupCancel
// Launch all child goroutines... then move immediately on to "collecting".
// The joy of a fork-join pattern is this loop is simple.
for _, task := range mgr.tasks {
mgr.awaiting[task] = struct{}{}
go childLaunch(groupCtx, reportCh, task)
}
return mgr._collecting
}
func (mgr *superviseFJ) _collecting(parentCtx context.Context) phaseFn {
atomic.StoreUint32(&mgr.phase, uint32(Phase_collecting))
// We're not accepting new tasks anymore, so this loop is now only
// for collecting results or accepting a group cancel instruction;
// and it can move directly to halt if there are no disruptions.
for len(mgr.awaiting) > 0 {
select {
case report := <-mgr.reportCh:
delete(mgr.awaiting, report.task)
mgr.results[report.task] = report.result
if report.result != nil {
mgr.firstErr = report.result
return mgr._halting
}
case <-parentCtx.Done():
mgr.firstErr = parentCtx.Err()
return mgr._halting
}
}
return mgr._halt
}
func (mgr *superviseFJ) _halting(_ context.Context) phaseFn {
atomic.StoreUint32(&mgr.phase, uint32(Phase_halting))
// We're halting, not entirely happily. Cancel all children.
mgr.groupCancel()
// Keep watching reports.
for len(mgr.awaiting) > 0 {
report := <-mgr.reportCh
delete(mgr.awaiting, report.task)
mgr.results[report.task] = report.result
}
// Move on.
return mgr._halt
}
func (mgr *superviseFJ) _halt(_ context.Context) phaseFn {
atomic.StoreUint32(&mgr.phase, uint32(Phase_halt))
return nil
}