Skip to content

fix(DeadlockTest): Handle draining of closed channel, speed up test. #1957

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1879,7 +1879,11 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
drain := func() {
for {
select {
case <-s.sendCh:
case _, ok := <-s.sendCh:
if !ok {
// Channel is closed.
return
}
default:
return
}
Expand Down
18 changes: 14 additions & 4 deletions publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package badger
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/require"
Expand All @@ -40,14 +40,18 @@ func TestPublisherDeadlock(t *testing.T) {
var firstUpdate sync.WaitGroup
firstUpdate.Add(1)

var allUpdatesDone sync.WaitGroup
allUpdatesDone.Add(1)
var subDone sync.WaitGroup
subDone.Add(1)
go func() {
subWg.Done()
match := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""}
err := db.Subscribe(context.Background(), func(kvs *pb.KVList) error {
firstUpdate.Done()
time.Sleep(time.Second * 20)
// Before exiting Subscribe process, we will wait until each of the
// 1110 updates (defined below) have been completed.
allUpdatesDone.Wait()
return errors.New("error returned")
}, []pb.Match{match})
require.Error(t, err, errors.New("error returned"))
Expand All @@ -65,7 +69,6 @@ func TestPublisherDeadlock(t *testing.T) {
firstUpdate.Wait()
var req atomic.Int64
for i := 1; i < 1110; i++ {
time.Sleep(time.Millisecond * 10)
go func(i int) {
err := db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
Expand All @@ -79,8 +82,15 @@ func TestPublisherDeadlock(t *testing.T) {
if req.Load() == 1109 {
break
}
time.Sleep(time.Second)
// FYI: This does the same as "thread.yield()" from other languages.
// In other words, it tells the go-routine scheduler to switch
// to another go-routine. This is strongly preferred over
// time.Sleep(...).
runtime.Gosched()
}
// Free up the subscriber, which is waiting for updates to finish.
allUpdatesDone.Done()
// Exit when the subscription process has been exited.
subDone.Wait()
})
}
Expand Down