Skip to content

Commit

Permalink
RSDK-4293 wait for answerer to finish completely (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximpertsov authored Oct 18, 2023
1 parent 60c2504 commit 429830c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
1 change: 1 addition & 0 deletions rpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
)

func TestServer(t *testing.T) {
t.Setenv(testDelayAnswererNegotiationVar, "t")
testutils.SkipUnlessInternet(t)
logger := golog.NewTestLogger(t)

Expand Down
12 changes: 6 additions & 6 deletions rpc/wrtc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,36 +223,36 @@ func dialWebRTC(
var pendingCandidates sync.WaitGroup
waitOneHost := make(chan struct{})
var waitOneHostOnce sync.Once
pc.OnICECandidate(func(i *webrtc.ICECandidate) {
pc.OnICECandidate(func(icecandidate *webrtc.ICECandidate) {
if exchangeCtx.Err() != nil {
return
}
if i != nil {
if icecandidate != nil {
pendingCandidates.Add(1)
if i.Typ == webrtc.ICECandidateTypeHost {
if icecandidate.Typ == webrtc.ICECandidateTypeHost {
waitOneHostOnce.Do(func() {
close(waitOneHost)
})
}
}
// must spin off to unblock the ICE gatherer
utils.PanicCapturingGo(func() {
if i != nil {
if icecandidate != nil {
defer pendingCandidates.Done()
}
select {
case <-remoteDescSet:
case <-exchangeCtx.Done():
return
}
if i == nil {
if icecandidate == nil {
pendingCandidates.Wait()
if err := sendDone(); err != nil {
sendErr(err)
}
return
}
iProto := iceCandidateToProto(i)
iProto := iceCandidateToProto(icecandidate)
if _, err := signalingClient.CallUpdate(exchangeCtx, &webrtcpb.CallUpdateRequest{
Uuid: uuid,
Update: &webrtcpb.CallUpdateRequest_Candidate{
Expand Down
34 changes: 25 additions & 9 deletions rpc/wrtc_signaling_answerer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
"time"
Expand All @@ -20,6 +21,8 @@ import (
webrtcpb "go.viam.com/utils/proto/rpc/webrtc/v1"
)

const testDelayAnswererNegotiationVar = "TEST_DELAY_ANSWERER_NEGOTIATION"

// A webrtcSignalingAnswerer listens for and answers calls with a given signaling service. It is
// directly connected to a Server that will handle the actual calls/connections over WebRTC
// data channels.
Expand Down Expand Up @@ -303,37 +306,50 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
return err
}

var callFlowWG sync.WaitGroup
var pendingCandidates sync.WaitGroup
waitOneHost := make(chan struct{})
var waitOneHostOnce sync.Once
pc.OnICECandidate(func(i *webrtc.ICECandidate) {
pc.OnICECandidate(func(icecandidate *webrtc.ICECandidate) {
if exchangeCtx.Err() != nil {
return
}
if i != nil {
callFlowWG.Add(1)
if i.Typ == webrtc.ICECandidateTypeHost {
if icecandidate != nil {
pendingCandidates.Add(1)
if icecandidate.Typ == webrtc.ICECandidateTypeHost {
waitOneHostOnce.Do(func() {
close(waitOneHost)
})
}
}
// must spin off to unblock the ICE gatherer
ans.activeBackgroundWorkers.Add(1)
utils.PanicCapturingGo(func() {
defer ans.activeBackgroundWorkers.Done()

if icecandidate != nil {
defer pendingCandidates.Done()
}

select {
case <-initSent:
case <-exchangeCtx.Done():
return
}
if i == nil {
callFlowWG.Wait()
// there are no more candidates coming during this negotiation
if icecandidate == nil {
if _, ok := os.LookupEnv(testDelayAnswererNegotiationVar); ok {
// RSDK-4293: Introducing a sleep here replicates the conditions
// for a prior goroutine leak.
ans.logger.Debug("Sleeping to delay the end of the negotiation")
time.Sleep(1 * time.Second)
}
pendingCandidates.Wait()
if err := sendDone(); err != nil {
sendErr(err)
}
return
}
defer callFlowWG.Done()
iProto := iceCandidateToProto(i)
iProto := iceCandidateToProto(icecandidate)
if err := client.Send(&webrtcpb.AnswerResponse{
Uuid: uuid,
Stage: &webrtcpb.AnswerResponse_Update{
Expand Down

0 comments on commit 429830c

Please sign in to comment.