Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Serializing the complete payload object instead of serializing just the GenericRecord in HoodieRecordConverter #495

Merged
merged 1 commit into from
Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
import com.uber.hoodie.common.util.collection.converter.StringConverter;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieUpsertException;
Expand Down Expand Up @@ -149,9 +147,7 @@ private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
// Load the new records in a map
logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge());
this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(),
config.getSpillableMapBasePath(), new StringConverter(),
new HoodieRecordConverter(schema, config.getPayloadClass()),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema));
config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.uber.hoodie.common.model;

import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Optional;
import org.apache.avro.Schema;
Expand All @@ -29,10 +30,16 @@
*/
public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload> {

private final Optional<GenericRecord> record;
// Store the GenericRecord converted to bytes - 1) Doesn't store schema hence memory efficient 2) Makes the payload
// java serializable
private final byte [] recordBytes;

public HoodieAvroPayload(Optional<GenericRecord> record) {
this.record = record;
try {
this.recordBytes = HoodieAvroUtils.avroToBytes(record.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we are not reusing the encoder/decoder in these methods. This can mess with performance.

Prior to diskbased map, this was only used in tests.. can you please fix this? If true, it can generally speed up compaction as well

Copy link
Contributor Author

@n3nash n3nash Nov 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to do that for payloads, it's tricky since we would have to have the binaryEncoder as a member variable along with the outputStream. That would mean we would have to introduce another "non-thread safe" method. Also, the BinaryEncoder itself is not thread safe : https://github.com/apache/avro/blob/release-1.7.7/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java#L153.
We could introduce this method and make use of it in the compaction step since we are single threaded there but for HoodieAvroPayload use-cases may be not given people could init the Payload in a multi-threaded way.
@vinothchandar WDYT ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a static encoder cache per schema right? I think we should address this if the performance hit is large ..

} catch (IOException io) {
throw new HoodieIOException("Cannot convert record to bytes", io);
}
}

@Override
Expand All @@ -48,6 +55,7 @@ public Optional<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentVal

@Override
public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
Optional<GenericRecord> record = Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
import com.uber.hoodie.common.util.HoodieTimer;
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
import com.uber.hoodie.common.util.collection.converter.StringConverter;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Iterator;
Expand Down Expand Up @@ -71,7 +69,6 @@ public HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String>
try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath,
new StringConverter(), new HoodieRecordConverter(readerSchema, getPayloadClassFQN()),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(readerSchema));
// Do the scan and merge
timer.startTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.uber.hoodie.common.util.collection;

import com.uber.hoodie.common.util.SerializationUtils;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.common.util.collection.converter.Converter;
import com.uber.hoodie.common.util.collection.io.storage.SizeAwareDataOutputStream;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
Expand All @@ -26,6 +26,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.AbstractMap;
import java.util.Collection;
Expand All @@ -45,15 +46,11 @@
* without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata.
* 2) Current position in the file NOTE : Only String.class type supported for Key
*/
public final class DiskBasedMap<T, R> implements Map<T, R> {
public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R> {

private static final Logger log = LogManager.getLogger(DiskBasedMap.class);
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, ValueMetadata> valueMetadataMap;
// Key converter to convert key type to bytes
private final Converter<T> keyConverter;
// Value converter to convert value type to bytes
private final Converter<R> valueConverter;
// Read only file access to be able to seek to random positions to readFromDisk values
private RandomAccessFile readOnlyFileHandle;
// Write only OutputStream to be able to ONLY append to the file
Expand All @@ -67,17 +64,14 @@ public final class DiskBasedMap<T, R> implements Map<T, R> {
private String filePath;


protected DiskBasedMap(String baseFilePath,
Converter<T> keyConverter, Converter<R> valueConverter) throws IOException {
protected DiskBasedMap(String baseFilePath) throws IOException {
this.valueMetadataMap = new HashMap<>();
File writeOnlyFileHandle = new File(baseFilePath, UUID.randomUUID().toString());
this.filePath = writeOnlyFileHandle.getPath();
initFile(writeOnlyFileHandle);
this.fileOutputStream = new FileOutputStream(writeOnlyFileHandle, true);
this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream);
this.filePosition = new AtomicLong(0L);
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
}

private void initFile(File writeOnlyFileHandle) throws IOException {
Expand Down Expand Up @@ -125,7 +119,7 @@ public void run() {
*/
public Iterator<R> iterator() {
return new LazyFileIterable(readOnlyFileHandle,
valueMetadataMap, valueConverter).iterator();
valueMetadataMap).iterator();
}

/**
Expand Down Expand Up @@ -162,7 +156,7 @@ public R get(Object key) {
return null;
}
try {
return this.valueConverter.getData(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
entry.getOffsetOfValue(), entry.getSizeOfValue()));
} catch (IOException e) {
throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
Expand All @@ -172,12 +166,12 @@ public R get(Object key) {
@Override
public R put(T key, R value) {
try {
byte[] val = this.valueConverter.getBytes(value);
byte[] val = SerializationUtils.serialize(value);
Integer valueSize = val.length;
Long timestamp = new Date().getTime();
this.valueMetadataMap.put(key,
new DiskBasedMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
byte[] serializedKey = keyConverter.getBytes(key);
byte[] serializedKey = SerializationUtils.serialize(key);
filePosition.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle,
new FileEntry(SpillableMapUtils.generateChecksum(val),
serializedKey.length, valueSize, serializedKey, val, timestamp)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import com.twitter.common.objectsize.ObjectSizeCalculator;
import com.uber.hoodie.common.util.SizeEstimator;
import com.uber.hoodie.common.util.collection.converter.Converter;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -39,7 +39,7 @@
* trade-off: If the spill threshold is too high, the in-memory map may occupy more memory than is available, resulting
* in OOM. However, if the spill threshold is too low, we spill frequently and incur unnecessary disk writes.
*/
public class ExternalSpillableMap<T, R> implements Map<T, R> {
public class ExternalSpillableMap<T extends Serializable, R extends Serializable> implements Map<T, R> {

// Find the actual estimated payload size after inserting N records
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
Expand All @@ -53,10 +53,6 @@ public class ExternalSpillableMap<T, R> implements Map<T, R> {
// TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and
// incorrect payload estimation
private final Double sizingFactorForInMemoryMap = 0.8;
// Key converter to convert key type to bytes
private final Converter<T> keyConverter;
// Value converter to convert value type to bytes
private final Converter<R> valueConverter;
// Size Estimator for key type
private final SizeEstimator<T> keySizeEstimator;
// Size Estimator for key types
Expand All @@ -69,15 +65,12 @@ public class ExternalSpillableMap<T, R> implements Map<T, R> {
private boolean shouldEstimatePayloadSize = true;

public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath,
Converter<T> keyConverter, Converter<R> valueConverter,
SizeEstimator<T> keySizeEstimator, SizeEstimator<R> valueSizeEstimator) throws IOException {
this.inMemoryMap = new HashMap<>();
this.diskBasedMap = new DiskBasedMap<>(baseFilePath, keyConverter, valueConverter);
this.diskBasedMap = new DiskBasedMap<>(baseFilePath);
this.maxInMemorySizeInBytes = (long) Math
.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap);
this.currentInMemoryMapSize = 0L;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.keySizeEstimator = keySizeEstimator;
this.valueSizeEstimator = valueSizeEstimator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.uber.hoodie.common.util.collection;

import com.uber.hoodie.common.util.SerializationUtils;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.common.util.collection.converter.Converter;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
Expand All @@ -37,20 +37,16 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
private final RandomAccessFile readOnlyFileHandle;
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, DiskBasedMap.ValueMetadata> inMemoryMetadataOfSpilledData;
private final Converter<R> valueConverter;

public LazyFileIterable(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map,
Converter<R> valueConverter) {
public LazyFileIterable(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map) {
this.readOnlyFileHandle = file;
this.inMemoryMetadataOfSpilledData = map;
this.valueConverter = valueConverter;
}

@Override
public Iterator<R> iterator() {
try {
return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData,
valueConverter);
return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData);
} catch (IOException io) {
throw new HoodieException("Unable to initialize iterator for file on disk", io);
}
Expand All @@ -61,14 +57,11 @@ public Iterator<R> iterator() {
*/
public class LazyFileIterator<T, R> implements Iterator<R> {

private final Converter<R> valueConverter;
private RandomAccessFile readOnlyFileHandle;
private Iterator<Map.Entry<T, DiskBasedMap.ValueMetadata>> metadataIterator;

public LazyFileIterator(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map,
Converter<R> valueConverter) throws IOException {
public LazyFileIterator(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map) throws IOException {
this.readOnlyFileHandle = file;
this.valueConverter = valueConverter;
// sort the map in increasing order of offset of value so disk seek is only in one(forward) direction
this.metadataIterator = map
.entrySet()
Expand All @@ -88,7 +81,7 @@ public boolean hasNext() {
public R next() {
Map.Entry<T, DiskBasedMap.ValueMetadata> entry = this.metadataIterator.next();
try {
return valueConverter.getData(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue()));
} catch (IOException e) {
throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e);
Expand Down

This file was deleted.

Loading