-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsend_fec_stream_test.go
144 lines (125 loc) · 4.36 KB
/
send_fec_stream_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package quic
import (
"crypto/rand"
mrand "math/rand"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/quic-go/quic-go/internal/mocks"
"github.com/quic-go/quic-go/internal/protocol"
"go.uber.org/mock/gomock"
)
/*
TODO
- Use the test from send_stream_test.go It("retransmits data until everything has been acknowledged", func() { ... } to drop random source symbols in order to trigger the repair functionality
go run github.com/onsi/ginkgo/v2/ginkgo -r -v -cover -randomize-all -randomize-suites -trace -skip-package integrationtests
Expecting 1357 but got 1343. That's a difference of 14 bytes.
maybeGetShortHeaderPacket
*/
var _ = Describe("Send FEC Stream", func() {
// TODO (ddritzenhoff) used to be 1337
const streamID protocol.StreamID = 1338
var (
str *sendStream
// strWithTimeout io.Writer // str wrapped with gbytes.TimeoutWriter
mockFC *mocks.MockStreamFlowController
mockSender *MockStreamSender
)
BeforeEach(func() {
mockSender = NewMockStreamSender(mockCtrl)
mockFC = mocks.NewMockStreamFlowController(mockCtrl)
// str = newSendStream(streamID, mockSender, mockFC)
str = newSendStreamWithFEC(streamID, mockSender, mockFC, true)
// timeout := scaleDuration(250 * time.Millisecond)
// strWithTimeout = gbytes.TimeoutWriter(str, timeout)
})
// This test is kind of an integration test.
// It writes 4 MB of data, and pops STREAM frames that sometimes are and sometimes aren't limited by flow control.
// Half of these STREAM frames are then received and their content saved, while the other half is reported lost
// and has to be retransmitted.
It("retransmits data until everything has been acknowledged", func() {
const dataLen = 1 << 22 // 4 MB
mockSender.EXPECT().onHasStreamData(streamID).AnyTimes()
mockFC.EXPECT().SendWindowSize().DoAndReturn(func() protocol.ByteCount {
return protocol.ByteCount(mrand.Intn(500)) + 50
}).AnyTimes()
mockFC.EXPECT().AddBytesSent(gomock.Any()).AnyTimes()
data := make([]byte, dataLen)
_, err := rand.Read(data)
Expect(err).ToNot(HaveOccurred())
done := make(chan struct{})
go func() {
defer GinkgoRecover()
defer close(done)
_, err := str.Write(data)
Expect(err).ToNot(HaveOccurred())
str.Close()
}()
var completed bool
mockSender.EXPECT().onStreamCompleted(streamID).Do(func(protocol.StreamID) { completed = true })
received := make([]byte, dataLen)
for {
if completed {
break
}
f, ok, _ := str.popStreamFrame(protocol.ByteCount(mrand.Intn(300)+100), protocol.Version1)
if !ok {
continue
}
sf := f.Frame
// 50%: acknowledge the frame and save the data
// 50%: lose the frame
if mrand.Intn(100) < 50 {
copy(received[sf.Offset:sf.Offset+sf.DataLen()], sf.Data)
f.Handler.OnAcked(f.Frame)
} else {
f.Handler.OnLost(f.Frame)
}
}
Expect(received).To(Equal(data))
})
// This test is kind of an integration test.
// It writes 4 MB of data, and pops STREAM frames that sometimes are and sometimes aren't limited by flow control.
// Half of these STREAM frames are then received and their content saved, while the other half is reported lost
// and has to be retransmitted.
It("purposely drop the second packet to force a repair", func() {
const dataLen = 2000 // 4 MB
mockSender.EXPECT().onHasStreamData(streamID).AnyTimes()
mockFC.EXPECT().SendWindowSize().DoAndReturn(func() protocol.ByteCount {
return protocol.ByteCount(mrand.Intn(500)) + 50
}).AnyTimes()
mockFC.EXPECT().AddBytesSent(gomock.Any()).AnyTimes()
data := make([]byte, dataLen)
_, err := rand.Read(data)
Expect(err).ToNot(HaveOccurred())
done := make(chan struct{})
go func() {
defer GinkgoRecover()
defer close(done)
_, err := str.Write(data)
Expect(err).ToNot(HaveOccurred())
str.Close()
}()
var completed bool
mockSender.EXPECT().onStreamCompleted(streamID).Do(func(protocol.StreamID) { completed = true })
received := make([]byte, dataLen)
for {
if completed {
break
}
f, ok, _ := str.popStreamFrame(protocol.ByteCount(mrand.Intn(300)+100), protocol.Version1)
if !ok {
continue
}
sf := f.Frame
// 50%: acknowledge the frame and save the data
// 50%: lose the frame
if mrand.Intn(100) < 50 {
copy(received[sf.Offset:sf.Offset+sf.DataLen()], sf.Data)
f.Handler.OnAcked(f.Frame)
} else {
f.Handler.OnLost(f.Frame)
}
}
Expect(received).To(Equal(data))
})
})