-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Add Error Handlers to File IO and related IOs (TextIO, AvroIO) #29670
Add Error Handlers to File IO and related IOs (TextIO, AvroIO) #29670
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
* | ||
* <p>When using dynamic destinations, or when using a formatting function to format a record for | ||
* writing, it's possible for an individual record to be malformed, causing an exception. By | ||
* default, these exceptions are propagated to the runner, and are usually retried, though this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these exceptions are propagated to the runner
-> this means the bundle fails right? May be good to clearly mention that
@@ -1288,6 +1317,12 @@ public Write<DestinationT, UserT> withNoSpilling() { | |||
return toBuilder().setNoSpilling(true).build(); | |||
} | |||
|
|||
/** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have the documentation live here instead of WriteFiles? I think users will interact with FileIO more as the top-level transform.
* | ||
* <p>Error handling for records that are malformed can be handled by using {@link | ||
* TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. See documentation in | ||
* {@link FileIO} for details on usage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We point to FileIO
for details on usage and examples, but for the withBadRecordErrorHandler
method definition below we point to WriteFiles
(same pattern in AvroIO)
I think it makes sense to accumulate any error handling documentation in one place to make it easier for users to find and devs to update in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes a bunch of sense, I'll normalize it
PCollectionTuple writeTuple = | ||
input.apply( | ||
"WritedUnshardedBundles", | ||
ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder, inputCoder)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be passing unwrittenRecordsTag
to WriteUnshardedTempFilesFn
here? it's referenced later in lines 543, 546
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was unecessary as it turns out, so I've removed it
|
||
// Utility method to format a record based on the dynamic destination. If the operation fails, and | ||
// is output to the bad record router, this returns null | ||
private @Nullable OutputT formatRecordWithErrorHandling( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatRecordWithErrorHandling
and getDestinationWithErrorHandling
have a similar function. They perform an operation on an element. If successful, the output of the operation is returned. Otherwise it gets routed with a custom error message and a default is returned instead.
If it makes sense, this can be implemented as a generic utility method that can be used across IOs (prob out of scope of this PR tho)
private void addErrorCollection(PCollectionTuple sourceTuple) { | ||
getBadRecordErrorHandler() | ||
.addErrorCollection( | ||
sourceTuple | ||
.get(BAD_RECORD_TAG) | ||
.setCoder(BadRecord.getCoder(sourceTuple.getPipeline()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also looks like something that can be a generic utility method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can't be entirely generic, as getBadRecordErrorHandler is specific to WriteFiles, but I agree that if I'm repeating this pattern a bunch I should look to simplify it
the flink precommit is known to be flaky |
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.