forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve][pip] PIP-374: Visibility of messages in receiverQueue for t…
…he consumers (apache#23235) Co-authored-by: vbhat6 <vinayas_bhat@intuit.com>
- Loading branch information
Showing
1 changed file
with
71 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
# PIP-374: Visibility of messages in receiverQueue for the consumers | ||
|
||
# Background knowledge | ||
|
||
When a consumer connects to the Broker, the broker starts dispatching the messages based on receiverQueueSize configured. | ||
There is no observability for the messages arrived on the consumer side if the user didn't call the receive method. It leads to ambiguities at times as | ||
the consumer application does not know whether the message was actually sent by the broker or is it lost in the network or is it lost in the receiver queue. | ||
|
||
ConsumerInterceptors is a plugin interface that intercept and possibly mutate messages received by the consumer. | ||
|
||
|
||
# Motivation | ||
|
||
* We need to receive queue filling of the event as the particular message is already on particular consumer's receiver queue and waiting for the consumer to pickup and process. It may wait in the recieverQueue longer if the consumer processing takes more time. It's very important to provide the visibility of the messages that are waiting in receiverQueue for processing. | ||
|
||
* Availability of a consumer application w.r.t any messaging system depends on the number of messages dispatched from the server/broker against the number of messages acknowledged from the consumer app. This metric defines the processing rate of a consumer. | ||
Currently, the number of acknowledged messages can be counted by having a counter in onAcknowledge() method of ConsumerInterceptor. But, there is no way to capture the number of messages arrived in Consumer. | ||
|
||
|
||
What does this solve? | ||
* Visibility about the message in receiverQueue for the consumer. | ||
* Stuck consumer state visibility | ||
* Scale the consumers to process the spikes in producer traffic | ||
* Reduce the overhead of processing the redeliveries | ||
|
||
|
||
# Goals | ||
|
||
## In Scope | ||
|
||
The proposal will add a method to the interceptor to allow users to knowthe message has been received by the consumer. | ||
|
||
Add a default abstract method in ConsumerInterceptor called onArrival() and hook this method call in the internal consumer of MultiTopicConsumerImpl and ConsumerImpl. By this way, there will be an observability of message received for the consumer. | ||
|
||
|
||
# High Level Design | ||
|
||
* Add onArrival() abstract method in ConsumerInterceptor interface. | ||
* Hook this method call where the consumer receives the batch messages at once(based on configured receiverQueueSize). | ||
|
||
|
||
# Detailed Design | ||
|
||
## Design & Implementation Details | ||
|
||
* ConsumerInterceptor.java | ||
``` | ||
default Message<T> onArrival()(Consumer<T> consumer, Message<T> message){ | ||
return message; | ||
} | ||
``` | ||
|
||
* Add hook in ConsumerImpl.messageReceived which calls onArrival method which calculates the the number of message received. | ||
``` | ||
Message<T> interceptMsg = onArrival(consumer,msg); | ||
``` | ||
|
||
# Backward & Forward Compatibility | ||
|
||
## Upgrade | ||
|
||
Since we added a default method onArrival() in interface, one who has provided the implementations for ConsumerInterceptor will not get any compile time error as it has default implementation. If user wants to give implementation from his side, he can override and provide implementation. | ||
|
||
# Links | ||
|
||
<!-- | ||
Updated afterwards | ||
--> | ||
* Mailing List discussion thread: | ||
* Mailing List voting thread: |