Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[FIXED] LeafNodes: Queue interest may be lost in super cluster #6377

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2525,7 +2525,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
}
key := bytesToString(sub.sid)
osub := c.subs[key]
updateGWs := false
if osub == nil {
c.subs[key] = sub
// Now place into the account sl.
Expand All @@ -2536,7 +2535,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
c.sendErr("Invalid Subscription")
return nil
}
updateGWs = srv.gateway.enabled
} else if sub.queue != nil {
// For a queue we need to update the weight.
delta = sub.qw - atomic.LoadInt32(&osub.qw)
Expand All @@ -2559,7 +2557,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
if !spoke {
// If we are routing add to the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, delta)
if updateGWs {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
}
}
Expand Down Expand Up @@ -2601,27 +2599,27 @@ func (c *client) processLeafUnsub(arg []byte) error {
return nil
}

updateGWs := false
spoke := c.isSpokeLeafNode()
// We store local subs by account and subject and optionally queue name.
// LS- will have the arg exactly as the key.
sub, ok := c.subs[string(arg)]
if !ok {
// If not found, don't try to update routes/gws/leaf nodes.
c.mu.Unlock()
return nil
}
delta := int32(1)
if ok && len(sub.queue) > 0 {
if len(sub.queue) > 0 {
delta = sub.qw
}
c.mu.Unlock()

if ok {
c.unsubscribe(acc, sub, true, true)
updateGWs = srv.gateway.enabled
}

c.unsubscribe(acc, sub, true, true)
if !spoke {
// If we are routing subtract from the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, -delta)
// Gateways
if updateGWs {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
}
}
Expand Down
Loading
Loading