Skip to content

Commit

Permalink
[HUDI-6092] Reuse schema objects while deserializing log blocks (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
prashantwason authored and yihua committed May 17, 2023
1 parent 0157e4b commit 7dab4d8
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected HoodieAvroHFileWriter createWriter(
protected HoodieAvroFileReader createReader(
Configuration conf) throws Exception {
CacheConfig cacheConfig = new CacheConfig(conf);
return new HoodieAvroHFileReader(conf, getFilePath(), cacheConfig, getFilePath().getFileSystem(conf));
return new HoodieAvroHFileReader(conf, getFilePath(), cacheConfig, getFilePath().getFileSystem(conf), Option.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content)
}

public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock, byte[] content) throws IOException {
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(dataBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
return new RecordIterator(dataBlock.readerSchema, writerSchema, content);
return new RecordIterator(dataBlock.readerSchema, dataBlock.getSchemaFromHeader(), content);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
Expand Down Expand Up @@ -62,6 +63,9 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {

protected Schema readerSchema;

// Map of string schema to parsed schema.
private static ConcurrentHashMap<String, Schema> schemaMap = new ConcurrentHashMap<>();

/**
* NOTE: This ctor is used on the write-path (ie when records ought to be written into the log)
*/
Expand Down Expand Up @@ -194,6 +198,12 @@ protected Option<String> getRecordKey(HoodieRecord record) {
return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName));
}

protected Schema getSchemaFromHeader() {
String schemaStr = getLogBlockHeader().get(HeaderMetadataType.SCHEMA);
schemaMap.computeIfAbsent(schemaStr, (schemaString) -> new Schema.Parser().parse(schemaString));
return schemaMap.get(schemaStr);
}

/**
* Converts the given list to closable iterator.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,8 @@ protected byte[] serializeRecords(List<HoodieRecord> records) throws IOException
protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] content, HoodieRecordType type) throws IOException {
checkState(readerSchema != null, "Reader's schema has to be non-null");

// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));

// Read the content
HoodieAvroHFileReader reader = new HoodieAvroHFileReader(null, pathForReader, content, Option.of(writerSchema));
HoodieAvroHFileReader reader = new HoodieAvroHFileReader(null, pathForReader, content, Option.of(getSchemaFromHeader()));
return unsafeCast(reader.getRecordIterator(readerSchema));
}

Expand All @@ -196,7 +193,8 @@ protected <T> ClosableIterator<HoodieRecord<T>> lookupRecords(List<String> keys,
Collections.sort(sortedKeys);

final HoodieAvroHFileReader reader =
new HoodieAvroHFileReader(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf));
new HoodieAvroHFileReader(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf),
Option.of(getSchemaFromHeader()));

// Get writer's schema from the header
final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public HoodieAvroHFileReader(Configuration hadoopConf, Path path, CacheConfig ca
Option.empty());
}

public HoodieAvroHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException {
this(path, HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, hadoopConf), Option.empty());
public HoodieAvroHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig, FileSystem fs, Option<Schema> schemaOpt) throws IOException {
this(path, HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, hadoopConf), schemaOpt);
}

public HoodieAvroHFileReader(FileSystem fs, Path dummyPath, byte[] content, Option<Schema> schemaOpt) throws IOException {
Expand Down

0 comments on commit 7dab4d8

Please # to comment.