Skip to content

Can we add a stream information to SubscriptionListener.SubscriptionContext? #540

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
laststem opened this issue Apr 26, 2024 · 3 comments
Closed
Labels
Milestone

Comments

@laststem
Copy link
Contributor

Is your feature request related to a problem? Please describe.

It will be nice if SubscriptionListener have stream information(stream name, consumer name... etc) in addition to offset

Describe the solution you'd like

Add a information to SubscriptionListener.SubscriptionContext and it must be readonly

Describe alternatives you've considered

No response

Additional context

No response

@laststem laststem added the enhancement New feature or request label Apr 26, 2024
@acogoluegnes
Copy link
Contributor

What would be the use case?

@acogoluegnes acogoluegnes added the good first issue Good for newcomers label Apr 26, 2024
@laststem
Copy link
Contributor Author

laststem commented Apr 26, 2024

Like a ConsumerUpdateListener.Context, can provide more information to the library user for improving usability.

for example, In the subscription implementation(MyStreamSubscriptionListener), it is not possible to distinguish which stream (my-stream ? or my-stream2 ?) is being controlled.

myStreamSubscriptionListener = new MyStreamSubscriptionListener();

environment.consumerBuilder()
    .stream("my-stream")
    .subscriptionListener(myStreamSubscriptionListener)
environment.consumerBuilder()
    .stream("my-stream2")
    .subscriptionListener(myStreamSubscriptionListener)

if there are more information in SubscriptionListener.SubscriptionContext, MyStreamSubscriptionListener can distinguish stream like this.

class MySubscriptionListener : SubscriptionListener {

    private val externalOffsetSource = mapOf<String, Long>()

    override fun preSubscribe(subscriptionContext: SubscriptionListener.SubscriptionContext) {
        val externalOffset = externalOffsetSource[subscriptionContext.stream()]!!
        subscriptionContext.offsetSpecification(OffsetSpecification.offset(externalOffset))
    }
}

And i have multiple stream consumer in single application. but i can't distinguish which stream offset is for from below log (ConsumersCoordinator.add). only printed offset in the log

              LOGGER.info(
                  "Requested offset specification {} not used in favor of stored offset found for reference {}",
                  offsetSpecification,
                  offsetTrackingReference);

Yes. i can attach lambda subscriptionListener on each consumerBuilder() to distinguish stream.
and i can print log offset with stream in lambda on each consumerBuilder().

But like a ConsumerUpdateListener.Context, it will provide more information to the library user for improving usability.
thank you

@acogoluegnes acogoluegnes added usability and removed enhancement New feature or request labels Apr 26, 2024
@acogoluegnes acogoluegnes added this to the 0.16.0 milestone Apr 26, 2024
@acogoluegnes
Copy link
Contributor

I added the stream name to the context, you can try with snapshots.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants