forked from testground/sync-service
-
Notifications
You must be signed in to change notification settings - Fork 0
/
barrier.go
111 lines (88 loc) · 1.89 KB
/
barrier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package sync
import (
"context"
"sync"
)
// barrier represents a single barrier with multiple waiters.
type barrier struct {
sync.Mutex
count int
zcs []*zeroCounter
}
// wait waits for the barrier to reach a certain target.
func (b *barrier) wait(ctx context.Context, target int) error {
b.Lock()
// If we're already over the target, return immediately.
if target <= b.count {
b.Unlock()
return nil
}
// Create a zero counter to wait for target - count elements to signal entry.
// It also returns if the context fires.
zc := newZeroCounter(ctx, target-b.count)
// Store the zero counter, unlock the barrier and wait for it to be reached.
b.zcs = append(b.zcs, zc)
b.Unlock()
return zc.wait()
}
// inc increments the barrier by one unit. To do so, we increment
// the counter and tell all the channels we received a new entry.
func (b *barrier) inc() int {
b.Lock()
defer b.Unlock()
b.count += 1
count := b.count
for _, zc := range b.zcs {
zc.dec()
}
return count
}
// isDone returns true if all the counters for this barrier have reached zero.
func (b *barrier) isDone() bool {
b.Lock()
defer b.Unlock()
for _, zc := range b.zcs {
if !zc.done() {
return false
}
}
return true
}
type zeroCounter struct {
sync.Mutex
ctx context.Context
cancel context.CancelFunc
count int
}
func newZeroCounter(ctx context.Context, target int) *zeroCounter {
ctx, cancel := context.WithCancel(ctx)
return &zeroCounter{
count: target,
ctx: ctx,
cancel: cancel,
}
}
func (w *zeroCounter) dec() {
w.Lock()
defer w.Unlock()
if w.count <= 0 {
return
}
w.count -= 1
if w.count <= 0 {
w.cancel()
}
}
func (w *zeroCounter) wait() error {
<-w.ctx.Done()
// If the counter is done, i.e., if it
// reached 0 or lower, we do not return
// an error.
if w.done() {
return nil
}
return w.ctx.Err()
}
func (w *zeroCounter) done() bool {
return w.count <= 0
}