From 4be3b8260d0109ba29474954a9ec9a44d0a31fe2 Mon Sep 17 00:00:00 2001 From: Harmen Date: Wed, 19 Apr 2023 09:38:40 +0200 Subject: [PATCH] fix 'XREAD ... $' on a non-existing stream --- cmd_stream.go | 7 ++++++- integration/stream_test.go | 13 +++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/cmd_stream.go b/cmd_stream.go index bf396dc5..04ece520 100644 --- a/cmd_stream.go +++ b/cmd_stream.go @@ -944,7 +944,12 @@ parsing: return } else if id == "$" { db := m.DB(getCtx(c).selectedDB) - opts.ids[i] = db.streamKeys[opts.streams[i]].lastID() + stream, ok := db.streamKeys[opts.streams[i]] + if ok { + opts.ids[i] = stream.lastID() + } else { + opts.ids[i] = "0-0" + } } } args = nil diff --git a/integration/stream_test.go b/integration/stream_test.go index 3c41c4b1..bc877471 100644 --- a/integration/stream_test.go +++ b/integration/stream_test.go @@ -283,6 +283,19 @@ func TestStream(t *testing.T) { wg.Wait() c.Do("XREAD", "BLOCK", "1000", "STREAMS", "pl", "$") }) + + // special '$' ID on non-existing stream + testRaw2(t, func(c, c2 *client) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + time.Sleep(10 * time.Millisecond) + c2.Do("XADD", "pl", "60-1", "nosuch", "Mercury") + wg.Done() + }() + wg.Wait() + c.Do("XREAD", "BLOCK", "1000", "STREAMS", "nosuch", "$") + }) }) }