diff --git a/README.md b/README.md index d6b3d7dc..73084462 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ Features: ## RTP Payload Formats -In RTSP, media streams are routed between server and clients by using RTP packets, which are encoded in a specific, codec-dependent, format. This library supports formats for the following codecs: +In RTSP, media streams are transmitted by using RTP packets, which are encoded in a specific, codec-dependent, format. This library supports formats for the following codecs: ### Video diff --git a/client.go b/client.go index 712f7911..54791f18 100644 --- a/client.go +++ b/client.go @@ -31,6 +31,14 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/sdp" ) +// avoid an int64 overflow and preserve resolution by splitting division into two parts: +// first add the integer part, then the decimal part. +func multiplyAndDivide(v, m, d time.Duration) time.Duration { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + // convert an URL into an address, in particular: // * add default port // * handle IPv6 with or without square brackets. @@ -329,8 +337,7 @@ type Client struct { closeError error writer asyncProcessor reader *clientReader - timeDecoder *rtptime.GlobalDecoder - timeDecoder2 *rtptime.GlobalDecoder2 + timeDecoder *rtptime.GlobalDecoder2 mustClose bool // in @@ -812,8 +819,7 @@ func (c *Client) startReadRoutines() { c.writer.allocateBuffer(8) } - c.timeDecoder = rtptime.NewGlobalDecoder() - c.timeDecoder2 = rtptime.NewGlobalDecoder2() + c.timeDecoder = rtptime.NewGlobalDecoder2() for _, cm := range c.medias { cm.start() @@ -855,7 +861,6 @@ func (c *Client) stopReadRoutines() { } c.timeDecoder = nil - c.timeDecoder2 = nil } func (c *Client) startWriter() { @@ -1900,7 +1905,13 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) { cm := c.medias[medi] ct := cm.formats[pkt.PayloadType] - return c.timeDecoder.Decode(ct.format, pkt) + + v, ok := c.timeDecoder.Decode(ct.format, pkt) + if !ok { + return 0, false + } + + return multiplyAndDivide(time.Duration(v), time.Second, time.Duration(ct.format.ClockRate())), true } // PacketPTS returns the PTS of an incoming RTP packet. @@ -1908,7 +1919,7 @@ func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Durat func (c *Client) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) { cm := c.medias[medi] ct := cm.formats[pkt.PayloadType] - return c.timeDecoder2.Decode(ct.format, pkt) + return c.timeDecoder.Decode(ct.format, pkt) } // PacketNTP returns the NTP timestamp of an incoming RTP packet. diff --git a/pkg/rtptime/global_decoder.go b/pkg/rtptime/global_decoder.go index 620ad9a8..a11cad46 100644 --- a/pkg/rtptime/global_decoder.go +++ b/pkg/rtptime/global_decoder.go @@ -1,7 +1,6 @@ package rtptime import ( - "sync" "time" "github.com/pion/rtp" @@ -45,6 +44,8 @@ func (d *globalDecoderTrackData) decode(ts uint32) time.Duration { } // GlobalDecoderTrack is a track (RTSP format or WebRTC track) of a GlobalDecoder. +// +// Deprecated: replaced by GlobalDecoderTrack2 type GlobalDecoderTrack interface { ClockRate() int PTSEqualsDTS(*rtp.Packet) bool @@ -54,11 +55,7 @@ type GlobalDecoderTrack interface { // // Deprecated: replaced by GlobalDecoder2. type GlobalDecoder struct { - mutex sync.Mutex - leadingTrack GlobalDecoderTrack - startNTP time.Time - startPTS time.Duration - tracks map[GlobalDecoderTrack]*globalDecoderTrackData + wrapped *GlobalDecoder2 } // NewGlobalDecoder allocates a GlobalDecoder. @@ -66,7 +63,7 @@ type GlobalDecoder struct { // Deprecated: replaced by NewGlobalDecoder2. func NewGlobalDecoder() *GlobalDecoder { return &GlobalDecoder{ - tracks: make(map[GlobalDecoderTrack]*globalDecoderTrackData), + wrapped: NewGlobalDecoder2(), } } @@ -75,47 +72,10 @@ func (d *GlobalDecoder) Decode( track GlobalDecoderTrack, pkt *rtp.Packet, ) (time.Duration, bool) { - if track.ClockRate() == 0 { - return 0, false - } - - d.mutex.Lock() - defer d.mutex.Unlock() - - df, ok := d.tracks[track] - - // track never seen before + v, ok := d.wrapped.Decode(track, pkt) if !ok { - if !track.PTSEqualsDTS(pkt) { - return 0, false - } - - now := timeNow() - - if d.leadingTrack == nil { - d.leadingTrack = track - d.startNTP = now - d.startPTS = 0 - } - - df = newGlobalDecoderTrackData( - d.startPTS+now.Sub(d.startNTP), - track.ClockRate(), - pkt.Timestamp) - - d.tracks[track] = df - - return df.startPTS, true - } - - pts := df.decode(pkt.Timestamp) - - // update startNTP / startPTS - if d.leadingTrack == track && track.PTSEqualsDTS(pkt) { - now := timeNow() - d.startNTP = now - d.startPTS = pts + return 0, false } - return pts, true + return multiplyAndDivide(time.Duration(v), time.Second, time.Duration(track.ClockRate())), true } diff --git a/server_session.go b/server_session.go index 12b2157f..5f90a1ed 100644 --- a/server_session.go +++ b/server_session.go @@ -247,8 +247,7 @@ type ServerSession struct { udpLastPacketTime *int64 // publish udpCheckStreamTimer *time.Timer writer asyncProcessor - timeDecoder *rtptime.GlobalDecoder - timeDecoder2 *rtptime.GlobalDecoder2 + timeDecoder *rtptime.GlobalDecoder2 // in chHandleRequest chan sessionRequestReq @@ -952,8 +951,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( v := ss.s.timeNow().Unix() ss.udpLastPacketTime = &v - ss.timeDecoder = rtptime.NewGlobalDecoder() - ss.timeDecoder2 = rtptime.NewGlobalDecoder2() + ss.timeDecoder = rtptime.NewGlobalDecoder2() for _, sm := range ss.setuppedMedias { sm.start() @@ -1039,8 +1037,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( v := ss.s.timeNow().Unix() ss.udpLastPacketTime = &v - ss.timeDecoder = rtptime.NewGlobalDecoder() - ss.timeDecoder2 = rtptime.NewGlobalDecoder2() + ss.timeDecoder = rtptime.NewGlobalDecoder2() for _, sm := range ss.setuppedMedias { sm.start() @@ -1095,7 +1092,6 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } ss.timeDecoder = nil - ss.timeDecoder2 = nil switch ss.state { case ServerSessionStatePlay: @@ -1268,7 +1264,13 @@ func (ss *ServerSession) WritePacketRTCP(medi *description.Media, pkt rtcp.Packe func (ss *ServerSession) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) { sm := ss.setuppedMedias[medi] sf := sm.formats[pkt.PayloadType] - return ss.timeDecoder.Decode(sf.format, pkt) + + v, ok := ss.timeDecoder.Decode(sf.format, pkt) + if !ok { + return 0, false + } + + return multiplyAndDivide(time.Duration(v), time.Second, time.Duration(sf.format.ClockRate())), true } // PacketPTS2 returns the PTS of an incoming RTP packet. @@ -1276,7 +1278,7 @@ func (ss *ServerSession) PacketPTS(medi *description.Media, pkt *rtp.Packet) (ti func (ss *ServerSession) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) { sm := ss.setuppedMedias[medi] sf := sm.formats[pkt.PayloadType] - return ss.timeDecoder2.Decode(sf.format, pkt) + return ss.timeDecoder.Decode(sf.format, pkt) } // PacketNTP returns the NTP timestamp of an incoming RTP packet.