Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

client: correctly sort tracks of multivariant playlists #120

Merged
merged 1 commit into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type ClientOnDataMPEG4AudioFunc func(pts time.Duration, aus [][]byte)
// ClientOnDataOpusFunc is the prototype of the function passed to OnDataOpus().
type ClientOnDataOpusFunc func(pts time.Duration, packets [][]byte)

type clientOnStreamTracksFunc func(context.Context, clientStreamProcessor) bool

func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) {
u, err := url.Parse(relative)
if err != nil {
Expand Down
20 changes: 12 additions & 8 deletions client_downloader_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ func pickAudioPlaylist(alternatives []*playlist.MultivariantRendition, groupID s
return candidates[0]
}

type clientTimeSync interface{}

type clientDownloaderPrimary struct {
primaryPlaylistURL *url.URL
httpClient *http.Client
Expand All @@ -112,15 +110,15 @@ type clientDownloaderPrimary struct {
leadingTimeSync clientTimeSync

// in
streamTracks chan []*Track
chStreamTracks chan clientStreamProcessor

// out
startStreaming chan struct{}
leadingTimeSyncReady chan struct{}
}

func (d *clientDownloaderPrimary) initialize() {
d.streamTracks = make(chan []*Track)
d.chStreamTracks = make(chan clientStreamProcessor)
d.startStreaming = make(chan struct{})
d.leadingTimeSyncReady = make(chan struct{})
}
Expand Down Expand Up @@ -221,8 +219,14 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {

for i := 0; i < streamCount; i++ {
select {
case streamTracks := <-d.streamTracks:
tracks = append(tracks, streamTracks...)
case streamProc := <-d.chStreamTracks:
if streamProc.getIsLeading() {
prevTracks := tracks
tracks = append([]*Track(nil), streamProc.getTracks()...)
tracks = append(tracks, prevTracks...)
} else {
tracks = append(tracks, streamProc.getTracks()...)
}
case <-ctx.Done():
return fmt.Errorf("terminated")
}
Expand All @@ -242,9 +246,9 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {
return nil
}

func (d *clientDownloaderPrimary) onStreamTracks(ctx context.Context, tracks []*Track) bool {
func (d *clientDownloaderPrimary) onStreamTracks(ctx context.Context, streamProc clientStreamProcessor) bool {
select {
case d.streamTracks <- tracks:
case d.chStreamTracks <- streamProc:
case <-ctx.Done():
return false
}
Expand Down
6 changes: 3 additions & 3 deletions client_downloader_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type clientDownloaderStream struct {
playlistURL *url.URL
initialPlaylist *playlist.Media
rp *clientRoutinePool
onStreamTracks func(context.Context, []*Track) bool
onStreamTracks clientOnStreamTracksFunc
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}
Expand Down Expand Up @@ -70,7 +70,7 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
return err
}

proc := &clientProcessorFMP4{
proc := &clientStreamProcessorFMP4{
ctx: ctx,
isLeading: d.isLeading,
initFile: byts,
Expand All @@ -88,7 +88,7 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {

d.rp.add(proc)
} else {
proc := &clientProcessorMPEGTS{
proc := &clientStreamProcessorMPEGTS{
onDecodeError: d.onDecodeError,
isLeading: d.isLeading,
segmentQueue: segmentQueue,
Expand Down
6 changes: 6 additions & 0 deletions client_stream_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package gohlslib

type clientStreamProcessor interface {
getIsLeading() bool
getTracks() []*Track
}
95 changes: 57 additions & 38 deletions client_processor_fmp4.go → client_stream_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,33 @@
return init.Tracks[0].ID
}

type clientProcessorFMP4 struct {
func findPartTrackOfLeadingTrack(parts []*fmp4.Part, leadingTrackID int) *fmp4.PartTrack {
for _, part := range parts {
for _, partTrack := range part.Tracks {
if partTrack.ID == leadingTrackID {
return partTrack
}
}
}
return nil

Check warning on line 34 in client_stream_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_stream_processor_fmp4.go#L34

Added line #L34 was not covered by tests
}

func findTimeScaleOfLeadingTrack(tracks []*fmp4.InitTrack, leadingTrackID int) uint32 {
for _, track := range tracks {
if track.ID == leadingTrackID {
return track.TimeScale
}
}
return 0

Check warning on line 43 in client_stream_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_stream_processor_fmp4.go#L43

Added line #L43 was not covered by tests
}

type clientStreamProcessorFMP4 struct {
ctx context.Context
isLeading bool
initFile []byte
segmentQueue *clientSegmentQueue
rp *clientRoutinePool
onStreamTracks func(context.Context, []*Track) bool
onStreamTracks clientOnStreamTracksFunc
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}
Expand All @@ -40,11 +60,11 @@
prePreProcessFuncs map[int]func(context.Context, *fmp4.PartTrack) error

// in
subpartProcessed chan struct{}
chPartTrackProcessed chan struct{}
}

func (p *clientProcessorFMP4) initialize() error {
p.subpartProcessed = make(chan struct{}, clientFMP4MaxPartTracksPerSegment)
func (p *clientStreamProcessorFMP4) initialize() error {
p.chPartTrackProcessed = make(chan struct{}, clientFMP4MaxPartTracksPerSegment)

err := p.init.Unmarshal(bytes.NewReader(p.initFile))
if err != nil {
Expand All @@ -60,15 +80,23 @@
}
}

ok := p.onStreamTracks(p.ctx, p.tracks)
ok := p.onStreamTracks(p.ctx, p)
if !ok {
return fmt.Errorf("terminated")
}

return nil
}

func (p *clientProcessorFMP4) run(ctx context.Context) error {
func (p *clientStreamProcessorFMP4) getIsLeading() bool {
return p.isLeading
}

func (p *clientStreamProcessorFMP4) getTracks() []*Track {
return p.tracks
}

func (p *clientStreamProcessorFMP4) run(ctx context.Context) error {
for {
seg, ok := p.segmentQueue.pull(ctx)
if !ok {
Expand All @@ -82,34 +110,33 @@
}
}

func (p *clientProcessorFMP4) processSegment(ctx context.Context, byts []byte) error {
func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, byts []byte) error {
var parts fmp4.Parts
err := parts.Unmarshal(byts)
if err != nil {
return err
}

if p.prePreProcessFuncs == nil {
err := p.initializeTrackProcessors(ctx, parts)
if err != nil {
return err
}

Check warning on line 124 in client_stream_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_stream_processor_fmp4.go#L123-L124

Added lines #L123 - L124 were not covered by tests
}

processingCount := 0

for _, part := range parts {
for _, partTrack := range part.Tracks {
err := p.initializeTrackProcs(ctx, partTrack)
if err != nil {
if err == errSkipSilently {
continue
}
return err
if processingCount >= (clientFMP4MaxPartTracksPerSegment - 1) {
return fmt.Errorf("too many part tracks at once")

Check warning on line 132 in client_stream_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_stream_processor_fmp4.go#L132

Added line #L132 was not covered by tests
}

prePreProcess, ok := p.prePreProcessFuncs[partTrack.ID]
if !ok {
continue
}

if processingCount >= (clientFMP4MaxPartTracksPerSegment - 1) {
return fmt.Errorf("too many part tracks at once")
}

err = prePreProcess(ctx, partTrack)
if err != nil {
return err
Expand All @@ -121,7 +148,7 @@

for i := 0; i < processingCount; i++ {
select {
case <-p.subpartProcessed:
case <-p.chPartTrackProcessed:
case <-ctx.Done():
return fmt.Errorf("terminated")
}
Expand All @@ -130,38 +157,30 @@
return nil
}

func (p *clientProcessorFMP4) onPartTrackProcessed(ctx context.Context) {
func (p *clientStreamProcessorFMP4) onPartTrackProcessed(ctx context.Context) {
select {
case p.subpartProcessed <- struct{}{}:
case p.chPartTrackProcessed <- struct{}{}:
case <-ctx.Done():
}
}

func (p *clientProcessorFMP4) initializeTrackProcs(ctx context.Context, track *fmp4.PartTrack) error {
if p.prePreProcessFuncs != nil {
return nil
}

func (p *clientStreamProcessorFMP4) initializeTrackProcessors(
ctx context.Context,
parts []*fmp4.Part,
) error {
var timeSync *clientTimeSyncFMP4
isLeadingTrack := (track.ID == p.leadingTrackID)

if p.isLeading {
if !isLeadingTrack {
return errSkipSilently
trackPart := findPartTrackOfLeadingTrack(parts, p.leadingTrackID)
if trackPart == nil {
return nil

Check warning on line 176 in client_stream_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_stream_processor_fmp4.go#L176

Added line #L176 was not covered by tests
}

timeScale := func() uint32 {
for _, track := range p.init.Tracks {
if isLeadingTrack {
return track.TimeScale
}
}
return 0
}()
timeScale := findTimeScaleOfLeadingTrack(p.init.Tracks, p.leadingTrackID)

timeSync = &clientTimeSyncFMP4{
timeScale: timeScale,
baseTime: track.BaseTime,
baseTime: trackPart.BaseTime,
}
timeSync.initialize()

Expand Down
26 changes: 19 additions & 7 deletions client_processor_mpegts.go → client_stream_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ func (r *switchableReader) Read(p []byte) (int, error) {
return r.r.Read(p)
}

type clientProcessorMPEGTS struct {
type clientStreamProcessorMPEGTS struct {
onDecodeError ClientOnDecodeErrorFunc
isLeading bool
segmentQueue *clientSegmentQueue
rp *clientRoutinePool
onStreamTracks func(context.Context, []*Track) bool
onStreamTracks clientOnStreamTracksFunc
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}
Expand All @@ -54,7 +54,15 @@ type clientProcessorMPEGTS struct {
timeSync *clientTimeSyncMPEGTS
}

func (p *clientProcessorMPEGTS) run(ctx context.Context) error {
func (p *clientStreamProcessorMPEGTS) getIsLeading() bool {
return p.isLeading
}

func (p *clientStreamProcessorMPEGTS) getTracks() []*Track {
return p.tracks
}

func (p *clientStreamProcessorMPEGTS) run(ctx context.Context) error {
for {
seg, ok := p.segmentQueue.pull(ctx)
if !ok {
Expand All @@ -68,7 +76,7 @@ func (p *clientProcessorMPEGTS) run(ctx context.Context) error {
}
}

func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) error {
func (p *clientStreamProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) error {
if p.switchableReader == nil {
p.switchableReader = &switchableReader{bytes.NewReader(byts)}

Expand Down Expand Up @@ -99,7 +107,7 @@ func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte)
}
}

ok := p.onStreamTracks(ctx, p.tracks)
ok := p.onStreamTracks(ctx, p)
if !ok {
return fmt.Errorf("terminated")
}
Expand Down Expand Up @@ -128,7 +136,7 @@ func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte)
}

prePreProcess := func(pts int64, dts int64, postProcess func(time.Duration, time.Duration)) error {
err := p.initializeTrackProcs(ctx, isLeadingTrack, dts)
err := p.initializeTrackProcessors(ctx, isLeadingTrack, dts)
if err != nil {
if err == errSkipSilently {
return nil
Expand Down Expand Up @@ -190,7 +198,11 @@ func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte)
}
}

func (p *clientProcessorMPEGTS) initializeTrackProcs(ctx context.Context, isLeadingTrack bool, dts int64) error {
func (p *clientStreamProcessorMPEGTS) initializeTrackProcessors(
ctx context.Context,
isLeadingTrack bool,
dts int64,
) error {
if p.trackProcs != nil {
return nil
}
Expand Down
Loading
Loading