diff --git a/client.go b/client.go index 6e0b02b..36ddb28 100644 --- a/client.go +++ b/client.go @@ -52,6 +52,8 @@ type ClientOnDataMPEG4AudioFunc func(pts time.Duration, aus [][]byte) // ClientOnDataOpusFunc is the prototype of the function passed to OnDataOpus(). type ClientOnDataOpusFunc func(pts time.Duration, packets [][]byte) +type clientOnStreamTracksFunc func(context.Context, clientStreamProcessor) bool + func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) { u, err := url.Parse(relative) if err != nil { diff --git a/client_downloader_primary.go b/client_downloader_primary.go index 579ce52..b33d40d 100644 --- a/client_downloader_primary.go +++ b/client_downloader_primary.go @@ -96,8 +96,6 @@ func pickAudioPlaylist(alternatives []*playlist.MultivariantRendition, groupID s return candidates[0] } -type clientTimeSync interface{} - type clientDownloaderPrimary struct { primaryPlaylistURL *url.URL httpClient *http.Client @@ -112,7 +110,7 @@ type clientDownloaderPrimary struct { leadingTimeSync clientTimeSync // in - streamTracks chan []*Track + chStreamTracks chan clientStreamProcessor // out startStreaming chan struct{} @@ -120,7 +118,7 @@ type clientDownloaderPrimary struct { } func (d *clientDownloaderPrimary) initialize() { - d.streamTracks = make(chan []*Track) + d.chStreamTracks = make(chan clientStreamProcessor) d.startStreaming = make(chan struct{}) d.leadingTimeSyncReady = make(chan struct{}) } @@ -221,8 +219,14 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error { for i := 0; i < streamCount; i++ { select { - case streamTracks := <-d.streamTracks: - tracks = append(tracks, streamTracks...) + case streamProc := <-d.chStreamTracks: + if streamProc.getIsLeading() { + prevTracks := tracks + tracks = append([]*Track(nil), streamProc.getTracks()...) + tracks = append(tracks, prevTracks...) + } else { + tracks = append(tracks, streamProc.getTracks()...) + } case <-ctx.Done(): return fmt.Errorf("terminated") } @@ -242,9 +246,9 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error { return nil } -func (d *clientDownloaderPrimary) onStreamTracks(ctx context.Context, tracks []*Track) bool { +func (d *clientDownloaderPrimary) onStreamTracks(ctx context.Context, streamProc clientStreamProcessor) bool { select { - case d.streamTracks <- tracks: + case d.chStreamTracks <- streamProc: case <-ctx.Done(): return false } diff --git a/client_downloader_stream.go b/client_downloader_stream.go index bc9de0e..4ee9dc9 100644 --- a/client_downloader_stream.go +++ b/client_downloader_stream.go @@ -38,7 +38,7 @@ type clientDownloaderStream struct { playlistURL *url.URL initialPlaylist *playlist.Media rp *clientRoutinePool - onStreamTracks func(context.Context, []*Track) bool + onStreamTracks clientOnStreamTracksFunc onSetLeadingTimeSync func(clientTimeSync) onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool) onData map[*Track]interface{} @@ -70,7 +70,7 @@ func (d *clientDownloaderStream) run(ctx context.Context) error { return err } - proc := &clientProcessorFMP4{ + proc := &clientStreamProcessorFMP4{ ctx: ctx, isLeading: d.isLeading, initFile: byts, @@ -88,7 +88,7 @@ func (d *clientDownloaderStream) run(ctx context.Context) error { d.rp.add(proc) } else { - proc := &clientProcessorMPEGTS{ + proc := &clientStreamProcessorMPEGTS{ onDecodeError: d.onDecodeError, isLeading: d.isLeading, segmentQueue: segmentQueue, diff --git a/client_stream_processor.go b/client_stream_processor.go new file mode 100644 index 0000000..37235b5 --- /dev/null +++ b/client_stream_processor.go @@ -0,0 +1,6 @@ +package gohlslib + +type clientStreamProcessor interface { + getIsLeading() bool + getTracks() []*Track +} diff --git a/client_processor_fmp4.go b/client_stream_processor_fmp4.go similarity index 77% rename from client_processor_fmp4.go rename to client_stream_processor_fmp4.go index a3bcd98..5b456b2 100644 --- a/client_processor_fmp4.go +++ b/client_stream_processor_fmp4.go @@ -23,13 +23,33 @@ func fmp4PickLeadingTrack(init *fmp4.Init) int { return init.Tracks[0].ID } -type clientProcessorFMP4 struct { +func findPartTrackOfLeadingTrack(parts []*fmp4.Part, leadingTrackID int) *fmp4.PartTrack { + for _, part := range parts { + for _, partTrack := range part.Tracks { + if partTrack.ID == leadingTrackID { + return partTrack + } + } + } + return nil +} + +func findTimeScaleOfLeadingTrack(tracks []*fmp4.InitTrack, leadingTrackID int) uint32 { + for _, track := range tracks { + if track.ID == leadingTrackID { + return track.TimeScale + } + } + return 0 +} + +type clientStreamProcessorFMP4 struct { ctx context.Context isLeading bool initFile []byte segmentQueue *clientSegmentQueue rp *clientRoutinePool - onStreamTracks func(context.Context, []*Track) bool + onStreamTracks clientOnStreamTracksFunc onSetLeadingTimeSync func(clientTimeSync) onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool) onData map[*Track]interface{} @@ -40,11 +60,11 @@ type clientProcessorFMP4 struct { prePreProcessFuncs map[int]func(context.Context, *fmp4.PartTrack) error // in - subpartProcessed chan struct{} + chPartTrackProcessed chan struct{} } -func (p *clientProcessorFMP4) initialize() error { - p.subpartProcessed = make(chan struct{}, clientFMP4MaxPartTracksPerSegment) +func (p *clientStreamProcessorFMP4) initialize() error { + p.chPartTrackProcessed = make(chan struct{}, clientFMP4MaxPartTracksPerSegment) err := p.init.Unmarshal(bytes.NewReader(p.initFile)) if err != nil { @@ -60,7 +80,7 @@ func (p *clientProcessorFMP4) initialize() error { } } - ok := p.onStreamTracks(p.ctx, p.tracks) + ok := p.onStreamTracks(p.ctx, p) if !ok { return fmt.Errorf("terminated") } @@ -68,7 +88,15 @@ func (p *clientProcessorFMP4) initialize() error { return nil } -func (p *clientProcessorFMP4) run(ctx context.Context) error { +func (p *clientStreamProcessorFMP4) getIsLeading() bool { + return p.isLeading +} + +func (p *clientStreamProcessorFMP4) getTracks() []*Track { + return p.tracks +} + +func (p *clientStreamProcessorFMP4) run(ctx context.Context) error { for { seg, ok := p.segmentQueue.pull(ctx) if !ok { @@ -82,23 +110,26 @@ func (p *clientProcessorFMP4) run(ctx context.Context) error { } } -func (p *clientProcessorFMP4) processSegment(ctx context.Context, byts []byte) error { +func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, byts []byte) error { var parts fmp4.Parts err := parts.Unmarshal(byts) if err != nil { return err } + if p.prePreProcessFuncs == nil { + err := p.initializeTrackProcessors(ctx, parts) + if err != nil { + return err + } + } + processingCount := 0 for _, part := range parts { for _, partTrack := range part.Tracks { - err := p.initializeTrackProcs(ctx, partTrack) - if err != nil { - if err == errSkipSilently { - continue - } - return err + if processingCount >= (clientFMP4MaxPartTracksPerSegment - 1) { + return fmt.Errorf("too many part tracks at once") } prePreProcess, ok := p.prePreProcessFuncs[partTrack.ID] @@ -106,10 +137,6 @@ func (p *clientProcessorFMP4) processSegment(ctx context.Context, byts []byte) e continue } - if processingCount >= (clientFMP4MaxPartTracksPerSegment - 1) { - return fmt.Errorf("too many part tracks at once") - } - err = prePreProcess(ctx, partTrack) if err != nil { return err @@ -121,7 +148,7 @@ func (p *clientProcessorFMP4) processSegment(ctx context.Context, byts []byte) e for i := 0; i < processingCount; i++ { select { - case <-p.subpartProcessed: + case <-p.chPartTrackProcessed: case <-ctx.Done(): return fmt.Errorf("terminated") } @@ -130,38 +157,30 @@ func (p *clientProcessorFMP4) processSegment(ctx context.Context, byts []byte) e return nil } -func (p *clientProcessorFMP4) onPartTrackProcessed(ctx context.Context) { +func (p *clientStreamProcessorFMP4) onPartTrackProcessed(ctx context.Context) { select { - case p.subpartProcessed <- struct{}{}: + case p.chPartTrackProcessed <- struct{}{}: case <-ctx.Done(): } } -func (p *clientProcessorFMP4) initializeTrackProcs(ctx context.Context, track *fmp4.PartTrack) error { - if p.prePreProcessFuncs != nil { - return nil - } - +func (p *clientStreamProcessorFMP4) initializeTrackProcessors( + ctx context.Context, + parts []*fmp4.Part, +) error { var timeSync *clientTimeSyncFMP4 - isLeadingTrack := (track.ID == p.leadingTrackID) if p.isLeading { - if !isLeadingTrack { - return errSkipSilently + trackPart := findPartTrackOfLeadingTrack(parts, p.leadingTrackID) + if trackPart == nil { + return nil } - timeScale := func() uint32 { - for _, track := range p.init.Tracks { - if isLeadingTrack { - return track.TimeScale - } - } - return 0 - }() + timeScale := findTimeScaleOfLeadingTrack(p.init.Tracks, p.leadingTrackID) timeSync = &clientTimeSyncFMP4{ timeScale: timeScale, - baseTime: track.BaseTime, + baseTime: trackPart.BaseTime, } timeSync.initialize() diff --git a/client_processor_mpegts.go b/client_stream_processor_mpegts.go similarity index 88% rename from client_processor_mpegts.go rename to client_stream_processor_mpegts.go index f4b7424..b43d5e4 100644 --- a/client_processor_mpegts.go +++ b/client_stream_processor_mpegts.go @@ -37,12 +37,12 @@ func (r *switchableReader) Read(p []byte) (int, error) { return r.r.Read(p) } -type clientProcessorMPEGTS struct { +type clientStreamProcessorMPEGTS struct { onDecodeError ClientOnDecodeErrorFunc isLeading bool segmentQueue *clientSegmentQueue rp *clientRoutinePool - onStreamTracks func(context.Context, []*Track) bool + onStreamTracks clientOnStreamTracksFunc onSetLeadingTimeSync func(clientTimeSync) onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool) onData map[*Track]interface{} @@ -54,7 +54,15 @@ type clientProcessorMPEGTS struct { timeSync *clientTimeSyncMPEGTS } -func (p *clientProcessorMPEGTS) run(ctx context.Context) error { +func (p *clientStreamProcessorMPEGTS) getIsLeading() bool { + return p.isLeading +} + +func (p *clientStreamProcessorMPEGTS) getTracks() []*Track { + return p.tracks +} + +func (p *clientStreamProcessorMPEGTS) run(ctx context.Context) error { for { seg, ok := p.segmentQueue.pull(ctx) if !ok { @@ -68,7 +76,7 @@ func (p *clientProcessorMPEGTS) run(ctx context.Context) error { } } -func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) error { +func (p *clientStreamProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) error { if p.switchableReader == nil { p.switchableReader = &switchableReader{bytes.NewReader(byts)} @@ -99,7 +107,7 @@ func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) } } - ok := p.onStreamTracks(ctx, p.tracks) + ok := p.onStreamTracks(ctx, p) if !ok { return fmt.Errorf("terminated") } @@ -128,7 +136,7 @@ func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) } prePreProcess := func(pts int64, dts int64, postProcess func(time.Duration, time.Duration)) error { - err := p.initializeTrackProcs(ctx, isLeadingTrack, dts) + err := p.initializeTrackProcessors(ctx, isLeadingTrack, dts) if err != nil { if err == errSkipSilently { return nil @@ -190,7 +198,11 @@ func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) } } -func (p *clientProcessorMPEGTS) initializeTrackProcs(ctx context.Context, isLeadingTrack bool, dts int64) error { +func (p *clientStreamProcessorMPEGTS) initializeTrackProcessors( + ctx context.Context, + isLeadingTrack bool, + dts int64, +) error { if p.trackProcs != nil { return nil } diff --git a/client_test.go b/client_test.go index aeb18f1..0dab4dd 100644 --- a/client_test.go +++ b/client_test.go @@ -14,11 +14,11 @@ import ( "github.com/asticode/go-astits" "github.com/aler9/writerseeker" + "github.com/bluenviron/gohlslib/pkg/codecs" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/gin-gonic/gin" "github.com/stretchr/testify/require" - "github.com/bluenviron/gohlslib/pkg/codecs" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" ) @@ -127,100 +127,19 @@ func mpegtsSegment(t *testing.T, w io.Writer) { require.NoError(t, err) } -func mp4Init(t *testing.T, w io.Writer) { - i := &fmp4.Init{ - Tracks: []*fmp4.InitTrack{ - { - ID: 1, - TimeScale: 90000, - Codec: &fmp4.CodecH264{ - SPS: []byte{ - 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, - 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, - 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, - 0x20, - }, - PPS: []byte{0x01, 0x02, 0x03, 0x04}, - }, - }, - }, - } - - ws := &writerseeker.WriterSeeker{} - err := i.Marshal(ws) - require.NoError(t, err) - - _, err = w.Write(ws.Bytes()) - require.NoError(t, err) +type marshaler interface { + Marshal(w io.WriteSeeker) error } -func mp4Segment(t *testing.T, w io.Writer) { - payload, _ := h264.AVCCMarshal([][]byte{ - {7, 1, 2, 3}, // SPS - {8}, // PPS - {5}, // IDR - }) - - p := &fmp4.Part{ - Tracks: []*fmp4.PartTrack{ - { - ID: 1, - Samples: []*fmp4.PartSample{{ - Duration: 90000 / 30, - PTSOffset: 90000 * 2, - Payload: payload, - }}, - }, - }, - } - +func mp4ToWriter(i marshaler, w io.Writer) error { ws := &writerseeker.WriterSeeker{} - err := p.Marshal(ws) - require.NoError(t, err) - - _, err = w.Write(ws.Bytes()) - require.NoError(t, err) -} - -type testHLSServer struct { - s *http.Server -} - -func newTestHLSServer(router http.Handler, isTLS bool) (*testHLSServer, error) { - ln, err := net.Listen("tcp", "localhost:5780") + err := i.Marshal(ws) if err != nil { - return nil, err - } - - s := &testHLSServer{ - s: &http.Server{Handler: router}, - } - - if isTLS { - go func() { - serverCertFpath, err := writeTempFile(serverCert) - if err != nil { - panic(err) - } - defer os.Remove(serverCertFpath) - - serverKeyFpath, err := writeTempFile(serverKey) - if err != nil { - panic(err) - } - defer os.Remove(serverKeyFpath) - - s.s.ServeTLS(ln, serverCertFpath, serverKeyFpath) - }() - } else { - go s.s.Serve(ln) + return err } - return s, nil -} - -func (s *testHLSServer) close() { - s.s.Shutdown(context.Background()) + _, err = w.Write(ws.Bytes()) + return err } func TestClientMPEGTS(t *testing.T) { @@ -232,11 +151,6 @@ func TestClientMPEGTS(t *testing.T) { t.Run(ca, func(t *testing.T) { gin.SetMode(gin.ReleaseMode) router := gin.New() - - segment := "segment.ts" - if ca == "segment with query" { - segment = "segment.ts?key=val" - } sent := false router.GET("/stream.m3u8", func(ctx *gin.Context) { @@ -245,6 +159,11 @@ func TestClientMPEGTS(t *testing.T) { } sent = true + segment := "segment.ts" + if ca == "segment with query" { + segment = "segment.ts?key=val" + } + ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U #EXT-X-VERSION:3 @@ -265,9 +184,32 @@ func TestClientMPEGTS(t *testing.T) { mpegtsSegment(t, ctx.Writer) }) - s, err := newTestHLSServer(router, ca == "tls") + ln, err := net.Listen("tcp", "localhost:5780") require.NoError(t, err) - defer s.close() + + s := &http.Server{Handler: router} + + if ca == "tls" { + go func() { + serverCertFpath, err := writeTempFile(serverCert) + if err != nil { + panic(err) + } + defer os.Remove(serverCertFpath) + + serverKeyFpath, err := writeTempFile(serverKey) + if err != nil { + panic(err) + } + defer os.Remove(serverKeyFpath) + + s.ServeTLS(ln, serverCertFpath, serverKeyFpath) + }() + } else { + go s.Serve(ln) + } + + defer s.Shutdown(context.Background()) packetRecv := make(chan struct{}) @@ -276,17 +218,6 @@ func TestClientMPEGTS(t *testing.T) { prefix = "https" } - onH264 := func(pts time.Duration, dts time.Duration, au [][]byte) { - require.Equal(t, 2*time.Second, pts) - require.Equal(t, time.Duration(0), dts) - require.Equal(t, [][]byte{ - {7, 1, 2, 3}, - {8}, - {5}, - }, au) - close(packetRecv) - } - var c *Client c = &Client{ URI: prefix + "://localhost:5780/stream.m3u8", @@ -298,9 +229,21 @@ func TestClientMPEGTS(t *testing.T) { }, }, OnTracks: func(tracks []*Track) error { - require.Equal(t, 1, len(tracks)) - require.Equal(t, &codecs.H264{}, tracks[0].Codec) - c.OnDataH26x(tracks[0], onH264) + require.Equal(t, []*Track{{ + Codec: &codecs.H264{}, + }}, tracks) + + c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) { + require.Equal(t, 2*time.Second, pts) + require.Equal(t, time.Duration(0), dts) + require.Equal(t, [][]byte{ + {7, 1, 2, 3}, + {8}, + {5}, + }, au) + close(packetRecv) + }) + return nil }, } @@ -336,39 +279,107 @@ segment.mp4 router.GET("/init.mp4", func(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", `video/mp4`) - mp4Init(t, ctx.Writer) + err := mp4ToWriter(&fmp4.Init{ + Tracks: []*fmp4.InitTrack{ + { + ID: 99, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: testSPS, + PPS: testPPS, + }, + }, + { + ID: 98, + TimeScale: 44100, + Codec: &fmp4.CodecMPEG4Audio{ + Config: testConfig, + }, + }, + }, + }, ctx.Writer) + require.NoError(t, err) }) router.GET("/segment.mp4", func(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", `video/mp4`) - mp4Segment(t, ctx.Writer) + + payload, _ := h264.AVCCMarshal([][]byte{ + {7, 1, 2, 3}, // SPS + {8}, // PPS + {5}, // IDR + }) + + err := mp4ToWriter(&fmp4.Part{ + Tracks: []*fmp4.PartTrack{ + { + ID: 98, + BaseTime: 44100 * 6, + Samples: []*fmp4.PartSample{{ + Duration: 44100 / 30, + Payload: []byte{1, 2, 3, 4}, + }}, + }, + { + ID: 99, + BaseTime: 90000 * 6, + Samples: []*fmp4.PartSample{{ + Duration: 90000 / 30, + PTSOffset: 90000 * 2, + Payload: payload, + }}, + }, + }, + }, ctx.Writer) + require.NoError(t, err) }) - s, err := newTestHLSServer(router, false) + ln, err := net.Listen("tcp", "localhost:5780") require.NoError(t, err) - defer s.close() - - packetRecv := make(chan struct{}) - - onH264 := func(pts time.Duration, dts time.Duration, au [][]byte) { - require.Equal(t, 2*time.Second, pts) - require.Equal(t, time.Duration(0), dts) - require.Equal(t, [][]byte{ - {7, 1, 2, 3}, - {8}, - {5}, - }, au) - close(packetRecv) - } + + s := &http.Server{Handler: router} + go s.Serve(ln) + defer s.Shutdown(context.Background()) + + packetRecv := make(chan struct{}, 2) var c *Client c = &Client{ URI: "http://localhost:5780/stream.m3u8", OnTracks: func(tracks []*Track) error { - require.Equal(t, 1, len(tracks)) - _, ok := tracks[0].Codec.(*codecs.H264) - require.Equal(t, true, ok) - c.OnDataH26x(tracks[0], onH264) + require.Equal(t, []*Track{ + { + Codec: &codecs.H264{ + SPS: testSPS, + PPS: testPPS, + }, + }, + { + Codec: &codecs.MPEG4Audio{ + Config: testConfig, + }, + }, + }, tracks) + + c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) { + require.Equal(t, 2*time.Second, pts) + require.Equal(t, time.Duration(0), dts) + require.Equal(t, [][]byte{ + {7, 1, 2, 3}, + {8}, + {5}, + }, au) + packetRecv <- struct{}{} + }) + + c.OnDataMPEG4Audio(tracks[1], func(pts time.Duration, aus [][]byte) { + require.Equal(t, 0*time.Second, pts) + require.Equal(t, [][]byte{ + {1, 2, 3, 4}, + }, aus) + packetRecv <- struct{}{} + }) + return nil }, } @@ -376,13 +387,196 @@ segment.mp4 err = c.Start() require.NoError(t, err) - <-packetRecv + for i := 0; i < 2; i++ { + <-packetRecv + } c.Close() <-c.Wait() } -func TestClientInvalidSequenceID(t *testing.T) { +func TestClientFMP4MultiRenditions(t *testing.T) { + gin.SetMode(gin.ReleaseMode) + router := gin.New() + + router.GET("/index.m3u8", func(ctx *gin.Context) { + ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) + io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U +#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="aac",NAME="English",DEFAULT=YES,AUTOSELECT=YES,LANGUAGE="en",URI="audio.m3u8" +#EXT-X-STREAM-INF:BANDWIDTH=7680000,CODECS="avc1.640015,mp4a.40.5",AUDIO="aac" +video.m3u8 +`))) + }) + + router.GET("/video.m3u8", func(ctx *gin.Context) { + ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) + io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U +#EXT-X-VERSION:7 +#EXT-X-MEDIA-SEQUENCE:20 +#EXT-X-INDEPENDENT-SEGMENTS +#EXT-X-TARGETDURATION:2 +#EXT-X-MAP:URI="init_video.mp4" +#EXTINF:2, +segment_video.mp4 +#EXT-X-ENDLIST +`))) + }) + + router.GET("/audio.m3u8", func(ctx *gin.Context) { + ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) + io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U +#EXT-X-VERSION:7 +#EXT-X-MEDIA-SEQUENCE:20 +#EXT-X-INDEPENDENT-SEGMENTS +#EXT-X-TARGETDURATION:2 +#EXT-X-MAP:URI="init_audio.mp4" +#EXTINF:2, +segment_audio.mp4 +#EXT-X-ENDLIST +`))) + }) + + router.GET("/init_video.mp4", func(ctx *gin.Context) { + ctx.Writer.Header().Set("Content-Type", `video/mp4`) + + err := mp4ToWriter(&fmp4.Init{ + Tracks: []*fmp4.InitTrack{ + { + ID: 1, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: testSPS, + PPS: testPPS, + }, + }, + }, + }, ctx.Writer) + require.NoError(t, err) + }) + + router.GET("/init_audio.mp4", func(ctx *gin.Context) { + ctx.Writer.Header().Set("Content-Type", `video/mp4`) + + err := mp4ToWriter(&fmp4.Init{ + Tracks: []*fmp4.InitTrack{ + { + ID: 1, + TimeScale: 44100, + Codec: &fmp4.CodecMPEG4Audio{ + Config: testConfig, + }, + }, + }, + }, ctx.Writer) + require.NoError(t, err) + }) + + router.GET("/segment_video.mp4", func(ctx *gin.Context) { + ctx.Writer.Header().Set("Content-Type", `video/mp4`) + + payload, _ := h264.AVCCMarshal([][]byte{ + {7, 1, 2, 3}, // SPS + {8}, // PPS + {5}, // IDR + }) + + err := mp4ToWriter(&fmp4.Part{ + Tracks: []*fmp4.PartTrack{ + { + ID: 1, + Samples: []*fmp4.PartSample{{ + Duration: 90000, + PTSOffset: 90000 * 3, + Payload: payload, + }}, + }, + }, + }, ctx.Writer) + require.NoError(t, err) + }) + + router.GET("/segment_audio.mp4", func(ctx *gin.Context) { + ctx.Writer.Header().Set("Content-Type", `video/mp4`) + + err := mp4ToWriter(&fmp4.Part{ + Tracks: []*fmp4.PartTrack{ + { + ID: 1, + Samples: []*fmp4.PartSample{{ + Duration: 44100, + Payload: []byte{1, 2, 3, 4}, + }}, + }, + }, + }, ctx.Writer) + require.NoError(t, err) + }) + + ln, err := net.Listen("tcp", "localhost:5780") + require.NoError(t, err) + + s := &http.Server{Handler: router} + go s.Serve(ln) + defer s.Shutdown(context.Background()) + + packetRecv := make(chan struct{}, 2) + tracksRecv := make(chan struct{}, 1) + + var c *Client + c = &Client{ + URI: "http://localhost:5780/index.m3u8", + OnTracks: func(tracks []*Track) error { + close(tracksRecv) + + require.Equal(t, []*Track{ + { + Codec: &codecs.H264{ + SPS: testSPS, + PPS: testPPS, + }, + }, + { + Codec: &codecs.MPEG4Audio{ + Config: testConfig, + }, + }, + }, tracks) + + c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) { + require.Equal(t, 3*time.Second, pts) + require.Equal(t, time.Duration(0), dts) + require.Equal(t, [][]byte{ + {7, 1, 2, 3}, + {8}, + {5}, + }, au) + packetRecv <- struct{}{} + }) + + c.OnDataMPEG4Audio(tracks[1], func(pts time.Duration, aus [][]byte) { + require.Equal(t, 0*time.Second, pts) + require.Equal(t, [][]byte{ + {1, 2, 3, 4}, + }, aus) + packetRecv <- struct{}{} + }) + + return nil + }, + } + + err = c.Start() + require.NoError(t, err) + + for i := 0; i < 2; i++ { + <-packetRecv + } + + c.Close() + <-c.Wait() +} + +func TestClientErrorInvalidSequenceID(t *testing.T) { router := gin.New() firstPlaylist := true @@ -426,9 +620,12 @@ segment1.ts mpegtsSegment(t, ctx.Writer) }) - s, err := newTestHLSServer(router, false) + ln, err := net.Listen("tcp", "localhost:5780") require.NoError(t, err) - defer s.close() + + s := &http.Server{Handler: router} + go s.Serve(ln) + defer s.Shutdown(context.Background()) c := &Client{ URI: "http://localhost:5780/stream.m3u8", diff --git a/client_timesync.go b/client_timesync.go new file mode 100644 index 0000000..ad3464b --- /dev/null +++ b/client_timesync.go @@ -0,0 +1,3 @@ +package gohlslib + +type clientTimeSync interface{} diff --git a/client_timesync_fmp4.go b/client_timesync_fmp4.go index ebe68d0..670ab33 100644 --- a/client_timesync_fmp4.go +++ b/client_timesync_fmp4.go @@ -33,8 +33,11 @@ func (ts *clientTimeSyncFMP4) initialize() { ts.startDTS = durationMp4ToGo(ts.baseTime, ts.timeScale) } -func (ts *clientTimeSyncFMP4) convertAndSync(ctx context.Context, timeScale uint32, - rawDTS uint64, ptsOffset int32, +func (ts *clientTimeSyncFMP4) convertAndSync( + ctx context.Context, + timeScale uint32, + rawDTS uint64, + ptsOffset int32, ) (time.Duration, time.Duration, error) { pts := durationMp4ToGo(rawDTS+uint64(ptsOffset), timeScale) dts := durationMp4ToGo(rawDTS, timeScale) diff --git a/muxer_test.go b/muxer_test.go index b829847..21e0737 100644 --- a/muxer_test.go +++ b/muxer_test.go @@ -29,6 +29,14 @@ var testSPS = []byte{ 0x20, } +var testPPS = []byte{0x01, 0x02, 0x03, 0x04} + +var testConfig = mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, +} + var testVideoTrack = &Track{ Codec: &codecs.H264{ SPS: testSPS,