From b9fe0e1d26c5aeb340b9808ad09de46e1dec4a7b Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Tue, 14 May 2024 16:23:03 -0700 Subject: [PATCH] [Kernel][Defaults] Handle legacy map types in Parquet files (#3097) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description Currently, Kernel's Parquet reader explicitly looks for the `key_value` repeated group under the Parquet map type, but the older versions of Parquet writers wrote any name for the repeated group. Instead of looking for the explicit `key_value` element, fetch the first element in the list. See [here](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps) for more details. ## How was this patch tested? The [test](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala#L29) and sample file written by legacy writers are taken from Apache Sparkā„¢. Some columns (arrays with 2-level encoding, another legacy format) from the test file are currently not supported. I will follow up with a separate PR. It involves bit refactoring on the ArrayColumnReader. --- .../internal/parquet/ArrayColumnReader.java | 4 +- .../internal/parquet/MapColumnReader.java | 16 +++-- .../parquet-thrift-compat.snappy.parquet | Bin 0 -> 10550 bytes .../parquet/ParquetFileReaderSuite.scala | 58 +++++++++++++++++- .../internal/parquet/ParquetSuiteBase.scala | 34 +++++----- 5 files changed, 89 insertions(+), 23 deletions(-) create mode 100644 kernel/kernel-defaults/src/test/resources/parquet/parquet-thrift-compat.snappy.parquet diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java index 5d111846f84..856001a2b0f 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java @@ -77,8 +77,8 @@ private static Converter createElementConverter( ArrayType typeFromClient, GroupType typeFromFile) { - checkArgument( - typeFromFile.getFieldCount() == 1, "Expected exactly one field in the array type"); + checkArgument(typeFromFile.getFieldCount() == 1, + "Expected exactly one field in the array type, but got: " + typeFromFile); GroupType repeatedGroup = typeFromFile.getType(0).asGroupType(); // TODO: handle the legacy 2-level list physical format diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java index b2b3a64ed29..03627ae861c 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java @@ -24,6 +24,8 @@ import io.delta.kernel.data.ColumnVector; import io.delta.kernel.types.MapType; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + import io.delta.kernel.defaults.internal.data.vector.DefaultMapVector; import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.createConverter; @@ -57,10 +59,16 @@ public ColumnVector getDataColumnVector(int batchSize) { } private static Converter[] createElementConverters( - int initialBatchSize, - MapType typeFromClient, - GroupType typeFromFile) { - final GroupType innerMapType = (GroupType) typeFromFile.getType("key_value"); + int initialBatchSize, + MapType typeFromClient, + GroupType typeFromFile) { + // Repeated element can be any name. Latest Parquet versions use "key_value" as the name, + // but legacy versions can use any arbitrary name for the repeated group. + // See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for details + checkArgument(typeFromFile.getFieldCount() == 1, + "Expected exactly one repeated field in the map type, but got: " + typeFromFile); + + GroupType innerMapType = typeFromFile.getType(0).asGroupType(); Converter[] elemConverters = new Converter[2]; elemConverters[0] = createConverter( initialBatchSize, diff --git a/kernel/kernel-defaults/src/test/resources/parquet/parquet-thrift-compat.snappy.parquet b/kernel/kernel-defaults/src/test/resources/parquet/parquet-thrift-compat.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..837e4876eea6385092610bb59016c3a7b97e1c72 GIT binary patch literal 10550 zcmeHNO>7&-72ctiG^rn3a=43St<GQ5yt4etcF+PPFyeaVeeb<_^Ua&LLr)LSL?l5HC4aXRga^OmZ&wIWh#(8_@rbho zdZ(o{H!KEVL=kbE3UzJbBZ20D)&LF1ywH5m_CWJPLy%onBlIdFVhvUGs6mijRTEW( zE*PqI@&?r)2vq%)Nb}qb{b!&XKMHzIKUC+mAhZynvBfV0p?w)z7qlp}1hnxMe*z@v zhbho?U*dJ2qo0l;#lP_;em*#^>lQcOB*g#0@jfpR9~=*~iexC@Zw?Io{v$$!R?v?& zl@|Y4`;e|8YEW?tiZm{qNDj}WQyBNLiQ$t|r^Zs~&gkUq zNDAC(mRfO^z*i)yPe&tX^!!lVk)8D1wN zoZ|1Jp9jZvoh*{T+2#kwk-*KwLfFmMfQHT{kh@EyS0*I|nO~q=GE*$> zv?YajTR=SOYdO;$=|$fV62ZJZB8_)T32NQfu6VH@Ha*oycf+AI^fmDA;-3eQe+eQ6 zTK7kz{ps}Six)3lzrJxBH_C!E#}*s3RHyg;AVPzJ*Vl@>Kp@4Oby}jH*-=m3lX8f29jUK6&Ioaevn{z+$Jyf$=l_bREK8+H zsU67|zl#B7d99v>X$JWgXuz0T4bnYNx3noT>WMhO)dY9QbP{@!+n@d8y-yw<`|ROw z;NcGsUm*eigZDlm0bzUNfN)|Tn%xvdF^Q}c4;^}Dlmt41NKkkjl}I3rTvbO$lNt;R zAZ_7M8-xL5@Ci*jzYYV>A(`QE{dpKbUVm?W{Rdkx@DfrS9=BeG0eHOq`qtJjJ|uzo z=Iz_tcj58FAL9cSK(>9k8@{|vq@R#(Jd`mp;MTNJ=5&7uCLdbZ_*w3?8%H*7L<7QK z8tDePMH<9VtLG5JcFpAqnc}K0br^i2ohzkcjzL*xqgE&{8ghPg(xdEK z%&g97BNk81UW>aPrIpI2tUh~`y|jCRcbsXMw0Q4P+NkJQ)f8C&h;1HhBxRcnvykLx z6oW!dpZAJBOUWboFdm}vl%@Y{sl1%mAS{4rhLEQ;BkDT91AzT}w=R#;ZP1-w;{5@L zM~I0^!(wI`cn38Qu64tU+A3Q1P!5(2TP@k8MKFBH^v(?#V>s#%Xm0{0mo?oFumPU5 z7POU;HV<*POcs5YtA`B#DKqx#gZvaA5OOfo9(oi~{q2$vIxP4@JbfjP9F(7xkE#9y z_m-lLw*&nRS(7SM_6;Kckf`Gc+o`7BB_@bxKZm+Y0H&8?m?r7jqu%$Ju*w$UK=(ZW zs(KYszb`_{H5{z$BZ~Svk$+7vc~wQP zzSWAv&~?}o8q@MN0YQ@sCOcq-jZ|?nq^lA`aeK1HB#o(=dh{zZ+FWO!8Z^6En#%1}UvBQ*F9( z02pqJOifL8P!*Tq@8xd0bu$ApRh;$cZmmC`PS&)}m6HzU`*vx5A~I0ZI#fpbIptux zf48>#qBU(-j&}#+vE3RU=&xyk9`D26yQJ?7oRp->4Y_ zzD{xQApYb%sMI?S_K)t;e)81pNkc7U1Dg9+`20h&ik>8TbyKv*%u4zVmL5Saad0pr zHCbW$ZCEXskyETNc7H>ea;N*ncSx~PNbtGfz2VcxP`Qdgb)ayqbDVh4gzZ5w6;dV4U zZa*q77fxQd9S`+-@&EK}t3>q!*|-{ViurDj#L0=2UXarxq;e#(niy>Yp+8ydw^B&M4U{SZztlpQ0RI=`PIkb+_DKJFU4}rSrJg+~Uap$a>p-{-V~}lB;RTaLU(o zz|7e@drRe=_Z1zgBP~%mr%X-#?LHw?qKh?j7aNCOkTym>9T<2t{ zUAGC*L1R+3B{40zEsZH)E>RuRq9)zf$kh2crbkuQnj#f2rfJEK%o={PBe`~8wUl|~ z#>1-D*`emOh;!lKUZ0tW?QOLxbul^o?x+<8c4=6hXqf`r+`=!gbAyR=K;cx*mZplc zh1^mhZQM$-+fbvZ(S>wm literal 0 HcmV?d00001 diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala index d88c36ed245..7306c2cd952 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala @@ -135,10 +135,66 @@ class ParquetFileReaderSuite extends AnyFunSuite checkAnswer(actResult1, expResult1) // File with multiple row-groups [0, 20000) where rowIndex = id - val filePath = getTestResourceFilePath("parquet/") + val filePath = getTestResourceFilePath("parquet/row_index_multiple_row_groups.parquet") val actResult2 = readParquetFilesUsingKernel(filePath, readSchema) val expResult2 = (0L until 20000L).map(i => TestRow(i, i)) checkAnswer(actResult2, expResult2) } + + ///////////////////////////////////////////////////////////////////////////////////////////////// + // Test compatibility with Parquet legacy format files // + ///////////////////////////////////////////////////////////////////////////////////////////////// + + // Test and the test file are copied from Spark's `ParquetThriftCompatibilitySuite` + test("read parquet file generated by parquet-thrift") { + val parquetFilePath = getTestResourceFilePath("parquet/parquet-thrift-compat.snappy.parquet") + + val readSchema = new StructType() + .add("boolColumn", BooleanType.BOOLEAN) + .add("byteColumn", ByteType.BYTE) + .add("shortColumn", ShortType.SHORT) + .add("intColumn", IntegerType.INTEGER) + .add("longColumn", LongType.LONG) + .add("doubleColumn", DoubleType.DOUBLE) + // Thrift `BINARY` values are actually unencoded `STRING` values, and thus are always + // treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift always assume + // Thrift `STRING`s are encoded using UTF-8. + .add("binaryColumn", StringType.STRING) + .add("stringColumn", StringType.STRING) + .add("enumColumn", StringType.STRING) + // maybe indicates nullable columns, above ones are non-nullable + .add("maybeBoolColumn", BooleanType.BOOLEAN) + .add("maybeByteColumn", ByteType.BYTE) + .add("maybeShortColumn", ShortType.SHORT) + .add("maybeIntColumn", IntegerType.INTEGER) + .add("maybeLongColumn", LongType.LONG) + .add("maybeDoubleColumn", DoubleType.DOUBLE) + // Thrift `BINARY` values are actually unencoded `STRING` values, and thus are always + // treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift always assume + // Thrift `STRING`s are encoded using UTF-8. + .add("maybeBinaryColumn", StringType.STRING) + .add("maybeStringColumn", StringType.STRING) + .add("maybeEnumColumn", StringType.STRING) + // TODO: not working - separate PR to handle 2-level legacy lists + // .add("stringsColumn", new ArrayType(StringType.STRING, true /* containsNull */)) + // .add("intSetColumn", new ArrayType(IntegerType.INTEGER, true /* containsNull */)) + .add("intToStringColumn", + new MapType(IntegerType.INTEGER, StringType.STRING, true /* valueContainsNull */)) + // TODO: not working - separate PR to handle 2-level legacy lists + // .add("complexColumn", new MapType( + // IntegerType.INTEGER, + // new ArrayType( + // new StructType() + // .add("nestedIntsColumn", new ArrayType(IntegerType.INTEGER, true /* containsNull */)) + // .add("nestedStringColumn", StringType.STRING) + // .add("stringColumn", StringType.STRING), + // true /* containsNull */), + // true /* valueContainsNull */)) + + assert(parquetFileRowCount(parquetFilePath) === 10) + checkAnswer( + readParquetFilesUsingKernel(parquetFilePath, readSchema), /* actual */ + readParquetFilesUsingSpark(parquetFilePath, readSchema) /* expected */) + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala index f3afde8094f..b6d18dc4996 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala @@ -207,27 +207,24 @@ trait ParquetSuiteBase extends TestUtils { } def readParquetUsingKernelAsColumnarBatches( - actualFileDir: String, + inputFileOrDir: String, readSchema: StructType, predicate: Optional[Predicate] = Optional.empty()): Seq[ColumnarBatch] = { - val parquetFiles = Files.list(Paths.get(actualFileDir)) - .iterator().asScala - .map(_.toString) - .filter(path => path.endsWith(".parquet")) - .map(path => FileStatus.of(path, 0L, 0L)) + val parquetFileList = parquetFiles(inputFileOrDir) + .map(FileStatus.of(_, 0, 0)) val data = defaultEngine.getParquetHandler.readParquetFiles( - toCloseableIterator(parquetFiles.asJava), + toCloseableIterator(parquetFileList.asJava.iterator()), readSchema, predicate) data.asScala.toSeq } - def parquetFileCount(path: String): Long = parquetFiles(path).size + def parquetFileCount(fileOrDir: String): Long = parquetFiles(fileOrDir).size - def parquetFileRowCount(path: String): Long = { - val files = parquetFiles(path) + def parquetFileRowCount(fileOrDir: String): Long = { + val files = parquetFiles(fileOrDir) var rowCount = 0L files.foreach { file => @@ -238,12 +235,17 @@ trait ParquetSuiteBase extends TestUtils { rowCount } - def parquetFiles(path: String): Seq[String] = { - Files.list(Paths.get(path)) - .iterator().asScala - .map(_.toString) - .filter(path => path.endsWith(".parquet")) - .toSeq + def parquetFiles(fileOrDir: String): Seq[String] = { + val fileOrDirPath = Paths.get(fileOrDir) + if (Files.isDirectory(fileOrDirPath)) { + Files.list(fileOrDirPath) + .iterator().asScala + .map(_.toString) + .filter(path => path.endsWith(".parquet")) + .toSeq + } else { + Seq(fileOrDir) + } } def footer(path: String): ParquetMetadata = {