Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Serializing the complete payload object instead of serializing just the GenericRecord in HoodieRecordConverter #495

Merged
merged 1 commit into from
Dec 4, 2018

Conversation

n3nash
Copy link
Contributor

@n3nash n3nash commented Nov 5, 2018

Serializing the GenericRecord only in the payload allows the payload to lose information when spilled to disk and back, hence change is to Serialize the whole payload instead.
Note : There is no data loss but only some context lost which may be present in the payload implementation of a user.

I have used SerializationUtils (which uses Java Serialization) to serde the payload. The side effect of this is that if the payload stores types which are serializable by Kryo (by registering them with Spark kryo), this serialization will fail.
The other option is to use Kryo serializer with JavaSerializer as default if kryo doesn't work. Working with Kryo is a little more involved since it's not as simple as converting payload to bytes, see here : https://github.com/EsotericSoftware/kryo
@vinothchandar @bvaradar Let me know your thoughts.

@n3nash n3nash force-pushed the hoodierecordconverter_fix branch 4 times, most recently from 8376852 to 825326c Compare November 5, 2018 04:51
@n3nash n3nash changed the title Serializing the complete payload object instead of serializing just the GenericRecord Serializing the complete payload object instead of serializing just the GenericRecord in HoodieRecordConverter Nov 5, 2018
@vinothchandar
Copy link
Member

The side effect of this is that if the payload stores types which are serializable by Kryo (by registering them with Spark kryo), this serialization will fail.

can you expand on this? We are forcing kryo as the serializer in hoodie anyway.. How is spilling locally within an executor interplay with kryo? I think this will be a problem if we serialize generic record?

@@ -29,10 +30,16 @@
*/
public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload> {

private final Optional<GenericRecord> record;
// Store the GenericRecord converted to bytes - 1) Doesn't store schema hence memory efficient 2) Makes the payload
// serializable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean "java serializable" correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -70,7 +49,7 @@ public HoodieRecordConverter(Schema schema, String payloadClazz) {
Triple.of(Pair.of(hoodieRecord.getKey().getRecordKey(),
hoodieRecord.getKey().getPartitionPath()), Pair.of(currentLocation, newLocation), val);
return SerializationUtils.serialize(data);
} catch (IOException io) {
} catch (Exception io) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -39,27 +33,12 @@
public class HoodieRecordConverter<V> implements
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class is now effectively a static method? Can we clean up and get rid of the Converter hierarchy and just have two util methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a StringConverter class and this hierarchy is to allow for more converters if spillable is to be used elsewhere, let's keep it ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serialization effectively will subsume the need here right.. i.e String is Serializable anyway.. I don't think this is needed per se, if we have serializable requirements for keys too for the SpillableMap. How much work is it to remove this?

*/
protected final GenericRecord record;
protected final byte [] recordBytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means, there is no need to register the schemas in HoodieDeltaStreamer & Spark Source anymore?

@bvaradar can we make a call here?


public HoodieAvroPayload(Optional<GenericRecord> record) {
this.record = record;
try {
this.recordBytes = HoodieAvroUtils.avroToBytes(record.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we are not reusing the encoder/decoder in these methods. This can mess with performance.

Prior to diskbased map, this was only used in tests.. can you please fix this? If true, it can generally speed up compaction as well

Copy link
Contributor Author

@n3nash n3nash Nov 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to do that for payloads, it's tricky since we would have to have the binaryEncoder as a member variable along with the outputStream. That would mean we would have to introduce another "non-thread safe" method. Also, the BinaryEncoder itself is not thread safe : https://github.com/apache/avro/blob/release-1.7.7/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java#L153.
We could introduce this method and make use of it in the compaction step since we are single threaded there but for HoodieAvroPayload use-cases may be not given people could init the Payload in a multi-threaded way.
@vinothchandar WDYT ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a static encoder cache per schema right? I think we should address this if the performance hit is large ..

@n3nash n3nash force-pushed the hoodierecordconverter_fix branch from 825326c to 3ec25f2 Compare November 26, 2018 07:32
@n3nash
Copy link
Contributor Author

n3nash commented Nov 26, 2018

@vinothchandar Addressed some comments, replied to others.

…he GenericRecord

Removing Converter hierarchy as we now depend purely on JavaSerialization and require the payload to be java serializable
@n3nash n3nash force-pushed the hoodierecordconverter_fix branch from 3ec25f2 to 1db73f4 Compare December 4, 2018 01:18
@n3nash
Copy link
Contributor Author

n3nash commented Dec 4, 2018

Removed Converters and opened up an issue for encoders performance here : #521
@vinothchandar @bvaradar

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants