From 2daa9b221e4a76ccd82f95f1132a2000db4a281e Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Fri, 4 Oct 2024 23:45:22 -0400 Subject: [PATCH] Add E2E Test for RTX Assert that generation of NACKs and sending of RTX operates as expected. --- go.mod | 2 +- go.sum | 4 +- peerconnection_media_test.go | 74 +++++++++++++++++++++++++++++ rtpreceiver_go_test.go | 91 ------------------------------------ vnet_test.go | 7 +-- 5 files changed, 79 insertions(+), 99 deletions(-) diff --git a/go.mod b/go.mod index a1c10b719ec..7bdc0e201f0 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/pion/datachannel v1.5.9 github.com/pion/dtls/v3 v3.0.2 github.com/pion/ice/v4 v4.0.1 - github.com/pion/interceptor v0.1.34 + github.com/pion/interceptor v0.1.35 github.com/pion/logging v0.2.2 github.com/pion/randutil v0.1.0 github.com/pion/rtcp v1.2.14 diff --git a/go.sum b/go.sum index bec6efe011e..3bf33337947 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,8 @@ github.com/pion/dtls/v3 v3.0.2 h1:425DEeJ/jfuTTghhUDW0GtYZYIwwMtnKKJNMcWccTX0= github.com/pion/dtls/v3 v3.0.2/go.mod h1:dfIXcFkKoujDQ+jtd8M6RgqKK3DuaUilm3YatAbGp5k= github.com/pion/ice/v4 v4.0.1 h1:2d3tPoTR90F3TcGYeXUwucGlXI3hds96cwv4kjZmb9s= github.com/pion/ice/v4 v4.0.1/go.mod h1:2dpakjpd7+74L5j3TAe6gvkbI5UIzOgAnkimm9SuHvA= -github.com/pion/interceptor v0.1.34 h1:jb1MG9LTdQ4VVCSZDUbUzjeJNngzz4dBXcr2dL+ejfA= -github.com/pion/interceptor v0.1.34/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= +github.com/pion/interceptor v0.1.35 h1:CshR7vY/1uF20GbD2/edgtcsxwkeCt3G3FUTr0MMwwM= +github.com/pion/interceptor v0.1.35/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM= diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index 9887336a1df..1473308b284 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "io" + "math/rand" "regexp" "strings" "sync" @@ -1780,3 +1781,76 @@ func TestPeerConnection_Zero_PayloadType(t *testing.T) { closePairNow(t, pcOffer, pcAnswer) } + +// Assert that NACKs work E2E with no extra configuration. If media is sent over a lossy connection +// the user gets retransmitted RTP packets with no extra configuration +func Test_PeerConnection_RTX_E2E(t *testing.T) { + defer test.TimeOut(time.Second * 30).Stop() + + pcOffer, pcAnswer, wan := createVNetPair(t) + + wan.AddChunkFilter(func(vnet.Chunk) bool { + return rand.Intn(5) != 4 //nolint: gosec + }) + + track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id") + assert.NoError(t, err) + + rtpSender, err := pcOffer.AddTrack(track) + assert.NoError(t, err) + + go func() { + rtcpBuf := make([]byte, 1500) + for { + if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { + return + } + } + }() + + rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC + ssrc := rtpSender.GetParameters().Encodings[0].SSRC + + rtxRead, rtxReadCancel := context.WithCancel(context.Background()) + pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) { + for { + pkt, attributes, readRTPErr := track.ReadRTP() + if errors.Is(readRTPErr, io.EOF) { + return + } else if pkt.PayloadType == 0 { + continue + } + + assert.NotNil(t, pkt) + assert.Equal(t, pkt.SSRC, uint32(ssrc)) + assert.Equal(t, pkt.PayloadType, uint8(96)) + + rtxPayloadType := attributes.Get(AttributeRtxPayloadType) + rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber) + rtxSSRC := attributes.Get(AttributeRtxSsrc) + if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil { + assert.Equal(t, rtxPayloadType, uint8(97)) + assert.Equal(t, rtxSSRC, uint32(rtxSsrc)) + + rtxReadCancel() + } + } + }) + + assert.NoError(t, signalPair(pcOffer, pcAnswer)) + + func() { + for { + select { + case <-time.After(20 * time.Millisecond): + writeErr := track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}) + assert.NoError(t, writeErr) + case <-rtxRead.Done(): + return + } + } + }() + + assert.NoError(t, wan.Stop()) + closePairNow(t, pcOffer, pcAnswer) +} diff --git a/rtpreceiver_go_test.go b/rtpreceiver_go_test.go index 60c3fd8e95b..911f0c83822 100644 --- a/rtpreceiver_go_test.go +++ b/rtpreceiver_go_test.go @@ -8,15 +8,10 @@ package webrtc import ( "context" - "encoding/binary" - "errors" - "io" "testing" "time" - "github.com/pion/rtp" "github.com/pion/sdp/v3" - "github.com/pion/transport/v3/test" "github.com/pion/webrtc/v4/pkg/media" "github.com/stretchr/testify/assert" ) @@ -75,89 +70,3 @@ func TestSetRTPParameters(t *testing.T) { assert.NoError(t, wan.Stop()) closePairNow(t, sender, receiver) } - -// Assert the behavior of reading a RTX with a distinct SSRC -// All the attributes should be populated and the packet unpacked -func Test_RTX_Read(t *testing.T) { - defer test.TimeOut(time.Second * 30).Stop() - - pcOffer, pcAnswer, err := newPair() - assert.NoError(t, err) - - track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id") - assert.NoError(t, err) - - rtpSender, err := pcOffer.AddTrack(track) - assert.NoError(t, err) - - rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC - ssrc := rtpSender.GetParameters().Encodings[0].SSRC - - rtxRead, rtxReadCancel := context.WithCancel(context.Background()) - pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) { - for { - pkt, attributes, readRTPErr := track.ReadRTP() - if errors.Is(readRTPErr, io.EOF) { - return - } else if pkt.PayloadType == 0 { - continue - } - - assert.NoError(t, readRTPErr) - assert.NotNil(t, pkt) - assert.Equal(t, pkt.SSRC, uint32(ssrc)) - assert.Equal(t, pkt.PayloadType, uint8(96)) - assert.Equal(t, pkt.Payload, []byte{0xB, 0xA, 0xD}) - - rtxPayloadType := attributes.Get(AttributeRtxPayloadType) - rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber) - rtxSSRC := attributes.Get(AttributeRtxSsrc) - if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil { - assert.Equal(t, rtxPayloadType, uint8(97)) - assert.Equal(t, rtxSSRC, uint32(rtxSsrc)) - assert.Equal(t, rtxSequenceNumber, pkt.SequenceNumber+500) - - rtxReadCancel() - } - } - }) - - assert.NoError(t, signalPair(pcOffer, pcAnswer)) - - func() { - for i := uint16(0); ; i++ { - pkt := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SSRC: uint32(ssrc), - PayloadType: 96, - SequenceNumber: i, - }, - Payload: []byte{0xB, 0xA, 0xD}, - } - - select { - case <-time.After(20 * time.Millisecond): - // Send the original packet - err = track.WriteRTP(&pkt) - assert.NoError(t, err) - - rtxPayload := []byte{0x0, 0x0, 0xB, 0xA, 0xD} - binary.BigEndian.PutUint16(rtxPayload[0:2], pkt.Header.SequenceNumber) - - // Send the RTX - _, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{ - Version: 2, - SSRC: uint32(rtxSsrc), - PayloadType: 97, - SequenceNumber: i + 500, - }, rtxPayload) - assert.NoError(t, err) - case <-rtxRead.Done(): - return - } - } - }() - - closePairNow(t, pcOffer, pcAnswer) -} diff --git a/vnet_test.go b/vnet_test.go index 3cb89044986..5cf2a64ce03 100644 --- a/vnet_test.go +++ b/vnet_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - "github.com/pion/interceptor" "github.com/pion/logging" "github.com/pion/transport/v3/vnet" "github.com/stretchr/testify/assert" @@ -53,12 +52,10 @@ func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Route // Start the virtual network by calling Start() on the root router assert.NoError(t, wan.Start()) - offerInterceptorRegistry := &interceptor.Registry{} - offerPeerConnection, err := NewAPI(WithSettingEngine(offerSettingEngine), WithInterceptorRegistry(offerInterceptorRegistry)).NewPeerConnection(Configuration{}) + offerPeerConnection, err := NewAPI(WithSettingEngine(offerSettingEngine)).NewPeerConnection(Configuration{}) assert.NoError(t, err) - answerInterceptorRegistry := &interceptor.Registry{} - answerPeerConnection, err := NewAPI(WithSettingEngine(answerSettingEngine), WithInterceptorRegistry(answerInterceptorRegistry)).NewPeerConnection(Configuration{}) + answerPeerConnection, err := NewAPI(WithSettingEngine(answerSettingEngine)).NewPeerConnection(Configuration{}) assert.NoError(t, err) return offerPeerConnection, answerPeerConnection, wan