forked from jiyeyuran/mediasoup-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdataproducer_test.go
254 lines (221 loc) · 8.51 KB
/
dataproducer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
package mediasoup
import (
"testing"
"github.com/stretchr/testify/suite"
)
func TestDataProducerTestingSuite(t *testing.T) {
suite.Run(t, new(DataProducerTestingSuite))
}
type DataProducerTestingSuite struct {
TestingSuite
worker *Worker
router *Router
transport1 ITransport
transport2 ITransport
}
func (suite *DataProducerTestingSuite) SetupTest() {
suite.worker = CreateTestWorker()
suite.router = CreateRouter(suite.worker)
suite.transport1, _ = suite.router.CreateWebRtcTransport(WebRtcTransportOptions{
ListenIps: []TransportListenIp{
{Ip: "127.0.0.1"},
},
EnableSctp: true,
})
suite.transport2, _ = suite.router.CreatePlainTransport(PlainTransportOptions{
ListenIp: TransportListenIp{
Ip: "127.0.0.1",
},
EnableSctp: true,
})
}
func (suite *DataProducerTestingSuite) TearDownTest() {
suite.worker.Close()
}
func (suite *DataProducerTestingSuite) TestTransport1ProduceDataSucceeds() {
onObserverNewDataProducer := NewMockFunc(suite.T())
suite.transport1.Observer().Once("newdataproducer", onObserverNewDataProducer.Fn())
dataProducer1, err := suite.transport1.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{StreamId: 666},
Label: "foo",
Protocol: "bar",
AppData: H{"foo": 1, "bar": "2"},
})
suite.NoError(err)
onObserverNewDataProducer.ExpectCalledTimes(1)
onObserverNewDataProducer.ExpectCalledWith(dataProducer1)
suite.False(dataProducer1.Closed())
suite.Equal(DataProducerType_Sctp, dataProducer1.Type())
suite.NotNil(dataProducer1.SctpStreamParameters())
suite.EqualValues(666, dataProducer1.SctpStreamParameters().StreamId)
suite.True(*dataProducer1.SctpStreamParameters().Ordered)
suite.Zero(dataProducer1.SctpStreamParameters().MaxPacketLifeTime)
suite.Zero(dataProducer1.SctpStreamParameters().MaxRetransmits)
suite.Equal("foo", dataProducer1.Label())
suite.Equal("bar", dataProducer1.Protocol())
suite.Equal(H{"foo": 1, "bar": "2"}, dataProducer1.AppData())
routerDump, _ := suite.router.Dump()
expectedDump := RouterDump{
MapDataProducerIdDataConsumerIds: map[string][]string{
dataProducer1.Id(): {},
},
MapDataConsumerIdDataProducerId: map[string]string{},
}
suite.Equal(expectedDump.MapDataProducerIdDataConsumerIds, routerDump.MapDataProducerIdDataConsumerIds)
suite.Equal(expectedDump.MapDataConsumerIdDataProducerId, routerDump.MapDataConsumerIdDataProducerId)
transportDump, _ := suite.transport1.Dump()
suite.Equal(suite.transport1.Id(), transportDump.Id)
suite.Equal([]string{dataProducer1.Id()}, transportDump.DataProducerIds)
suite.Empty(transportDump.DataConsumerIds)
}
func (suite *DataProducerTestingSuite) TestTransport2ProduceDataSucceeds() {
onObserverNewDataProducer := NewMockFunc(suite.T())
suite.transport2.Observer().Once("newdataproducer", onObserverNewDataProducer.Fn())
dataProducer2, err := suite.transport2.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{StreamId: 777, MaxRetransmits: 3},
Label: "foo",
Protocol: "bar",
AppData: H{"foo": 1, "bar": "2"},
})
suite.NoError(err)
onObserverNewDataProducer.ExpectCalledTimes(1)
onObserverNewDataProducer.ExpectCalledWith(dataProducer2)
suite.False(dataProducer2.Closed())
suite.Equal(DataProducerType_Sctp, dataProducer2.Type())
suite.NotNil(dataProducer2.SctpStreamParameters())
suite.EqualValues(777, dataProducer2.SctpStreamParameters().StreamId)
suite.False(*dataProducer2.SctpStreamParameters().Ordered)
suite.Zero(dataProducer2.SctpStreamParameters().MaxPacketLifeTime)
suite.EqualValues(3, dataProducer2.SctpStreamParameters().MaxRetransmits)
suite.Equal("foo", dataProducer2.Label())
suite.Equal("bar", dataProducer2.Protocol())
suite.Equal(H{"foo": 1, "bar": "2"}, dataProducer2.AppData())
routerDump, _ := suite.router.Dump()
expectedDump := RouterDump{
MapDataProducerIdDataConsumerIds: map[string][]string{
dataProducer2.Id(): {},
},
MapDataConsumerIdDataProducerId: map[string]string{},
}
suite.Equal(expectedDump.MapDataProducerIdDataConsumerIds, routerDump.MapDataProducerIdDataConsumerIds)
suite.Equal(expectedDump.MapDataConsumerIdDataProducerId, routerDump.MapDataConsumerIdDataProducerId)
transportDump, _ := suite.transport2.Dump()
suite.Equal(suite.transport2.Id(), transportDump.Id)
suite.Equal([]string{dataProducer2.Id()}, transportDump.DataProducerIds)
suite.Empty(transportDump.DataConsumerIds)
}
func (suite *DataProducerTestingSuite) TestProduceDataTypeError() {
_, err := suite.transport1.ProduceData(DataProducerOptions{})
suite.Error(err)
}
func (suite *DataProducerTestingSuite) TestProduceDataWithAlreadyUsedStreamIdWithError() {
suite.transport1.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{
StreamId: 666,
},
})
_, err := suite.transport1.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{
StreamId: 666,
},
})
suite.Error(err)
}
func (suite *DataProducerTestingSuite) TestProduceDataWithOrderedAndMaxPacketLifeTimeRejectsWithTypeErrorError() {
_, err := suite.transport1.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{
StreamId: 999,
Ordered: Bool(true),
MaxPacketLifeTime: 4000,
},
})
suite.Error(err)
}
func (suite *DataProducerTestingSuite) TestDataProducerDumpSucceeds() {
dataProducer1, err := suite.transport1.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{
StreamId: 666,
},
Label: "foo",
Protocol: "bar",
})
suite.NoError(err)
data, err := dataProducer1.Dump()
suite.NoError(err)
suite.Equal(dataProducer1.Id(), data.Id)
suite.Equal("sctp", data.Type)
suite.NotNil(dataProducer1.SctpStreamParameters())
suite.Equal(dataProducer1.SctpStreamParameters().StreamId, data.SctpStreamParameters.StreamId)
suite.True(*data.SctpStreamParameters.Ordered)
suite.Zero(data.SctpStreamParameters.MaxPacketLifeTime)
suite.Zero(data.SctpStreamParameters.MaxRetransmits)
suite.Equal("foo", data.Label)
suite.Equal("bar", data.Protocol)
dataProducer2, _ := suite.transport2.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{
StreamId: 777,
MaxRetransmits: 3,
},
Label: "foo",
Protocol: "bar",
AppData: H{"foo": 1, "bar": "2"},
})
data, err = dataProducer2.Dump()
suite.NoError(err)
suite.Equal(dataProducer2.Id(), data.Id)
suite.Equal("sctp", data.Type)
suite.NotNil(dataProducer2.SctpStreamParameters())
suite.Equal(dataProducer2.SctpStreamParameters().StreamId, data.SctpStreamParameters.StreamId)
suite.False(*data.SctpStreamParameters.Ordered)
suite.Zero(data.SctpStreamParameters.MaxPacketLifeTime)
suite.EqualValues(3, data.SctpStreamParameters.MaxRetransmits)
suite.Equal("foo", data.Label)
suite.Equal("bar", data.Protocol)
}
func (suite *DataProducerTestingSuite) TestDataProducerCloseSucceeds() {
onObserverClose := NewMockFunc(suite.T())
dataProducer1, _ := suite.transport1.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{
StreamId: 666,
},
})
dataProducer1.Observer().Once("close", onObserverClose.Fn())
dataProducer1.Close()
onObserverClose.ExpectCalledTimes(1)
suite.True(dataProducer1.Closed())
routerDump, _ := suite.router.Dump()
expectedDump := RouterDump{
MapDataProducerIdDataConsumerIds: map[string][]string{},
MapDataConsumerIdDataProducerId: map[string]string{},
}
suite.Equal(expectedDump.MapDataProducerIdDataConsumerIds, routerDump.MapDataProducerIdDataConsumerIds)
suite.Equal(expectedDump.MapDataConsumerIdDataProducerId, routerDump.MapDataConsumerIdDataProducerId)
transportDump, _ := suite.transport1.Dump()
suite.Equal(suite.transport1.Id(), transportDump.Id)
suite.Empty(transportDump.ProducerIds)
suite.Empty(transportDump.DataProducerIds)
}
func (suite *DataProducerTestingSuite) TestProducerMethodsRejectIfClosed() {
dataProducer1, _ := suite.transport2.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{
StreamId: 666,
},
})
dataProducer1.Close()
_, err := dataProducer1.Dump()
suite.Error(err)
_, err = dataProducer1.GetStats()
suite.Error(err)
}
func (suite *DataProducerTestingSuite) TestDataProducerEmitsTransportcloseIfTransportIsClosed() {
dataProducer1, _ := suite.transport2.ProduceData(DataProducerOptions{
SctpStreamParameters: &SctpStreamParameters{
StreamId: 666,
},
})
onObserverClose := NewMockFunc(suite.T())
dataProducer1.Observer().Once("close", onObserverClose.Fn())
dataProducer1.Close()
onObserverClose.ExpectCalledTimes(1)
suite.True(dataProducer1.Closed())
}