forked from Azure/go-amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanualCreditor.go
99 lines (79 loc) · 2.21 KB
/
manualCreditor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package amqp
import (
"context"
"errors"
"sync"
)
type manualCreditor struct {
mu sync.Mutex
// future values for the next flow frame.
pendingDrain bool
creditsToAdd uint32
// drained is set when a drain is active and we're waiting
// for the corresponding flow from the remote.
drained chan struct{}
}
var errLinkDraining = errors.New("link is currently draining, no credits can be added")
var errAlreadyDraining = errors.New("drain already in process")
// EndDrain ends the current drain, unblocking any active Drain calls.
func (mc *manualCreditor) EndDrain() {
mc.mu.Lock()
defer mc.mu.Unlock()
if mc.drained != nil {
close(mc.drained)
mc.drained = nil
}
}
// FlowBits gets gets the proper values for the next flow frame
// and resets the internal state.
// Returns:
// (drain: true, credits: 0) if a flow is needed (drain)
// (drain: false, credits > 0) if a flow is needed (issue credit)
// (drain: false, credits == 0) if no flow needed.
func (mc *manualCreditor) FlowBits(currentCredits uint32) (bool, uint32) {
mc.mu.Lock()
defer mc.mu.Unlock()
drain := mc.drained != nil
var credits uint32
// either:
// drain is true (ie, we're going to send a drain frame, and the credits for it should be 0)
// mc.creditsToAdd == 0 (no flow frame needed, no new credits are being issued)
if drain || mc.creditsToAdd == 0 {
credits = 0
} else {
credits = mc.creditsToAdd + currentCredits
}
mc.creditsToAdd = 0
mc.pendingDrain = false
return drain, credits
}
// Drain initiates a drain and blocks until EndDrain is called.
func (mc *manualCreditor) Drain(ctx context.Context) error {
mc.mu.Lock()
if mc.drained != nil {
mc.mu.Unlock()
return errAlreadyDraining
}
mc.drained = make(chan struct{})
// use a local copy to avoid racing with EndDrain()
drained := mc.drained
mc.mu.Unlock()
// send drain, wait for responding flow frame
select {
case <-drained:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// IssueCredit queues up additional credits to be requested at the next
// call of FlowBits()
func (mc *manualCreditor) IssueCredit(credits uint32) error {
mc.mu.Lock()
defer mc.mu.Unlock()
if mc.drained != nil {
return errLinkDraining
}
mc.creditsToAdd += credits
return nil
}