diff --git a/datachannel.go b/datachannel.go index 01e38b77c4a..f7c9511b350 100644 --- a/datachannel.go +++ b/datachannel.go @@ -40,7 +40,6 @@ type DataChannel struct { readyState atomic.Value // DataChannelState bufferedAmountLowThreshold uint64 detachCalled bool - readLoopActive chan struct{} // The binaryType represents attribute MUST, on getting, return the value to // which it was last set. On setting, if the new value is either the string @@ -328,7 +327,6 @@ func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlread defer d.mu.Unlock() if !d.api.settingEngine.detach.DataChannels { - d.readLoopActive = make(chan struct{}) go d.readLoop() } } @@ -358,7 +356,6 @@ var rlBufPool = sync.Pool{New: func() interface{} { }} func (d *DataChannel) readLoop() { - defer close(d.readLoopActive) for { buffer := rlBufPool.Get().([]byte) //nolint:forcetypeassert n, isString, err := d.dataChannel.ReadDataChannel(buffer) @@ -441,22 +438,6 @@ func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) { // Close Closes the DataChannel. It may be called regardless of whether // the DataChannel object was created by this peer or the remote peer. func (d *DataChannel) Close() error { - return d.close(false) -} - -// Normally, close only stops writes from happening, so waitForReadsDone=true -// will wait for reads to be finished based on underlying SCTP association -// closure or a SCTP reset stream from the other side. This is safe to call -// with waitForReadsDone=true after tearing down a PeerConnection but not -// necessarily before. For example, if you used a vnet and dropped all packets -// right before closing the DataChannel, you'd need never see a reset stream. -func (d *DataChannel) close(waitForReadsDone bool) error { - if waitForReadsDone && d.readLoopActive != nil { - defer func() { - <-d.readLoopActive - }() - } - d.mu.Lock() haveSctpTransport := d.dataChannel != nil d.mu.Unlock() diff --git a/peerconnection.go b/peerconnection.go index 6a018a937b3..92977b5f763 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -56,7 +56,6 @@ type PeerConnection struct { idpLoginURL *string isClosed *atomicBool - isClosedDone chan struct{} isNegotiationNeeded *atomicBool updateNegotiationNeededFlagOnEmptyChain *atomicBool @@ -128,7 +127,6 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, ICECandidatePoolSize: 0, }, isClosed: &atomicBool{}, - isClosedDone: make(chan struct{}), isNegotiationNeeded: &atomicBool{}, updateNegotiationNeededFlagOnEmptyChain: &atomicBool{}, lastOffer: "", @@ -2036,31 +2034,14 @@ func (pc *PeerConnection) writeRTCP(pkts []rtcp.Packet, _ interceptor.Attributes return pc.dtlsTransport.WriteRTCP(pkts) } -// Close ends the PeerConnection. -// It will make a best effort to wait for all underlying goroutines it spawned to finish, -// except for cases that would cause deadlocks with itself. +// Close ends the PeerConnection func (pc *PeerConnection) Close() error { // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1) // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2) if pc.isClosed.swap(true) { - // someone else got here first but may still be closing (e.g. via DTLS close_notify) - <-pc.isClosedDone return nil } - defer close(pc.isClosedDone) - // Try closing everything and collect the errors - // Shutdown strategy: - // 1. Close all data channels. - // 2. All Conn close by closing their underlying Conn. - // 3. A Mux stops this chain. It won't close the underlying - // Conn if one of the endpoints is closed down. To - // continue the chain the Mux has to be closed. - pc.sctpTransport.lock.Lock() - closeErrs := make([]error, 0, 4+len(pc.sctpTransport.dataChannels)) - pc.sctpTransport.lock.Unlock() - - // canon steps // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3) pc.signalingState.Set(SignalingStateClosed) @@ -2070,6 +2051,7 @@ func (pc *PeerConnection) Close() error { // 2. A Mux stops this chain. It won't close the underlying // Conn if one of the endpoints is closed down. To // continue the chain the Mux has to be closed. + closeErrs := make([]error, 4) closeErrs = append(closeErrs, pc.api.interceptor.Close()) @@ -2096,6 +2078,7 @@ func (pc *PeerConnection) Close() error { // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7) closeErrs = append(closeErrs, pc.dtlsTransport.Stop()) + // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10) if pc.iceTransport != nil { closeErrs = append(closeErrs, pc.iceTransport.Stop()) @@ -2104,13 +2087,6 @@ func (pc *PeerConnection) Close() error { // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11) pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State()) - // non-canon steps - pc.sctpTransport.lock.Lock() - for _, d := range pc.sctpTransport.dataChannels { - closeErrs = append(closeErrs, d.close(true)) - } - pc.sctpTransport.lock.Unlock() - return util.FlattenErrs(closeErrs) } diff --git a/peerconnection_close_test.go b/peerconnection_close_test.go index 6835e458d56..df7a6526b1e 100644 --- a/peerconnection_close_test.go +++ b/peerconnection_close_test.go @@ -7,8 +7,6 @@ package webrtc import ( - "runtime" - "strings" "testing" "time" @@ -181,103 +179,3 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) { t.Error("pcOffer.Close() Timeout") } } - -func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) { - // Limit runtime in case of deadlocks - lim := test.TimeOut(time.Second * 20) - defer lim.Stop() - - report := CheckRoutinesIntolerant(t) - defer report() - - pcOffer, pcAnswer, err := newPair() - if err != nil { - t.Fatal(err) - } - - var dcAnswer *DataChannel - answerDataChannelOpened := make(chan struct{}) - pcAnswer.OnDataChannel(func(d *DataChannel) { - // Make sure this is the data channel we were looking for. (Not the one - // created in signalPair). - if d.Label() != "data" { - return - } - dcAnswer = d - close(answerDataChannelOpened) - }) - - dcOffer, err := pcOffer.CreateDataChannel("data", nil) - if err != nil { - t.Fatal(err) - } - - offerDataChannelOpened := make(chan struct{}) - dcOffer.OnOpen(func() { - close(offerDataChannelOpened) - }) - - err = signalPair(pcOffer, pcAnswer) - if err != nil { - t.Fatal(err) - } - - <-offerDataChannelOpened - <-answerDataChannelOpened - - msgNum := 0 - dcOffer.OnMessage(func(_ DataChannelMessage) { - t.Log("msg", msgNum) - msgNum++ - }) - - // send 50 messages, then close pcOffer, and then send another 50 - for i := 0; i < 100; i++ { - if i == 50 { - err = pcOffer.Close() - if err != nil { - t.Fatal(err) - } - } - _ = dcAnswer.Send([]byte("hello!")) - } - - err = pcAnswer.Close() - if err != nil { - t.Fatal(err) - } -} - -// CheckRoutinesIntolerant is used to check for leaked go-routines. -// It differs from test.CheckRoutines in that it won't wait at all -// for lingering goroutines. This is helpful for tests that need -// to ensure clean closure of resources. -func CheckRoutinesIntolerant(t *testing.T) func() { - return func() { - routines := getRoutines() - if len(routines) == 0 { - return - } - t.Fatalf("%s: \n%s", "Unexpected routines on test end", strings.Join(routines, "\n\n")) // nolint - } -} - -func getRoutines() []string { - buf := make([]byte, 2<<20) - buf = buf[:runtime.Stack(buf, true)] - return filterRoutines(strings.Split(string(buf), "\n\n")) -} - -func filterRoutines(routines []string) []string { - result := []string{} - for _, stack := range routines { - if stack == "" || // Empty - strings.Contains(stack, "testing.Main(") || // Tests - strings.Contains(stack, "testing.(*T).Run(") || // Test run - strings.Contains(stack, "getRoutines(") { // This routine - continue - } - result = append(result, stack) - } - return result -} diff --git a/peerconnection_go_test.go b/peerconnection_go_test.go index b51a0c65057..f610d1c4311 100644 --- a/peerconnection_go_test.go +++ b/peerconnection_go_test.go @@ -1623,3 +1623,37 @@ func TestPeerConnectionState(t *testing.T) { assert.NoError(t, pc.Close()) assert.Equal(t, PeerConnectionStateClosed, pc.ConnectionState()) } + +func TestPeerConnectionDeadlock(t *testing.T) { + lim := test.TimeOut(time.Second * 5) + defer lim.Stop() + + report := test.CheckRoutines(t) + defer report() + + closeHdlr := func(peerConnection *PeerConnection) { + peerConnection.OnICEConnectionStateChange(func(i ICEConnectionState) { + if i == ICEConnectionStateFailed || i == ICEConnectionStateClosed { + if err := peerConnection.Close(); err != nil { + assert.NoError(t, err) + } + } + }) + } + + pcOffer, pcAnswer, err := NewAPI().newPair(Configuration{}) + assert.NoError(t, err) + + assert.NoError(t, signalPair(pcOffer, pcAnswer)) + + onDataChannel, onDataChannelCancel := context.WithCancel(context.Background()) + pcAnswer.OnDataChannel(func(*DataChannel) { + onDataChannelCancel() + }) + <-onDataChannel.Done() + + closeHdlr(pcOffer) + closeHdlr(pcAnswer) + + closePairNow(t, pcOffer, pcAnswer) +}