Skip to content

Commit

Permalink
通过 Router 对象感知 ReceiverTrack的Add与Remove (#633)
Browse files Browse the repository at this point in the history
* 通过 Router 对象感知 ReceiverTrack的Add与Remove

* 格式化
  • Loading branch information
zjzhang-cn authored Jan 10, 2022
1 parent c8cea05 commit 8e1eaa5
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
25 changes: 25 additions & 0 deletions cmd/signal/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,31 @@ func (s *SFUServer) Signal(sig rtc.RTC_SignalServer) error {
publisher := peer.Publisher()

if publisher != nil {
router := publisher.GetRouter()
if router != nil {
publisher.GetRouter().OnAddReceiverTrack(func(receiver sfu.Receiver) {
log.Debugf("[S=>C] OnAddReceiverTrack: \nKind %v, \nUid: %v, \nMsid: %v,\nTrackID: %v", receiver.Kind(), uid, receiver.StreamID(), receiver.TrackID())
var peerTracks []*rtc.TrackInfo
peerTracks = append(peerTracks, &rtc.TrackInfo{
Id: receiver.TrackID(),
Kind: receiver.Kind().String(),
StreamId: receiver.StreamID(),
Muted: false,
})
s.BroadcastTrackEvent(uid, peerTracks, rtc.TrackEvent_ADD)
})
publisher.GetRouter().OnDelReceiverTrack(func(receiver sfu.Receiver) {
log.Debugf("[S=>C] OnDelReceiverTrack: \nKind %v, \nUid: %v, \nMsid: %v,\nTrackID: %v", receiver.Kind(), uid, receiver.StreamID(), receiver.TrackID())
var peerTracks []*rtc.TrackInfo
peerTracks = append(peerTracks, &rtc.TrackInfo{
Id: receiver.TrackID(),
Kind: receiver.Kind().String(),
StreamId: receiver.StreamID(),
Muted: false,
})
s.BroadcastTrackEvent(uid, peerTracks, rtc.TrackEvent_REMOVE)
})
}
var once sync.Once
publisher.OnPublisherTrack(func(pt sfu.PublisherTrack) {
log.Debugf("[S=>C] OnPublisherTrack: \nKind %v, \nUid: %v, \nMsid: %v,\nTrackID: %v", pt.Track.Kind(), uid, pt.Track.Msid(), pt.Track.ID())
Expand Down
25 changes: 25 additions & 0 deletions pkg/sfu/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sfu

import (
"sync"
"sync/atomic"

"github.com/pion/ion-sfu/pkg/buffer"
"github.com/pion/ion-sfu/pkg/stats"
Expand All @@ -18,6 +19,9 @@ type Router interface {
SetRTCPWriter(func([]rtcp.Packet) error)
AddDownTrack(s *Subscriber, r Receiver) (*DownTrack, error)
Stop()
GetReceiver() map[string]Receiver
OnAddReceiverTrack(f func(receiver Receiver))
OnDelReceiverTrack(f func(receiver Receiver))
}

// RouterConfig defines Router configurations
Expand All @@ -43,6 +47,8 @@ type router struct {
receivers map[string]Receiver
bufferFactory *buffer.Factory
writeRTCP func([]rtcp.Packet) error
onAddTrack atomic.Value // func(Receiver)
onDelTrack atomic.Value // func(Receiver)
}

// newRouter for routing rtp/rtcp packets
Expand All @@ -66,6 +72,18 @@ func newRouter(id string, session Session, config *WebRTCTransportConfig) Router
return r
}

func (r *router) GetReceiver() map[string]Receiver {
return r.receivers
}

func (r *router) OnAddReceiverTrack(f func(receiver Receiver)) {
r.onAddTrack.Store(f)
}

func (r *router) OnDelReceiverTrack(f func(receiver Receiver)) {
r.onDelTrack.Store(f)
}

func (r *router) ID() string {
return r.id
}
Expand Down Expand Up @@ -162,6 +180,10 @@ func (r *router) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRe
r.deleteReceiver(trackID, uint32(track.SSRC()))
})
publish = true

if handler, ok := r.onAddTrack.Load().(func(Receiver)); ok && handler != nil {
handler(recv)
}
}

recv.AddUpTrack(track, buff, r.config.Simulcast.BestQualityFirst)
Expand Down Expand Up @@ -269,6 +291,9 @@ func (r *router) AddDownTrack(sub *Subscriber, recv Receiver) (*DownTrack, error

func (r *router) deleteReceiver(track string, ssrc uint32) {
r.Lock()
if handler, ok := r.onDelTrack.Load().(func(Receiver)); ok && handler != nil {
handler(r.receivers[track])
}
delete(r.receivers, track)
delete(r.stats, ssrc)
r.Unlock()
Expand Down

0 comments on commit 8e1eaa5

Please # to comment.