-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader #100
Conversation
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
its work , i had try. |
stoppingOffset); | ||
recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); | ||
finishSplitAtRecord( | ||
tp, stoppingOffset, consumerPosition, finishedPartitions, recordsBySplits); | ||
} | ||
// Track this partition's record lag if it never appears before | ||
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumerRecords.partitions().forEach(trackTp -> {
// Track this partition's record lag if it never appears before
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, trackTp);
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do not need to track tp when consumerRecords is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LinMingQiang, thanks for the review.
I've changed to track tp, only when there is record for that tp.
e9bcea2
to
3e07d38
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @dongwoo6kim, overall seems okay from my side. But would be possible to add a integration-test ITCase for this case
Hello @morazow I've added ITCase for this case. |
Thanks @dongwoo6kim , Tests looks good from my side 👍 (Recently I faced similar issue which maybe related, when running batch mode with setting |
Thanks for confirming @morazow, |
Hey @dongwoo6kim, we created another issue for it, the solution seems to be similar but let's discuss it again once this PR is merged. |
Thanks @dongwoo6kim, Looks good! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongwoo6kim thank you very much for your contribution. I have a couple of remarks.
If you don't have time to fix it, I can also take over because we also hitting that on production.
.../src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
Show resolved
Hide resolved
...connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
Outdated
Show resolved
Hide resolved
List<ProducerRecord<String, Integer>> records = | ||
KafkaSourceTestEnv.getRecordsForTopic(transactionalTopic); | ||
// Prepare records for executeAndVerify method | ||
records.removeIf(record -> record.partition() > record.value()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is preprocessing step before using executeAndVerify method to verify test result.
This method expects records from partition P
should be an integer sequence from P
to NUM_RECORDS_PER_PARTITION
. So I deleted records where the value is less than the partition number.
Similar approach to here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change the comment to reflect how the data looks afterwards? I'm assuming this will preserve one record per partition?
I'm also curios why we need a specific data layout for this test to work? Or rephrased: what would happen if we retain all records originally generated? Wouldn't the test still assert similar things?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made two changes.
- Replaced
records.removeIf(record -> record.partition() > record.value())
withKafkaSourceTestEnv.setupEarliestOffsets(transactionalTopic)
, as it serves a similar purpose. - Added a comment explaining how the data looks after the setup.
I'm assuming this will preserve one record per partition?
After data modification, each partition p will contain records from p to NUM_RECORDS_PER_PARTITION (which is 10). For example, partition 1 has records 1 to 10, and partition 5 has records 5 to 10.
what would happen if we retain all records originally generated
If we retain all records we need to make new assertion logic for the generated records.
The main purpose of this data modification setup is to reuse executeAndVerify
method.
When you look at here executeAndVerify
method expects the input data to be modified like this way.
I intended to reuse existing test util functions and follow the test code convention but if you think it is causing unnecessary confusion I can change the test code to have custom assertion logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's needed for verification, then all good. We should probably revise the setup then at a later point.
...connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
Show resolved
Hide resolved
...connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
Outdated
Show resolved
Hide resolved
...-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
Show resolved
Hide resolved
...-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
Outdated
Show resolved
Hide resolved
bfcca76
to
2134218
Compare
@AHeise thanks for the feedback! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for the swift response and action. Changes look good to me. I will approve and merge once CI passes.
My only concern is that I don't fully understand why we need to modify the data in the test case. Shouldn't a transaction marker always be at the end? What did I miss?
Also if possible could you squash your commits and extend the commit message to include some of the information of the PR description? Basically state the problem and the briefly summarize the solution. Your commit message and PR title is already very descriptive, so it's not necessary for merging and it would just be a nice-to-have.
List<ProducerRecord<String, Integer>> records = | ||
KafkaSourceTestEnv.getRecordsForTopic(transactionalTopic); | ||
// Prepare records for executeAndVerify method | ||
records.removeIf(record -> record.partition() > record.value()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change the comment to reflect how the data looks afterwards? I'm assuming this will preserve one record per partition?
I'm also curios why we need a specific data layout for this test to work? Or rephrased: what would happen if we retain all records originally generated? Wouldn't the test still assert similar things?
Seems like the deleted arch unit rule was still needed. What was your intent when you deleted it? |
2134218
to
9a48edc
Compare
…stopping condition on split reader Problem: In batch mode, flink kafka connector could hang when consuming transactional messages or reading from deleted records. Solution: Use consumer.position() instead of lastRecord's offset to skip control and deleted messages, preventing the hang.
9a48edc
to
1605d51
Compare
It was automatically deleted after running |
@AHeise Thanks for the feedback. I've left some comments and made updates. Please have a look. |
Changes all LGTM. I'm running CI and merge when it's green. |
Awesome work, congrats on your first merged pull request! |
Problem
When using the flink kafka connector in batch scenarios, consuming transactional messages can cause indefinite hanging.
This issue can be easily reproduced with following steps.
scan.bounded.mode
tolatest-offset
and run consumer using flink kafka connectorCause
The previous stopping condition in the
KafkaPartitionSplitReader
compared the offset of the last record with thestoppingOffset
. This approach works for streaming use cases and batch processing of non-transactional messages. However, in scenarios involving transactional messages, this is insufficient.Control messages, which are not visible to clients, can occupy the entire range between the last record's offset and the stoppingOffset which leads to indefinite blocking.
Workaround
I've modified the stopping condition to use
consumer.position(tp)
, which effectively skips any control messages present in the current poll, pointing directly to the next record's offset.To handle edge cases, particularly when
properties.max.poll.records
is set to1
, I've adjusted the fetch method to always check all assigned partitions, even if no records are returned in a poll.Edge case example
Consider partition
0
, where offsets13
and14
are valid records and15
is a control record. IfstoppingOffset
is set to 15 for partition0
andproperties.max.poll.records
is configured to1
, checking only partitions that return records would miss offset 15. By consistently reviewing all assigned partitions, the consumer’s position jumps control record in the subsequent poll, allowing the system to escape.Discussion
To address the metric issue in FLINK-33484, I think we need to make wrapper class of
ConsumerRecord
for exampleConsumerRecordWithOffsetJump
.And we may need new
KafkaPartitionSplitReader
that implementsSplitReader<ConsumerRecordWithOffsetJump<byte[], byte[]>, KafkaPartitionSplit>
.So when record is emitted it should set current offset not just
record.offset()+1
butrecord.offset() + record.jumpValue
in here.jumpValue
is typically 1, except for the last record of each poll where it's calculated asconsumer.position() - lastRecord.offset()
.If this sounds good to everyone, I'm happy to work on this.