-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Fixing behavior of buffering in Create/Merge handles for invalid/wrong schema records #558
Conversation
747df20
to
66895c4
Compare
@bvaradar Can you please take a pass ? |
Optional recordMetadata = record.getData().getMetadata(); | ||
try { | ||
if (exception.isPresent()) { | ||
throw exception.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of throwing and catching exceptions which has some overhead, can you copy the catch block here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that the number of such exceptions should be really low, ideally, this is the same piece of code that used to run before, throw and catch exceptions in error scenarios so would like to keep it the same ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 I think we should just do a exception.get() instanceof Throwable
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, done.
@@ -61,20 +59,30 @@ public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, | |||
this.hoodieTable = hoodieTable; | |||
} | |||
|
|||
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread. | |||
static class BufferedPayload<T extends HoodieRecord> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have a different name for this BufferedPayload class. We need to capture that this class encapsulates the result of converting HoodieRecord to Avro (e.g : HoodieAvroRecordGenResult)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this can be a standalone class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, changed the name, but like to keep the class here since it's only used for InsertHandlers ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest looks good
@bvaradar If it looks good, can you or @vinothchandar please merge it ? |
66895c4
to
d5a7c20
Compare
@@ -61,20 +59,30 @@ public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, | |||
this.hoodieTable = hoodieTable; | |||
} | |||
|
|||
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread. | |||
static class HoodieAvroRecordGenResult<T extends HoodieRecord> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to HoodieInsertValueGenResult
? meta question, same thing not needed for MergeHandle, since the buffering does not happen for update path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, this happens for merge handle as well, the name is misleading, renamed the commit.
Optional recordMetadata = record.getData().getMetadata(); | ||
try { | ||
if (exception.isPresent()) { | ||
throw exception.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 I think we should just do a exception.get() instanceof Throwable
instead
d5a7c20
to
13c0d9c
Compare
@vinothchandar addressed your comments |
build failing? |
13c0d9c
to
ee1dc44
Compare
In earlier versions of hoodie, the contract for a error record (a record with bad schema) was to throw an exception during getInsertValue() and then add this record to the failed records list, this prevents from failing the whole job for a single bad record.
In the latest release, this contract has changed. Due to the introduction of parallelizing read/write operations for Create/Merge handles, we are offloading the getInsertValue() to the reader thread to save time in the heavy operation and help in faster runtime. Since, getInsertValue() is called on the reader side, we throw an exception and fail the job even if a single row is with bad schema.
This PR fixes the code back to the original contract.