diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java index 0403b62ebb465..9a2a3fb80c1fc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java @@ -44,6 +44,9 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { public static final String PARQUET_COMPRESSION_RATIO = "hoodie.parquet.compression.ratio"; // Default compression ratio for parquet public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1); + public static final String PARQUET_COMPRESSION_CODEC = "hoodie.parquet.compression.codec"; + // Default compression codec for parquet + public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip"; public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio"; // Default compression ratio for log file to parquet, general 3x public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35); @@ -105,6 +108,11 @@ public Builder parquetCompressionRatio(double parquetCompressionRatio) { return this; } + public Builder parquetCompressionCodec(String parquetCompressionCodec) { + props.setProperty(PARQUET_COMPRESSION_CODEC, parquetCompressionCodec); + return this; + } + public Builder logFileToParquetCompressionRatio(double logFileToParquetCompressionRatio) { props.setProperty(LOGFILE_TO_PARQUET_COMPRESSION_RATIO, String.valueOf(logFileToParquetCompressionRatio)); return this; @@ -124,6 +132,8 @@ public HoodieStorageConfig build() { LOGFILE_SIZE_MAX_BYTES, DEFAULT_LOGFILE_SIZE_MAX_BYTES); setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_RATIO), PARQUET_COMPRESSION_RATIO, DEFAULT_STREAM_COMPRESSION_RATIO); + setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_CODEC), + PARQUET_COMPRESSION_CODEC, DEFAULT_PARQUET_COMPRESSION_CODEC); setDefaultOnCondition(props, !props.containsKey(LOGFILE_TO_PARQUET_COMPRESSION_RATIO), LOGFILE_TO_PARQUET_COMPRESSION_RATIO, DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO); return config; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 07ddc9673ec21..d1a5e467fdafa 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Properties; import javax.annotation.concurrent.Immutable; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.spark.storage.StorageLevel; /** @@ -355,6 +356,10 @@ public double getParquetCompressionRatio() { return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO)); } + public CompressionCodecName getParquetCompressionCodec() { + return CompressionCodecName.fromConf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC)); + } + public double getLogFileToParquetCompressionRatio() { return Double.valueOf(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO)); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java index 2e83108e434bf..13874ca3623ba 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java @@ -26,7 +26,6 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; public class HoodieStorageWriterFactory { @@ -47,7 +46,7 @@ R extends IndexedRecord> HoodieStorageWriter newParquetStorageWriter(String c new AvroSchemaConverter().convert(schema), schema, filter); HoodieParquetConfig parquetConfig = - new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, + new HoodieParquetConfig(writeSupport, config.getParquetCompressionCodec(), config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(), hoodieTable.getHadoopConf(), config.getParquetCompressionRatio());