Skip to content

Commit

Permalink
client: fix panic when writing packets after connection error (#681)
Browse files Browse the repository at this point in the history
* Fix writer nullpointer panic on network reconnect

* add additional code and tests

---------

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
  • Loading branch information
SijmenHuizenga and aler9 authored Jan 18, 2025
1 parent 6240aa2 commit b2cfa93
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 23 deletions.
16 changes: 8 additions & 8 deletions async_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
type asyncProcessor struct {
bufferSize int

running bool
buffer *ringbuffer.RingBuffer
running bool
buffer *ringbuffer.RingBuffer
stopError error

chError chan error
stopped chan struct{}
}

func (w *asyncProcessor) initialize() {
Expand All @@ -22,22 +23,21 @@ func (w *asyncProcessor) initialize() {

func (w *asyncProcessor) start() {
w.running = true
w.chError = make(chan error)
w.stopped = make(chan struct{})
go w.run()
}

func (w *asyncProcessor) stop() {
if w.running {
w.buffer.Close()
<-w.chError
<-w.stopped
w.running = false
}
}

func (w *asyncProcessor) run() {
err := w.runInner()
w.chError <- err
close(w.chError)
w.stopError = w.runInner()
close(w.stopped)
}

func (w *asyncProcessor) runInner() error {
Expand Down
24 changes: 24 additions & 0 deletions async_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package gortsplib

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestAsyncProcessorStopAfterError(t *testing.T) {
p := &asyncProcessor{bufferSize: 8}
p.initialize()

p.push(func() error {
return fmt.Errorf("ok")
})

p.start()

<-p.stopped
require.EqualError(t, p.stopError, "ok")

p.stop()
}
12 changes: 5 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,9 @@ func (c *Client) runInner() error {
return nil
}()

chWriterError := func() chan error {
if c.writer != nil {
return c.writer.chError
chWriterError := func() chan struct{} {
if c.writer != nil && c.writer.running {
return c.writer.stopped
}
return nil
}()
Expand Down Expand Up @@ -637,8 +637,8 @@ func (c *Client) runInner() error {
}
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)

case err := <-chWriterError:
return err
case <-chWriterError:
return c.writer.stopError

case err := <-chReaderError:
c.reader = nil
Expand Down Expand Up @@ -911,8 +911,6 @@ func (c *Client) stopTransportRoutines() {
}

c.timeDecoder = nil

c.writer = nil
}

func (c *Client) connOpen() error {
Expand Down
4 changes: 2 additions & 2 deletions client_play_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,8 +1961,8 @@ func TestClientPlayPause(t *testing.T) {
})
require.NoError(t, err2)

req, err = conn.ReadRequest()
require.NoError(t, err)
req, err2 = conn.ReadRequest()
require.NoError(t, err2)
require.Equal(t, base.Play, req.Method)

err2 = conn.WriteResponse(&base.Response{
Expand Down
11 changes: 5 additions & 6 deletions server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,9 +626,9 @@ func (ss *ServerSession) run() {

func (ss *ServerSession) runInner() error {
for {
chWriterError := func() chan error {
if ss.writer != nil {
return ss.writer.chError
chWriterError := func() chan struct{} {
if ss.writer != nil && ss.writer.running {
return ss.writer.stopped
}
return nil
}()
Expand Down Expand Up @@ -729,8 +729,8 @@ func (ss *ServerSession) runInner() error {

ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)

case err := <-chWriterError:
return err
case <-chWriterError:
return ss.writer.stopError

case <-ss.ctx.Done():
return liberrors.ErrServerTerminated{}
Expand Down Expand Up @@ -1306,7 +1306,6 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}

ss.writer.stop()
ss.writer = nil

ss.timeDecoder = nil

Expand Down

0 comments on commit b2cfa93

Please # to comment.