diff --git a/dataframe-arrow/api/dataframe-arrow.api b/dataframe-arrow/api/dataframe-arrow.api index 2d2e8f1c0f..e0bf301c31 100644 --- a/dataframe-arrow/api/dataframe-arrow.api +++ b/dataframe-arrow/api/dataframe-arrow.api @@ -35,6 +35,14 @@ public final class org/jetbrains/kotlinx/dataframe/io/ArrowReadingKt { public static synthetic fun readArrowIPC$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;Ljava/net/URL;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame; public static synthetic fun readArrowIPC$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;Ljava/nio/channels/ReadableByteChannel;Lorg/apache/arrow/memory/RootAllocator;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame; public static synthetic fun readArrowIPC$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[BLorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame; + public static final fun readParquet (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/io/File;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;J)Lorg/jetbrains/kotlinx/dataframe/DataFrame; + public static final fun readParquet (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/lang/String;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;J)Lorg/jetbrains/kotlinx/dataframe/DataFrame; + public static final fun readParquet (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/net/URL;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;J)Lorg/jetbrains/kotlinx/dataframe/DataFrame; + public static final fun readParquet (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/nio/file/Path;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;J)Lorg/jetbrains/kotlinx/dataframe/DataFrame; + public static synthetic fun readParquet$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/io/File;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;JILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame; + public static synthetic fun readParquet$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/lang/String;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;JILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame; + public static synthetic fun readParquet$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/net/URL;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;JILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame; + public static synthetic fun readParquet$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[Ljava/nio/file/Path;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;JILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame; public static final fun toDataFrame (Lorg/apache/arrow/vector/ipc/ArrowReader;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;)Lorg/jetbrains/kotlinx/dataframe/DataFrame; public static synthetic fun toDataFrame$default (Lorg/apache/arrow/vector/ipc/ArrowReader;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame; } diff --git a/dataframe-arrow/build.gradle.kts b/dataframe-arrow/build.gradle.kts index 20d918ee09..f3b60b52a0 100644 --- a/dataframe-arrow/build.gradle.kts +++ b/dataframe-arrow/build.gradle.kts @@ -20,6 +20,7 @@ dependencies { implementation(libs.arrow.vector) implementation(libs.arrow.format) implementation(libs.arrow.memory) + implementation(libs.arrow.dataset) implementation(libs.commonsCompress) implementation(libs.kotlin.reflect) implementation(libs.kotlin.datetimeJvm) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt index e4f3f79cf2..fb31ec7018 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt @@ -1,5 +1,6 @@ package org.jetbrains.kotlinx.dataframe.io +import org.apache.arrow.dataset.file.FileFormat import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.ipc.ArrowReader import org.apache.commons.compress.utils.SeekableInMemoryByteChannel @@ -16,6 +17,7 @@ import java.nio.channels.Channels import java.nio.channels.ReadableByteChannel import java.nio.channels.SeekableByteChannel import java.nio.file.Files +import java.nio.file.Path public class ArrowFeather : SupportedDataFrameFormat { override fun readDataFrame(stream: InputStream, header: List): AnyFrame = @@ -36,6 +38,8 @@ public class ArrowFeather : SupportedDataFrameFormat { private const val READ_ARROW_FEATHER = "readArrowFeather" +internal const val ARROW_PARQUET_DEFAULT_BATCH_SIZE = 32768L + private class DefaultReadArrowMethod(path: String?) : AbstractDefaultReadMethod(path, MethodArguments.EMPTY, READ_ARROW_FEATHER) @@ -185,3 +189,55 @@ public fun DataFrame.Companion.readArrow( */ public fun ArrowReader.toDataFrame(nullability: NullabilityOptions = NullabilityOptions.Infer): AnyFrame = DataFrame.Companion.readArrowImpl(this, nullability) + +/** + * Read [Parquet](https://parquet.apache.org/) data from existing [urls] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html) + */ +public fun DataFrame.Companion.readParquet( + vararg urls: URL, + nullability: NullabilityOptions = NullabilityOptions.Infer, + batchSize: Long = ARROW_PARQUET_DEFAULT_BATCH_SIZE, +): AnyFrame = + readArrowDatasetImpl( + urls.map { + it.toString() + }.toTypedArray(), + FileFormat.PARQUET, + nullability, + batchSize, + ) + +/** + * Read [Parquet](https://parquet.apache.org/) data from existing [strUrls] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html) + */ +public fun DataFrame.Companion.readParquet( + vararg strUrls: String, + nullability: NullabilityOptions = NullabilityOptions.Infer, + batchSize: Long = ARROW_PARQUET_DEFAULT_BATCH_SIZE, +): AnyFrame = readArrowDatasetImpl(arrayOf(*strUrls), FileFormat.PARQUET, nullability, batchSize) + +/** + * Read [Parquet](https://parquet.apache.org/) data from existing [paths] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html) + */ +public fun DataFrame.Companion.readParquet( + vararg paths: Path, + nullability: NullabilityOptions = NullabilityOptions.Infer, + batchSize: Long = ARROW_PARQUET_DEFAULT_BATCH_SIZE, +): AnyFrame = readArrowDatasetImpl(paths.map { "file:$it" }.toTypedArray(), FileFormat.PARQUET, nullability, batchSize) + +/** + * Read [Parquet](https://parquet.apache.org/) data from existing [files] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html) + */ +public fun DataFrame.Companion.readParquet( + vararg files: File, + nullability: NullabilityOptions = NullabilityOptions.Infer, + batchSize: Long = ARROW_PARQUET_DEFAULT_BATCH_SIZE, +): AnyFrame = + readArrowDatasetImpl( + files.map { + "file:${it.toPath()}" + }.toTypedArray(), + FileFormat.PARQUET, + nullability, + batchSize, + ) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index f7c7eb9407..479ba02d7c 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -6,6 +6,11 @@ import kotlinx.datetime.LocalTime import kotlinx.datetime.toKotlinLocalDate import kotlinx.datetime.toKotlinLocalDateTime import kotlinx.datetime.toKotlinLocalTime +import org.apache.arrow.dataset.file.FileFormat +import org.apache.arrow.dataset.file.FileSystemDatasetFactory +import org.apache.arrow.dataset.jni.DirectReservationListener +import org.apache.arrow.dataset.jni.NativeMemoryPool +import org.apache.arrow.dataset.scanner.ScanOptions import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.BigIntVector import org.apache.arrow.vector.BitVector @@ -59,10 +64,13 @@ import org.jetbrains.kotlinx.dataframe.api.emptyDataFrame import org.jetbrains.kotlinx.dataframe.api.getColumn import org.jetbrains.kotlinx.dataframe.api.toDataFrame import org.jetbrains.kotlinx.dataframe.impl.asList +import java.io.File import java.math.BigDecimal import java.math.BigInteger +import java.net.URI import java.nio.channels.ReadableByteChannel import java.nio.channels.SeekableByteChannel +import java.nio.file.Files import kotlin.reflect.KType import kotlin.reflect.full.withNullability import kotlin.reflect.typeOf @@ -414,3 +422,52 @@ internal fun DataFrame.Companion.readArrowImpl( return flattened.concatKeepingSchema() } } + +private fun resolveArrowDatasetUris(fileUris: Array): Array = + fileUris.map { + when { + it.startsWith("http:", true) -> { + val url = URI.create(it).toURL() + val tempFile = File.createTempFile("kdf", ".parquet") + tempFile.deleteOnExit() + url.openStream().use { input -> + Files.copy(input, tempFile.toPath()) + "file:${tempFile.toPath()}" + } + } + + !it.startsWith("file:", true) && File(it).exists() -> { + "file:$it" + } + + else -> it + } + }.toTypedArray() + +/** + * Read [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html) from [fileUris] + */ +internal fun DataFrame.Companion.readArrowDatasetImpl( + fileUris: Array, + fileFormat: FileFormat, + nullability: NullabilityOptions = NullabilityOptions.Infer, + batchSize: Long = ARROW_PARQUET_DEFAULT_BATCH_SIZE, +): AnyFrame { + val scanOptions = ScanOptions(batchSize) + RootAllocator().use { allocator -> + FileSystemDatasetFactory( + allocator, + NativeMemoryPool.createListenable(DirectReservationListener.instance()), + fileFormat, + resolveArrowDatasetUris(fileUris), + ).use { datasetFactory -> + datasetFactory.finish().use { dataset -> + dataset.newScan(scanOptions).use { scanner -> + scanner.scanBatches().use { reader -> + return readArrowImpl(reader, nullability) + } + } + } + } + } +} diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index 78356f4fed..86cc63cfc2 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -48,8 +48,10 @@ import org.junit.Test import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import java.io.File +import java.net.URI import java.net.URL import java.nio.channels.Channels +import java.nio.file.FileSystems import java.sql.DriverManager import java.util.Locale import kotlin.reflect.typeOf @@ -653,4 +655,69 @@ internal class ArrowKtTest { DataFrame.readArrow(dbArrowReader) shouldBe expected } } + + @Test + fun testReadParquetPath() { + val resourceLocation = testResource("test.arrow.parquet").path + val resourcePath = FileSystems.getDefault().getPath(resourceLocation) + val dataFrame = DataFrame.readParquet(resourcePath) + dataFrame.rowsCount() shouldBe 300 + assertEstimations( + exampleFrame = dataFrame, + expectedNullable = false, + hasNulls = false, + fromParquet = true, + ) + } + + @Test + fun testReadParquetFile() { + val resourceLocation = testResource("test.arrow.parquet").path + val resourcePath = FileSystems.getDefault().getPath(resourceLocation) + val dataFrame = DataFrame.readParquet(resourcePath.toFile()) + dataFrame.rowsCount() shouldBe 300 + assertEstimations( + exampleFrame = dataFrame, + expectedNullable = false, + hasNulls = false, + fromParquet = true, + ) + } + + @Test + fun testReadParquetStringPath() { + val resourceLocation = testResource("test.arrow.parquet").path + val resourcePath = FileSystems.getDefault().getPath(resourceLocation) + val dataFrame = DataFrame.readParquet("$resourcePath") + dataFrame.rowsCount() shouldBe 300 + assertEstimations( + exampleFrame = dataFrame, + expectedNullable = false, + hasNulls = false, + fromParquet = true, + ) + } + + @Test + fun testReadParquetUrl() { + val resourceLocation = testResource("test.arrow.parquet").path + val resourcePath = FileSystems.getDefault().getPath(resourceLocation) + val fileUrl = URI.create("file:$resourcePath").toURL() + val dataFrame = DataFrame.readParquet(fileUrl) + dataFrame.rowsCount() shouldBe 300 + assertEstimations( + exampleFrame = dataFrame, + expectedNullable = false, + hasNulls = false, + fromParquet = true, + ) + } + + @Test + fun testReadMultipleParquetFiles() { + val resourceLocation = testResource("test.arrow.parquet").path + val resourcePath = FileSystems.getDefault().getPath(resourceLocation) + val dataFrame = DataFrame.readParquet(resourcePath, resourcePath, resourcePath) + dataFrame.rowsCount() shouldBe 900 + } } diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt index 2ae83fb36f..12ef641b65 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt @@ -24,7 +24,12 @@ import java.time.LocalTime as JavaLocalTime * Assert that we have got the same data that was originally saved on example creation. * Example generation project is currently located at https://github.com/Kopilov/arrow_example */ -internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) { +internal fun assertEstimations( + exampleFrame: AnyFrame, + expectedNullable: Boolean, + hasNulls: Boolean, + fromParquet: Boolean = false, +) { /** * In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number */ @@ -142,16 +147,27 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate()) } - val datetimeCol = exampleFrame["date64"] as DataColumn - datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) - datetimeCol.forEachIndexed { i, element -> - assertValueOrNull( - rowNumber = iBatch(i), - actual = element, - expected = JavaLocalDateTime - .ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC) - .toKotlinLocalDateTime(), - ) + if (fromParquet) { + // parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time + val datetimeCol = exampleFrame["date64"] as DataColumn + datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) + datetimeCol.forEachIndexed { i, element -> + assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate()) + } + } else { + val datetimeCol = exampleFrame["date64"] as DataColumn + datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) + datetimeCol.forEachIndexed { i, element -> + assertValueOrNull( + rowNumber = iBatch(i), + actual = element, + expected = JavaLocalDateTime.ofEpochSecond( + iBatch(i).toLong() * 60 * 60 * 24 * 30, + 0, + ZoneOffset.UTC, + ).toKotlinLocalDateTime(), + ) + } } val timeSecCol = exampleFrame["time32_seconds"] as DataColumn diff --git a/dataframe-arrow/src/test/resources/test.arrow.parquet b/dataframe-arrow/src/test/resources/test.arrow.parquet new file mode 100644 index 0000000000..cf78b1c255 Binary files /dev/null and b/dataframe-arrow/src/test/resources/test.arrow.parquet differ diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 935572d7c5..0386a1afb6 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -45,7 +45,7 @@ junit-platform = "1.11.3" kotestAsserions = "6.0.0.M1" jsoup = "1.18.3" -arrow = "18.1.0" +arrow = "18.3.0" kodex = "0.4.4" simpleGit = "2.2.1" dependencyVersions = "0.52.0" @@ -54,7 +54,7 @@ shadow = "8.3.5" android-gradle-api = "7.3.1" # need to revise our tests to update ktor = "3.0.1" # needs jupyter compatibility with Kotlin 2.1 to update kotlin-compile-testing = "0.7.1" -duckdb = "1.1.3" +duckdb = "1.2.2.0" buildconfig = "5.6.7" benchmark = "0.4.12" @@ -124,6 +124,7 @@ arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" } arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" } arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" } +arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" } geotools-main = { module = "org.geotools:gt-main", version.ref = "geotools" } geotools-shapefile = { module = "org.geotools:gt-shapefile", version.ref = "geotools" }