From f5fb8bf97a9a28028654a4a5c2a3f0b5a0eb38f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=8C=AF=E6=B1=9F?= Date: Wed, 29 Dec 2021 15:44:52 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E9=80=9A=E8=BF=87=20Router=20=E5=AF=B9?= =?UTF-8?q?=E8=B1=A1=E6=84=9F=E7=9F=A5=20ReceiverTrack=E7=9A=84Add?= =?UTF-8?q?=E4=B8=8ERemove?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/signal/grpc/server/server.go | 25 +++++++++++++++++++++++++ pkg/sfu/router.go | 25 +++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/cmd/signal/grpc/server/server.go b/cmd/signal/grpc/server/server.go index 9fb67ca4b..2262a5c96 100644 --- a/cmd/signal/grpc/server/server.go +++ b/cmd/signal/grpc/server/server.go @@ -251,6 +251,31 @@ func (s *SFUServer) Signal(sig rtc.RTC_SignalServer) error { publisher := peer.Publisher() if publisher != nil { + router:=publisher.GetRouter() + if router!=nil { + publisher.GetRouter().OnAddReceiverTrack(func(receiver sfu.Receiver) { + log.Debugf("[S=>C] OnAddReceiverTrack: \nKind %v, \nUid: %v, \nMsid: %v,\nTrackID: %v", receiver.Kind(), uid, receiver.StreamID(), receiver.TrackID()) + var peerTracks []*rtc.TrackInfo + peerTracks = append(peerTracks, &rtc.TrackInfo{ + Id: receiver.TrackID(), + Kind: receiver.Kind().String(), + StreamId: receiver.StreamID(), + Muted: false, + }) + s.BroadcastTrackEvent(uid, peerTracks, rtc.TrackEvent_ADD) + }) + publisher.GetRouter().OnDelReceiverTrack(func(receiver sfu.Receiver) { + log.Debugf("[S=>C] OnDelReceiverTrack: \nKind %v, \nUid: %v, \nMsid: %v,\nTrackID: %v", receiver.Kind(), uid, receiver.StreamID(), receiver.TrackID()) + var peerTracks []*rtc.TrackInfo + peerTracks = append(peerTracks, &rtc.TrackInfo{ + Id: receiver.TrackID(), + Kind: receiver.Kind().String(), + StreamId: receiver.StreamID(), + Muted: false, + }) + s.BroadcastTrackEvent(uid, peerTracks, rtc.TrackEvent_REMOVE) + }) + } var once sync.Once publisher.OnPublisherTrack(func(pt sfu.PublisherTrack) { log.Debugf("[S=>C] OnPublisherTrack: \nKind %v, \nUid: %v, \nMsid: %v,\nTrackID: %v", pt.Track.Kind(), uid, pt.Track.Msid(), pt.Track.ID()) diff --git a/pkg/sfu/router.go b/pkg/sfu/router.go index 60a32ae92..97de76850 100644 --- a/pkg/sfu/router.go +++ b/pkg/sfu/router.go @@ -2,6 +2,7 @@ package sfu import ( "sync" + "sync/atomic" "github.com/pion/ion-sfu/pkg/buffer" "github.com/pion/ion-sfu/pkg/stats" @@ -18,6 +19,9 @@ type Router interface { SetRTCPWriter(func([]rtcp.Packet) error) AddDownTrack(s *Subscriber, r Receiver) (*DownTrack, error) Stop() + GetReceiver() map[string]Receiver + OnAddReceiverTrack(f func(receiver Receiver)) + OnDelReceiverTrack(f func(receiver Receiver)) } // RouterConfig defines Router configurations @@ -43,6 +47,8 @@ type router struct { receivers map[string]Receiver bufferFactory *buffer.Factory writeRTCP func([]rtcp.Packet) error + onAddTrack atomic.Value // func(Receiver) + onDelTrack atomic.Value // func(Receiver) } // newRouter for routing rtp/rtcp packets @@ -66,6 +72,18 @@ func newRouter(id string, session Session, config *WebRTCTransportConfig) Router return r } +func (r *router) GetReceiver() map[string]Receiver { + return r.receivers +} + +func (r *router) OnAddReceiverTrack(f func(receiver Receiver)) { + r.onAddTrack.Store(f) +} + +func (r *router) OnDelReceiverTrack(f func(receiver Receiver)) { + r.onDelTrack.Store(f) +} + func (r *router) ID() string { return r.id } @@ -162,6 +180,10 @@ func (r *router) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRe r.deleteReceiver(trackID, uint32(track.SSRC())) }) publish = true + + if handler, ok := r.onAddTrack.Load().(func(Receiver)); ok && handler != nil { + handler(recv) + } } recv.AddUpTrack(track, buff, r.config.Simulcast.BestQualityFirst) @@ -269,6 +291,9 @@ func (r *router) AddDownTrack(sub *Subscriber, recv Receiver) (*DownTrack, error func (r *router) deleteReceiver(track string, ssrc uint32) { r.Lock() + if handler, ok := r.onDelTrack.Load().(func(Receiver)); ok && handler != nil { + handler(r.receivers[track]) + } delete(r.receivers, track) delete(r.stats, ssrc) r.Unlock() From 1234f82365e814957d7631e47878e244dc827444 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=8C=AF=E6=B1=9F?= Date: Wed, 29 Dec 2021 16:03:07 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/signal/grpc/server/server.go | 4 ++-- pkg/sfu/router.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/signal/grpc/server/server.go b/cmd/signal/grpc/server/server.go index 2262a5c96..561d1bb51 100644 --- a/cmd/signal/grpc/server/server.go +++ b/cmd/signal/grpc/server/server.go @@ -251,8 +251,8 @@ func (s *SFUServer) Signal(sig rtc.RTC_SignalServer) error { publisher := peer.Publisher() if publisher != nil { - router:=publisher.GetRouter() - if router!=nil { + router := publisher.GetRouter() + if router != nil { publisher.GetRouter().OnAddReceiverTrack(func(receiver sfu.Receiver) { log.Debugf("[S=>C] OnAddReceiverTrack: \nKind %v, \nUid: %v, \nMsid: %v,\nTrackID: %v", receiver.Kind(), uid, receiver.StreamID(), receiver.TrackID()) var peerTracks []*rtc.TrackInfo diff --git a/pkg/sfu/router.go b/pkg/sfu/router.go index 97de76850..9540afd94 100644 --- a/pkg/sfu/router.go +++ b/pkg/sfu/router.go @@ -19,7 +19,7 @@ type Router interface { SetRTCPWriter(func([]rtcp.Packet) error) AddDownTrack(s *Subscriber, r Receiver) (*DownTrack, error) Stop() - GetReceiver() map[string]Receiver + GetReceiver() map[string]Receiver OnAddReceiverTrack(f func(receiver Receiver)) OnDelReceiverTrack(f func(receiver Receiver)) }