Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add compression codec configurations for HoodieParquetWriter. #604

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
public static final String PARQUET_COMPRESSION_RATIO = "hoodie.parquet.compression.ratio";
// Default compression ratio for parquet
public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
public static final String PARQUET_COMPRESSION_CODEC = "hoodie.parquet.compression.codec";
// Default compression codec for parquet
public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip";
public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio";
// Default compression ratio for log file to parquet, general 3x
public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35);
Expand Down Expand Up @@ -105,6 +108,11 @@ public Builder parquetCompressionRatio(double parquetCompressionRatio) {
return this;
}

public Builder parquetCompressionCodec(String parquetCompressionCodec) {
props.setProperty(PARQUET_COMPRESSION_CODEC, parquetCompressionCodec);
return this;
}

public Builder logFileToParquetCompressionRatio(double logFileToParquetCompressionRatio) {
props.setProperty(LOGFILE_TO_PARQUET_COMPRESSION_RATIO, String.valueOf(logFileToParquetCompressionRatio));
return this;
Expand All @@ -124,6 +132,8 @@ public HoodieStorageConfig build() {
LOGFILE_SIZE_MAX_BYTES, DEFAULT_LOGFILE_SIZE_MAX_BYTES);
setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_RATIO),
PARQUET_COMPRESSION_RATIO, DEFAULT_STREAM_COMPRESSION_RATIO);
setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_CODEC),
PARQUET_COMPRESSION_CODEC, DEFAULT_PARQUET_COMPRESSION_CODEC);
setDefaultOnCondition(props, !props.containsKey(LOGFILE_TO_PARQUET_COMPRESSION_RATIO),
LOGFILE_TO_PARQUET_COMPRESSION_RATIO, DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO);
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Properties;
import javax.annotation.concurrent.Immutable;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.storage.StorageLevel;

/**
Expand Down Expand Up @@ -355,6 +356,10 @@ public double getParquetCompressionRatio() {
return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO));
}

public CompressionCodecName getParquetCompressionCodec() {
return CompressionCodecName.fromConf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC));
}

public double getLogFileToParquetCompressionRatio() {
return Double.valueOf(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class HoodieStorageWriterFactory {

Expand All @@ -47,7 +46,7 @@ R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter(String c
new AvroSchemaConverter().convert(schema), schema, filter);

HoodieParquetConfig parquetConfig =
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
new HoodieParquetConfig(writeSupport, config.getParquetCompressionCodec(),
config.getParquetBlockSize(), config.getParquetPageSize(),
config.getParquetMaxFileSize(), hoodieTable.getHadoopConf(),
config.getParquetCompressionRatio());
Expand Down