Skip to content

3021_add_subscriptionMap_log_subscription #3335

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extra/rediscensus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ require (
)

retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
v9.5.3 // This version was accidentally released.
)
2 changes: 1 addition & 1 deletion extra/rediscmd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ require (
)

retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
v9.5.3 // This version was accidentally released.
)
2 changes: 1 addition & 1 deletion extra/redisotel/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ require (
)

retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
v9.5.3 // This version was accidentally released.
)
2 changes: 1 addition & 1 deletion extra/redisprometheus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ require (
)

retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
v9.5.3 // This version was accidentally released.
)
36 changes: 32 additions & 4 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redis
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -571,6 +572,9 @@ type channel struct {
chanSize int
chanSendTimeout time.Duration
checkInterval time.Duration

subscriptions map[string]struct{}
subscriptionsMu sync.RWMutex
}

func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
Expand All @@ -580,6 +584,8 @@ func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
chanSize: 100,
chanSendTimeout: time.Minute,
checkInterval: 3 * time.Second,

subscriptions: make(map[string]struct{}),
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -618,6 +624,20 @@ func (c *channel) initHealthCheck() {
}()
}

// Helper function to format subscription information
func (c *channel) getSubscriptionInfo() string {
if len(c.subscriptions) == 0 {
return "none"
}

subs := make([]string, 0, len(c.subscriptions))
for sub := range c.subscriptions {
subs = append(subs, sub)
}
sort.Strings(subs) // Sort for consistent output
return strings.Join(subs, ", ")
}

// initMsgChan must be in sync with initAllChan.
func (c *channel) initMsgChan() {
ctx := context.TODO()
Expand Down Expand Up @@ -663,9 +683,13 @@ func (c *channel) initMsgChan() {
<-timer.C
}
case <-timer.C:
c.subscriptionsMu.RLock()
subInfo := c.getSubscriptionInfo()
c.subscriptionsMu.RUnlock()

internal.Logger.Printf(
ctx, "redis: %s channel is full for %s (message is dropped)",
c, c.chanSendTimeout)
ctx, "redis: Channel is full for %s (message is dropped), subscriptions: %s",
c.chanSendTimeout, subInfo)
}
default:
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
Expand Down Expand Up @@ -717,9 +741,13 @@ func (c *channel) initAllChan() {
<-timer.C
}
case <-timer.C:
c.subscriptionsMu.RLock()
subInfo := c.getSubscriptionInfo()
c.subscriptionsMu.RUnlock()

internal.Logger.Printf(
ctx, "redis: %s channel is full for %s (message is dropped)",
c, c.chanSendTimeout)
ctx, "redis: Channel is full for %s (message is dropped), subscriptions: %s",
c.chanSendTimeout, subInfo)
}
default:
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
Expand Down
Loading