From 5babbef90aa8ad8bd5f4ec65dc28caf28ccc376b Mon Sep 17 00:00:00 2001 From: Eric Daniels Date: Mon, 8 Aug 2022 15:52:42 -0400 Subject: [PATCH 1/4] Properly close microphone before malgo chunk channel --- pkg/driver/microphone/microphone.go | 35 ++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/pkg/driver/microphone/microphone.go b/pkg/driver/microphone/microphone.go index 8dcec4fa..933bc2c4 100644 --- a/pkg/driver/microphone/microphone.go +++ b/pkg/driver/microphone/microphone.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "sync" "time" "unsafe" @@ -32,7 +33,8 @@ var ( type microphone struct { malgo.DeviceInfo - chunkChan chan []byte + chunkChan chan []byte + deviceCloseFunc func() } func init() { @@ -87,6 +89,9 @@ func (m *microphone) Open() error { } func (m *microphone) Close() error { + if m.deviceCloseFunc != nil { + m.deviceCloseFunc() + } if m.chunkChan != nil { close(m.chunkChan) m.chunkChan = nil @@ -136,11 +141,35 @@ func (m *microphone) AudioRecord(inputProp prop.Media) (audio.Reader, error) { return nil, err } + var closeDeviceOnce sync.Once + m.deviceCloseFunc = func() { + closeDeviceOnce.Do(func() { + closeDone := make(chan struct{}) + // we may be waiting on a chunk so keep reading until we can stop + // the malgo device. + go func() { + for { + select { + case <-closeDone: + return + default: + } + select { + case <-m.chunkChan: + case <-closeDone: + } + } + }() + device.Stop() + device.Uninit() + close(closeDone) + }) + } + var reader audio.Reader = audio.ReaderFunc(func() (wave.Audio, func(), error) { chunk, ok := <-m.chunkChan if !ok { - device.Stop() - device.Uninit() + m.deviceCloseFunc() return nil, func() {}, io.EOF } From c4a5bfc8641726ade436f69a3428f2f50cc9175a Mon Sep 17 00:00:00 2001 From: Eric Daniels Date: Mon, 8 Aug 2022 19:16:29 -0400 Subject: [PATCH 2/4] pr changes --- pkg/driver/microphone/microphone.go | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/pkg/driver/microphone/microphone.go b/pkg/driver/microphone/microphone.go index 933bc2c4..f124b4ab 100644 --- a/pkg/driver/microphone/microphone.go +++ b/pkg/driver/microphone/microphone.go @@ -1,6 +1,7 @@ package microphone import ( + "context" "encoding/binary" "errors" "fmt" @@ -126,43 +127,33 @@ func (m *microphone) AudioRecord(inputProp prop.Media) (audio.Reader, error) { return nil, errUnsupportedFormat } + cancelCtx, cancel := context.WithCancel(context.Background()) onRecvChunk := func(_, chunk []byte, framecount uint32) { - m.chunkChan <- chunk + select { + case <-cancelCtx.Done(): + case m.chunkChan <- chunk: + } } callbacks.Data = onRecvChunk device, err := malgo.InitDevice(ctx.Context, config, callbacks) if err != nil { + cancel() return nil, err } err = device.Start() if err != nil { + cancel() return nil, err } var closeDeviceOnce sync.Once m.deviceCloseFunc = func() { closeDeviceOnce.Do(func() { - closeDone := make(chan struct{}) - // we may be waiting on a chunk so keep reading until we can stop - // the malgo device. - go func() { - for { - select { - case <-closeDone: - return - default: - } - select { - case <-m.chunkChan: - case <-closeDone: - } - } - }() + cancel() // Unblock onRecvChunk device.Stop() device.Uninit() - close(closeDone) }) } From 1bf844e182545259d765878094927157ed1cf973 Mon Sep 17 00:00:00 2001 From: Eric Daniels Date: Tue, 9 Aug 2022 08:49:10 -0400 Subject: [PATCH 3/4] do not need to call stop --- pkg/driver/microphone/microphone.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/driver/microphone/microphone.go b/pkg/driver/microphone/microphone.go index f124b4ab..0f7a317b 100644 --- a/pkg/driver/microphone/microphone.go +++ b/pkg/driver/microphone/microphone.go @@ -152,7 +152,6 @@ func (m *microphone) AudioRecord(inputProp prop.Media) (audio.Reader, error) { m.deviceCloseFunc = func() { closeDeviceOnce.Do(func() { cancel() // Unblock onRecvChunk - device.Stop() device.Uninit() }) } From dcaf53c7ded216fb0df6addc684a0bc3341099ff Mon Sep 17 00:00:00 2001 From: Eric Daniels Date: Tue, 9 Aug 2022 09:08:26 -0400 Subject: [PATCH 4/4] move close --- pkg/driver/microphone/microphone.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/driver/microphone/microphone.go b/pkg/driver/microphone/microphone.go index 0f7a317b..3296e96b 100644 --- a/pkg/driver/microphone/microphone.go +++ b/pkg/driver/microphone/microphone.go @@ -93,10 +93,6 @@ func (m *microphone) Close() error { if m.deviceCloseFunc != nil { m.deviceCloseFunc() } - if m.chunkChan != nil { - close(m.chunkChan) - m.chunkChan = nil - } return nil } @@ -153,6 +149,11 @@ func (m *microphone) AudioRecord(inputProp prop.Media) (audio.Reader, error) { closeDeviceOnce.Do(func() { cancel() // Unblock onRecvChunk device.Uninit() + + if m.chunkChan != nil { + close(m.chunkChan) + m.chunkChan = nil + } }) }