diff --git a/db.go b/db.go index 30aa2c13a..015b32862 100644 --- a/db.go +++ b/db.go @@ -1879,7 +1879,11 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches drain := func() { for { select { - case <-s.sendCh: + case _, ok := <-s.sendCh: + if !ok { + // Channel is closed. + return + } default: return } diff --git a/publisher_test.go b/publisher_test.go index 1113ba2e8..f94d12ebc 100644 --- a/publisher_test.go +++ b/publisher_test.go @@ -18,10 +18,10 @@ package badger import ( "context" "fmt" + "runtime" "sync" "sync/atomic" "testing" - "time" "github.com/pkg/errors" "github.com/stretchr/testify/require" @@ -40,6 +40,8 @@ func TestPublisherDeadlock(t *testing.T) { var firstUpdate sync.WaitGroup firstUpdate.Add(1) + var allUpdatesDone sync.WaitGroup + allUpdatesDone.Add(1) var subDone sync.WaitGroup subDone.Add(1) go func() { @@ -47,7 +49,9 @@ func TestPublisherDeadlock(t *testing.T) { match := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""} err := db.Subscribe(context.Background(), func(kvs *pb.KVList) error { firstUpdate.Done() - time.Sleep(time.Second * 20) + // Before exiting Subscribe process, we will wait until each of the + // 1110 updates (defined below) have been completed. + allUpdatesDone.Wait() return errors.New("error returned") }, []pb.Match{match}) require.Error(t, err, errors.New("error returned")) @@ -65,7 +69,6 @@ func TestPublisherDeadlock(t *testing.T) { firstUpdate.Wait() var req atomic.Int64 for i := 1; i < 1110; i++ { - time.Sleep(time.Millisecond * 10) go func(i int) { err := db.Update(func(txn *Txn) error { e := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i))) @@ -79,8 +82,15 @@ func TestPublisherDeadlock(t *testing.T) { if req.Load() == 1109 { break } - time.Sleep(time.Second) + // FYI: This does the same as "thread.yield()" from other languages. + // In other words, it tells the go-routine scheduler to switch + // to another go-routine. This is strongly preferred over + // time.Sleep(...). + runtime.Gosched() } + // Free up the subscriber, which is waiting for updates to finish. + allUpdatesDone.Done() + // Exit when the subscription process has been exited. subDone.Wait() }) }