From 08cc7f2d7155e74d66da7ab96631fc8021e391c7 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 14 Jan 2025 09:50:09 +0000 Subject: [PATCH 1/8] Fix JS API in-flight metric After a drain this would have been misreporting, as we did not remove drained entries from the `apiInflight` count. Signed-off-by: Neil Twigg --- server/ipqueue.go | 7 +++++-- server/jetstream_api.go | 5 +++-- server/raft.go | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/ipqueue.go b/server/ipqueue.go index b26a749ed7f..95bf27457e3 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -190,14 +190,16 @@ func (q *ipQueue[T]) len() int { } // Empty the queue and consumes the notification signal if present. +// Returns the number of items that were drained from the queue. // Note that this could cause a reader go routine that has been // notified that there is something in the queue (reading from queue's `ch`) // may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`. -func (q *ipQueue[T]) drain() { +func (q *ipQueue[T]) drain() int { if q == nil { - return + return 0 } q.Lock() + olen := len(q.elts) if q.elts != nil { q.resetAndReturnToPool(&q.elts) q.elts, q.pos = nil, 0 @@ -209,6 +211,7 @@ func (q *ipQueue[T]) drain() { default: } q.Unlock() + return olen } // Since the length of the queue goes to 0 after a pop(), it is good to diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d47b6dcc901..e7fb21c1a17 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -836,7 +836,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub limit := atomic.LoadInt64(&js.queueLimit) if pending >= int(limit) { s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending) - s.jsAPIRoutedReqs.drain() + drained := int64(s.jsAPIRoutedReqs.drain()) + atomic.AddInt64(&js.apiInflight, -drained) s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ TypedEvent: TypedEvent{ @@ -846,7 +847,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub }, Server: s.Name(), Domain: js.config.Domain, - Dropped: int64(pending), + Dropped: drained, }) } } diff --git a/server/raft.go b/server/raft.go index fb70ee776b6..e3242afb711 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1848,7 +1848,7 @@ runner: // just will remove them from the central monitoring map queues := []interface { unregister() - drain() + drain() int }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} for _, q := range queues { q.drain() From 330ac8be86fabd61785f3b6d31ebd6f4195dbc36 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 15 Jan 2025 17:54:09 -0700 Subject: [PATCH 2/8] [FIXED] LeafNodes: Queue interest may be lost in super cluster If a cluster has leafnode connections and each have the same queue group, the loss of a leafnode connection could cause the server in the "hub" cluster to drop interest across a gateway for this queue group. The issue is fixed by properly accounting for all queue sub and unsub in the server gateway interest map. Signed-off-by: Ivan Kozlovic --- server/leafnode.go | 20 ++- server/leafnode_test.go | 343 ++++++++++++++++++++++++++++++++++++++++ server/route.go | 20 ++- 3 files changed, 361 insertions(+), 22 deletions(-) diff --git a/server/leafnode.go b/server/leafnode.go index 5f3fa4583f7..6cd4b3c02f5 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2443,7 +2443,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { } key := bytesToString(sub.sid) osub := c.subs[key] - updateGWs := false if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -2454,7 +2453,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { c.sendErr("Invalid Subscription") return nil } - updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. delta = sub.qw - atomic.LoadInt32(&osub.qw) @@ -2477,7 +2475,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { if !spoke { // If we are routing add to the route map for the associated account. srv.updateRouteSubscriptionMap(acc, sub, delta) - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } } @@ -2519,27 +2517,27 @@ func (c *client) processLeafUnsub(arg []byte) error { return nil } - updateGWs := false spoke := c.isSpokeLeafNode() // We store local subs by account and subject and optionally queue name. // LS- will have the arg exactly as the key. sub, ok := c.subs[string(arg)] + if !ok { + // If not found, don't try to update routes/gws/leaf nodes. + c.mu.Unlock() + return nil + } delta := int32(1) - if ok && len(sub.queue) > 0 { + if len(sub.queue) > 0 { delta = sub.qw } c.mu.Unlock() - if ok { - c.unsubscribe(acc, sub, true, true) - updateGWs = srv.gateway.enabled - } - + c.unsubscribe(acc, sub, true, true) if !spoke { // If we are routing subtract from the route map for the associated account. srv.updateRouteSubscriptionMap(acc, sub, -delta) // Gateways - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, -delta) } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 47d84eb299e..ceede72ecc9 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4554,6 +4554,349 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { } } +func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testing.T) { + SetGatewaysSolicitDelay(0) + defer ResetGatewaysSolicitDelay() + + // + // D + // | + // Leaf + // | + // v + // C + // ^ ^ + // / \ + // GW GW + // / \ + // v \ + // B1 <--- route ---> B2 <----*----------* + // ^ <---* | | + // | | Leaf Leaf + // Leaf *-- Leaf ---* | | + // | | | | + // A1 <--- route ---> A2 OTHER1 OTHER2 + // + + accs := ` + accounts { + SYS: {users: [{user:sys, password: pwd}]} + USER: {users: [{user:user, password: pwd}]} + } + system_account: SYS + ` + bConf := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: "B" + listen: "127.0.0.1:-1" + no_advertise: true + %s + } + gateway { + name: "B" + listen: "127.0.0.1:-1" + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + sb1Conf := createConfFile(t, []byte(fmt.Sprintf(bConf, accs, "B1", _EMPTY_))) + sb1, sb1o := RunServerWithConfig(sb1Conf) + defer sb1.Shutdown() + + sb2Conf := createConfFile(t, []byte(fmt.Sprintf(bConf, accs, "B2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", sb1o.Cluster.Port)))) + sb2, sb2o := RunServerWithConfig(sb2Conf) + defer sb2.Shutdown() + + checkClusterFormed(t, sb1, sb2) + + cConf := ` + %s + server_name: C + listen: "127.0.0.1:-1" + cluster { + name: "C" + listen: "127.0.0.1:-1" + } + gateway { + name: "C" + listen: "127.0.0.1:-1" + gateways [ + { + name: B + url: "nats://127.0.0.1:%d" + } + ] + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + scConf := createConfFile(t, []byte(fmt.Sprintf(cConf, accs, sb1o.Gateway.Port))) + sc, sco := RunServerWithConfig(scConf) + defer sc.Shutdown() + + waitForOutboundGateways(t, sc, 1, 2*time.Second) + waitForOutboundGateways(t, sb1, 1, 2*time.Second) + waitForOutboundGateways(t, sb2, 1, 2*time.Second) + waitForInboundGateways(t, sc, 2, 2*time.Second) + waitForInboundGateways(t, sb1, 1, 2*time.Second) + + dConf := ` + %s + server_name: D + listen: "127.0.0.1:-1" + cluster { + name: "D" + listen: "127.0.0.1:-1" + } + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + sdConf := createConfFile(t, []byte(fmt.Sprintf(dConf, accs, sco.LeafNode.Port))) + sd, _ := RunServerWithConfig(sdConf) + defer sd.Shutdown() + + checkLeafNodeConnected(t, sc) + checkLeafNodeConnected(t, sd) + + aConf := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: A + listen: "127.0.0.1:-1" + no_advertise: true + %s + } + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + a1Conf := createConfFile(t, []byte(fmt.Sprintf(aConf, accs, "A1", _EMPTY_, sb1o.LeafNode.Port))) + sa1, sa1o := RunServerWithConfig(a1Conf) + defer sa1.Shutdown() + + checkLeafNodeConnected(t, sa1) + checkLeafNodeConnected(t, sb1) + + a2Conf := createConfFile(t, []byte(fmt.Sprintf(aConf, accs, "A2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", sa1o.Cluster.Port), sb1o.LeafNode.Port))) + sa2, _ := RunServerWithConfig(a2Conf) + defer sa2.Shutdown() + + checkClusterFormed(t, sa1, sa2) + checkLeafNodeConnected(t, sa2) + checkLeafNodeConnectedCount(t, sb1, 2) + + otherLeafsConf := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + o1Conf := createConfFile(t, []byte(fmt.Sprintf(otherLeafsConf, accs, "OTHERLEAF1", sb2o.LeafNode.Port))) + so1, _ := RunServerWithConfig(o1Conf) + defer so1.Shutdown() + checkLeafNodeConnected(t, so1) + checkLeafNodeConnectedCount(t, sb2, 1) + + o2Conf := createConfFile(t, []byte(fmt.Sprintf(otherLeafsConf, accs, "OTHERLEAF2", sb2o.LeafNode.Port))) + so2, _ := RunServerWithConfig(o2Conf) + defer so2.Shutdown() + checkLeafNodeConnected(t, so2) + checkLeafNodeConnectedCount(t, sb2, 2) + + // Helper to check that the interest is propagated to all servers + checkInterest := func(t *testing.T, expected []int, expectedGW int32) { + t.Helper() + subj := "foo" + for i, s := range []*Server{sa1, sa2, so1, so2, sb1, sb2, sc, sd} { + if s == sc || !s.isRunning() { + continue + } + acc, err := s.LookupAccount("USER") + require_NoError(t, err) + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + n := acc.Interest(subj) + if n == expected[i] { + return nil + } + return fmt.Errorf("Expected interest count for server %q to be %v, got %v", s, expected[i], n) + }) + } + // For server C, need to check in gateway's account. + checkForRegisteredQSubInterest(t, sc, "B", "USER", "foo", expected[6], time.Second) + + // For server B1 and B2, check that we have the proper counts in the map. + for _, s := range []*Server{sb1, sb2} { + if !s.isRunning() { + continue + } + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + s.gateway.pasi.Lock() + accMap := s.gateway.pasi.m + st := accMap["USER"] + var n int32 + entry, ok := st["foo bar"] + if ok { + n = entry.n + } + s.gateway.pasi.Unlock() + if n == expectedGW { + return nil + } + return fmt.Errorf("Expected GW interest count for server %q to be %v, got %v", s, expectedGW, n) + }) + } + } + + ncA1 := natsConnect(t, sa1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA1.Close() + for i := 0; i < 3; i++ { + natsQueueSubSync(t, ncA1, "foo", "bar") + } + natsFlush(t, ncA1) + // With 3 queue subs on A1, we should have for servers (in order checked in checkInterest) + // for A1: 3 locals, for all others, 1 for the remote sub from A1. + // B1 and B2 GW map will be 3 (1 for each sub) + checkInterest(t, []int{3, 1, 1, 1, 1, 1, 1, 1}, 3) + + ncA2 := natsConnect(t, sa2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA2.Close() + ncA2qsub1 := natsQueueSubSync(t, ncA2, "foo", "bar") + ncA2qsub2 := natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + // A1 will have 1 more for remote sub, same for A2 (2 locals + 1 remote). + // B1 will have 2 interest (1 per leaf connection) + // B1 and B2 GW map goes to 5. + checkInterest(t, []int{4, 3, 1, 1, 2, 1, 1, 1}, 5) + + ncOther1 := natsConnect(t, so1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncOther1.Close() + natsQueueSubSync(t, ncOther1, "foo", "bar") + natsQueueSubSync(t, ncOther1, "foo", "bar") + natsFlush(t, ncOther1) + // A1, A2 will have one more because of routed interest + // O1 will have 3 (2 locals + 1 for remote interest) + // O2 has still 1 for remote interest + // B1 has 1 more because of new leaf interest and B2 because of routed interest. + // B1 and B2 GW map goes to 7. + checkInterest(t, []int{5, 4, 3, 1, 3, 2, 1, 1}, 7) + + ncOther2 := natsConnect(t, so2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncOther2.Close() + natsQueueSubSync(t, ncOther2, "foo", "bar") + natsFlush(t, ncOther2) + // O2 1 more for local interest + // B2 1 more for the new leaf interest + // B1 and B2 GW map goes to 8. + checkInterest(t, []int{5, 4, 3, 2, 3, 3, 1, 1}, 8) + + // Stop the server so1. + so1.Shutdown() + so1.WaitForShutdown() + checkLeafNodeConnectedCount(t, sb2, 1) + // Now check interest still valid, but wait a little bit to make sure that + // even with the bug where we would send an RS- through the gateway, there + // would be enough time for it to propagate before we check for interest. + time.Sleep(250 * time.Millisecond) + // O1 is stopped, so expect 0 + // B2 has 1 less because leaf connection went away. + // B1 and B2 GW map goes down to 6. + checkInterest(t, []int{5, 4, 0, 2, 3, 2, 1, 1}, 6) + + // Store server sa1. + sa1.Shutdown() + sa1.WaitForShutdown() + checkLeafNodeConnectedCount(t, sb1, 1) + time.Sleep(250 * time.Millisecond) + // A1 and O1 are gone, so 0 + // A2 has 1 less due to loss of routed interest + // B1 has 1 less because 1 leaf connection went away. + // B1 and B2 GW map goes down to 3. + checkInterest(t, []int{0, 3, 0, 2, 2, 2, 1, 1}, 3) + + // Now remove the queue subs from A2 + ncA2qsub1.Unsubscribe() + natsFlush(t, ncA2) + // A2 has 1 less + checkInterest(t, []int{0, 2, 0, 2, 2, 2, 1, 1}, 2) + + ncA2qsub2.Unsubscribe() + natsFlush(t, ncA2) + // A2 has 1 (no more locals but still interest for O2). + // O2 has 1 (no more for remote interest, only local). + // B1, B2 has 1 less since no interest from any of its leaf connections. + checkInterest(t, []int{0, 1, 0, 1, 1, 1, 1, 1}, 1) + + // Removing (closing connection) of the sub on O2 will remove + // interest globally. + ncOther2.Close() + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) + + // Resubscribe now, and again, interest should be propagated. + natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + checkInterest(t, []int{0, 1, 0, 1, 1, 1, 1, 1}, 1) + + natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + checkInterest(t, []int{0, 2, 0, 1, 1, 1, 1, 1}, 2) + + // Close the client connection that has the 2 queue subs. + ncA2.Close() + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) + + // Now we will test when a route is lost on a server that has gateway enabled + // that we update counts properly. + ncB2 := natsConnect(t, sb2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB2.Close() + natsQueueSubSync(t, ncB2, "foo", "bar") + natsQueueSubSync(t, ncB2, "foo", "bar") + natsQueueSubSync(t, ncB2, "foo", "bar") + natsFlush(t, ncB2) + checkInterest(t, []int{0, 1, 0, 1, 1, 3, 1, 1}, 3) + + ncB1 := natsConnect(t, sb1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB1.Close() + natsQueueSubSync(t, ncB1, "foo", "bar") + natsQueueSubSync(t, ncB1, "foo", "bar") + checkInterest(t, []int{0, 1, 0, 1, 3, 4, 1, 1}, 5) + + // Now shutdown B2 + sb2.Shutdown() + sa1.WaitForShutdown() + time.Sleep(250 * time.Millisecond) + checkInterest(t, []int{0, 1, 0, 0, 2, 0, 1, 1}, 2) + + ncB1.Close() + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) +} + func TestLeafNodeQueueInterestAndWeightCorrectAfterServerRestartOrConnectionClose(t *testing.T) { // Note that this is not what a normal configuration should be. Users should diff --git a/server/route.go b/server/route.go index 0c455547c98..a865122e615 100644 --- a/server/route.go +++ b/server/route.go @@ -1348,8 +1348,6 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { return nil } - updateGWs := false - _keya := [128]byte{} _key := _keya[:0] @@ -1373,19 +1371,21 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { if ok { delete(c.subs, key) acc.sl.Remove(sub) - updateGWs = srv.gateway.enabled if len(sub.queue) > 0 { delta = sub.qw } } c.mu.Unlock() - if updateGWs { - srv.gatewayUpdateSubInterest(accountName, sub, -delta) - } + // Update gateways and leaf nodes only if the subscription was found. + if ok { + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(accountName, sub, -delta) + } - // Now check on leafnode updates. - acc.updateLeafNodes(sub, -delta) + // Now check on leafnode updates. + acc.updateLeafNodes(sub, -delta) + } if c.opts.Verbose { c.sendOK() @@ -1600,7 +1600,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { // We use the sub.sid for the key of the c.subs map. key := bytesToString(sub.sid) osub := c.subs[key] - updateGWs := false if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1611,7 +1610,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { c.sendErr("Invalid Subscription") return nil } - updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. delta = sub.qw - atomic.LoadInt32(&osub.qw) @@ -1620,7 +1618,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { } c.mu.Unlock() - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } From 1c9423b738e58dff0c479665cbc219ff3d8a2586 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 16 Jan 2025 22:29:09 +0000 Subject: [PATCH 3/8] Update to Go 1.23.5/1.22.11 Signed-off-by: Neil Twigg --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 28aa8fd3934..f1337e3b75d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,8 @@ language: go go: # This should be quoted or use .x, but should not be unquoted. # Remember that a YAML bare float drops trailing zeroes. - - "1.23.4" - - "1.22.10" + - "1.23.5" + - "1.22.11" go_import_path: github.com/nats-io/nats-server From 68607a71371fbd5687bfe753763b613bfc3bd99a Mon Sep 17 00:00:00 2001 From: Dmitry Dorofeev Date: Tue, 21 Jan 2025 15:40:10 +0300 Subject: [PATCH 4/8] fixed unclosed tmp file handle in filestore.go Signed-off-by: Dmitry Dorofeev --- server/filestore.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 245c68a7a21..b9767928ac6 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5469,16 +5469,23 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { <-dios tmpFD, err := os.OpenFile(tmpFN, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, defaultFilePerms) dios <- struct{}{} + if err != nil { return fmt.Errorf("failed to create temporary file: %w", err) } + errorCleanup := func(err error) error { + tmpFD.Close() + os.Remove(tmpFN) + return err + } + // The original buffer at this point is uncompressed, so we will now compress // it if needed. Note that if the selected algorithm is NoCompression, the // Compress function will just return the input buffer unmodified. cmpBuf, err := alg.Compress(origBuf) if err != nil { - return fmt.Errorf("failed to compress block: %w", err) + return errorCleanup(fmt.Errorf("failed to compress block: %w", err)) } // We only need to write out the metadata header if compression is enabled. @@ -5496,7 +5503,7 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { if mb.bek != nil && len(cmpBuf) > 0 { bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) if err != nil { - return err + return errorCleanup(err) } mb.bek = bek mb.bek.XORKeyStream(cmpBuf, cmpBuf) @@ -5504,11 +5511,6 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // Write the new block data (which might be compressed or encrypted) to the // temporary file. - errorCleanup := func(err error) error { - tmpFD.Close() - os.Remove(tmpFN) - return err - } if n, err := tmpFD.Write(cmpBuf); err != nil { return errorCleanup(fmt.Errorf("failed to write to temporary file: %w", err)) } else if n != len(cmpBuf) { From 71890187f51828694f51f6838abf1a99ae9af76b Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 21 Jan 2025 14:29:30 +0100 Subject: [PATCH 5/8] [FIXED] Don't timeout for retried AckAll Signed-off-by: Maurice van Veen --- server/consumer.go | 3 ++- server/jetstream_consumer_test.go | 44 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index f267718361c..77cec0b7cd1 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2860,7 +2860,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() - return ackInPlace + // Return true to let caller respond back to the client. + return true } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 8e62dbb12a0..193798c3d67 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1526,3 +1526,47 @@ func TestJetStreamConsumerBackoffWhenBackoffLengthIsEqualToMaxDeliverConfig(t *t require_NoError(t, err) require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3)) } + +func TestJetStreamConsumerRetryAckAfterTimeout(t *testing.T) { + for _, ack := range []struct { + title string + policy nats.SubOpt + }{ + {title: "AckExplicit", policy: nats.AckExplicit()}, + {title: "AckAll", policy: nats.AckAll()}, + } { + t.Run(ack.title, func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + sub, err := js.PullSubscribe("foo", "CONSUMER", ack.policy) + require_NoError(t, err) + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + + msg := msgs[0] + // Send core request so the client is unaware of the ack being sent. + _, err = nc.Request(msg.Reply, nil, time.Second) + require_NoError(t, err) + + // It could be we have already acked this specific message, but we haven't received the success response. + // Retrying the ack should not time out and still signal success. + err = msg.AckSync() + require_NoError(t, err) + }) + } +} From 7fa11822baaab06427bdbabc5b82bbc55d3696dd Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 21 Jan 2025 09:16:30 +0100 Subject: [PATCH 6/8] [FIXED] Stuck consumer during leader change with inflight ack Signed-off-by: Maurice van Veen --- server/consumer.go | 2 ++ server/jetstream_consumer_test.go | 49 +++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/server/consumer.go b/server/consumer.go index 77cec0b7cd1..82fd094adb2 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1373,6 +1373,8 @@ func (o *consumer) setLeader(isLeader bool) { // If we were the leader make sure to drain queued up acks. if wasLeader { o.ackMsgs.drain() + // Reset amount of acks that need to be processed. + atomic.StoreInt64(&o.awl, 0) // Also remove any pending replies since we should not be the one to respond at this point. o.replies = nil } diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 193798c3d67..1d325602f6b 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1570,3 +1570,52 @@ func TestJetStreamConsumerRetryAckAfterTimeout(t *testing.T) { }) } } + +func TestJetStreamConsumerSwitchLeaderDuringInflightAck(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + for i := 0; i < 2_000; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "foo", + "CONSUMER", + nats.MaxAckPending(2_000), + nats.ManualAck(), + nats.AckExplicit(), + nats.AckWait(2*time.Second), + ) + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + msgs, err := sub.Fetch(2_000) + require_NoError(t, err) + require_Len(t, len(msgs), 2_000) + + // Simulate an ack being pushed, and o.setLeader(false) being called before the ack is processed and resets o.awl + atomic.AddInt64(&o.awl, 1) + o.setLeader(false) + o.setLeader(true) + + msgs, err = sub.Fetch(1, nats.MaxWait(5*time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) +} From a441c8261b06342eff87c2833aa41ba5a2802da2 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 21 Jan 2025 09:19:52 +0100 Subject: [PATCH 7/8] [FIXED] Consumer slowdown when redelivering deleted message Signed-off-by: Maurice van Veen --- server/consumer.go | 24 +++++++----- server/jetstream_consumer_test.go | 63 +++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 82fd094adb2..54033d26803 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3603,17 +3603,21 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } continue } - if seq > 0 { - pmsg := getJSPubMsgFromPool() - sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) - if sm == nil || err != nil { - pmsg.returnToPool() - pmsg, dc = nil, 0 - // Adjust back deliver count. - o.decDeliveryCount(seq) - } - return pmsg, dc, err + pmsg := getJSPubMsgFromPool() + sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) + if sm == nil || err != nil { + pmsg.returnToPool() + pmsg, dc = nil, 0 + // Adjust back deliver count. + o.decDeliveryCount(seq) + } + // Message was scheduled for redelivery but was removed in the meantime. + if err == ErrStoreMsgNotFound || err == errDeletedMsg { + delete(o.pending, seq) + delete(o.rdc, seq) + continue } + return pmsg, dc, err } } diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 1d325602f6b..4f1b4ad310a 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1619,3 +1619,66 @@ func TestJetStreamConsumerSwitchLeaderDuringInflightAck(t *testing.T) { require_NoError(t, err) require_Len(t, len(msgs), 1) } + +func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) { + storageTypes := []nats.StorageType{nats.MemoryStorage, nats.FileStorage} + for _, storageType := range storageTypes { + t.Run(storageType.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: storageType, + }) + require_NoError(t, err) + + for i := 0; i < 3; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "foo", + "CONSUMER", + nats.ManualAck(), + nats.AckExplicit(), + nats.AckWait(time.Second), + ) + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + msgs, err := sub.Fetch(3) + require_NoError(t, err) + require_Len(t, len(msgs), 3) + + err = js.DeleteMsg("TEST", 2) + require_NoError(t, err) + + o.mu.Lock() + defer o.mu.Unlock() + for seq := range o.rdc { + o.removeFromRedeliverQueue(seq) + } + + o.pending = make(map[uint64]*Pending) + o.pending[2] = &Pending{} + o.addToRedeliverQueue(2) + + _, _, err = o.getNextMsg() + require_Error(t, err, ErrStoreEOF) + require_Len(t, len(o.pending), 0) + require_Len(t, len(o.rdc), 0) + }) + } +} From 8e940032ee8d42ab2cc72f222bf62e111633c862 Mon Sep 17 00:00:00 2001 From: souravagrawal Date: Sat, 21 Dec 2024 22:54:18 +0530 Subject: [PATCH 8/8] Disable JetStream on disk errors (#6292) Currently, there are scenarios where NATS JetStream may encounter permission errors when file system goes into read only mode, which can lead to an inconsistent state. In such cases, the system continues to allow publishing messages by resetting stream state, leading to a misaligned consumer stream sequence. This PR introduces changes to gracefully handle these permission errors and prevent NATS from continuing in an inconsistent state when: - Flushing stream state to disk - Deleting expired messages on startup - Creating new block for messages After this PR, If NATS is running in non-clustered mode, the user will be unable to issue write requests until the issue is resolved. In clustered mode, only the affected node will stop accepting requests, while the system will continue to function as long as the required quorum remains healthy. PR potentially fixes : #6211 which leads to consumer sequence reaching higher than stream sequence. Signed-off-by: Sourabh Agrawal --- server/filestore.go | 55 +++++++++++++++++++++------ server/filestore_test.go | 81 +++++++++++++++++++++++++++++++++++++++- server/jetstream.go | 11 ++++++ server/raft.go | 4 ++ server/store.go | 5 +++ server/stream.go | 8 ++++ 6 files changed, 152 insertions(+), 12 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index b9767928ac6..4e0c28ca4ab 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -495,7 +495,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Do age checks too, make sure to call in place. if fs.cfg.MaxAge != 0 { - fs.expireMsgsOnRecover() + err := fs.expireMsgsOnRecover() + if isPermissionError(err) { + return nil, err + } fs.startAgeChk() } @@ -1978,9 +1981,9 @@ func (fs *fileStore) recoverMsgs() error { // We will treat this differently in case we have a recovery // that will expire alot of messages on startup. // Should only be called on startup. -func (fs *fileStore) expireMsgsOnRecover() { +func (fs *fileStore) expireMsgsOnRecover() error { if fs.state.Msgs == 0 { - return + return nil } var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge) @@ -1992,7 +1995,7 @@ func (fs *fileStore) expireMsgsOnRecover() { // usually taken care of by fs.removeMsgBlock() but we do not call that here. var last msgId - deleteEmptyBlock := func(mb *msgBlock) { + deleteEmptyBlock := func(mb *msgBlock) error { // If we are the last keep state to remember first/last sequence. // Do this part by hand since not deleting one by one. if mb == fs.lmb { @@ -2008,8 +2011,12 @@ func (fs *fileStore) expireMsgsOnRecover() { } return true }) - mb.dirtyCloseWithRemove(true) + err := mb.dirtyCloseWithRemove(true) + if isPermissionError(err) { + return err + } deleted++ + return nil } for _, mb := range fs.blks { @@ -2023,8 +2030,11 @@ func (fs *fileStore) expireMsgsOnRecover() { if mb.last.ts <= minAge { purged += mb.msgs bytes += mb.bytes - deleteEmptyBlock(mb) + err := deleteEmptyBlock(mb) mb.mu.Unlock() + if isPermissionError(err) { + return err + } continue } @@ -2148,6 +2158,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if purged > 0 { fs.dirty++ } + return nil } func copyMsgBlocks(src []*msgBlock) []*msgBlock { @@ -3463,6 +3474,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { dios <- struct{}{} if err != nil { + if isPermissionError(err) { + return nil, err + } mb.dirtyCloseWithRemove(true) return nil, fmt.Errorf("Error creating msg block file: %v", err) } @@ -7781,9 +7795,9 @@ func (mb *msgBlock) dirtyClose() { } // Should be called with lock held. -func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { +func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { if mb == nil { - return + return nil } // Stop cache expiration timer. if mb.ctmr != nil { @@ -7805,13 +7819,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { // Clear any tracking by subject if we are removing. mb.fss = nil if mb.mfn != _EMPTY_ { - os.Remove(mb.mfn) + err := os.Remove(mb.mfn) + if isPermissionError(err) { + return err + } mb.mfn = _EMPTY_ } if mb.kfn != _EMPTY_ { - os.Remove(mb.kfn) + err := os.Remove(mb.kfn) + if isPermissionError(err) { + return err + } } } + return nil } // Remove a seq from the fss and select new first. @@ -8224,7 +8245,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) { for { select { case <-t.C: - fs.writeFullState() + err := fs.writeFullState() + if isPermissionError(err) && fs.srv != nil { + fs.warn("File system permission denied when flushing stream state, disabling JetStream: %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return + } + case <-qch: return } @@ -8432,7 +8461,11 @@ func (fs *fileStore) _writeFullState(force bool) error { // Protect with dios. <-dios err := os.WriteFile(fn, buf, defaultFilePerms) + // if file system is not writable isPermissionError is set to true dios <- struct{}{} + if isPermissionError(err) { + return err + } // Update dirty if successful. if err == nil { diff --git a/server/filestore_test.go b/server/filestore_test.go index 458cef7a740..18493b61017 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "io" + "io/fs" "math/bits" "math/rand" "os" @@ -143,9 +144,9 @@ func TestFileStoreBasics(t *testing.T) { func TestFileStoreMsgHeaders(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) defer fs.Stop() - subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World") elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8 if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen { @@ -8194,3 +8195,81 @@ func TestFileStoreNumPendingMulti(t *testing.T) { } require_Equal(t, total, checkTotal) } + +func TestFileStoreStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE, _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + changeDirectoryPermission(directory, READONLY_MODE) + require_NoError(t, err) + totalMsgs := 10000 + i := 0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg) + if err != nil { + break + } + } + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func TestFileStoreWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE, _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + require_NoError(t, err) + totalMsgs := 10000 + i := 0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg) + if err != nil { + break + } + } + changeDirectoryPermission(directory, READONLY_MODE) + err = fs.writeFullState() + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func changeDirectoryPermission(directory string, mode fs.FileMode) error { + err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("error accessing path %q: %w", path, err) + } + + // Check if the path is a directory or file and set permissions accordingly + if info.IsDir() { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing directory permissions for %q: %w", path, err) + } + } else { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing file permissions for %q: %w", path, err) + } + } + return nil + }) + return err +} diff --git a/server/jetstream.go b/server/jetstream.go index 2e606e6a6fa..c1e709a19ef 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -2974,3 +2974,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) { cfg.Duplicates = 0 } } + +func (s *Server) handleWritePermissionError() { + //TODO Check if we should add s.jetStreamOOSPending in condition + if s.JetStreamEnabled() { + s.Errorf("File system permission denied while writing, disabling JetStream") + + go s.DisableJetStream() + + //TODO Send respective advisory if needed, same as in handleOutOfSpace + } +} diff --git a/server/raft.go b/server/raft.go index e3242afb711..427e8ce6773 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3861,6 +3861,10 @@ func (n *raft) setWriteErrLocked(err error) { n.error("Critical write error: %v", err) n.werr = err + if isPermissionError(err) { + go n.s.handleWritePermissionError() + } + if isOutOfSpaceErr(err) { // For now since this can be happening all under the covers, we will call up and disable JetStream. go n.s.handleOutOfSpace(nil) diff --git a/server/store.go b/server/store.go index 1c8f7f7ec1f..2d72f694740 100644 --- a/server/store.go +++ b/server/store.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "os" "strings" "time" "unsafe" @@ -780,3 +781,7 @@ func copyString(s string) string { copy(b, s) return bytesToString(b) } + +func isPermissionError(err error) bool { + return err != nil && os.IsPermission(err) +} diff --git a/server/stream.go b/server/stream.go index e34f8cd4ba9..a2883631d48 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4640,6 +4640,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } if err != nil { + if isPermissionError(err) { + mset.mu.Unlock() + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + mset.srv.DisableJetStream() + mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err) + return err + } // If we did not succeed put those values back and increment clfs in case we are clustered. var state StreamState mset.store.FastState(&state)