-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
changefeedccl/kvfeed: pass consumer id correctly #138939
Conversation
Previously, we introduced the concept of a consumer ID to prevent a single changefeed job from over-consuming the catch-up scan quota and blocking other consumers from making progress on the server side. However, the changefeed client-side code requires the consumer ID to be passed again in the rangefeed options during rangefeedFactory.Run. This was missing in the previous PR, causing the changefeed job ID to default to zero. This patch fixes the issue by ensuring the consumer ID is correctly passed in the rangefeed options. Related: cockroachdb#133789 Release note: None
It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
@@ -97,6 +97,7 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang | |||
if cfg.RangeObserver != nil { | |||
rfOpts = append(rfOpts, kvcoord.WithRangeObserver(cfg.RangeObserver)) | |||
} | |||
rfOpts = append(rfOpts, kvcoord.WithConsumerID(cfg.ConsumerID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this have a != 0
check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value should be zero, so setting it shouldn't make a difference
cockroach/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Lines 183 to 186 in ad6f23c
var cfg rangeFeedConfig | |
for _, opt := range opts { | |
opt.set(&cfg) | |
} |
rangefeedOpts = append(rangefeedOpts, kvcoord.WithConsumerID(f.consumerID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd still be nice to not pass the functional option if we know it's not going to have an effect but no strong feelings either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks :D Sorry about that.
TFTR! bors r=andyyang890, stevendanna |
Previously, we introduced the concept of a consumer ID to prevent a single
changefeed job from over-consuming the catch-up scan quota and blocking other
consumers from making progress on the server side. However, the changefeed
client-side code requires the consumer ID to be passed again in the rangefeed
options during rangefeedFactory.Run. This was missing in the previous
PR, causing the changefeed job ID to default to zero. This patch fixes
the issue by ensuring the consumer ID is correctly passed in the
rangefeed options.
Related: #133789
Release note: none
Epic: none