From 450dc10103a83083cdc6cc3a6471c6d1b755617f Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Fri, 5 Jan 2018 14:06:18 -0800 Subject: [PATCH 1/2] Improving out of box experience for data source - Fixes #246 - Bump up default parallelism to 1500, to handle large upserts - Add docs on s3 confuration & tuning tips with tested spark knobs - Fix bug to not duplicate hoodie metadata fields when input dataframe is another hoodie dataset - Improve speed of ROTablePathFilter by removing directory check - Move to spark-avro 4.0 to handle issue with nested fields with same name - Keep AvroConversionUtils in sync with spark-avro 4.0 --- docs/configurations.md | 44 ++++++++++++++++++- docs/s3_filesystem.md | 20 ++++++++- .../uber/hoodie/config/HoodieWriteConfig.java | 2 +- .../hoodie/common/util/HoodieAvroUtils.java | 11 ++++- .../hadoop/HoodieROTablePathFilter.java | 10 ++--- hoodie-spark/pom.xml | 2 +- .../com/uber/hoodie/AvroConversionUtils.scala | 25 +++++++++-- .../scala/com/uber/hoodie/DefaultSource.scala | 17 ++++--- 8 files changed, 112 insertions(+), 19 deletions(-) diff --git a/docs/configurations.md b/docs/configurations.md index bd4f4a27a282..f8c94669870f 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -7,6 +7,8 @@ toc: false summary: "Here we list all possible configurations and what they mean" --- +### Configuration + * [HoodieWriteConfig](#HoodieWriteConfig)
Top Level Config which is passed in when HoodieWriteClent is created. - [withPath](#withPath) (hoodie_base_path)
@@ -152,4 +154,44 @@ summary: "Here we list all possible configurations and what they mean" `instant_time <= END_INSTANTTIME` are fetched out. -{% include callout.html content="Hoodie is a young project. A lot of pluggable interfaces and configurations to support diverse workloads need to be created. Get involved [here](https://github.com/uber/hoodie)" type="info" %} +### Tuning + +Writing data via Hoodie happens as a Spark job and thus general rules of spark debugging applies here too. Below is a list of things to keep in mind, if you are looking to improving performance or reliability. + + - **Right operations** : Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. Difference between them is that bulk insert uses a disk based write path to scale to load large inputs without need to cache it. + - **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs + - **Off-heap memory** : Hoodie writes parquet files and that needs good amount of off-heap memory proportional to schema width. Consider setting something like `spark.yarn.executor.memoryOverhead` or `spark.yarn.driver.memoryOverhead`, if you are running into such failures. + - **Spark Memory** : Typically, hoodie needs to be able to read a single file into memory to perform merges or compactions and thus the executor memory should be sufficient to accomodate this. In addition, Hoodie caches the input to be able to intelligently place data and thus leaving some `spark.storage.memoryFraction` will generally help boost performance. + - **Sizing files** : Set `limitFileSize` above judiciously, to balance ingest/write latency vs number of files & consequently metadata overhead associated with it. + - **Timeseries/Log data** : Default configs are tuned for database/nosql changelogs where individual record sizes are large. Another very popular class of data is timeseries/event/log data that tends to be more volumnious with lot more records per partition. In such cases + - Consider tuning the bloom filter accuracy via `.bloomFilterFPP()/bloomFilterNumEntries()` to achieve your target index look up time + - Consider making a key that is prefixed with time of the event, which will enable range pruning & significantly speeding up index lookup. + + Below is a full working production config + + ``` + spark.driver.extraClassPath /etc/hive/conf + spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof + spark.driver.maxResultSize 2g + spark.driver.memory 4g + spark.executor.cores 1 + spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof + spark.executor.id driver + spark.executor.instances 300 + spark.executor.memory 6g + spark.rdd.compress true + + spark.kryoserializer.buffer.max 512m + spark.serializer org.apache.spark.serializer.KryoSerializer + spark.shuffle.memoryFraction 0.2 + spark.shuffle.service.enabled true + spark.sql.hive.convertMetastoreParquet false + spark.storage.memoryFraction 0.6 + spark.submit.deployMode cluster + spark.task.cpus 1 + spark.task.maxFailures 4 + + spark.yarn.driver.memoryOverhead 1024 + spark.yarn.executor.memoryOverhead 3072 + spark.yarn.max.executor.failures 100 + ``` diff --git a/docs/s3_filesystem.md b/docs/s3_filesystem.md index adb1cefcb5a4..09d70e62530e 100644 --- a/docs/s3_filesystem.md +++ b/docs/s3_filesystem.md @@ -17,7 +17,9 @@ There are two configurations required for Hoodie-S3 compatibility: ### AWS Credentials -Add the required configs in your core-site.xml from where Hoodie can fetch them. Replace the `fs.defaultFS` with your S3 bucket name and Hoodie should be able to read/write from the bucket. +Simplest way to use Hoodie with S3, is to configure your `SparkSession` or `SparkContext` with S3 credentials. Hoodie will automatically pick this up and talk to S3. + +Alternatively, add the required configs in your core-site.xml from where Hoodie can fetch them. Replace the `fs.defaultFS` with your S3 bucket name and Hoodie should be able to read/write from the bucket. ``` @@ -51,6 +53,22 @@ Add the required configs in your core-site.xml from where Hoodie can fetch them. ``` + +Utilities such as hoodie-cli or deltastreamer tool, can pick up s3 creds via environmental variable prefixed with `HOODIE_ENV_`. For e.g below is a bash snippet to setup +such variables and then have cli be able to work on datasets stored in s3 + +``` +export HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key=$accessKey +export HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key=$secretKey +export HOODIE_ENV_fs_DOT_s3_DOT_awsAccessKeyId=$accessKey +export HOODIE_ENV_fs_DOT_s3_DOT_awsSecretAccessKey=$secretKey +export HOODIE_ENV_fs_DOT_s3n_DOT_awsAccessKeyId=$accessKey +export HOODIE_ENV_fs_DOT_s3n_DOT_awsSecretAccessKey=$secretKey +export HOODIE_ENV_fs_DOT_s3n_DOT_impl=org.apache.hadoop.fs.s3a.S3AFileSystem +``` + + + ### AWS Libs AWS hadoop libraries to add to our classpath diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index e0fe99379e0d..ebec1dd7b818 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -41,7 +41,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String TABLE_NAME = "hoodie.table.name"; private static final String BASE_PATH_PROP = "hoodie.base.path"; private static final String AVRO_SCHEMA = "hoodie.avro.schema"; - private static final String DEFAULT_PARALLELISM = "200"; + private static final String DEFAULT_PARALLELISM = "1500"; private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"; private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java index 09d1a2cab807..45b69e0b6a82 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java @@ -74,6 +74,13 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx return reader.read(null, decoder); } + public static boolean isMetadataField(String fieldName) { + return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName) + || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName) + || HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName) + || HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName) + || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName); + } /** * Adds the Hoodie metadata fields to the given schema @@ -98,7 +105,9 @@ public static Schema addMetadataFields(Schema schema) { parentFields.add(partitionPathField); parentFields.add(fileNameField); for (Schema.Field field : schema.getFields()) { - parentFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), null)); + if (!isMetadataField(field.name())) { + parentFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), null)); + } } Schema mergedSchema = Schema diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java index 44672ec4b645..158aa00391db 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java @@ -20,7 +20,6 @@ import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; -import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.DatasetNotFoundException; import com.uber.hoodie.exception.HoodieException; import java.io.Serializable; @@ -61,6 +60,9 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { private HashSet nonHoodiePathCache; + private transient FileSystem fs; + + public HoodieROTablePathFilter() { hoodiePathCache = new HashMap<>(); nonHoodiePathCache = new HashSet<>(); @@ -79,7 +81,6 @@ private Path safeGetParentsParent(Path path) { return null; } - @Override public boolean accept(Path path) { @@ -88,9 +89,8 @@ public boolean accept(Path path) { } Path folder = null; try { - FileSystem fs = path.getFileSystem(FSUtils.prepareHadoopConf(new Configuration())); - if (fs.isDirectory(path)) { - return true; + if (fs == null) { + fs = path.getFileSystem(new Configuration()); } // Assumes path is a file diff --git a/hoodie-spark/pom.xml b/hoodie-spark/pom.xml index 3e8878ea9454..634dab06b676 100644 --- a/hoodie-spark/pom.xml +++ b/hoodie-spark/pom.xml @@ -142,7 +142,7 @@ com.databricks spark-avro_2.11 - 3.2.0 + 4.0.0 com.fasterxml.jackson.core diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala index 4312636fa258..e92043490690 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala @@ -43,6 +43,16 @@ object AvroConversionUtils { } } + def getNewRecordNamespace(elementDataType: DataType, + currentRecordNamespace: String, + elementName: String): String = { + + elementDataType match { + case StructType(_) => s"$currentRecordNamespace.$elementName" + case _ => currentRecordNamespace + } + } + def createConverterToAvro(dataType: DataType, structName: String, recordNamespace: String): (Any) => Any = { @@ -60,7 +70,10 @@ object AvroConversionUtils { case DateType => (item: Any) => if (item == null) null else item.asInstanceOf[Date].getTime case ArrayType(elementType, _) => - val elementConverter = createConverterToAvro(elementType, structName, recordNamespace) + val elementConverter = createConverterToAvro( + elementType, + structName, + getNewRecordNamespace(elementType, recordNamespace, structName)) (item: Any) => { if (item == null) { null @@ -77,7 +90,10 @@ object AvroConversionUtils { } } case MapType(StringType, valueType, _) => - val valueConverter = createConverterToAvro(valueType, structName, recordNamespace) + val valueConverter = createConverterToAvro( + valueType, + structName, + getNewRecordNamespace(valueType, recordNamespace, structName)) (item: Any) => { if (item == null) { null @@ -94,7 +110,10 @@ object AvroConversionUtils { val schema: Schema = SchemaConverters.convertStructToAvro( structType, builder, recordNamespace) val fieldConverters = structType.fields.map(field => - createConverterToAvro(field.dataType, field.name, recordNamespace)) + createConverterToAvro( + field.dataType, + field.name, + getNewRecordNamespace(field.dataType, recordNamespace, field.name))) (item: Any) => { if (item == null) { null diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala index dec9160e22d9..dd744da6d507 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala @@ -134,11 +134,16 @@ class DefaultSource extends RelationProvider df: DataFrame): BaseRelation = { val parameters = parametersWithWriteDefaults(optParams).toMap + val sparkContext = sqlContext.sparkContext val path = parameters.get("path") val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) if (path.isEmpty || tblName.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } + val serializer = sparkContext.getConf.get("spark.serializer") + if (!serializer.equals("org.apache.spark.serializer.KryoSerializer")) { + throw new HoodieException(s"${serializer} serialization is not supported by hoodie. Please use kryo.") + } val storageType = parameters(STORAGE_TYPE_OPT_KEY) val operation = parameters(OPERATION_OPT_KEY) @@ -146,11 +151,12 @@ class DefaultSource extends RelationProvider // register classes & schemas val structName = s"${tblName.get}_record" val nameSpace = s"hoodie.${tblName.get}" - sqlContext.sparkContext.getConf.registerKryoClasses( + sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - sqlContext.sparkContext.getConf.registerAvroSchemas(schema) + sparkContext.getConf.registerAvroSchemas(schema) + log.info(s"Registered avro schema : ${schema.toString(true)}"); // Convert to RDD[HoodieRecord] val keyGenerator = DataSourceUtils.createKeyGenerator( @@ -167,7 +173,7 @@ class DefaultSource extends RelationProvider val basePath = new Path(parameters.get("path").get) - val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) var exists = fs.exists(basePath) // Handle various save modes @@ -190,12 +196,11 @@ class DefaultSource extends RelationProvider properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get); properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType); properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived"); - HoodieTableMetaClient.initializePathAsHoodieDataset( - sqlContext.sparkContext.hadoopConfiguration, path.get, properties); + HoodieTableMetaClient.initializePathAsHoodieDataset(sparkContext.hadoopConfiguration, path.get, properties); } // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(new JavaSparkContext(sqlContext.sparkContext), + val client = DataSourceUtils.createHoodieClient(new JavaSparkContext(sparkContext), schema.toString, path.get, tblName.get, From 0e91883166508fab59aa1361f34f1d5a59a52631 Mon Sep 17 00:00:00 2001 From: vinothchandar Date: Sun, 10 Jun 2018 18:54:58 -0700 Subject: [PATCH 2/2] Fixing deps & serialization for RTView - hoodie-hadoop-mr now needs objectsize bundled - Also updated docs with additional tuning tips --- docs/configurations.md | 5 ++++- hoodie-hadoop-mr/pom.xml | 6 ++++++ .../hadoop/realtime/HoodieRealtimeFileSplit.java | 16 ++++++++-------- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/docs/configurations.md b/docs/configurations.md index f8c94669870f..4639fb2e5237 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -159,13 +159,16 @@ summary: "Here we list all possible configurations and what they mean" Writing data via Hoodie happens as a Spark job and thus general rules of spark debugging applies here too. Below is a list of things to keep in mind, if you are looking to improving performance or reliability. - **Right operations** : Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. Difference between them is that bulk insert uses a disk based write path to scale to load large inputs without need to cache it. - - **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs + - **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs. We recommend having shuffle parallelism `hoodie.[insert|upsert|bulkinsert].shuffle.parallelism` such that its atleast input_data_size/500MB - **Off-heap memory** : Hoodie writes parquet files and that needs good amount of off-heap memory proportional to schema width. Consider setting something like `spark.yarn.executor.memoryOverhead` or `spark.yarn.driver.memoryOverhead`, if you are running into such failures. - **Spark Memory** : Typically, hoodie needs to be able to read a single file into memory to perform merges or compactions and thus the executor memory should be sufficient to accomodate this. In addition, Hoodie caches the input to be able to intelligently place data and thus leaving some `spark.storage.memoryFraction` will generally help boost performance. - **Sizing files** : Set `limitFileSize` above judiciously, to balance ingest/write latency vs number of files & consequently metadata overhead associated with it. - **Timeseries/Log data** : Default configs are tuned for database/nosql changelogs where individual record sizes are large. Another very popular class of data is timeseries/event/log data that tends to be more volumnious with lot more records per partition. In such cases - Consider tuning the bloom filter accuracy via `.bloomFilterFPP()/bloomFilterNumEntries()` to achieve your target index look up time - Consider making a key that is prefixed with time of the event, which will enable range pruning & significantly speeding up index lookup. + - **GC Tuning** : Please be sure to follow garbage collection tuning tips from Spark tuning guide to avoid OutOfMemory errors + - [Must] Use G1/CMS Collector. Sample CMS Flags to add to spark.executor.extraJavaOptions : ``-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof` + - If it keeps OOMing still, reduce spark memory conservatively: `spark.memory.fraction=0.2, spark.memory.storageFraction=0.2` allowing it to spill rather than OOM. (reliably slow vs crashing intermittently) Below is a full working production config diff --git a/hoodie-hadoop-mr/pom.xml b/hoodie-hadoop-mr/pom.xml index 7bf618011ea9..eae2eb8d74e5 100644 --- a/hoodie-hadoop-mr/pom.xml +++ b/hoodie-hadoop-mr/pom.xml @@ -79,6 +79,11 @@ com.twitter parquet-avro + + com.twitter.common + objectsize + 0.0.12 + org.apache.avro avro @@ -114,6 +119,7 @@ com.uber.hoodie:hoodie-common com.twitter:parquet-avro + com.twitter.common:objectsize diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java index 5ba7545b713a..36f6a54b7280 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -63,22 +63,22 @@ public String getBasePath() { } private static void writeString(String str, DataOutput out) throws IOException { - byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8); - out.writeInt(pathBytes.length); - out.write(pathBytes); + byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + out.writeInt(bytes.length); + out.write(bytes); } private static String readString(DataInput in) throws IOException { - byte[] pathBytes = new byte[in.readInt()]; - in.readFully(pathBytes); - return new String(pathBytes, StandardCharsets.UTF_8); + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + return new String(bytes, StandardCharsets.UTF_8); } @Override public void write(DataOutput out) throws IOException { super.write(out); - + writeString(basePath, out); writeString(maxCommitTime, out); out.writeInt(deltaFilePaths.size()); for (String logFilePath : deltaFilePaths) { @@ -89,7 +89,7 @@ public void write(DataOutput out) throws IOException { @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - + basePath = readString(in); maxCommitTime = readString(in); int totalLogFiles = in.readInt(); deltaFilePaths = new ArrayList<>(totalLogFiles);