Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Cherry-picks for 2.10.25-RC.3 #6384

Merged
merged 8 commits into from
Jan 22, 2025
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.23.4"
- "1.22.10"
- "1.23.5"
- "1.22.11"

go_import_path: github.com/nats-io/nats-server

Expand Down
7 changes: 5 additions & 2 deletions server/ipqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,16 @@ func (q *ipQueue[T]) len() int {
}

// Empty the queue and consumes the notification signal if present.
// Returns the number of items that were drained from the queue.
// Note that this could cause a reader go routine that has been
// notified that there is something in the queue (reading from queue's `ch`)
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
func (q *ipQueue[T]) drain() {
func (q *ipQueue[T]) drain() int {
if q == nil {
return
return 0
}
q.Lock()
olen := len(q.elts)
if q.elts != nil {
q.resetAndReturnToPool(&q.elts)
q.elts, q.pos = nil, 0
Expand All @@ -209,6 +211,7 @@ func (q *ipQueue[T]) drain() {
default:
}
q.Unlock()
return olen
}

// Since the length of the queue goes to 0 after a pop(), it is good to
Expand Down
5 changes: 3 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
limit := atomic.LoadInt64(&js.queueLimit)
if pending >= int(limit) {
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
s.jsAPIRoutedReqs.drain()
drained := int64(s.jsAPIRoutedReqs.drain())
atomic.AddInt64(&js.apiInflight, -drained)

s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Expand All @@ -846,7 +847,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
},
Server: s.Name(),
Domain: js.config.Domain,
Dropped: int64(pending),
Dropped: drained,
})
}
}
Expand Down
20 changes: 9 additions & 11 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2443,7 +2443,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
}
key := bytesToString(sub.sid)
osub := c.subs[key]
updateGWs := false
if osub == nil {
c.subs[key] = sub
// Now place into the account sl.
Expand All @@ -2454,7 +2453,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
c.sendErr("Invalid Subscription")
return nil
}
updateGWs = srv.gateway.enabled
} else if sub.queue != nil {
// For a queue we need to update the weight.
delta = sub.qw - atomic.LoadInt32(&osub.qw)
Expand All @@ -2477,7 +2475,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
if !spoke {
// If we are routing add to the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, delta)
if updateGWs {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
}
}
Expand Down Expand Up @@ -2519,27 +2517,27 @@ func (c *client) processLeafUnsub(arg []byte) error {
return nil
}

updateGWs := false
spoke := c.isSpokeLeafNode()
// We store local subs by account and subject and optionally queue name.
// LS- will have the arg exactly as the key.
sub, ok := c.subs[string(arg)]
if !ok {
// If not found, don't try to update routes/gws/leaf nodes.
c.mu.Unlock()
return nil
}
delta := int32(1)
if ok && len(sub.queue) > 0 {
if len(sub.queue) > 0 {
delta = sub.qw
}
c.mu.Unlock()

if ok {
c.unsubscribe(acc, sub, true, true)
updateGWs = srv.gateway.enabled
}

c.unsubscribe(acc, sub, true, true)
if !spoke {
// If we are routing subtract from the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, -delta)
// Gateways
if updateGWs {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
}
}
Expand Down
Loading
Loading