From 8fce45dfa7182924d8d60fffd9a51b245ada3546 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 1 Feb 2022 12:17:08 -0800 Subject: [PATCH] Under certain scenarios the pending for a consumer could appear to get stuck. Under the covers we were calculating pending per msg block incorrectly when a single message existed beyond the requested sequence. Signed-off-by: Derek Collison --- server/filestore.go | 2 +- server/filestore_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/server/filestore.go b/server/filestore.go index 0aad6e2189d..d4b299e55fb 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1115,7 +1115,7 @@ func (mb *msgBlock) filteredPendingLocked(subj string, wc bool, seq uint64) (tot for i, subj := range subs { // If the starting seq is less then or equal that means we want all and we do not need to load any messages. ss := mb.fss[subj] - if ss == nil { + if ss == nil || seq > ss.Last { continue } diff --git a/server/filestore_test.go b/server/filestore_test.go index e9f4a5e1318..a6316024305 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -3480,6 +3480,33 @@ func TestFileStoreRemoveLastWriteIndex(t *testing.T) { } } +func TestFileStoreFilteredPendingBug(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "TEST", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + fs.StoreMsg("foo", nil, []byte("msg")) + fs.StoreMsg("bar", nil, []byte("msg")) + fs.StoreMsg("baz", nil, []byte("msg")) + + fs.mu.Lock() + mb := fs.lmb + fs.mu.Unlock() + + total, f, l := mb.filteredPending("foo", false, 3) + if total != 0 { + t.Fatalf("Expected total of 0 but got %d", total) + } + if f != 0 || l != 0 { + t.Fatalf("Expected first and last to be 0 as well, but got %d %d", f, l) + } +} + // Test to optimize the selectMsgBlock with lots of blocks. func TestFileStoreFetchPerf(t *testing.T) { // Comment out to run.