From 1b12c936803fa1936c3298587c2585bac908321e Mon Sep 17 00:00:00 2001 From: notedit Date: Fri, 8 May 2020 20:01:45 +0800 Subject: [PATCH] clean code --- incomingstream.go | 51 +++++++++++++++++++----------------------- incomingstreamtrack.go | 2 -- outgoingstream.go | 43 +++++++++++++++++------------------ transponder.go | 10 --------- transport.go | 17 +++++++------- wrapper/module.go | 1 + 6 files changed, 52 insertions(+), 72 deletions(-) diff --git a/incomingstream.go b/incomingstream.go index 3876ed0..d762a60 100644 --- a/incomingstream.go +++ b/incomingstream.go @@ -15,20 +15,13 @@ import ( type IncomingStream struct { id string info *sdp.StreamInfo - transport transportWrapper + transport native.DTLSICETransport receiver native.RTPReceiverFacade tracks map[string]*IncomingStreamTrack - onStopListeners []func() onStreamAddIncomingTrackListeners []func(*IncomingStreamTrack) - sync.Mutex + l sync.Mutex } -// internal use -type transportWrapper interface { - AddIncomingSourceGroup(group native.RTPIncomingSourceGroup) bool - RemoveIncomingSourceGroup(group native.RTPIncomingSourceGroup) bool - GetTimeService() native.TimeService -} // NewIncomingStream Create new incoming stream // TODO: make this public @@ -39,7 +32,6 @@ func newIncomingStream(transport native.DTLSICETransport, receiver native.RTPRec stream.receiver = receiver stream.tracks = make(map[string]*IncomingStreamTrack) - stream.onStopListeners = make([]func(), 0) stream.onStreamAddIncomingTrackListeners = make([]func(*IncomingStreamTrack), 0) for _, track := range info.GetTracks() { @@ -79,13 +71,15 @@ func (i *IncomingStream) GetStats() map[string]map[string]*IncomingAllStats { // GetTrack Get track by id func (i *IncomingStream) GetTrack(trackID string) *IncomingStreamTrack { - i.Lock() - defer i.Unlock() + i.l.Lock() + defer i.l.Unlock() return i.tracks[trackID] } // GetTracks Get all tracks in this stream func (i *IncomingStream) GetTracks() []*IncomingStreamTrack { + i.l.Lock() + defer i.l.Unlock() tracks := []*IncomingStreamTrack{} for _, track := range i.tracks { tracks = append(tracks, track) @@ -95,6 +89,8 @@ func (i *IncomingStream) GetTracks() []*IncomingStreamTrack { // GetAudioTracks get all audio tracks func (i *IncomingStream) GetAudioTracks() []*IncomingStreamTrack { + i.l.Lock() + defer i.l.Unlock() audioTracks := []*IncomingStreamTrack{} for _, track := range i.tracks { if strings.ToLower(track.GetMedia()) == "audio" { @@ -106,6 +102,8 @@ func (i *IncomingStream) GetAudioTracks() []*IncomingStreamTrack { // GetVideoTracks get all video tracks func (i *IncomingStream) GetVideoTracks() []*IncomingStreamTrack { + i.l.Lock() + defer i.l.Unlock() videoTracks := []*IncomingStreamTrack{} for _, track := range i.tracks { if strings.ToLower(track.GetMedia()) == "video" { @@ -118,8 +116,8 @@ func (i *IncomingStream) GetVideoTracks() []*IncomingStreamTrack { // AddTrack Adds an incoming stream track created using the Transpocnder.CreateIncomingStreamTrack to this stream func (i *IncomingStream) AddTrack(track *IncomingStreamTrack) error { - i.Lock() - defer i.Unlock() + i.l.Lock() + defer i.l.Unlock() if _, ok := i.tracks[track.GetID()]; ok { return errors.New("Track id already present in stream") } @@ -130,7 +128,10 @@ func (i *IncomingStream) AddTrack(track *IncomingStreamTrack) error { func (i *IncomingStream) RemoveTrack(track *IncomingStreamTrack) error { - // TODO + i.l.Lock() + defer i.l.Unlock() + + delete(i.tracks,track.GetID()) return nil } @@ -268,22 +269,13 @@ func (i *IncomingStream) CreateTrack(track *sdp.TrackInfo) *IncomingStreamTrack incomingTrack := NewIncomingStreamTrack(track.GetMedia(), track.GetID(), i.receiver, sources) - i.Lock() + i.l.Lock() i.tracks[track.GetID()] = incomingTrack - i.Unlock() - - for _, ontrack := range i.onStreamAddIncomingTrackListeners { - ontrack(incomingTrack) - } + i.l.Unlock() return incomingTrack } -// OnTrack register addtrack listener -func (i *IncomingStream) OnTrack(ontrack func(*IncomingStreamTrack)) { - i.onStreamAddIncomingTrackListeners = append(i.onStreamAddIncomingTrackListeners, ontrack) -} - // Stop Removes the media strem from the transport and also detaches from any attached incoming stream func (i *IncomingStream) Stop() { @@ -291,13 +283,16 @@ func (i *IncomingStream) Stop() { return } + i.l.Lock() + defer i.l.Unlock() + for k, track := range i.tracks { track.Stop() - i.Lock() delete(i.tracks, k) - i.Unlock() } + native.DeleteRTPReceiverFacade(i.receiver) // other module maybe need delete i.receiver = nil + i.transport = nil } diff --git a/incomingstreamtrack.go b/incomingstreamtrack.go index 110897a..71a3dac 100644 --- a/incomingstreamtrack.go +++ b/incomingstreamtrack.go @@ -157,9 +157,7 @@ func getStatsFromIncomingSource(source native.RTPIncomingSource) *IncomingStats } for _, layer2 := range individual { - if layer2.SpatialLayerId <= aggregated.SpatialLayerId && layer2.TemporalLayerId <= aggregated.TemporalLayerId { - aggregated.TotalBytes += layer2.TotalBytes aggregated.NumPackets += layer2.NumPackets aggregated.Bitrate += layer2.Bitrate diff --git a/outgoingstream.go b/outgoingstream.go index 53d2a90..0f98ebc 100644 --- a/outgoingstream.go +++ b/outgoingstream.go @@ -16,9 +16,8 @@ type OutgoingStream struct { muted bool tracks map[string]*OutgoingStreamTrack onStopListeners []func() - onMuteListeners []func(bool) onAddTrackListeners []func(*OutgoingStreamTrack) - sync.Mutex + l sync.Mutex } // NewOutgoingStream create outgoing stream @@ -35,7 +34,6 @@ func NewOutgoingStream(transport native.DTLSICETransport, info *sdp.StreamInfo) } stream.onStopListeners = make([]func(), 0) - stream.onMuteListeners = make([]func(bool), 0) stream.onAddTrackListeners = make([]func(*OutgoingStreamTrack), 0) return stream @@ -70,9 +68,6 @@ func (o *OutgoingStream) Mute(muting bool) { if o.muted != muting { o.muted = muting - for _, muteFunc := range o.onMuteListeners { - muteFunc(muting) - } } } @@ -116,6 +111,8 @@ func (o *OutgoingStream) AttachTo(incomingStream *IncomingStream) []*Transponder // Detach Stop listening for media func (o *OutgoingStream) Detach() { + o.l.Lock() + defer o.l.Unlock() for _, track := range o.tracks { track.Detach() } @@ -123,19 +120,20 @@ func (o *OutgoingStream) Detach() { // GetStreamInfo get the stream info func (o *OutgoingStream) GetStreamInfo() *sdp.StreamInfo { - return o.info } // GetTrack get one track func (o *OutgoingStream) GetTrack(trackID string) *OutgoingStreamTrack { - o.Lock() - defer o.Unlock() + o.l.Lock() + defer o.l.Unlock() return o.tracks[trackID] } // GetTracks get all the tracks func (o *OutgoingStream) GetTracks() []*OutgoingStreamTrack { + o.l.Lock() + defer o.l.Unlock() tracks := []*OutgoingStreamTrack{} for _, track := range o.tracks { tracks = append(tracks, track) @@ -145,6 +143,8 @@ func (o *OutgoingStream) GetTracks() []*OutgoingStreamTrack { // GetAudioTracks Get an array of the media stream audio tracks func (o *OutgoingStream) GetAudioTracks() []*OutgoingStreamTrack { + o.l.Lock() + defer o.l.Unlock() audioTracks := []*OutgoingStreamTrack{} for _, track := range o.tracks { if strings.ToLower(track.GetMedia()) == "audio" { @@ -156,6 +156,8 @@ func (o *OutgoingStream) GetAudioTracks() []*OutgoingStreamTrack { // GetVideoTracks Get an array of the media stream video tracks func (o *OutgoingStream) GetVideoTracks() []*OutgoingStreamTrack { + o.l.Lock() + defer o.l.Unlock() videoTracks := []*OutgoingStreamTrack{} for _, track := range o.tracks { if strings.ToLower(track.GetMedia()) == "video" { @@ -167,9 +169,8 @@ func (o *OutgoingStream) GetVideoTracks() []*OutgoingStreamTrack { // AddTrack add one outgoing track func (o *OutgoingStream) AddTrack(track *OutgoingStreamTrack) { - - o.Lock() - defer o.Unlock() + o.l.Lock() + defer o.l.Unlock() if _, ok := o.tracks[track.GetID()]; ok { return @@ -178,7 +179,10 @@ func (o *OutgoingStream) AddTrack(track *OutgoingStreamTrack) { } func (o *OutgoingStream) RemoveTrack(track *OutgoingStreamTrack) { - // TODO + o.l.Lock() + defer o.l.Unlock() + + delete(o.tracks, track.GetID()) } // CreateTrack Create new track from a TrackInfo object and add it to this stream @@ -216,13 +220,14 @@ func (o *OutgoingStream) CreateTrack(track *sdp.TrackInfo) *OutgoingStreamTrack outgoingTrack := newOutgoingStreamTrack(track.GetMedia(), track.GetID(), native.TransportToSender(o.transport), source) + // TODO // runtime.SetFinalizer(source, func(source native.RTPOutgoingSourceGroup) { // o.transport.RemoveOutgoingSourceGroup(source) // }) - o.Lock() + o.l.Lock() o.tracks[outgoingTrack.GetID()] = outgoingTrack - o.Unlock() + o.l.Unlock() for _, addTrackFunc := range o.onAddTrackListeners { addTrackFunc(outgoingTrack) @@ -236,10 +241,6 @@ func (o *OutgoingStream) OnTrack(listener func(*OutgoingStreamTrack)) { o.onAddTrackListeners = append(o.onAddTrackListeners, listener) } -// OnMute register onmute listener -func (o *OutgoingStream) OnMute(listener func(bool)) { - o.onMuteListeners = append(o.onMuteListeners, listener) -} // Stop stop the remote stream func (o *OutgoingStream) Stop() { @@ -252,10 +253,6 @@ func (o *OutgoingStream) Stop() { track.Stop() } - for _, stopFunc := range o.onStopListeners { - stopFunc() - } - o.tracks = make(map[string]*OutgoingStreamTrack, 0) o.transport = nil diff --git a/transponder.go b/transponder.go index 49ebd9d..e92b47e 100644 --- a/transponder.go +++ b/transponder.go @@ -28,7 +28,6 @@ type Transponder struct { temporalLayerId int maxSpatialLayerId int maxTemporalLayerId int - onMuteListeners []func(bool) onStopListeners []func() } @@ -42,7 +41,6 @@ func NewTransponder(transponderFacade native.RTPStreamTransponderFacade) *Transp transponder.maxSpatialLayerId = MaxLayerId transponder.maxTemporalLayerId = MaxLayerId - transponder.onMuteListeners = make([]func(bool), 0) transponder.onStopListeners = make([]func(), 0) return transponder @@ -109,10 +107,6 @@ func (t *Transponder) Mute(muting bool) { if t.transponder != nil { t.transponder.Mute(muting) } - - for _, mutefunc := range t.onMuteListeners { - mutefunc(muting) - } } } @@ -302,10 +296,6 @@ func (t *Transponder) SetMaximumLayers(maxSpatialLayerId, maxTemporalLayerId int t.maxTemporalLayerId = maxTemporalLayerId } -// OnMute register mute listener -func (t *Transponder) OnMute(listener func(bool)) { - t.onMuteListeners = append(t.onMuteListeners, listener) -} // Stop stop this transponder func (t *Transponder) Stop() { diff --git a/transport.go b/transport.go index b8023a3..6757d68 100644 --- a/transport.go +++ b/transport.go @@ -89,7 +89,7 @@ type Transport struct { incomingStreamTracks map[string]*IncomingStreamTrack outgoingStreamTracks map[string]*OutgoingStreamTrack - iceStats ICEStats + iceStats *ICEStats senderSideListener senderSideEstimatorListener dtlsICEListener dtlsICETransportListener @@ -133,7 +133,7 @@ func NewTransport(bundle native.RTPBundleTransport, remoteIce *sdp.ICEInfo, remo transport.connection = bundle.AddICETransport(transport.username, properties) transport.transport = transport.connection.GetTransport() - transport.iceStats = ICEStats{} + transport.iceStats = &ICEStats{} native.DeletePropertiesFacade(properties) @@ -206,15 +206,14 @@ func (t *Transport) GetDTLSState() string { } // GetICEStats get ice stats -func (t *Transport) GetICEStats() ICEStats { +func (t *Transport) GetICEStats() *ICEStats { - iceStats := ICEStats{} - iceStats.RequestsSent = t.connection.GetIceRequestsSent() - iceStats.RequestsReceived = t.connection.GetIceRequestsReceived() - iceStats.ResponsesSent = t.connection.GetIceResponsesSent() - iceStats.ResponsesReceived = t.connection.GetIceResponsesReceived() + t.iceStats.RequestsSent = t.connection.GetIceRequestsSent() + t.iceStats.RequestsReceived = t.connection.GetIceRequestsReceived() + t.iceStats.ResponsesSent = t.connection.GetIceResponsesSent() + t.iceStats.ResponsesReceived = t.connection.GetIceResponsesReceived() - return iceStats + return t.iceStats } // SetRemoteProperties Set remote RTP properties diff --git a/wrapper/module.go b/wrapper/module.go index 38e76e3..3989d0d 100644 --- a/wrapper/module.go +++ b/wrapper/module.go @@ -2,6 +2,7 @@ package native /* +#cgo CFLAGS: -Wno-deprecated #cgo CXXFLAGS: -std=c++1z #cgo CPPFLAGS: -I${SRCDIR}/../thirdparty/openssl/build/include/ #cgo CPPFLAGS: -I${SRCDIR}/../thirdparty/libsrtp/build/include/