diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java index 5d111846f84..856001a2b0f 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java @@ -77,8 +77,8 @@ private static Converter createElementConverter( ArrayType typeFromClient, GroupType typeFromFile) { - checkArgument( - typeFromFile.getFieldCount() == 1, "Expected exactly one field in the array type"); + checkArgument(typeFromFile.getFieldCount() == 1, + "Expected exactly one field in the array type, but got: " + typeFromFile); GroupType repeatedGroup = typeFromFile.getType(0).asGroupType(); // TODO: handle the legacy 2-level list physical format diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java index b2b3a64ed29..03627ae861c 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java @@ -24,6 +24,8 @@ import io.delta.kernel.data.ColumnVector; import io.delta.kernel.types.MapType; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + import io.delta.kernel.defaults.internal.data.vector.DefaultMapVector; import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.createConverter; @@ -57,10 +59,16 @@ public ColumnVector getDataColumnVector(int batchSize) { } private static Converter[] createElementConverters( - int initialBatchSize, - MapType typeFromClient, - GroupType typeFromFile) { - final GroupType innerMapType = (GroupType) typeFromFile.getType("key_value"); + int initialBatchSize, + MapType typeFromClient, + GroupType typeFromFile) { + // Repeated element can be any name. Latest Parquet versions use "key_value" as the name, + // but legacy versions can use any arbitrary name for the repeated group. + // See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for details + checkArgument(typeFromFile.getFieldCount() == 1, + "Expected exactly one repeated field in the map type, but got: " + typeFromFile); + + GroupType innerMapType = typeFromFile.getType(0).asGroupType(); Converter[] elemConverters = new Converter[2]; elemConverters[0] = createConverter( initialBatchSize, diff --git a/kernel/kernel-defaults/src/test/resources/parquet/parquet-thrift-compat.snappy.parquet b/kernel/kernel-defaults/src/test/resources/parquet/parquet-thrift-compat.snappy.parquet new file mode 100644 index 00000000000..837e4876eea Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/parquet/parquet-thrift-compat.snappy.parquet differ diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala index d88c36ed245..7306c2cd952 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala @@ -135,10 +135,66 @@ class ParquetFileReaderSuite extends AnyFunSuite checkAnswer(actResult1, expResult1) // File with multiple row-groups [0, 20000) where rowIndex = id - val filePath = getTestResourceFilePath("parquet/") + val filePath = getTestResourceFilePath("parquet/row_index_multiple_row_groups.parquet") val actResult2 = readParquetFilesUsingKernel(filePath, readSchema) val expResult2 = (0L until 20000L).map(i => TestRow(i, i)) checkAnswer(actResult2, expResult2) } + + ///////////////////////////////////////////////////////////////////////////////////////////////// + // Test compatibility with Parquet legacy format files // + ///////////////////////////////////////////////////////////////////////////////////////////////// + + // Test and the test file are copied from Spark's `ParquetThriftCompatibilitySuite` + test("read parquet file generated by parquet-thrift") { + val parquetFilePath = getTestResourceFilePath("parquet/parquet-thrift-compat.snappy.parquet") + + val readSchema = new StructType() + .add("boolColumn", BooleanType.BOOLEAN) + .add("byteColumn", ByteType.BYTE) + .add("shortColumn", ShortType.SHORT) + .add("intColumn", IntegerType.INTEGER) + .add("longColumn", LongType.LONG) + .add("doubleColumn", DoubleType.DOUBLE) + // Thrift `BINARY` values are actually unencoded `STRING` values, and thus are always + // treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift always assume + // Thrift `STRING`s are encoded using UTF-8. + .add("binaryColumn", StringType.STRING) + .add("stringColumn", StringType.STRING) + .add("enumColumn", StringType.STRING) + // maybe indicates nullable columns, above ones are non-nullable + .add("maybeBoolColumn", BooleanType.BOOLEAN) + .add("maybeByteColumn", ByteType.BYTE) + .add("maybeShortColumn", ShortType.SHORT) + .add("maybeIntColumn", IntegerType.INTEGER) + .add("maybeLongColumn", LongType.LONG) + .add("maybeDoubleColumn", DoubleType.DOUBLE) + // Thrift `BINARY` values are actually unencoded `STRING` values, and thus are always + // treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift always assume + // Thrift `STRING`s are encoded using UTF-8. + .add("maybeBinaryColumn", StringType.STRING) + .add("maybeStringColumn", StringType.STRING) + .add("maybeEnumColumn", StringType.STRING) + // TODO: not working - separate PR to handle 2-level legacy lists + // .add("stringsColumn", new ArrayType(StringType.STRING, true /* containsNull */)) + // .add("intSetColumn", new ArrayType(IntegerType.INTEGER, true /* containsNull */)) + .add("intToStringColumn", + new MapType(IntegerType.INTEGER, StringType.STRING, true /* valueContainsNull */)) + // TODO: not working - separate PR to handle 2-level legacy lists + // .add("complexColumn", new MapType( + // IntegerType.INTEGER, + // new ArrayType( + // new StructType() + // .add("nestedIntsColumn", new ArrayType(IntegerType.INTEGER, true /* containsNull */)) + // .add("nestedStringColumn", StringType.STRING) + // .add("stringColumn", StringType.STRING), + // true /* containsNull */), + // true /* valueContainsNull */)) + + assert(parquetFileRowCount(parquetFilePath) === 10) + checkAnswer( + readParquetFilesUsingKernel(parquetFilePath, readSchema), /* actual */ + readParquetFilesUsingSpark(parquetFilePath, readSchema) /* expected */) + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala index f3afde8094f..b6d18dc4996 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala @@ -207,27 +207,24 @@ trait ParquetSuiteBase extends TestUtils { } def readParquetUsingKernelAsColumnarBatches( - actualFileDir: String, + inputFileOrDir: String, readSchema: StructType, predicate: Optional[Predicate] = Optional.empty()): Seq[ColumnarBatch] = { - val parquetFiles = Files.list(Paths.get(actualFileDir)) - .iterator().asScala - .map(_.toString) - .filter(path => path.endsWith(".parquet")) - .map(path => FileStatus.of(path, 0L, 0L)) + val parquetFileList = parquetFiles(inputFileOrDir) + .map(FileStatus.of(_, 0, 0)) val data = defaultEngine.getParquetHandler.readParquetFiles( - toCloseableIterator(parquetFiles.asJava), + toCloseableIterator(parquetFileList.asJava.iterator()), readSchema, predicate) data.asScala.toSeq } - def parquetFileCount(path: String): Long = parquetFiles(path).size + def parquetFileCount(fileOrDir: String): Long = parquetFiles(fileOrDir).size - def parquetFileRowCount(path: String): Long = { - val files = parquetFiles(path) + def parquetFileRowCount(fileOrDir: String): Long = { + val files = parquetFiles(fileOrDir) var rowCount = 0L files.foreach { file => @@ -238,12 +235,17 @@ trait ParquetSuiteBase extends TestUtils { rowCount } - def parquetFiles(path: String): Seq[String] = { - Files.list(Paths.get(path)) - .iterator().asScala - .map(_.toString) - .filter(path => path.endsWith(".parquet")) - .toSeq + def parquetFiles(fileOrDir: String): Seq[String] = { + val fileOrDirPath = Paths.get(fileOrDir) + if (Files.isDirectory(fileOrDirPath)) { + Files.list(fileOrDirPath) + .iterator().asScala + .map(_.toString) + .filter(path => path.endsWith(".parquet")) + .toSeq + } else { + Seq(fileOrDir) + } } def footer(path: String): ParquetMetadata = {