Skip to content

Commit

Permalink
Add E2E Test for RTX
Browse files Browse the repository at this point in the history
Assert that generation of NACKs and sending of RTX operates as expected.
  • Loading branch information
Sean-Der committed Oct 5, 2024
1 parent 32f7063 commit 2daa9b2
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 99 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/pion/datachannel v1.5.9
github.com/pion/dtls/v3 v3.0.2
github.com/pion/ice/v4 v4.0.1
github.com/pion/interceptor v0.1.34
github.com/pion/interceptor v0.1.35
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
github.com/pion/rtcp v1.2.14
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ github.com/pion/dtls/v3 v3.0.2 h1:425DEeJ/jfuTTghhUDW0GtYZYIwwMtnKKJNMcWccTX0=
github.com/pion/dtls/v3 v3.0.2/go.mod h1:dfIXcFkKoujDQ+jtd8M6RgqKK3DuaUilm3YatAbGp5k=
github.com/pion/ice/v4 v4.0.1 h1:2d3tPoTR90F3TcGYeXUwucGlXI3hds96cwv4kjZmb9s=
github.com/pion/ice/v4 v4.0.1/go.mod h1:2dpakjpd7+74L5j3TAe6gvkbI5UIzOgAnkimm9SuHvA=
github.com/pion/interceptor v0.1.34 h1:jb1MG9LTdQ4VVCSZDUbUzjeJNngzz4dBXcr2dL+ejfA=
github.com/pion/interceptor v0.1.34/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/interceptor v0.1.35 h1:CshR7vY/1uF20GbD2/edgtcsxwkeCt3G3FUTr0MMwwM=
github.com/pion/interceptor v0.1.35/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=
Expand Down
74 changes: 74 additions & 0 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -1780,3 +1781,76 @@ func TestPeerConnection_Zero_PayloadType(t *testing.T) {

closePairNow(t, pcOffer, pcAnswer)
}

// Assert that NACKs work E2E with no extra configuration. If media is sent over a lossy connection
// the user gets retransmitted RTP packets with no extra configuration
func Test_PeerConnection_RTX_E2E(t *testing.T) {
defer test.TimeOut(time.Second * 30).Stop()

pcOffer, pcAnswer, wan := createVNetPair(t)

wan.AddChunkFilter(func(vnet.Chunk) bool {
return rand.Intn(5) != 4 //nolint: gosec
})

track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
assert.NoError(t, err)

rtpSender, err := pcOffer.AddTrack(track)
assert.NoError(t, err)

go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
ssrc := rtpSender.GetParameters().Encodings[0].SSRC

rtxRead, rtxReadCancel := context.WithCancel(context.Background())
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
for {
pkt, attributes, readRTPErr := track.ReadRTP()
if errors.Is(readRTPErr, io.EOF) {
return
} else if pkt.PayloadType == 0 {
continue
}

assert.NotNil(t, pkt)
assert.Equal(t, pkt.SSRC, uint32(ssrc))
assert.Equal(t, pkt.PayloadType, uint8(96))

rtxPayloadType := attributes.Get(AttributeRtxPayloadType)
rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber)
rtxSSRC := attributes.Get(AttributeRtxSsrc)
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
assert.Equal(t, rtxPayloadType, uint8(97))
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))

rtxReadCancel()
}
}
})

assert.NoError(t, signalPair(pcOffer, pcAnswer))

func() {
for {
select {
case <-time.After(20 * time.Millisecond):
writeErr := track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second})
assert.NoError(t, writeErr)
case <-rtxRead.Done():
return
}
}
}()

assert.NoError(t, wan.Stop())
closePairNow(t, pcOffer, pcAnswer)
}
91 changes: 0 additions & 91 deletions rtpreceiver_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@ package webrtc

import (
"context"
"encoding/binary"
"errors"
"io"
"testing"
"time"

"github.com/pion/rtp"
"github.com/pion/sdp/v3"
"github.com/pion/transport/v3/test"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -75,89 +70,3 @@ func TestSetRTPParameters(t *testing.T) {
assert.NoError(t, wan.Stop())
closePairNow(t, sender, receiver)
}

// Assert the behavior of reading a RTX with a distinct SSRC
// All the attributes should be populated and the packet unpacked
func Test_RTX_Read(t *testing.T) {
defer test.TimeOut(time.Second * 30).Stop()

pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)

track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
assert.NoError(t, err)

rtpSender, err := pcOffer.AddTrack(track)
assert.NoError(t, err)

rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
ssrc := rtpSender.GetParameters().Encodings[0].SSRC

rtxRead, rtxReadCancel := context.WithCancel(context.Background())
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
for {
pkt, attributes, readRTPErr := track.ReadRTP()
if errors.Is(readRTPErr, io.EOF) {
return
} else if pkt.PayloadType == 0 {
continue
}

assert.NoError(t, readRTPErr)
assert.NotNil(t, pkt)
assert.Equal(t, pkt.SSRC, uint32(ssrc))
assert.Equal(t, pkt.PayloadType, uint8(96))
assert.Equal(t, pkt.Payload, []byte{0xB, 0xA, 0xD})

rtxPayloadType := attributes.Get(AttributeRtxPayloadType)
rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber)
rtxSSRC := attributes.Get(AttributeRtxSsrc)
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
assert.Equal(t, rtxPayloadType, uint8(97))
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))
assert.Equal(t, rtxSequenceNumber, pkt.SequenceNumber+500)

rtxReadCancel()
}
}
})

assert.NoError(t, signalPair(pcOffer, pcAnswer))

func() {
for i := uint16(0); ; i++ {
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
SSRC: uint32(ssrc),
PayloadType: 96,
SequenceNumber: i,
},
Payload: []byte{0xB, 0xA, 0xD},
}

select {
case <-time.After(20 * time.Millisecond):
// Send the original packet
err = track.WriteRTP(&pkt)
assert.NoError(t, err)

rtxPayload := []byte{0x0, 0x0, 0xB, 0xA, 0xD}
binary.BigEndian.PutUint16(rtxPayload[0:2], pkt.Header.SequenceNumber)

// Send the RTX
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
Version: 2,
SSRC: uint32(rtxSsrc),
PayloadType: 97,
SequenceNumber: i + 500,
}, rtxPayload)
assert.NoError(t, err)
case <-rtxRead.Done():
return
}
}
}()

closePairNow(t, pcOffer, pcAnswer)
}
7 changes: 2 additions & 5 deletions vnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/transport/v3/vnet"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -53,12 +52,10 @@ func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Route
// Start the virtual network by calling Start() on the root router
assert.NoError(t, wan.Start())

offerInterceptorRegistry := &interceptor.Registry{}
offerPeerConnection, err := NewAPI(WithSettingEngine(offerSettingEngine), WithInterceptorRegistry(offerInterceptorRegistry)).NewPeerConnection(Configuration{})
offerPeerConnection, err := NewAPI(WithSettingEngine(offerSettingEngine)).NewPeerConnection(Configuration{})
assert.NoError(t, err)

answerInterceptorRegistry := &interceptor.Registry{}
answerPeerConnection, err := NewAPI(WithSettingEngine(answerSettingEngine), WithInterceptorRegistry(answerInterceptorRegistry)).NewPeerConnection(Configuration{})
answerPeerConnection, err := NewAPI(WithSettingEngine(answerSettingEngine)).NewPeerConnection(Configuration{})
assert.NoError(t, err)

return offerPeerConnection, answerPeerConnection, wan
Expand Down

0 comments on commit 2daa9b2

Please sign in to comment.