Skip to content

Commit

Permalink
Properly handle non-media probes
Browse files Browse the repository at this point in the history
libwebrtc has started sending media probes on an unannounced SSRC(0).
Currently Pion will ignore this as the SSRC hasn't been declared
explicitly and no RID/MID RTP Headers.

This adds a special case to accept SSRC 0 and Read the RTP packets. This
allows the TWCC reports to properly be generated.
  • Loading branch information
Sean-Der committed Jul 21, 2024
1 parent 19d0224 commit c85269b
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 16 deletions.
55 changes: 51 additions & 4 deletions interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,59 @@ func Test_InterceptorRegistry_Build(t *testing.T) {
},
})

peerConnectionA, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{})
assert.NoError(t, err)

peerConnectionB, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{})
peerConnectionA, peerConnectionB, err := NewAPI(WithInterceptorRegistry(ir)).newPair(Configuration{})
assert.NoError(t, err)

assert.Equal(t, 2, registryBuildCount)
closePairNow(t, peerConnectionA, peerConnectionB)
}

func Test_Interceptor_ZeroSSRC(t *testing.T) {
to := test.TimeOut(time.Second * 20)
defer to.Stop()

report := test.CheckRoutines(t)
defer report()

track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)

offerer, answerer, err := newPair()
assert.NoError(t, err)

_, err = offerer.AddTrack(track)
assert.NoError(t, err)

probeReceiverCreated := make(chan struct{})

go func() {
sequenceNumber := uint16(0)
for range time.NewTicker(time.Millisecond * 20).C {
track.mu.Lock()
if len(track.bindings) == 1 {
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
Version: 2,
SSRC: 0,
SequenceNumber: sequenceNumber,
}, []byte{0, 1, 2, 3, 4, 5})
assert.NoError(t, err)
}
sequenceNumber++
track.mu.Unlock()

if nonMediaBandwidthProbe, ok := answerer.nonMediaBandwidthProbe.Load().(*RTPReceiver); ok {
assert.Equal(t, len(nonMediaBandwidthProbe.Tracks()), 1)
close(probeReceiverCreated)
return
}
}
}()

assert.NoError(t, signalPair(offerer, answerer))

peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer)
peerConnectionConnected.Wait()

<-probeReceiverCreated
closePairNow(t, offerer, answerer)
}
37 changes: 36 additions & 1 deletion peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type PeerConnection struct {
// should be defined (see JSEP 3.4.1).
greaterMid int

rtpTransceivers []*RTPTransceiver
rtpTransceivers []*RTPTransceiver
nonMediaBandwidthProbe atomic.Value // RTPReceiver

onSignalingStateChangeHandler func(SignalingState)
onICEConnectionStateChangeHandler atomic.Value // func(ICEConnectionState)
Expand Down Expand Up @@ -1524,6 +1525,32 @@ func (pc *PeerConnection) handleUndeclaredSSRC(ssrc SSRC, remoteDescription *Ses
return true, nil
}

// Chrome sends probing traffic on SSRC 0. This reads the packets to ensure that we properly
// generate TWCC reports for it. Since this isn't actually media we don't pass this to the user
func (pc *PeerConnection) handleNonMediaBandwidthProbe() {
nonMediaBandwidthProbe, err := pc.api.NewRTPReceiver(RTPCodecTypeVideo, pc.dtlsTransport)
if err != nil {
pc.log.Errorf("handleNonMediaBandwidthProbe failed to create RTPReceiver: %v", err)
return
}

if err = nonMediaBandwidthProbe.Receive(RTPReceiveParameters{
Encodings: []RTPDecodingParameters{{RTPCodingParameters: RTPCodingParameters{}}},
}); err != nil {
pc.log.Errorf("handleNonMediaBandwidthProbe failed to start RTPReceiver: %v", err)
return
}

pc.nonMediaBandwidthProbe.Store(nonMediaBandwidthProbe)
b := make([]byte, pc.api.settingEngine.getReceiveMTU())
for {
if _, _, err = nonMediaBandwidthProbe.readRTP(b, nonMediaBandwidthProbe.Track()); err != nil {
pc.log.Tracef("handleNonMediaBandwidthProbe read exiting: %v", err)
return
}
}
}

func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) error { //nolint:gocognit
remoteDescription := pc.RemoteDescription()
if remoteDescription == nil {
Expand Down Expand Up @@ -1656,6 +1683,11 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() {
continue
}

if ssrc == 0 {
go pc.handleNonMediaBandwidthProbe()
continue
}

pc.dtlsTransport.storeSimulcastStream(stream)

if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines {
Expand Down Expand Up @@ -2072,6 +2104,9 @@ func (pc *PeerConnection) Close() error {
closeErrs = append(closeErrs, t.Stop())
}
}
if nonMediaBandwidthProbe, ok := pc.nonMediaBandwidthProbe.Load().(*RTPReceiver); ok {
closeErrs = append(closeErrs, nonMediaBandwidthProbe.Stop())
}
pc.mu.Unlock()

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5)
Expand Down
6 changes: 3 additions & 3 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,7 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) {
SequenceNumber: sequenceNumber,
PayloadType: 96,
Padding: true,
SSRC: uint32(i),
SSRC: uint32(i + 1),
},
Payload: []byte{0x00, 0x02},
}
Expand All @@ -1547,7 +1547,7 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) {
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
SSRC: uint32(i),
SSRC: uint32(i + 1),
},
Payload: []byte{0x00},
}
Expand Down Expand Up @@ -1591,7 +1591,7 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) {
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
SSRC: uint32(i),
SSRC: uint32(i + 1),
},
Payload: []byte{0x00},
}
Expand Down
2 changes: 1 addition & 1 deletion peerconnection_renegotiation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func TestPeerConnection_Renegotiation_Simulcast(t *testing.T) {
for ssrc, rid := range rids {
header := &rtp.Header{
Version: 2,
SSRC: uint32(ssrc),
SSRC: uint32(ssrc + 1),
SequenceNumber: sequenceNumber,
PayloadType: 96,
}
Expand Down
12 changes: 5 additions & 7 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error {

var t *trackStreams
for idx, ts := range r.tracks {
if ts.track != nil && parameters.Encodings[i].SSRC != 0 && ts.track.SSRC() == parameters.Encodings[i].SSRC {
if ts.track != nil && ts.track.SSRC() == parameters.Encodings[i].SSRC {
t = &r.tracks[idx]
break
}
Expand All @@ -210,12 +210,10 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error {
return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, parameters.Encodings[i].SSRC)
}

if parameters.Encodings[i].SSRC != 0 {
t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
var err error
if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
return err
}
t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
var err error
if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
return err
}

if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
Expand Down

0 comments on commit c85269b

Please sign in to comment.