Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PeerConnection.GracefulClose #2847

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type DataChannel struct {
readyState atomic.Value // DataChannelState
bufferedAmountLowThreshold uint64
detachCalled bool
readLoopActive chan struct{}
isGracefulClosed bool

// The binaryType represents attribute MUST, on getting, return the value to
// which it was last set. On setting, if the new value is either the string
Expand Down Expand Up @@ -225,6 +227,10 @@ func (d *DataChannel) OnOpen(f func()) {
func (d *DataChannel) onOpen() {
d.mu.RLock()
handler := d.onOpenHandler
if d.isGracefulClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler != nil {
Expand Down Expand Up @@ -252,6 +258,10 @@ func (d *DataChannel) OnDial(f func()) {
func (d *DataChannel) onDial() {
d.mu.RLock()
handler := d.onDialHandler
if d.isGracefulClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler != nil {
Expand All @@ -261,6 +271,10 @@ func (d *DataChannel) onDial() {

// OnClose sets an event handler which is invoked when
// the underlying data transport has been closed.
// Note: Due to backwards compatibility, there is a chance that
// OnClose can be called, even if the GracefulClose is used.
// If this is the case for you, you can deregister OnClose
// prior to GracefulClose.
func (d *DataChannel) OnClose(f func()) {
d.mu.Lock()
defer d.mu.Unlock()
Expand Down Expand Up @@ -292,6 +306,10 @@ func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
func (d *DataChannel) onMessage(msg DataChannelMessage) {
d.mu.RLock()
handler := d.onMessageHandler
if d.isGracefulClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler == nil {
Expand All @@ -302,6 +320,10 @@ func (d *DataChannel) onMessage(msg DataChannelMessage) {

func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlreadyNegotiated bool) {
d.mu.Lock()
if d.isGracefulClosed {
d.mu.Unlock()
return
}
d.dataChannel = dc
bufferedAmountLowThreshold := d.bufferedAmountLowThreshold
onBufferedAmountLow := d.onBufferedAmountLow
Expand All @@ -326,7 +348,12 @@ func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlread
d.mu.Lock()
defer d.mu.Unlock()

if d.isGracefulClosed {
return
}

if !d.api.settingEngine.detach.DataChannels {
d.readLoopActive = make(chan struct{})
go d.readLoop()
}
}
Expand All @@ -342,6 +369,10 @@ func (d *DataChannel) OnError(f func(err error)) {
func (d *DataChannel) onError(err error) {
d.mu.RLock()
handler := d.onErrorHandler
if d.isGracefulClosed {
d.mu.RUnlock()
return
}
d.mu.RUnlock()

if handler != nil {
Expand All @@ -350,6 +381,12 @@ func (d *DataChannel) onError(err error) {
}

func (d *DataChannel) readLoop() {
defer func() {
d.mu.Lock()
readLoopActive := d.readLoopActive
d.mu.Unlock()
defer close(readLoopActive)
}()
buffer := make([]byte, dataChannelBufferSize)
for {
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
Expand Down Expand Up @@ -449,7 +486,32 @@ func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
// Close Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer.
func (d *DataChannel) Close() error {
return d.close(false)
}

// GracefulClose Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// DataChannel callbacks or if in a callback, in its own goroutine.
func (d *DataChannel) GracefulClose() error {
return d.close(true)
}

// Normally, close only stops writes from happening, so graceful=true
// will wait for reads to be finished based on underlying SCTP association
// closure or a SCTP reset stream from the other side. This is safe to call
// with graceful=true after tearing down a PeerConnection but not
// necessarily before. For example, if you used a vnet and dropped all packets
// right before closing the DataChannel, you'd need never see a reset stream.
func (d *DataChannel) close(shouldGracefullyClose bool) error {
d.mu.Lock()
d.isGracefulClosed = true
readLoopActive := d.readLoopActive
if shouldGracefullyClose && readLoopActive != nil {
defer func() {
<-readLoopActive
}()
}
haveSctpTransport := d.dataChannel != nil
d.mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/pion/datachannel v1.5.8
github.com/pion/dtls/v3 v3.0.0
github.com/pion/ice/v3 v3.0.15
github.com/pion/ice/v3 v3.0.16
github.com/pion/interceptor v0.1.29
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/dtls/v3 v3.0.0 h1:m2hzwPkzqoBjVKXm5ymNuX01OAjht82TdFL6LoTzgi4=
github.com/pion/dtls/v3 v3.0.0/go.mod h1:tiX7NaneB0wNoRaUpaMVP7igAlkMCTQkbpiY+OfeIi0=
github.com/pion/ice/v3 v3.0.15 h1:6FFM1k1Ei36keZN1drl8/xaEs+NpMMG6M+MsVRchXho=
github.com/pion/ice/v3 v3.0.15/go.mod h1:SdmubtIsCcvdb1ZInrTUz7Iaqi90/rYd1pzbzlMxsZg=
github.com/pion/ice/v3 v3.0.16 h1:YoPlNg3jU1UT/DDTa9v/g1vH6A2/pAzehevI1o66H8E=
github.com/pion/ice/v3 v3.0.16/go.mod h1:SdmubtIsCcvdb1ZInrTUz7Iaqi90/rYd1pzbzlMxsZg=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
Expand Down
22 changes: 20 additions & 2 deletions icegatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,31 @@ func (g *ICEGatherer) Gather() error {

// Close prunes all local candidates, and closes the ports.
func (g *ICEGatherer) Close() error {
return g.close(false /* shouldGracefullyClose */)
}

// GracefulClose prunes all local candidates, and closes the ports. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// ICEGatherer callbacks or if in a callback, in its own goroutine.
func (g *ICEGatherer) GracefulClose() error {
return g.close(true /* shouldGracefullyClose */)
}

func (g *ICEGatherer) close(shouldGracefullyClose bool) error {
g.lock.Lock()
defer g.lock.Unlock()

if g.agent == nil {
return nil
} else if err := g.agent.Close(); err != nil {
return err
}
if shouldGracefullyClose {
if err := g.agent.GracefulClose(); err != nil {
return err
}
} else {
if err := g.agent.Close(); err != nil {
return err
}
}

g.agent = nil
Expand Down
24 changes: 23 additions & 1 deletion icetransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pion/ice/v3"
"github.com/pion/logging"
"github.com/pion/webrtc/v4/internal/mux"
"github.com/pion/webrtc/v4/internal/util"
)

// ICETransport allows an application access to information about the ICE
Expand Down Expand Up @@ -187,6 +188,17 @@ func (t *ICETransport) restart() error {

// Stop irreversibly stops the ICETransport.
func (t *ICETransport) Stop() error {
return t.stop(false /* shouldGracefullyClose */)
}

// GracefulStop irreversibly stops the ICETransport. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// ICETransport callbacks or if in a callback, in its own goroutine.
func (t *ICETransport) GracefulStop() error {
return t.stop(true /* shouldGracefullyClose */)
}

func (t *ICETransport) stop(shouldGracefullyClose bool) error {
t.lock.Lock()
defer t.lock.Unlock()

Expand All @@ -197,8 +209,18 @@ func (t *ICETransport) Stop() error {
}

if t.mux != nil {
return t.mux.Close()
var closeErrs []error
if shouldGracefullyClose && t.gatherer != nil {
// we can't access icegatherer/icetransport.Close via
// mux's net.Conn Close so we call it earlier here.
closeErrs = append(closeErrs, t.gatherer.GracefulClose())
}
closeErrs = append(closeErrs, t.mux.Close())
return util.FlattenErrs(closeErrs)
} else if t.gatherer != nil {
if shouldGracefullyClose {
return t.gatherer.GracefulClose()
}
return t.gatherer.Close()
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions internal/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@
}

if err = m.dispatch(buf[:n]); err != nil {
if errors.Is(err, io.ErrClosedPipe) {

Check warning on line 132 in internal/mux/mux.go

View check run for this annotation

Codecov / codecov/patch

internal/mux/mux.go#L132

Added line #L132 was not covered by tests
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to have this spuriously log when we are the ones closing the packetio.Buffer.

// if the buffer was closed, that's not an error we care to report
return

Check warning on line 134 in internal/mux/mux.go

View check run for this annotation

Codecov / codecov/patch

internal/mux/mux.go#L134

Added line #L134 was not covered by tests
}
m.log.Errorf("mux: ending readLoop dispatch error %s", err.Error())
return
}
Expand Down
71 changes: 58 additions & 13 deletions operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

// Operations is a task executor.
type operations struct {
mu sync.Mutex
busy bool
ops *list.List
mu sync.Mutex
busyCh chan struct{}
ops *list.List

updateNegotiationNeededFlagOnEmptyChain *atomicBool
onNegotiationNeeded func()
isClosed bool
}

func newOperations(
Expand All @@ -33,21 +34,34 @@
}

// Enqueue adds a new action to be executed. If there are no actions scheduled,
// the execution will start immediately in a new goroutine.
// the execution will start immediately in a new goroutine. If the queue has been
// closed, the operation will be dropped. The queue is only deliberately closed
// by a user.
func (o *operations) Enqueue(op operation) {
o.mu.Lock()
defer o.mu.Unlock()
_ = o.tryEnqueue(op)
}

// tryEnqueue attempts to enqueue the given operation. It returns false
// if the op is invalid or the queue is closed. mu must be locked by
// tryEnqueue's caller.
func (o *operations) tryEnqueue(op operation) bool {
if op == nil {
return
return false

Check warning on line 51 in operations.go

View check run for this annotation

Codecov / codecov/patch

operations.go#L51

Added line #L51 was not covered by tests
}

o.mu.Lock()
running := o.busy
if o.isClosed {
return false
}
o.ops.PushBack(op)
o.busy = true
o.mu.Unlock()

if !running {
if o.busyCh == nil {
o.busyCh = make(chan struct{})
go o.start()
}

return true
}

// IsEmpty checks if there are tasks in the queue
Expand All @@ -62,12 +76,38 @@
func (o *operations) Done() {
var wg sync.WaitGroup
wg.Add(1)
o.Enqueue(func() {
o.mu.Lock()
enqueued := o.tryEnqueue(func() {
wg.Done()
})
o.mu.Unlock()
if !enqueued {
return
}
wg.Wait()
}

// GracefulClose waits for the operations queue to be cleared and forbids
// new operations from being enqueued.
func (o *operations) GracefulClose() {
o.mu.Lock()
if o.isClosed {
o.mu.Unlock()
return

Check warning on line 96 in operations.go

View check run for this annotation

Codecov / codecov/patch

operations.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}
// do not enqueue anymore ops from here on
// o.isClosed=true will also not allow a new busyCh
// to be created.
o.isClosed = true

busyCh := o.busyCh
o.mu.Unlock()
if busyCh == nil {
return
}
<-busyCh

Check warning on line 108 in operations.go

View check run for this annotation

Codecov / codecov/patch

operations.go#L108

Added line #L108 was not covered by tests
}

func (o *operations) pop() func() {
o.mu.Lock()
defer o.mu.Unlock()
Expand All @@ -87,12 +127,17 @@
defer func() {
o.mu.Lock()
defer o.mu.Unlock()
if o.ops.Len() == 0 {
o.busy = false
// this wil lbe the most recent busy chan
close(o.busyCh)

if o.ops.Len() == 0 || o.isClosed {
o.busyCh = nil
return
}

// either a new operation was enqueued while we
// were busy, or an operation panicked
o.busyCh = make(chan struct{})

Check warning on line 140 in operations.go

View check run for this annotation

Codecov / codecov/patch

operations.go#L140

Added line #L140 was not covered by tests
go o.start()
}()

Expand Down
Loading
Loading