-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[fix][client] Fix Reader.hasMessageAvailable might return true after seeking to latest #22201
[fix][client] Fix Reader.hasMessageAvailable might return true after seeking to latest #22201
Conversation
…seeking to latest ### Motivation Java client has the same issue with apache/pulsar-client-python#199 After a seek operation is done, the `startMessageId` will be updated until the reconnection due to the seek is done in `connectionOpened`. So before it's updated, `hasMessageAvailable` could compare with an outdated `startMessageId` and return a wrong value. ### Modifications Replace `duringSeek` with a `SeekStatus` field: - `NOT_STARTED`: initial, or a seek operation is done. `seek` could only succeed in this status. - `IN_PROGRESS`: A seek operation has started but the client does not receive the response from broker. - `COMPLETED`: The client has received the seek response but the seek future is not done. After the status becomes `COMPLETED`, if the connection is not ready, next time the connection is established, the status will change from `COMPLETED` to `NOT_STARTED` and then seek future will be completed in the internal executor. Add `testHasMessageAvailableAfterSeek` to cover this change.
79ad518
to
5f09ec7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, awesome job @BewareMyPower
There seems to be something wrong for the multi-topics consumer seek. I'm investigating |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #22201 +/- ##
============================================
- Coverage 73.57% 73.56% -0.01%
+ Complexity 32624 32071 -553
============================================
Files 1877 1878 +1
Lines 139502 139663 +161
Branches 15299 15335 +36
============================================
+ Hits 102638 102750 +112
- Misses 28908 28931 +23
- Partials 7956 7982 +26
Flags with carried forward coverage won't be shown. Click here to find out more.
|
…seeking to latest (#22201)
…fter seeking by timestamp ### Motivation After seeking by timestamp, `hasMessageAvailable()` and `readNext()` could return wrong values. The 1st bug was a regression introduced by apache#22201, which modifies `startMessageId` to `seekMessageId` before a seek operation is done. However, the previous behavior is also a bug but accidentally works in this case. When seeking by timestamp, the `seekMessageId` is modified to `earliest`, which should not be compared with `lastMessageIdInBroker` because the actual start position is determined by the seek timestamp, not the `earliest` message id. The 2nd bug was caused by apache#9652, when `startMessageIdInclusive()` is configured to create a reader, it could seek to the position of the latest message when `lastDequeuedMessageId` is `earliest` and `startMessageId` is `latest`. ### Modifications Add a boolean flag `hasSoughtByTimestamp` to represent whether the last seek call accepts a timestamp. If it's true, don't take `startMessageId` into comparison with `lastMessageIdInBroker`, just compare the mark-delete position and last position in the GetLastMessageId response. Add `testHasMessageAvailableAfterSeekTimestamp` to verify the change. For the `readNext` call, if the reader has sought by timestamp, don't seek to the latest position in `hasMessageAvailable`. Modify `testReadMessageWithBatchingWithMessageInclusive` to verify the fix. However, this patch does not modify the existing behavior when `seek` is not called because the inclusive reader relies on the implicit seek operation in `hasMessageAvailable`.
It's necessary to cherry-pick #20963 so that this fix could be applied cleanly. |
…seeking to latest (apache#22201) (cherry picked from commit 95a53f3) (cherry picked from commit 318ff33)
…seeking to latest (apache#22201) (cherry picked from commit 95a53f3) (cherry picked from commit 318ff33)
…seeking to latest (apache#22201) (cherry picked from commit 95a53f3) (cherry picked from commit 318ff33)
…seeking to latest (apache#22201) (cherry picked from commit 95a53f3) (cherry picked from commit 318ff33)
…seeking to latest (apache#22201) (cherry picked from commit 95a53f3) (cherry picked from commit 318ff33)
…seeking to latest (apache#22201)
Motivation
Java client has the same issue with
apache/pulsar-client-python#199
After a seek operation is done, the
startMessageId
will be updated until the reconnection due to the seek is done inconnectionOpened
. So before it's updated,hasMessageAvailable
could compare with an outdatedstartMessageId
and return a wrong value.Modifications
Replace
duringSeek
with aSeekStatus
field:NOT_STARTED
: initial, or a seek operation is done.seek
could only succeed in this status.IN_PROGRESS
: A seek operation has started but the client does not receive the response from broker.COMPLETED
: The client has received the seek response but the seek future is not done.After the status becomes
COMPLETED
, if the connection is not ready, next time the connection is established, the status will change fromCOMPLETED
toNOT_STARTED
and then seek future will be completed in the internal executor.Add
testHasMessageAvailableAfterSeek
to cover this change.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: BewareMyPower#30