-
Notifications
You must be signed in to change notification settings - Fork 0
/
commands.go
357 lines (291 loc) · 10.6 KB
/
commands.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
package rtimpus
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/pwbh/rtimpus/amf"
"github.com/pwbh/rtimpus/utils"
)
type Connect struct {
CommandName string
TransactionID float64
CommandObject amf.Object
}
type CallResponse struct {
ProcedureName string
TransactionID float64
CommandObject amf.Object
Response amf.Object
}
type CreateStream struct {
CommandName string
TransactionID float64
CommandObject amf.Object
}
const (
CONNECT = "connect"
CREATE_STREAM = "createStream"
)
func UnmarshalCommand(chunk *Chunk) (interface{}, error) {
buffer := bytes.NewBuffer(chunk.payload.data)
decoder := amf.NewAMF0Decoder(buffer)
value, err := decoder.Decode()
if err != nil {
return nil, err
}
switch value {
case CONNECT:
return getConnectResult(decoder)
case CREATE_STREAM:
return getCreateStream(decoder)
// If nothing matches then it has to be the incoming RPC call
default:
return getCallResponseResult(decoder, value)
}
}
func getConnectResult(decoder *amf.AMF0Decoder) (*Connect, error) {
connect := new(Connect)
connect.CommandName = CONNECT
transactionID, err := decoder.Decode()
if err != nil {
return nil, err
}
tranID, ok := transactionID.(float64)
if !ok {
return nil, errors.New("transactionID is not of type uint32")
}
connect.TransactionID = tranID
commandObject, err := decoder.Decode()
if err != nil {
return nil, err
}
commObj, ok := commandObject.(amf.Object)
if !ok {
return nil, errors.New("commandObject is not of type Object")
}
connect.CommandObject = commObj
return connect, nil
}
func getCallResponseResult(decoder *amf.AMF0Decoder, value interface{}) (*CallResponse, error) {
precedureName, ok := value.(string)
if !ok {
return nil, fmt.Errorf("unknown value decoded %v", value)
}
call := new(CallResponse)
call.ProcedureName = precedureName
transactionID, err := decoder.Decode()
if err != nil {
return nil, err
}
tranID, ok := transactionID.(float64)
if !ok {
return nil, errors.New("transactionID is not of type uint32")
}
call.TransactionID = tranID
commandObject, err := decoder.Decode()
if err != nil {
return nil, err
}
commObj, ok := commandObject.(amf.Object)
if !ok {
return nil, errors.New("commandObject is not of type Object")
}
call.CommandObject = commObj
response, err := decoder.Decode()
if err != nil {
return nil, err
}
res, ok := response.(amf.Object)
if !ok {
return nil, errors.New("response is not of type Object")
}
call.Response = res
return call, nil
}
func getCreateStream(decoder *amf.AMF0Decoder) (*CallResponse, error) {
call := new(CallResponse)
call.ProcedureName = CREATE_STREAM
transactionID, err := decoder.Decode()
if err != nil {
return nil, err
}
tranID, ok := transactionID.(float64)
if !ok {
return nil, errors.New("transactionID is not of type uint32")
}
call.TransactionID = tranID
commandObject, err := decoder.Decode()
if err != nil {
return nil, err
}
commObj, ok := commandObject.(amf.Object)
if !ok {
return nil, errors.New("commandObject is not of type Object")
}
call.CommandObject = commObj
return call, nil
}
// RTMP Chunk Stream uses message type IDs 1, 2, 3, 5, and 6 for protocol control messages.
// These messages contain information needed by the RTMP Chunk Stream protocol.
// These protocol control messages MUST have message stream ID 0 (known as the control stream)
// and be sent in chunk stream ID 2. Protocol control messages take effect as soon as they are received;
// their timestamps are ignored.
// func createProtocolMessageHeader(messageType byte, payloadLength uint32) ([]byte, error) {
// if messageType > 6 {
// return nil, errors.New("valid messageType ids 1-6, received >6")
// }
// buf := make([]byte, 12)
// buf[0] = 2 // Chunk Stream ID
// buf[1] = messageType // Message Type
// utils.PutUint24(buf[2:], payloadLength) // Payload length
// binary.BigEndian.PutUint32(buf[5:], 0) // Timestamp is ignored
// utils.PutUint24(buf[9:], 0) // Message Stream ID
// return buf, nil
// }
func createProtocolMessageHeader(messageTypeID byte, payloadLength uint32) ([]byte, error) {
if messageTypeID > 6 {
return nil, errors.New("valid messageTypeID ids 1-6, received >6")
}
buf := make([]byte, 12)
buf[0] = 2 // Chunk Stream ID
utils.PutUint24(buf[1:], 0) // Timestamp is ignored
utils.PutUint24(buf[4:], payloadLength) // Message length
buf[7] = messageTypeID // Message Type
binary.LittleEndian.PutUint32(buf[8:], 0) // Message Stream ID
return buf, nil
}
// Protocol control message 1, Set Chunk Size, is used to notify the peer of a new maximum chunk size.
// The maximum chunk size defaults to 128 bytes, but the client or the server can change this value, and updates
// its peer using this message. For example, suppose a client wants to send 131 bytes of audio data and the chunk size is 128.
// In this case, the client can send this message to the server to notify it that the chunk size is now 131 bytes. The client can
// then send the audio data in a single chunk.
// The maximum chunk size SHOULD be at least 128 bytes, and MUST be at least 1 byte. The maximum chunk size
// is maintained independently for each direction.
func sendSetChunkSize(c *Connection, size uint32) error {
payloadLength := 4
header, err := createProtocolMessageHeader(1, uint32(payloadLength))
if err != nil {
return err
}
headerLength := len(header)
buf := make([]byte, payloadLength+headerLength)
copy(buf[:headerLength], header)
binary.BigEndian.PutUint32(buf[headerLength:], size)
_, wErr := c.Write(buf)
return wErr
}
// Protocol control message 2, Abort Message, is used to notify the peer if it is waiting for chunks to complete a message,
// then to discard the partially received message over a chunk stream. The peer receives the chunk stream ID as
// this protocol message’s payload. An application may send this message when closing in order to indicate that
// further processing of the messages is not required.
func sendAbortMessage(c *Connection, streamID uint32) error {
payloadLength := 4
header, err := createProtocolMessageHeader(2, uint32(payloadLength))
if err != nil {
return err
}
headerLength := len(header)
buf := make([]byte, headerLength+payloadLength)
copy(buf[:headerLength], header)
binary.BigEndian.PutUint32(buf[headerLength:], streamID)
_, wErr := c.Write(buf)
return wErr
}
// Protocol control message 3, Acknowledgement, The client or the server MUST send an acknowledgment to the peer after receiving bytes equal to the window size.
// The window size is the maximum number of bytes that the sender sends without receiving acknowledgment from the receiver.
// This message specifies the sequence number, which is the number of the bytes received so far.
// sequenceNumber field holds the number of bytes received so far.
func sendAcknowledgement(c *Connection, sequenceNumber uint32) error {
payloadLength := 4
header, err := createProtocolMessageHeader(3, uint32(payloadLength))
if err != nil {
return err
}
headerLength := len(header)
buf := make([]byte, headerLength+payloadLength)
copy(buf[:headerLength], header)
binary.BigEndian.PutUint32(buf[headerLength:], sequenceNumber)
_, wErr := c.Write(buf)
return wErr
}
// Protocol control message 5, The client or the server sends this message to inform the peer of the window size to use between sending acknowledgments.
// The sender expects acknowledgment from its peer after the sender sends window size bytes.
// The receiving peer MUST send an Acknowledgement (Section 5.4.3) after receiving the indicated
// number of bytes since the last Acknowledgement was sent, or from the beginning of the session if no Acknowledgement has yet been sent.
// Basically it means how often the server/client will send an acknowledgement message to their peer, or also known as ServerBW
func sendWindowAcknowledgementSize(c *Connection, size uint32) error {
payloadLength := 4
header, err := createProtocolMessageHeader(5, uint32(payloadLength))
if err != nil {
return err
}
headerLength := len(header)
buf := make([]byte, headerLength+payloadLength)
copy(buf[:headerLength], header)
binary.BigEndian.PutUint32(buf[headerLength:], size)
_, wErr := c.Write(buf)
c.ServerBW = size
return wErr
}
// Protocol contro message 6, The client or the server sends this message to limit the output bandwidth of its peer.
// The peer receiving this message limits its output bandwidth by limiting the amount of sent but unacknowledged
// data to the window size indicated in this message. The peer receiving this message SHOULD
// respond with a Window Acknowledgement Size message if the window size is different from the last
// one sent to the sender of this message.
// The Limit Type is one of the following values:
// 0 - Hard: The peer SHOULD limit its output bandwidth to the indicated window size.
// 1 - Soft: The peer SHOULD limit its output bandwidth to the the window indicated in this message or the limit already in effect, whichever is smaller.
// 2 - Dynamic: If the previous Limit Type was Hard, treat this message as though it was marked Hard, otherwise ignore this message.
// How big is the throughput of data also knows as the ClientBW
func sendSetPeerBandwith(c *Connection, size uint32, limit byte) error {
if limit > 2 {
fmt.Printf("limit exceeds maximum of 2, received %d\n", limit)
}
payloadLength := 5
header, err := createProtocolMessageHeader(6, uint32(payloadLength))
if err != nil {
return err
}
headerLength := len(header)
buf := make([]byte, headerLength+payloadLength)
copy(buf[:headerLength], header)
binary.BigEndian.PutUint32(buf[headerLength:], size)
buf[headerLength+payloadLength-1] = limit
_, wErr := c.Write(buf)
if wErr != nil {
return wErr
}
c.ClientBW = size
return nil
}
func sendConnectResult(c *Connection) error {
objBuf := bytes.NewBuffer([]byte{})
encoder := amf.NewAMF0Encoder(objBuf)
if err := encoder.Encode("_result"); err != nil {
return err
}
if err := encoder.Encode(1); err != nil {
return err
}
properties := getServerProperties()
if err := encoder.Encode(properties); err != nil {
return err
}
header, err := createChunkHeader(2, 0, 20, encoder.Length())
headerLength := len(header)
buf := make([]byte, headerLength+encoder.Length())
copy(buf[:headerLength], header)
copy(buf[headerLength:], objBuf.Bytes())
if err != nil {
return err
}
return err
}
func getServerProperties() amf.Object {
properties := make(amf.Object)
properties["code"] = "NetConnection.Connect.Success"
properties["description"] = "Connection successful"
properties["objectEncoding"] = OBJECT_ENCODING_AMF0
return properties
}