From 8ce47c8df57552496a6f861f2cabac243223da65 Mon Sep 17 00:00:00 2001 From: Marcin Jakubowski Date: Fri, 17 Jan 2025 16:40:01 +0100 Subject: [PATCH 1/4] Projection over partition values - core module --- .../parquet4s/ParquetSource.scala | 7 +- .../parquet4s/ParquetReaderItSpec.scala | 71 +++++++++++++++++++ .../parquet4s/ParquetSchemaResolver.scala | 7 +- .../parquet4s/ParquetSchemaResolver.scala | 7 +- .../parquet4s/ParquetSchemaResolver.scala | 16 ++++- .../parquet4s/ParquetReader.scala | 43 ++++++----- .../parquet4s/ParquetRecord.scala | 6 +- .../parquet4s/PartitionFilter.scala | 22 ++++++ .../parquet4s/stats/LazyDelegateStats.scala | 2 +- 9 files changed, 152 insertions(+), 29 deletions(-) diff --git a/akkaPekko/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetSource.scala b/akkaPekko/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetSource.scala index a42e7c6a..f917bb13 100644 --- a/akkaPekko/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetSource.scala +++ b/akkaPekko/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetSource.scala @@ -309,9 +309,10 @@ object ParquetSource extends IOOps { ) = Source .unfoldResource[RowParquetRecord, Iterator[RowParquetRecord] & Closeable]( - ParquetIterator.factory(inputFile, projectedSchemaOpt, columnProjections, filterCompat, decoder, options), - iterator => if (iterator.hasNext) Option(iterator.next()) else None, - _.close() + create = + ParquetIterator.factory(inputFile, projectedSchemaOpt, columnProjections, filterCompat, decoder, options), + read = iterator => if (iterator.hasNext) Option(iterator.next()) else None, + close = _.close() ) private def setPartitionValues(partitionedPath: PartitionedPath)(record: RowParquetRecord) = diff --git a/core/src/it/scala/com/github/mjakubowski84/parquet4s/ParquetReaderItSpec.scala b/core/src/it/scala/com/github/mjakubowski84/parquet4s/ParquetReaderItSpec.scala index 2bca2e3a..8307e0ff 100644 --- a/core/src/it/scala/com/github/mjakubowski84/parquet4s/ParquetReaderItSpec.scala +++ b/core/src/it/scala/com/github/mjakubowski84/parquet4s/ParquetReaderItSpec.scala @@ -9,6 +9,8 @@ class ParquetReaderItSpec extends AnyFreeSpec with Matchers with TestUtils with case class Partitioned(a: String, b: String, i: Int) case class I(i: Int) + case class NestedPartitioned(nested: Nested, i: Int) + case class Nested(b: String) before { clearTemp() @@ -58,6 +60,75 @@ class ParquetReaderItSpec extends AnyFreeSpec with Matchers with TestUtils with finally partitioned.close() } + "Nested partition values should be set in read record and the projection shall be applied" in { + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/nested.b=b1/file.parquet"), Seq(I(1))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/nested.b=b2/file.parquet"), Seq(I(2))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/nested.b=b1/file.parquet"), Seq(I(3))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/nested.b=b2/file.parquet"), Seq(I(4))) + + val partitioned = ParquetReader.projectedAs[NestedPartitioned].read(tempPath) + try + partitioned.toSeq should contain theSameElementsAs + Seq( + NestedPartitioned(Nested(b = "b1"), i = 1), + NestedPartitioned(Nested(b = "b2"), i = 2), + NestedPartitioned(Nested(b = "b1"), i = 3), + NestedPartitioned(Nested(b = "b2"), i = 4) + ) + finally partitioned.close() + } + + "Projection shall be applied to partition values when reading generic records" in { + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/nested.b=b1/file.parquet"), Seq(I(1))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/nested.b=b2/file.parquet"), Seq(I(2))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/nested.b=b1/file.parquet"), Seq(I(3))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/nested.b=b2/file.parquet"), Seq(I(4))) + + val schema = ParquetSchemaResolver.resolveSchema[NestedPartitioned] + + val partitioned = ParquetReader.projectedGeneric(schema).read(tempPath) + try + partitioned.toSeq should contain theSameElementsAs + Seq( + RowParquetRecord("nested" -> RowParquetRecord("b" -> "b1".value), "i" -> 1.value), + RowParquetRecord("nested" -> RowParquetRecord("b" -> "b2".value), "i" -> 2.value), + RowParquetRecord("nested" -> RowParquetRecord("b" -> "b1".value), "i" -> 3.value), + RowParquetRecord("nested" -> RowParquetRecord("b" -> "b2".value), "i" -> 4.value) + ) + finally partitioned.close() + } + + "Partition values shall be completyly skipped if projection doesn't include them" in { + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b1/file.parquet"), Seq(I(1))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b2/file.parquet"), Seq(I(2))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/b=b1/file.parquet"), Seq(I(3))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/b=b2/file.parquet"), Seq(I(4))) + + val results = ParquetReader.projectedAs[I].read(tempPath) + try + results.toSeq should contain theSameElementsAs Seq(I(i = 1), I(i = 2), I(i = 3), I(i = 4)) + finally results.close() + } + + "Partition values shall be completyly skipped if projection with generic records doesn't include them" in { + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b1/file.parquet"), Seq(I(1))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b2/file.parquet"), Seq(I(2))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/b=b1/file.parquet"), Seq(I(3))) + ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a2/b=b2/file.parquet"), Seq(I(4))) + + val schema = ParquetSchemaResolver.resolveSchema[I] + + val results = ParquetReader.projectedGeneric(schema).read(tempPath) + try + results.toSeq should contain theSameElementsAs Seq( + RowParquetRecord("i" -> 1.value), + RowParquetRecord("i" -> 2.value), + RowParquetRecord("i" -> 3.value), + RowParquetRecord("i" -> 4.value) + ) + finally results.close() + } + "Partitions should be filtered" in { ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b1/file.parquet"), Seq(I(1))) ParquetWriter.of[I].writeAndClose(Path(tempPath, "a=a1/b=b2/file.parquet"), Seq(I(2))) diff --git a/core/src/main/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala b/core/src/main/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala index 92eb578a..c97c69ab 100644 --- a/core/src/main/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala +++ b/core/src/main/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala @@ -44,7 +44,7 @@ object ParquetSchemaResolver { trait SchemaVisitor[V] extends Cursor.Visitor[TypedSchemaDefInvoker[V], Option[Type]] { override def onCompleted(cursor: Cursor, invoker: TypedSchemaDefInvoker[V]): Option[Type] = - throw new UnsupportedOperationException("Schema resolution cannot complete before all fields are processed.") + Option(invoker.schema(cursor.path.elements.last)) } private val logger = LoggerFactory.getLogger(this.getClass) @@ -62,6 +62,11 @@ object ParquetSchemaResolver { def resolveSchema[T](implicit g: ParquetSchemaResolver[T]): MessageType = Message(g.schemaName, g.resolveSchema(Cursor.simple)*) + /** Finds a type at the given path + */ + def findType[T](columnPath: ColumnPath)(implicit g: ParquetSchemaResolver[T]): Option[Type] = + g.resolveSchema(Cursor.following(columnPath)).headOption + implicit val hnil: ParquetSchemaResolver[HNil] = _ => List.empty implicit def hcons[K <: Symbol, V, T <: HList](implicit diff --git a/core/src/main/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala b/core/src/main/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala index 92eb578a..c97c69ab 100644 --- a/core/src/main/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala +++ b/core/src/main/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala @@ -44,7 +44,7 @@ object ParquetSchemaResolver { trait SchemaVisitor[V] extends Cursor.Visitor[TypedSchemaDefInvoker[V], Option[Type]] { override def onCompleted(cursor: Cursor, invoker: TypedSchemaDefInvoker[V]): Option[Type] = - throw new UnsupportedOperationException("Schema resolution cannot complete before all fields are processed.") + Option(invoker.schema(cursor.path.elements.last)) } private val logger = LoggerFactory.getLogger(this.getClass) @@ -62,6 +62,11 @@ object ParquetSchemaResolver { def resolveSchema[T](implicit g: ParquetSchemaResolver[T]): MessageType = Message(g.schemaName, g.resolveSchema(Cursor.simple)*) + /** Finds a type at the given path + */ + def findType[T](columnPath: ColumnPath)(implicit g: ParquetSchemaResolver[T]): Option[Type] = + g.resolveSchema(Cursor.following(columnPath)).headOption + implicit val hnil: ParquetSchemaResolver[HNil] = _ => List.empty implicit def hcons[K <: Symbol, V, T <: HList](implicit diff --git a/core/src/main/scala-3/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala b/core/src/main/scala-3/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala index 60cdf09b..63a18b36 100644 --- a/core/src/main/scala-3/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala +++ b/core/src/main/scala-3/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala @@ -38,9 +38,7 @@ object ParquetSchemaResolver: final abstract private[ParquetSchemaResolver] class Fields[Labels <: Tuple, Values <: Tuple] - trait SchemaVisitor[V] extends Cursor.Visitor[String, Option[Type]]: - override def onCompleted(cursor: Cursor, fieldName: String): Option[Type] = - throw new UnsupportedOperationException("Schema resolution cannot complete before all fields are processed.") + trait SchemaVisitor[V] extends Cursor.Visitor[String, Option[Type]] private val logger = LoggerFactory.getLogger(this.getClass) @@ -57,6 +55,11 @@ object ParquetSchemaResolver: def resolveSchema[T](using g: ParquetSchemaResolver[T]): MessageType = Message(g.schemaName, g.resolveSchema(Cursor.simple): _*) + /** Finds a type at the given path + */ + def findType[T](columnPath: ColumnPath)(using g: ParquetSchemaResolver[T]): Option[Type] = + g.resolveSchema(Cursor.following(columnPath)).headOption + given ParquetSchemaResolver[Fields[EmptyTuple, EmptyTuple]] with def resolveSchema(cursor: Cursor): List[Type] = List.empty @@ -93,6 +96,9 @@ object ParquetSchemaResolver: def onActive(cursor: Cursor, fieldName: String): Option[Type] = Option(summon[TSD[V]](fieldName)) + def onCompleted(cursor: Cursor, fieldName: String): Option[Type] = + Option(summon[TSD[V]](fieldName)) + /** Purpose of productSchemaVisitor is to filter product fields so that those that are used for partitioning are not * present in the final schema. It is only applied to products that are not nested in Options and collections - as * optional fields and elements of collections are not valid for partitioning. @@ -110,4 +116,8 @@ object ParquetSchemaResolver: case schema => Option(schema(fieldName)) + def onCompleted(cursor: Cursor, fieldName: String): Option[Type] = + // TODO shall we resolve member fields, too? + Option(summon[TSD[V]](fieldName)) + end ParquetSchemaResolver diff --git a/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetReader.scala b/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetReader.scala index 14cc7b8e..ac48b216 100644 --- a/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetReader.scala +++ b/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetReader.scala @@ -126,18 +126,27 @@ object ParquetReader extends IOOps { throw exception case Right(partitionedDirectory) => val projectedSchemaOpt = projectedSchemaResolverOpt.map(implicit resolver => - ParquetSchemaResolver.resolveSchema(partitionedDirectory.schema) + ParquetSchemaResolver.resolveSchema[T](partitionedDirectory.schema) ) + lazy val fallbackFilterCompat = filter.toNonPredicateFilterCompat + val iterables = partitionedDirectory.paths.map { partitionedPath => - singleIterable( + val partitionViewOpt = Option(projectedSchemaResolverOpt.fold(partitionedPath.view) { implicit resolver => + partitionedPath.view.filterPaths(ParquetSchemaResolver.findType[T](_).nonEmpty) + }).filter(_.nonEmpty) + + val iterable = singleIterable( inputFile = partitionedPath.inputFile, valueCodecConfiguration = valueCodecConfiguration, projectedSchemaOpt = projectedSchemaOpt, filterCompat = partitionedPath.filterPredicateOpt.fold(fallbackFilterCompat)(FilterCompat.get), - partitionViewOpt = Option(partitionedPath.view), + partitionViewOpt = partitionViewOpt, readerOptions = readerOptions - ).appendTransformation(setPartitionValues(partitionedPath)) + ) + partitionViewOpt.fold(iterable)(partitionView => + iterable.appendTransformation(setPartitionValues(partitionView)) + ) } new CompoundParquetIterable[T](iterables) } @@ -161,11 +170,11 @@ object ParquetReader extends IOOps { ) } - private def setPartitionValues(partitionedPath: PartitionedPath)( + private def setPartitionValues(partitionView: PartitionView)( record: RowParquetRecord ): Iterable[RowParquetRecord] = Option( - partitionedPath.partitions.foldLeft(record) { case (currentRecord, (columnPath, value)) => + partitionView.values.foldLeft(record) { case (currentRecord, (columnPath, value)) => currentRecord.updated(columnPath, BinaryValue(value)) } ) @@ -275,25 +284,25 @@ object ParquetReader extends IOOps { projectedSchemaResolverOpt = Option(RowParquetRecord.genericParquetSchemaResolver(projectedSchema)) ) - // format: off /** Creates [[Builder]] of Parquet reader returning projected generic records. Due to projection, reader does * not attempt to read all existing columns of the file but applies enforced projection schema. Besides simple - * projection one can use aliases and extract values from nested fields - in a way similar to SQL. - *

+ * projection one can use aliases and extract values from nested fields - in a way similar to SQL.

* @example - *
 
-    *projectedGeneric(
-    *  Col("foo").as[Int], // selects Int column "foo"
-    *  Col("bar.baz".as[String]), // selects String field "bar.baz", creates column "baz" wih a value of "baz"
-    *  Col("bar.baz".as[String].alias("bar_baz")) // selects String field "bar.baz", creates column "bar_baz" wih a value of "baz"
-    *)
-    *   
+ * ```scala + * projectedGeneric( + * // selects Int column "foo" + * Col("foo").as[Int], + * // selects String field "bar.baz", creates column "baz" wih a value of "baz" + * Col("bar.baz".as[String]), + * // selects String field "bar.baz", creates column "bar_baz" wih a value of "baz" + * Col("bar.baz".as[String].alias("bar_baz")) + * ) + * ``` * @param col * first column projection * @param cols * next column projections */ - // format: on def projectedGeneric(col: TypedColumnPath[?], cols: TypedColumnPath[?]*): Builder[RowParquetRecord] = { val (fields, columnProjections) = (col +: cols.toVector).zipWithIndex diff --git a/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetRecord.scala b/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetRecord.scala index 0c7f28ff..fcb4e5fa 100644 --- a/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetRecord.scala +++ b/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetRecord.scala @@ -141,13 +141,13 @@ object RowParquetRecord { new ParquetSchemaResolver[RowParquetRecord] { override def schemaName: Option[String] = Option(message.getName) override def resolveSchema(cursor: Cursor): List[Type] = - skipFields(cursor, message.getFields.asScala.toList) + applyCursor(cursor, message.getFields.asScala.toList) - private def skipFields(cursor: Cursor, fields: List[Type]): List[Type] = + private def applyCursor(cursor: Cursor, fields: List[Type]): List[Type] = fields.flatMap { case groupField: GroupType if groupField.getLogicalTypeAnnotation == null => cursor.advanceByFieldName(groupField.getName).flatMap { newCursor => - val fields = skipFields(newCursor, groupField.getFields.asScala.toList) + val fields = applyCursor(newCursor, groupField.getFields.asScala.toList) if (fields.isEmpty) None else Some( diff --git a/core/src/main/scala/com/github/mjakubowski84/parquet4s/PartitionFilter.scala b/core/src/main/scala/com/github/mjakubowski84/parquet4s/PartitionFilter.scala index a5eb84ac..fbf25cc3 100644 --- a/core/src/main/scala/com/github/mjakubowski84/parquet4s/PartitionFilter.scala +++ b/core/src/main/scala/com/github/mjakubowski84/parquet4s/PartitionFilter.scala @@ -128,6 +128,28 @@ class PartitionView(partitions: List[(ColumnPath, Binary)]) { * a value of given partion field */ def value(columnPath: ColumnPath): Option[Binary] = partitionMap.get(columnPath) + + /** @return + * All partition values + */ + def values: List[(ColumnPath, Binary)] = partitions + + /** Removes partitions whose paths do not match the given predicate. + * @return + * a new instance with partitions that match the given predicate + */ + def filterPaths(predicate: ColumnPath => Boolean): PartitionView = + new PartitionView(partitions.filter { case (columnPath, _) => predicate(columnPath) }) + + /** @return + * true if view doesn't contain any partitions + */ + def isEmpty: Boolean = partitions.isEmpty + + /** @return + * true if view contains at least single partition + */ + def nonEmpty: Boolean = partitions.nonEmpty } private class PartitionedPathImpl( diff --git a/core/src/main/scala/com/github/mjakubowski84/parquet4s/stats/LazyDelegateStats.scala b/core/src/main/scala/com/github/mjakubowski84/parquet4s/stats/LazyDelegateStats.scala index 52b3b9b0..245b49cb 100644 --- a/core/src/main/scala/com/github/mjakubowski84/parquet4s/stats/LazyDelegateStats.scala +++ b/core/src/main/scala/com/github/mjakubowski84/parquet4s/stats/LazyDelegateStats.scala @@ -20,7 +20,7 @@ private[parquet4s] class LazyDelegateStats( else new FilteredFileStats(inputFile, vcc, projectionSchemaOpt, filter) partitionViewOpt match { - case Some(partitionView) if partitionView.schema.nonEmpty => + case Some(partitionView) if partitionView.nonEmpty => new PartitionedFileStats(fileStats, partitionView) case _ => fileStats From 0c08842ec9051be7bc3fb8893742c1c8fd824927 Mon Sep 17 00:00:00 2001 From: Marcin Jakubowski Date: Mon, 20 Jan 2025 19:24:28 +0100 Subject: [PATCH 2/4] Projection over partition values - akka and fs2 module --- .../parquet4s/ParquetSource.scala | 32 ++++++++++++------- .../parquet4s/parquet/reader.scala | 14 ++++---- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/akkaPekko/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetSource.scala b/akkaPekko/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetSource.scala index f917bb13..30ae6798 100644 --- a/akkaPekko/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetSource.scala +++ b/akkaPekko/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetSource.scala @@ -239,11 +239,13 @@ object ParquetSource extends IOOps { partitionedDirectory => { val projectedSchemaOpt = projectedSchemaResolverOpt .map(implicit resolver => ParquetSchemaResolver.resolveSchema(partitionedDirectory.schema)) + val filteredPaths = Source.fromIterator(() => partitionedDirectory.paths.iterator) if (parallelism == 1) { filteredPaths.flatMapConcat( createPartitionedSource( + projectedSchemaResolverOpt, projectedSchemaOpt, columnProjections, decoder, @@ -255,6 +257,7 @@ object ParquetSource extends IOOps { filteredPaths.flatMapMerge( breadth = parallelism, createPartitionedSource( + projectedSchemaResolverOpt, projectedSchemaOpt, columnProjections, decoder, @@ -281,42 +284,49 @@ object ParquetSource extends IOOps { recordSource.map(decode) } - private def createPartitionedSource( + private def createPartitionedSource[T]( + projectedSchemaResolverOpt: Option[ParquetSchemaResolver[T]], projectedSchemaOpt: Option[MessageType], columnProjections: Seq[ColumnProjection], - decoder: ParquetRecordDecoder[?], + metadataReader: MetadataReader, fallbackFilterCompat: => FilterCompat.Filter, options: ParquetReader.Options ): PartitionedPath => Source[RowParquetRecord, NotUsed] = - partitionedPath => - createSource( + partitionedPath => { + val partitionViewOpt = Option(projectedSchemaResolverOpt.fold(partitionedPath.view) { implicit resolver => + partitionedPath.view.filterPaths(ParquetSchemaResolver.findType[T](_).nonEmpty) + }).filter(_.nonEmpty) + + val source = createSource( inputFile = partitionedPath.inputFile, projectedSchemaOpt = projectedSchemaOpt, columnProjections = columnProjections, filterCompat = partitionedPath.filterPredicateOpt.fold(fallbackFilterCompat)(FilterCompat.get), - decoder = decoder, + metadataReader = metadataReader, options = options ) - .map(setPartitionValues(partitionedPath)) + + partitionViewOpt.fold(source)(partitionView => source.map(setPartitionValues(partitionView))) + } private def createSource( inputFile: InputFile, projectedSchemaOpt: Option[MessageType], columnProjections: Seq[ColumnProjection], filterCompat: FilterCompat.Filter, - decoder: ParquetRecordDecoder[?], + metadataReader: MetadataReader, options: ParquetReader.Options ) = Source .unfoldResource[RowParquetRecord, Iterator[RowParquetRecord] & Closeable]( - create = - ParquetIterator.factory(inputFile, projectedSchemaOpt, columnProjections, filterCompat, decoder, options), + create = ParquetIterator + .factory(inputFile, projectedSchemaOpt, columnProjections, filterCompat, metadataReader, options), read = iterator => if (iterator.hasNext) Option(iterator.next()) else None, close = _.close() ) - private def setPartitionValues(partitionedPath: PartitionedPath)(record: RowParquetRecord) = - partitionedPath.partitions.foldLeft(record) { case (currentRecord, (columnPath, value)) => + private def setPartitionValues(partitionView: PartitionView)(record: RowParquetRecord) = + partitionView.values.foldLeft(record) { case (currentRecord, (columnPath, value)) => currentRecord.updated(columnPath, BinaryValue(value)) } diff --git a/fs2/src/main/scala/com/github/mjakubowski84/parquet4s/parquet/reader.scala b/fs2/src/main/scala/com/github/mjakubowski84/parquet4s/parquet/reader.scala index e34fd2ef..497f603c 100644 --- a/fs2/src/main/scala/com/github/mjakubowski84/parquet4s/parquet/reader.scala +++ b/fs2/src/main/scala/com/github/mjakubowski84/parquet4s/parquet/reader.scala @@ -348,6 +348,10 @@ object reader { F.catchNonFatal(FilterCompat.get(pathFilterPredicate)) ) ) + partitionViewOpt <- Stream.eval(F.delay(Option(projectedSchemaResolverOpt.fold(partitionedPath.view) { + implicit resolver => + partitionedPath.view.filterPaths(ParquetSchemaResolver.findType[T](_).nonEmpty) + }).filter(_.nonEmpty))) parquetIterator <- Stream.resource( parquetIteratorResource( inputFile = partitionedPath.inputFile, @@ -358,7 +362,7 @@ object reader { options = options ) ) - } yield partitionedReaderStream[F](parquetIterator, partitionedPath, chunkSize) + } yield partitionedReaderStream[F](parquetIterator, partitionViewOpt, chunkSize) } private def readSingleFile[F[_], T]( @@ -390,20 +394,18 @@ object reader { private def partitionedReaderStream[F[_]]( parquetIterator: Iterator[RowParquetRecord], - partitionedPath: PartitionedPath, + partitionViewOpt: Option[PartitionView], chunkSize: Int )(implicit F: Sync[F]): Stream[F, RowParquetRecord] = { val stream = Stream.fromBlockingIterator[F](parquetIterator, chunkSize) - if (partitionedPath.partitions.nonEmpty) { + partitionViewOpt.fold(stream) { partitionView => stream.evalMapChunk { record => - partitionedPath.partitions.foldLeft(F.pure(record)) { case (f, (columnPath, value)) => + partitionView.values.foldLeft(F.pure(record)) { case (f, (columnPath, value)) => f.flatMap { r => F.catchNonFatal(r.updated(columnPath, BinaryValue(value))) } } } - } else { - stream } } From 7203584b698a1d52cb32eafb8c9b9174864d626e Mon Sep 17 00:00:00 2001 From: Marcin Jakubowski Date: Fri, 21 Feb 2025 16:34:58 +0100 Subject: [PATCH 3/4] findType implementation --- .scalafmt.conf | 2 +- .../parquet4s/ParquetSchemaResolver.scala | 16 ++++++++-- .../parquet4s/ParquetSchemaResolver.scala | 16 ++++++++-- .../parquet4s/ParquetSchemaResolver.scala | 15 +++++++-- .../parquet4s/ParquetSchemaResolverSpec.scala | 31 ++++++++++++++++++- 5 files changed, 72 insertions(+), 8 deletions(-) diff --git a/.scalafmt.conf b/.scalafmt.conf index b01b7f30..986b0149 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = 3.8.3 +version = 3.8.6 preset = default maxColumn = 120 diff --git a/core/src/main/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala b/core/src/main/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala index c97c69ab..67104658 100644 --- a/core/src/main/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala +++ b/core/src/main/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala @@ -3,6 +3,7 @@ package com.github.mjakubowski84.parquet4s import com.github.mjakubowski84.parquet4s.TypedSchemaDef as TSD import org.apache.parquet.schema.* import org.slf4j.LoggerFactory +import scala.util.Try import shapeless.* import shapeless.labelled.* @@ -44,7 +45,7 @@ object ParquetSchemaResolver { trait SchemaVisitor[V] extends Cursor.Visitor[TypedSchemaDefInvoker[V], Option[Type]] { override def onCompleted(cursor: Cursor, invoker: TypedSchemaDefInvoker[V]): Option[Type] = - Option(invoker.schema(cursor.path.elements.last)) + Option(invoker()) } private val logger = LoggerFactory.getLogger(this.getClass) @@ -65,7 +66,18 @@ object ParquetSchemaResolver { /** Finds a type at the given path */ def findType[T](columnPath: ColumnPath)(implicit g: ParquetSchemaResolver[T]): Option[Type] = - g.resolveSchema(Cursor.following(columnPath)).headOption + leafType( + g.resolveSchema(Cursor.following(columnPath)).headOption, + if (columnPath.isEmpty) Seq.empty else columnPath.elements.tail + ) + + private def leafType(typeOpt: Option[Type], pathElements: Seq[String]): Option[Type] = + typeOpt match { + case Some(group: GroupType) if pathElements.nonEmpty => + leafType(Try(group.getType(pathElements.head)).toOption, pathElements.tail) + case _ => + typeOpt + } implicit val hnil: ParquetSchemaResolver[HNil] = _ => List.empty diff --git a/core/src/main/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala b/core/src/main/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala index c97c69ab..67104658 100644 --- a/core/src/main/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala +++ b/core/src/main/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala @@ -3,6 +3,7 @@ package com.github.mjakubowski84.parquet4s import com.github.mjakubowski84.parquet4s.TypedSchemaDef as TSD import org.apache.parquet.schema.* import org.slf4j.LoggerFactory +import scala.util.Try import shapeless.* import shapeless.labelled.* @@ -44,7 +45,7 @@ object ParquetSchemaResolver { trait SchemaVisitor[V] extends Cursor.Visitor[TypedSchemaDefInvoker[V], Option[Type]] { override def onCompleted(cursor: Cursor, invoker: TypedSchemaDefInvoker[V]): Option[Type] = - Option(invoker.schema(cursor.path.elements.last)) + Option(invoker()) } private val logger = LoggerFactory.getLogger(this.getClass) @@ -65,7 +66,18 @@ object ParquetSchemaResolver { /** Finds a type at the given path */ def findType[T](columnPath: ColumnPath)(implicit g: ParquetSchemaResolver[T]): Option[Type] = - g.resolveSchema(Cursor.following(columnPath)).headOption + leafType( + g.resolveSchema(Cursor.following(columnPath)).headOption, + if (columnPath.isEmpty) Seq.empty else columnPath.elements.tail + ) + + private def leafType(typeOpt: Option[Type], pathElements: Seq[String]): Option[Type] = + typeOpt match { + case Some(group: GroupType) if pathElements.nonEmpty => + leafType(Try(group.getType(pathElements.head)).toOption, pathElements.tail) + case _ => + typeOpt + } implicit val hnil: ParquetSchemaResolver[HNil] = _ => List.empty diff --git a/core/src/main/scala-3/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala b/core/src/main/scala-3/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala index 63a18b36..7e770cb2 100644 --- a/core/src/main/scala-3/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala +++ b/core/src/main/scala-3/com/github/mjakubowski84/parquet4s/ParquetSchemaResolver.scala @@ -9,6 +9,7 @@ import scala.deriving.Mirror import scala.language.higherKinds import scala.reflect.ClassTag import scala.util.NotGiven +import scala.util.Try /** Type class that allows to build schema of Parquet file out from regular Scala type, typically case class. * @tparam T @@ -58,7 +59,18 @@ object ParquetSchemaResolver: /** Finds a type at the given path */ def findType[T](columnPath: ColumnPath)(using g: ParquetSchemaResolver[T]): Option[Type] = - g.resolveSchema(Cursor.following(columnPath)).headOption + leafType( + g.resolveSchema(Cursor.following(columnPath)).headOption, + if (columnPath.isEmpty) Seq.empty else columnPath.elements.tail + ) + + private def leafType(typeOpt: Option[Type], pathElements: Seq[String]): Option[Type] = + typeOpt match { + case Some(group: GroupType) if pathElements.nonEmpty => + leafType(Try(group.getType(pathElements.head)).toOption, pathElements.tail) + case _ => + typeOpt + } given ParquetSchemaResolver[Fields[EmptyTuple, EmptyTuple]] with def resolveSchema(cursor: Cursor): List[Type] = List.empty @@ -117,7 +129,6 @@ object ParquetSchemaResolver: Option(schema(fieldName)) def onCompleted(cursor: Cursor, fieldName: String): Option[Type] = - // TODO shall we resolve member fields, too? Option(summon[TSD[V]](fieldName)) end ParquetSchemaResolver diff --git a/core/src/test/scala/com/github/mjakubowski84/parquet4s/ParquetSchemaResolverSpec.scala b/core/src/test/scala/com/github/mjakubowski84/parquet4s/ParquetSchemaResolverSpec.scala index 2d7cecc4..f261d0c2 100644 --- a/core/src/test/scala/com/github/mjakubowski84/parquet4s/ParquetSchemaResolverSpec.scala +++ b/core/src/test/scala/com/github/mjakubowski84/parquet4s/ParquetSchemaResolverSpec.scala @@ -1,7 +1,7 @@ package com.github.mjakubowski84.parquet4s import com.github.mjakubowski84.parquet4s.LogicalTypes.* -import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.resolveSchema +import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.{resolveSchema, findType} import com.github.mjakubowski84.parquet4s.TestCases.* import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.* import org.apache.parquet.schema.Type.Repetition.* @@ -269,4 +269,33 @@ class ParquetSchemaResolverSpec extends AnyFlatSpec with Matchers { resolved.getName should be(Message.DefaultName) } + "findType" should "find a field" in { + findType[Primitives](Col("int")) should be(Some(Types.primitive(INT32, REQUIRED).as(Int32Type).named("int"))) + } + + it should "process empty class" in { + findType[Empty](Col("")) should be(None) + findType[Empty](Col("invalid")) should be(None) + } + + it should "handle invalid paths" in { + findType[Primitives](Col("invalid")) should be(None) + findType[Primitives](Col("")) should be(None) + } + + it should "find a nested field" in { + findType[ContainsNestedClass](Col("nested.int")) should be(Some(Types.required(INT32).as(Int32Type).named("int"))) + } + + it should "provide complex type result" in { + findType[ContainsNestedClass](Col("nested")) should be( + Some( + Types + .optionalGroup() + .addField(Types.required(INT32).as(Int32Type).named("int")) + .named("nested") + ) + ) + } + } From 4604e844f32d53c3e42db33649f03440ec395af1 Mon Sep 17 00:00:00 2001 From: Marcin Jakubowski Date: Sat, 22 Feb 2025 10:15:24 +0100 Subject: [PATCH 4/4] formatting --- .../ParquetWriterAndSparkCompatibilityItSpec.scala | 3 +-- .../SparkAndParquetReaderCompatibilityItSpec.scala | 3 +-- .../parquet4s/TimeEncodingCompatibilityItSpec.scala | 3 +-- .../TimeEncodingInt64NanosCompatibilityItSpec.scala | 3 +-- .../ParquetWriterAndSparkCompatibilityItSpec.scala | 3 +-- .../SparkAndParquetReaderCompatibilityItSpec.scala | 3 +-- .../parquet4s/TimeEncodingCompatibilityItSpec.scala | 3 +-- .../TimeEncodingInt64NanosCompatibilityItSpec.scala | 3 +-- ...tWriterAndParquetReaderCompatibilityItSpec.scala | 3 +-- .../com/github/mjakubowski84/parquet4s/IOOps.scala | 13 ++++++------- .../github/mjakubowski84/parquet4s/IOOpsSpec.scala | 6 ++---- .../parquet4s/ParquetRecordDecoderSpec.scala | 3 +-- .../mjakubowski84/parquet4s/parquet/IoSpec.scala | 6 ++---- project/Compilation.scala | 3 +-- .../mjakubowski84/parquet4s/s3/S3ItSpec.scala | 3 +-- .../parquet4s/Parquet4sScalaPBSpec.scala | 6 ++---- 16 files changed, 24 insertions(+), 43 deletions(-) diff --git a/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetWriterAndSparkCompatibilityItSpec.scala b/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetWriterAndSparkCompatibilityItSpec.scala index 5d988963..96ee5eef 100644 --- a/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetWriterAndSparkCompatibilityItSpec.scala +++ b/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/ParquetWriterAndSparkCompatibilityItSpec.scala @@ -18,8 +18,7 @@ class ParquetWriterAndSparkCompatibilityItSpec extends AnyFreeSpec with Matchers readFromTemp(testCase.typeTag) should contain theSameElementsAs testCase.data } - "Spark should be able to read file saved by ParquetWriter if the file contains" - { + "Spark should be able to read file saved by ParquetWriter if the file contains" - CompatibilityTestCases.cases(Writer, Spark).foreach(runTestCase) - } } diff --git a/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/SparkAndParquetReaderCompatibilityItSpec.scala b/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/SparkAndParquetReaderCompatibilityItSpec.scala index 1731431f..be3b51f5 100644 --- a/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/SparkAndParquetReaderCompatibilityItSpec.scala +++ b/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/SparkAndParquetReaderCompatibilityItSpec.scala @@ -27,9 +27,8 @@ class SparkAndParquetReaderCompatibilityItSpec extends AnyFreeSpec with Matchers } } - "ParquetReader should be able to read file saved by Spark if the file contains" - { + "ParquetReader should be able to read file saved by Spark if the file contains" - CompatibilityTestCases.cases(Spark, Reader).foreach(runTestCase) - } "ParquetReader should read data partitioned by Spark" in { import sparkSession.implicits.* diff --git a/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/TimeEncodingCompatibilityItSpec.scala b/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/TimeEncodingCompatibilityItSpec.scala index 8df593a4..af4c37f5 100644 --- a/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/TimeEncodingCompatibilityItSpec.scala +++ b/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/TimeEncodingCompatibilityItSpec.scala @@ -46,7 +46,7 @@ abstract class TimeEncodingCompatibilityItSpec finally parquetIterable.close() } - "For time zone of" - { + "For time zone of" - timeZones.foreach { timeZone => val data = TimePrimitives(timestamp = localDateTimeToTimestamp(newYearMidnight, timeZone), date = newYear) timeZone.getDisplayName - { @@ -60,6 +60,5 @@ abstract class TimeEncodingCompatibilityItSpec } } } - } } diff --git a/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/TimeEncodingInt64NanosCompatibilityItSpec.scala b/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/TimeEncodingInt64NanosCompatibilityItSpec.scala index ce5369f0..e7f97fdb 100644 --- a/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/TimeEncodingInt64NanosCompatibilityItSpec.scala +++ b/core/src/it/scala-2.12/com/github/mjakubowski84/parquet4s/TimeEncodingInt64NanosCompatibilityItSpec.scala @@ -38,7 +38,7 @@ class TimeEncodingInt64NanosCompatibilityItSpec extends AnyFreeSpec with Matcher finally parquetIterable.close() } - "Parquet4s should read data written with time zone of" - { + "Parquet4s should read data written with time zone of" - timeZones.foreach { timeZone => val data = TimePrimitives(timestamp = localDateTimeToTimestamp(newYearMidnight, timeZone), date = newYear) timeZone.getDisplayName in { @@ -46,6 +46,5 @@ class TimeEncodingInt64NanosCompatibilityItSpec extends AnyFreeSpec with Matcher readWithParquet4S(timeZone) should be(data) } } - } } diff --git a/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetWriterAndSparkCompatibilityItSpec.scala b/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetWriterAndSparkCompatibilityItSpec.scala index 5d988963..96ee5eef 100644 --- a/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetWriterAndSparkCompatibilityItSpec.scala +++ b/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/ParquetWriterAndSparkCompatibilityItSpec.scala @@ -18,8 +18,7 @@ class ParquetWriterAndSparkCompatibilityItSpec extends AnyFreeSpec with Matchers readFromTemp(testCase.typeTag) should contain theSameElementsAs testCase.data } - "Spark should be able to read file saved by ParquetWriter if the file contains" - { + "Spark should be able to read file saved by ParquetWriter if the file contains" - CompatibilityTestCases.cases(Writer, Spark).foreach(runTestCase) - } } diff --git a/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/SparkAndParquetReaderCompatibilityItSpec.scala b/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/SparkAndParquetReaderCompatibilityItSpec.scala index f1e345bd..fe034ff8 100644 --- a/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/SparkAndParquetReaderCompatibilityItSpec.scala +++ b/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/SparkAndParquetReaderCompatibilityItSpec.scala @@ -19,8 +19,7 @@ class SparkAndParquetReaderCompatibilityItSpec extends AnyFreeSpec with Matchers finally parquetIterable.close() } - "ParquetReader should be able to read file saved by Spark if the file contains" - { + "ParquetReader should be able to read file saved by Spark if the file contains" - CompatibilityTestCases.cases(Spark, Reader).foreach(runTestCase) - } } diff --git a/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/TimeEncodingCompatibilityItSpec.scala b/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/TimeEncodingCompatibilityItSpec.scala index 8df593a4..af4c37f5 100644 --- a/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/TimeEncodingCompatibilityItSpec.scala +++ b/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/TimeEncodingCompatibilityItSpec.scala @@ -46,7 +46,7 @@ abstract class TimeEncodingCompatibilityItSpec finally parquetIterable.close() } - "For time zone of" - { + "For time zone of" - timeZones.foreach { timeZone => val data = TimePrimitives(timestamp = localDateTimeToTimestamp(newYearMidnight, timeZone), date = newYear) timeZone.getDisplayName - { @@ -60,6 +60,5 @@ abstract class TimeEncodingCompatibilityItSpec } } } - } } diff --git a/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/TimeEncodingInt64NanosCompatibilityItSpec.scala b/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/TimeEncodingInt64NanosCompatibilityItSpec.scala index ce5369f0..e7f97fdb 100644 --- a/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/TimeEncodingInt64NanosCompatibilityItSpec.scala +++ b/core/src/it/scala-2.13/com/github/mjakubowski84/parquet4s/TimeEncodingInt64NanosCompatibilityItSpec.scala @@ -38,7 +38,7 @@ class TimeEncodingInt64NanosCompatibilityItSpec extends AnyFreeSpec with Matcher finally parquetIterable.close() } - "Parquet4s should read data written with time zone of" - { + "Parquet4s should read data written with time zone of" - timeZones.foreach { timeZone => val data = TimePrimitives(timestamp = localDateTimeToTimestamp(newYearMidnight, timeZone), date = newYear) timeZone.getDisplayName in { @@ -46,6 +46,5 @@ class TimeEncodingInt64NanosCompatibilityItSpec extends AnyFreeSpec with Matcher readWithParquet4S(timeZone) should be(data) } } - } } diff --git a/core/src/it/scala/com/github/mjakubowski84/parquet4s/ParquetWriterAndParquetReaderCompatibilityItSpec.scala b/core/src/it/scala/com/github/mjakubowski84/parquet4s/ParquetWriterAndParquetReaderCompatibilityItSpec.scala index cd038c1d..cae8cc18 100644 --- a/core/src/it/scala/com/github/mjakubowski84/parquet4s/ParquetWriterAndParquetReaderCompatibilityItSpec.scala +++ b/core/src/it/scala/com/github/mjakubowski84/parquet4s/ParquetWriterAndParquetReaderCompatibilityItSpec.scala @@ -26,8 +26,7 @@ class ParquetWriterAndParquetReaderCompatibilityItSpec finally parquetIterable.close() } - "Spark should be able to read file saved by ParquetWriter if the file contains" - { + "Spark should be able to read file saved by ParquetWriter if the file contains" - CompatibilityTestCases.cases(Writer, Reader).foreach(runTestCase) - } } diff --git a/core/src/main/scala/com/github/mjakubowski84/parquet4s/IOOps.scala b/core/src/main/scala/com/github/mjakubowski84/parquet4s/IOOps.scala index 220a0c39..64118680 100644 --- a/core/src/main/scala/com/github/mjakubowski84/parquet4s/IOOps.scala +++ b/core/src/main/scala/com/github/mjakubowski84/parquet4s/IOOps.scala @@ -150,13 +150,12 @@ trait IOOps { // non-empty node dir // early removes paths which do not match filter predicate filterPredicateOpt - .fold(partitionedDirs.map { case (path, partition) => path -> (partitions :+ partition) }) { - filterPredicate => - PartitionFilter.filterPartitionPaths( - filterPredicate = filterPredicate, - commonPartitions = partitions, - partitionedPaths = partitionedDirs - ) + .fold(partitionedDirs.map { case (path, partition) => path -> (partitions :+ partition) }) { filterPredicate => + PartitionFilter.filterPartitionPaths( + filterPredicate = filterPredicate, + commonPartitions = partitions, + partitionedPaths = partitionedDirs + ) } .map { case (subPath, partitions) => listPartitionedDirectory(fs, configuration, subPath, partitions, filterPredicateOpt) diff --git a/core/src/test/scala/com/github/mjakubowski84/parquet4s/IOOpsSpec.scala b/core/src/test/scala/com/github/mjakubowski84/parquet4s/IOOpsSpec.scala index be053a21..b37922a9 100644 --- a/core/src/test/scala/com/github/mjakubowski84/parquet4s/IOOpsSpec.scala +++ b/core/src/test/scala/com/github/mjakubowski84/parquet4s/IOOpsSpec.scala @@ -6,15 +6,14 @@ import org.scalatest.matchers.should.Matchers class IOOpsSpec extends AnyFlatSpec with Matchers with Inside with PartitionTestUtils { - "PartitionRegexp" should "match valid partition names and values" in { + "PartitionRegexp" should "match valid partition names and values" in forAll(ValidPartitionsTable) { case (name, value) => inside(s"$name=$value") { case IOOps.PartitionRegexp(`name`, `value`) => succeed } } - } - it should "not match invalid partition names and values" in { + it should "not match invalid partition names and values" in forAll(InvalidPartitionsTable) { case (name, value) => s"$name=$value" match { case IOOps.PartitionRegexp(capturedName, capturedValue) => @@ -26,6 +25,5 @@ class IOOpsSpec extends AnyFlatSpec with Matchers with Inside with PartitionTest succeed } } - } } diff --git a/core/src/test/scala/com/github/mjakubowski84/parquet4s/ParquetRecordDecoderSpec.scala b/core/src/test/scala/com/github/mjakubowski84/parquet4s/ParquetRecordDecoderSpec.scala index e5a276fd..bbc09ca4 100644 --- a/core/src/test/scala/com/github/mjakubowski84/parquet4s/ParquetRecordDecoderSpec.scala +++ b/core/src/test/scala/com/github/mjakubowski84/parquet4s/ParquetRecordDecoderSpec.scala @@ -249,9 +249,8 @@ class ParquetRecordDecoderSpec extends AnyFlatSpec with Matchers { } it should "throw exception when encountered null-value as a key" in { - a[DecodingException] should be thrownBy { + a[DecodingException] should be thrownBy decode[ContainsMapOfPrimitives](RowParquetRecord("map" -> MapParquetRecord(NullValue -> 1.value))) - } } it should "decode record containing map of optional primitives" in { diff --git a/fs2/src/test/scala/com/github/mjakubowski84/parquet4s/parquet/IoSpec.scala b/fs2/src/test/scala/com/github/mjakubowski84/parquet4s/parquet/IoSpec.scala index 8e3fbb0b..9b29b479 100644 --- a/fs2/src/test/scala/com/github/mjakubowski84/parquet4s/parquet/IoSpec.scala +++ b/fs2/src/test/scala/com/github/mjakubowski84/parquet4s/parquet/IoSpec.scala @@ -7,15 +7,14 @@ import org.scalatest.matchers.should.Matchers class IoSpec extends AnyFlatSpec with Matchers with Inside with PartitionTestUtils { - "PartitionRegexp" should "match valid partition names and values" in { + "PartitionRegexp" should "match valid partition names and values" in forAll(ValidPartitionsTable) { case (name, value) => inside(s"$name=$value") { case io.PartitionRegexp(`name`, `value`) => succeed } } - } - it should "not match invalid partition names and values" in { + it should "not match invalid partition names and values" in forAll(InvalidPartitionsTable) { case (name, value) => s"$name=$value" match { case io.PartitionRegexp(capturedName, capturedValue) => @@ -27,6 +26,5 @@ class IoSpec extends AnyFlatSpec with Matchers with Inside with PartitionTestUti succeed } } - } } diff --git a/project/Compilation.scala b/project/Compilation.scala index 9baa4c3c..e1544ea3 100644 --- a/project/Compilation.scala +++ b/project/Compilation.scala @@ -4,7 +4,7 @@ import sbt.CrossVersion object Compilation { lazy val compilationSettings = Seq( - scalacOptions ++= { + scalacOptions ++= Seq( "-encoding", "UTF-8", @@ -60,7 +60,6 @@ object Compilation { ) } } - } ) } diff --git a/s3Test/src/it/scala/com/github/mjakubowski84/parquet4s/s3/S3ItSpec.scala b/s3Test/src/it/scala/com/github/mjakubowski84/parquet4s/s3/S3ItSpec.scala index 33021ddf..7f959552 100644 --- a/s3Test/src/it/scala/com/github/mjakubowski84/parquet4s/s3/S3ItSpec.scala +++ b/s3Test/src/it/scala/com/github/mjakubowski84/parquet4s/s3/S3ItSpec.scala @@ -28,7 +28,7 @@ class S3ItSpec extends AnyFlatSpec with Matchers with TestContainerForAll { override def afterContainersStart(containers: LocalStackV2Container): Unit = containers.execInContainer("awslocal", "s3api", "create-bucket", "--bucket", bucket) - "Parquet4s" should "write and read data to/from S3" in { + "Parquet4s" should "write and read data to/from S3" in withContainers { s3Container => val configuration = new Configuration() @@ -43,6 +43,5 @@ class S3ItSpec extends AnyFlatSpec with Matchers with TestContainerForAll { _.toSeq should be(data) } } - } } diff --git a/scalapb/src/test/scala/com/github/mjakubowski84/parquet4s/Parquet4sScalaPBSpec.scala b/scalapb/src/test/scala/com/github/mjakubowski84/parquet4s/Parquet4sScalaPBSpec.scala index efdd181f..5ec02d7c 100644 --- a/scalapb/src/test/scala/com/github/mjakubowski84/parquet4s/Parquet4sScalaPBSpec.scala +++ b/scalapb/src/test/scala/com/github/mjakubowski84/parquet4s/Parquet4sScalaPBSpec.scala @@ -26,13 +26,11 @@ class Parquet4sScalaPBSpec extends AnyFlatSpec with Matchers { testWithData(i => Data(abc = Data.ABC.fromValue(i % 3))) } - it should "work with message types" in { + it should "work with message types" in testWithData(i => Data(inner = Some(Data.Inner(i.toString)))) - } - it should "work with unrecognized enum values" in { + it should "work with unrecognized enum values" in testWithData(i => Data(abc = Data.ABC.fromValue(i % 5))) - } it should "work with map types" in { testWithData(i => Data(map = Map("original" -> i, "doubled" -> 2 * i)))