forked from anacrolix/torrent
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpeer-conn-msg-writer.go
124 lines (113 loc) · 3.03 KB
/
peer-conn-msg-writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package torrent
import (
"bytes"
"io"
"time"
"github.com/anacrolix/chansync"
"github.com/anacrolix/log"
"github.com/anacrolix/sync"
pp "github.com/anacrolix/torrent/peer_protocol"
)
func (pc *PeerConn) initMessageWriter() {
w := &pc.messageWriter
*w = peerConnMsgWriter{
fillWriteBuffer: func() {
if pc.closed.IsSet() {
return
}
pc.fillWriteBuffer()
},
closed: &pc.closed,
logger: pc.logger,
w: pc.w,
keepAlive: func(lock bool, lockTorrent bool) bool {
return pc.useful(lock, lockTorrent)
},
writeBuffer: new(bytes.Buffer),
}
}
func (pc *PeerConn) startMessageWriter() {
pc.initMessageWriter()
go pc.messageWriterRunner()
}
func (pc *PeerConn) messageWriterRunner() {
defer pc.close(true)
pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
}
type peerConnMsgWriter struct {
// Must not be called with the local mutex held, as it will call back into the write method.
fillWriteBuffer func()
closed *chansync.SetOnce
logger log.Logger
w io.Writer
keepAlive func(lock bool, lockTorrent bool) bool
mu sync.Mutex
writeCond chansync.BroadcastCond
// Pointer so we can swap with the "front buffer".
writeBuffer *bytes.Buffer
}
// Routine that writes to the peer. Some of what to write is buffered by
// activity elsewhere in the Client, and some is determined locally when the
// connection is writable.
func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
lastWrite := time.Now()
keepAliveTimer := time.NewTimer(keepAliveTimeout)
frontBuf := new(bytes.Buffer)
for {
if cn.closed.IsSet() {
return
}
cn.fillWriteBuffer()
keepAlive := cn.keepAlive(true, true)
cn.mu.Lock()
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
torrent.Add("written keepalives", 1)
}
if cn.writeBuffer.Len() == 0 {
writeCond := cn.writeCond.Signaled()
cn.mu.Unlock()
select {
case <-cn.closed.Done():
case <-writeCond:
case <-keepAliveTimer.C:
}
continue
}
// Flip the buffers.
frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
cn.mu.Unlock()
if frontBuf.Len() == 0 {
panic("expected non-empty front buffer")
}
var err error
for frontBuf.Len() != 0 {
// Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59.
next := frontBuf.Next(1<<16 - 1)
var n int
n, err = cn.w.Write(next)
if err == nil && n != len(next) {
panic("expected full write")
}
if err != nil {
break
}
}
if err != nil {
cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
return
}
lastWrite = time.Now()
keepAliveTimer.Reset(keepAliveTimeout)
}
}
func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
cn.mu.Lock()
defer cn.mu.Unlock()
cn.writeBuffer.Write(msg.MustMarshalBinary())
cn.writeCond.Broadcast()
return !cn.writeBufferFull()
}
func (cn *peerConnMsgWriter) writeBufferFull() bool {
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}