Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Projection over partition values #370

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.8.3
version = 3.8.6
preset = default
maxColumn = 120

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,13 @@ object ParquetSource extends IOOps {
partitionedDirectory => {
val projectedSchemaOpt = projectedSchemaResolverOpt
.map(implicit resolver => ParquetSchemaResolver.resolveSchema(partitionedDirectory.schema))

val filteredPaths = Source.fromIterator(() => partitionedDirectory.paths.iterator)

if (parallelism == 1) {
filteredPaths.flatMapConcat(
createPartitionedSource(
projectedSchemaResolverOpt,
projectedSchemaOpt,
columnProjections,
decoder,
Expand All @@ -255,6 +257,7 @@ object ParquetSource extends IOOps {
filteredPaths.flatMapMerge(
breadth = parallelism,
createPartitionedSource(
projectedSchemaResolverOpt,
projectedSchemaOpt,
columnProjections,
decoder,
Expand All @@ -281,41 +284,49 @@ object ParquetSource extends IOOps {
recordSource.map(decode)
}

private def createPartitionedSource(
private def createPartitionedSource[T](
projectedSchemaResolverOpt: Option[ParquetSchemaResolver[T]],
projectedSchemaOpt: Option[MessageType],
columnProjections: Seq[ColumnProjection],
decoder: ParquetRecordDecoder[?],
metadataReader: MetadataReader,
fallbackFilterCompat: => FilterCompat.Filter,
options: ParquetReader.Options
): PartitionedPath => Source[RowParquetRecord, NotUsed] =
partitionedPath =>
createSource(
partitionedPath => {
val partitionViewOpt = Option(projectedSchemaResolverOpt.fold(partitionedPath.view) { implicit resolver =>
partitionedPath.view.filterPaths(ParquetSchemaResolver.findType[T](_).nonEmpty)
}).filter(_.nonEmpty)

val source = createSource(
inputFile = partitionedPath.inputFile,
projectedSchemaOpt = projectedSchemaOpt,
columnProjections = columnProjections,
filterCompat = partitionedPath.filterPredicateOpt.fold(fallbackFilterCompat)(FilterCompat.get),
decoder = decoder,
metadataReader = metadataReader,
options = options
)
.map(setPartitionValues(partitionedPath))

partitionViewOpt.fold(source)(partitionView => source.map(setPartitionValues(partitionView)))
}

private def createSource(
inputFile: InputFile,
projectedSchemaOpt: Option[MessageType],
columnProjections: Seq[ColumnProjection],
filterCompat: FilterCompat.Filter,
decoder: ParquetRecordDecoder[?],
metadataReader: MetadataReader,
options: ParquetReader.Options
) =
Source
.unfoldResource[RowParquetRecord, Iterator[RowParquetRecord] & Closeable](
ParquetIterator.factory(inputFile, projectedSchemaOpt, columnProjections, filterCompat, decoder, options),
iterator => if (iterator.hasNext) Option(iterator.next()) else None,
_.close()
create = ParquetIterator
.factory(inputFile, projectedSchemaOpt, columnProjections, filterCompat, metadataReader, options),
read = iterator => if (iterator.hasNext) Option(iterator.next()) else None,
close = _.close()
)

private def setPartitionValues(partitionedPath: PartitionedPath)(record: RowParquetRecord) =
partitionedPath.partitions.foldLeft(record) { case (currentRecord, (columnPath, value)) =>
private def setPartitionValues(partitionView: PartitionView)(record: RowParquetRecord) =
partitionView.values.foldLeft(record) { case (currentRecord, (columnPath, value)) =>
currentRecord.updated(columnPath, BinaryValue(value))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ class ParquetWriterAndSparkCompatibilityItSpec extends AnyFreeSpec with Matchers
readFromTemp(testCase.typeTag) should contain theSameElementsAs testCase.data
}

"Spark should be able to read file saved by ParquetWriter if the file contains" - {
"Spark should be able to read file saved by ParquetWriter if the file contains" -
CompatibilityTestCases.cases(Writer, Spark).foreach(runTestCase)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ class SparkAndParquetReaderCompatibilityItSpec extends AnyFreeSpec with Matchers
}
}

"ParquetReader should be able to read file saved by Spark if the file contains" - {
"ParquetReader should be able to read file saved by Spark if the file contains" -
CompatibilityTestCases.cases(Spark, Reader).foreach(runTestCase)
}

"ParquetReader should read data partitioned by Spark" in {
import sparkSession.implicits.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class TimeEncodingCompatibilityItSpec
finally parquetIterable.close()
}

"For time zone of" - {
"For time zone of" -
timeZones.foreach { timeZone =>
val data = TimePrimitives(timestamp = localDateTimeToTimestamp(newYearMidnight, timeZone), date = newYear)
timeZone.getDisplayName - {
Expand All @@ -60,6 +60,5 @@ abstract class TimeEncodingCompatibilityItSpec
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ class TimeEncodingInt64NanosCompatibilityItSpec extends AnyFreeSpec with Matcher
finally parquetIterable.close()
}

"Parquet4s should read data written with time zone of" - {
"Parquet4s should read data written with time zone of" -
timeZones.foreach { timeZone =>
val data = TimePrimitives(timestamp = localDateTimeToTimestamp(newYearMidnight, timeZone), date = newYear)
timeZone.getDisplayName in {
writeWithParquet4S(data, timeZone)
readWithParquet4S(timeZone) should be(data)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ class ParquetWriterAndSparkCompatibilityItSpec extends AnyFreeSpec with Matchers
readFromTemp(testCase.typeTag) should contain theSameElementsAs testCase.data
}

"Spark should be able to read file saved by ParquetWriter if the file contains" - {
"Spark should be able to read file saved by ParquetWriter if the file contains" -
CompatibilityTestCases.cases(Writer, Spark).foreach(runTestCase)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ class SparkAndParquetReaderCompatibilityItSpec extends AnyFreeSpec with Matchers
finally parquetIterable.close()
}

"ParquetReader should be able to read file saved by Spark if the file contains" - {
"ParquetReader should be able to read file saved by Spark if the file contains" -
CompatibilityTestCases.cases(Spark, Reader).foreach(runTestCase)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class TimeEncodingCompatibilityItSpec
finally parquetIterable.close()
}

"For time zone of" - {
"For time zone of" -
timeZones.foreach { timeZone =>
val data = TimePrimitives(timestamp = localDateTimeToTimestamp(newYearMidnight, timeZone), date = newYear)
timeZone.getDisplayName - {
Expand All @@ -60,6 +60,5 @@ abstract class TimeEncodingCompatibilityItSpec
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ class TimeEncodingInt64NanosCompatibilityItSpec extends AnyFreeSpec with Matcher
finally parquetIterable.close()
}

"Parquet4s should read data written with time zone of" - {
"Parquet4s should read data written with time zone of" -
timeZones.foreach { timeZone =>
val data = TimePrimitives(timestamp = localDateTimeToTimestamp(newYearMidnight, timeZone), date = newYear)
timeZone.getDisplayName in {
writeWithParquet4S(data, timeZone)
readWithParquet4S(timeZone) should be(data)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class ParquetReaderItSpec extends AnyFreeSpec with Matchers with TestUtils with

case class Partitioned(a: String, b: String, i: Int)
case class I(i: Int)
case class NestedPartitioned(nested: Nested, i: Int)
case class Nested(b: String)

before {
clearTemp()
Expand Down Expand Up @@ -58,6 +60,75 @@ class ParquetReaderItSpec extends AnyFreeSpec with Matchers with TestUtils with
finally partitioned.close()
}

"Nested partition values should be set in read record and the projection shall be applied" in {
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/nested.b=b1/file.parquet"), Seq(I(1)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/nested.b=b2/file.parquet"), Seq(I(2)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/nested.b=b1/file.parquet"), Seq(I(3)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/nested.b=b2/file.parquet"), Seq(I(4)))

val partitioned = ParquetReader.projectedAs[NestedPartitioned].read(tempPath)
try
partitioned.toSeq should contain theSameElementsAs
Seq(
NestedPartitioned(Nested(b = "b1"), i = 1),
NestedPartitioned(Nested(b = "b2"), i = 2),
NestedPartitioned(Nested(b = "b1"), i = 3),
NestedPartitioned(Nested(b = "b2"), i = 4)
)
finally partitioned.close()
}

"Projection shall be applied to partition values when reading generic records" in {
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/nested.b=b1/file.parquet"), Seq(I(1)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/nested.b=b2/file.parquet"), Seq(I(2)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/nested.b=b1/file.parquet"), Seq(I(3)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/nested.b=b2/file.parquet"), Seq(I(4)))

val schema = ParquetSchemaResolver.resolveSchema[NestedPartitioned]

val partitioned = ParquetReader.projectedGeneric(schema).read(tempPath)
try
partitioned.toSeq should contain theSameElementsAs
Seq(
RowParquetRecord("nested" -> RowParquetRecord("b" -> "b1".value), "i" -> 1.value),
RowParquetRecord("nested" -> RowParquetRecord("b" -> "b2".value), "i" -> 2.value),
RowParquetRecord("nested" -> RowParquetRecord("b" -> "b1".value), "i" -> 3.value),
RowParquetRecord("nested" -> RowParquetRecord("b" -> "b2".value), "i" -> 4.value)
)
finally partitioned.close()
}

"Partition values shall be completyly skipped if projection doesn't include them" in {
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b1/file.parquet"), Seq(I(1)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b2/file.parquet"), Seq(I(2)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/b=b1/file.parquet"), Seq(I(3)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/b=b2/file.parquet"), Seq(I(4)))

val results = ParquetReader.projectedAs[I].read(tempPath)
try
results.toSeq should contain theSameElementsAs Seq(I(i = 1), I(i = 2), I(i = 3), I(i = 4))
finally results.close()
}

"Partition values shall be completyly skipped if projection with generic records doesn't include them" in {
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b1/file.parquet"), Seq(I(1)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b2/file.parquet"), Seq(I(2)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/b=b1/file.parquet"), Seq(I(3)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/b=b2/file.parquet"), Seq(I(4)))

val schema = ParquetSchemaResolver.resolveSchema[I]

val results = ParquetReader.projectedGeneric(schema).read(tempPath)
try
results.toSeq should contain theSameElementsAs Seq(
RowParquetRecord("i" -> 1.value),
RowParquetRecord("i" -> 2.value),
RowParquetRecord("i" -> 3.value),
RowParquetRecord("i" -> 4.value)
)
finally results.close()
}

"Partitions should be filtered" in {
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b1/file.parquet"), Seq(I(1)))
ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b2/file.parquet"), Seq(I(2)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ class ParquetWriterAndParquetReaderCompatibilityItSpec
finally parquetIterable.close()
}

"Spark should be able to read file saved by ParquetWriter if the file contains" - {
"Spark should be able to read file saved by ParquetWriter if the file contains" -
CompatibilityTestCases.cases(Writer, Reader).foreach(runTestCase)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.github.mjakubowski84.parquet4s
import com.github.mjakubowski84.parquet4s.TypedSchemaDef as TSD
import org.apache.parquet.schema.*
import org.slf4j.LoggerFactory
import scala.util.Try
import shapeless.*
import shapeless.labelled.*

Expand Down Expand Up @@ -44,7 +45,7 @@ object ParquetSchemaResolver {

trait SchemaVisitor[V] extends Cursor.Visitor[TypedSchemaDefInvoker[V], Option[Type]] {
override def onCompleted(cursor: Cursor, invoker: TypedSchemaDefInvoker[V]): Option[Type] =
throw new UnsupportedOperationException("Schema resolution cannot complete before all fields are processed.")
Option(invoker())
}

private val logger = LoggerFactory.getLogger(this.getClass)
Expand All @@ -62,6 +63,22 @@ object ParquetSchemaResolver {
def resolveSchema[T](implicit g: ParquetSchemaResolver[T]): MessageType =
Message(g.schemaName, g.resolveSchema(Cursor.simple)*)

/** Finds a type at the given path
*/
def findType[T](columnPath: ColumnPath)(implicit g: ParquetSchemaResolver[T]): Option[Type] =
leafType(
g.resolveSchema(Cursor.following(columnPath)).headOption,
if (columnPath.isEmpty) Seq.empty else columnPath.elements.tail
)

private def leafType(typeOpt: Option[Type], pathElements: Seq[String]): Option[Type] =
typeOpt match {
case Some(group: GroupType) if pathElements.nonEmpty =>
leafType(Try(group.getType(pathElements.head)).toOption, pathElements.tail)
case _ =>
typeOpt
}

implicit val hnil: ParquetSchemaResolver[HNil] = _ => List.empty

implicit def hcons[K <: Symbol, V, T <: HList](implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.github.mjakubowski84.parquet4s
import com.github.mjakubowski84.parquet4s.TypedSchemaDef as TSD
import org.apache.parquet.schema.*
import org.slf4j.LoggerFactory
import scala.util.Try
import shapeless.*
import shapeless.labelled.*

Expand Down Expand Up @@ -44,7 +45,7 @@ object ParquetSchemaResolver {

trait SchemaVisitor[V] extends Cursor.Visitor[TypedSchemaDefInvoker[V], Option[Type]] {
override def onCompleted(cursor: Cursor, invoker: TypedSchemaDefInvoker[V]): Option[Type] =
throw new UnsupportedOperationException("Schema resolution cannot complete before all fields are processed.")
Option(invoker())
}

private val logger = LoggerFactory.getLogger(this.getClass)
Expand All @@ -62,6 +63,22 @@ object ParquetSchemaResolver {
def resolveSchema[T](implicit g: ParquetSchemaResolver[T]): MessageType =
Message(g.schemaName, g.resolveSchema(Cursor.simple)*)

/** Finds a type at the given path
*/
def findType[T](columnPath: ColumnPath)(implicit g: ParquetSchemaResolver[T]): Option[Type] =
leafType(
g.resolveSchema(Cursor.following(columnPath)).headOption,
if (columnPath.isEmpty) Seq.empty else columnPath.elements.tail
)

private def leafType(typeOpt: Option[Type], pathElements: Seq[String]): Option[Type] =
typeOpt match {
case Some(group: GroupType) if pathElements.nonEmpty =>
leafType(Try(group.getType(pathElements.head)).toOption, pathElements.tail)
case _ =>
typeOpt
}

implicit val hnil: ParquetSchemaResolver[HNil] = _ => List.empty

implicit def hcons[K <: Symbol, V, T <: HList](implicit
Expand Down
Loading