Skip to content

Commit

Permalink
[Kernel][Parquet Writer] Fix an issue with writing decimal as binary (#…
Browse files Browse the repository at this point in the history
…3233)

## Description
The number of bytes needed to calculate the max buffer size needed when
writing the decimal type to Parquet is off by one.

Resolved #3152

## How was this patch tested?
Added unit tests that read and write decimals with various precision and
scales.
  • Loading branch information
vkorukanti authored Jun 7, 2024
1 parent 273f988 commit 5289a5e
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 6 deletions.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1717778521300,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"3","numOutputBytes":"9126"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.3.0-SNAPSHOT","txnId":"5e3bfa16-cf0f-4d40-ad7d-b6426a6b4b7a"}}
{"metaData":{"id":"7f750aff-9bf2-4e52-bfce-39811932da26","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"decimal_4_0\",\"type\":\"decimal(4,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_7_0\",\"type\":\"decimal(7,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_7_6\",\"type\":\"decimal(7,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_12_0\",\"type\":\"decimal(12,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_12_6\",\"type\":\"decimal(12,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_15_0\",\"type\":\"decimal(15,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_15_6\",\"type\":\"decimal(15,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_15_12\",\"type\":\"decimal(15,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_18_0\",\"type\":\"decimal(18,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_18_6\",\"type\":\"decimal(18,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_18_12\",\"type\":\"decimal(18,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_0\",\"type\":\"decimal(25,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_6\",\"type\":\"decimal(25,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_12\",\"type\":\"decimal(25,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_18\",\"type\":\"decimal(25,18)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_25_24\",\"type\":\"decimal(25,24)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_0\",\"type\":\"decimal(35,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_6\",\"type\":\"decimal(35,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_12\",\"type\":\"decimal(35,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_18\",\"type\":\"decimal(35,18)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_24\",\"type\":\"decimal(35,24)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_35_30\",\"type\":\"decimal(35,30)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_0\",\"type\":\"decimal(38,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_6\",\"type\":\"decimal(38,6)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_12\",\"type\":\"decimal(38,12)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_18\",\"type\":\"decimal(38,18)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_24\",\"type\":\"decimal(38,24)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_30\",\"type\":\"decimal(38,30)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_38_36\",\"type\":\"decimal(38,36)\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1717778519308}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00000-bb4b3e59-ddb9-4d26-beaf-de9554e14517-c000.snappy.parquet","partitionValues":{},"size":9126,"modificationTime":1717778521237,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"decimal_4_0\":-13,\"decimal_7_0\":0,\"decimal_12_0\":0,\"decimal_12_6\":-0.000098,\"decimal_15_0\":-157,\"decimal_15_6\":-3.346000,\"decimal_15_12\":-0.002162000000,\"decimal_18_0\":0,\"decimal_18_6\":-22641.000000,\"decimal_18_12\":-5.190000000000,\"decimal_25_0\":0,\"decimal_25_6\":-0.000013,\"decimal_25_12\":-3.1661E-8,\"decimal_25_18\":-24199.000000000000000000,\"decimal_35_0\":0,\"decimal_35_6\":-0.000161,\"decimal_35_12\":-2.59176E-7,\"decimal_35_18\":-1.36744000E-10,\"decimal_35_24\":-22827907.000000000000000000000000,\"decimal_35_30\":-32805.309000000000000000000000000000,\"decimal_38_0\":-17,\"decimal_38_6\":-0.027994,\"decimal_38_12\":-0.000024695819,\"decimal_38_18\":-4.614771000E-9,\"decimal_38_24\":-9.718032000000E-12,\"decimal_38_30\":-2.6626087000000000E-14,\"decimal_38_36\":-2.9546424000000000000E-17},\"maxValues\":{\"decimal_4_0\":4,\"decimal_7_0\":0,\"decimal_12_0\":0,\"decimal_12_6\":0.000062,\"decimal_15_0\":481,\"decimal_15_6\":3.302000,\"decimal_15_12\":0.001469000000,\"decimal_18_0\":0,\"decimal_18_6\":7998.000000,\"decimal_18_12\":10.994000000000,\"decimal_25_0\":0,\"decimal_25_6\":0.000021,\"decimal_25_12\":5.925E-9,\"decimal_25_18\":234942.000000000000000000,\"decimal_35_0\":0,\"decimal_35_6\":0.000161,\"decimal_35_12\":1.65519E-7,\"decimal_35_18\":1.52896000E-10,\"decimal_35_24\":14797356.000000000000000000000000,\"decimal_35_30\":8083.687000000000000000000000000000,\"decimal_38_0\":26,\"decimal_38_6\":0.021882,\"decimal_38_12\":0.000032950993,\"decimal_38_18\":1.2783803000E-8,\"decimal_38_24\":2.395564000000E-12,\"decimal_38_30\":2.9414203000000000E-14,\"decimal_38_36\":3.241836000000000000E-18},\"nullCount\":{\"decimal_4_0\":1,\"decimal_7_0\":1,\"decimal_7_6\":3,\"decimal_12_0\":1,\"decimal_12_6\":1,\"decimal_15_0\":1,\"decimal_15_6\":1,\"decimal_15_12\":1,\"decimal_18_0\":1,\"decimal_18_6\":1,\"decimal_18_12\":1,\"decimal_25_0\":1,\"decimal_25_6\":1,\"decimal_25_12\":1,\"decimal_25_18\":1,\"decimal_25_24\":3,\"decimal_35_0\":1,\"decimal_35_6\":1,\"decimal_35_12\":1,\"decimal_35_18\":1,\"decimal_35_24\":1,\"decimal_35_30\":1,\"decimal_38_0\":1,\"decimal_38_6\":1,\"decimal_38_12\":1,\"decimal_38_18\":1,\"decimal_38_24\":1,\"decimal_38_30\":1,\"decimal_38_36\":1}}"}}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package io.delta.golden

import java.io.File
import java.math.{BigDecimal => JBigDecimal}
import java.math.{BigInteger, BigDecimal => JBigDecimal}
import java.sql.Timestamp
import java.time.ZoneOffset.UTC
import java.time.LocalDateTime
import java.util.{Locale, TimeZone}
import java.util.{Locale, Random, TimeZone}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.implicitConversions

Expand Down Expand Up @@ -1234,6 +1235,53 @@ class GoldenTables extends QueryTest with SharedSparkSession {
}
}

generateGoldenTable("decimal-various-scale-precision") { tablePath =>
val fields = ArrayBuffer[StructField]()
Seq(0, 4, 7, 12, 15, 18, 25, 35, 38).foreach { precision =>
Seq.range(start = 0, end = precision, step = 6).foreach { scale =>
fields.append(
StructField(s"decimal_${precision}_${scale}", DecimalType(precision, scale)))
}
}

val schema = StructType(fields)

val random = new Random(27 /* seed */)
def generateRandomBigDecimal(precision: Int, scale: Int): JBigDecimal = {
// Generate a random BigInteger with the specified precision
val unscaledValue = new BigInteger(precision, random)

// Create a BigDecimal with the unscaled value and the specified scale
new JBigDecimal(unscaledValue, scale)
}

val rows = ArrayBuffer[Row]()
Seq.range(start = 0, end = 3).foreach { i =>
val rowValues = ArrayBuffer[BigDecimal]()
Seq(0, 4, 7, 12, 15, 18, 25, 35, 38).foreach { precision =>
Seq.range(start = 0, end = precision, step = 3).foreach { scale =>
i match {
case 0 =>
rowValues.append(null)
case 1 =>
// Generate a positive random BigDecimal with the specified precision and scale
rowValues.append(generateRandomBigDecimal(precision, scale))
case 2 =>
// Generate a negative random BigDecimal with the specified precision and scale
rowValues.append(generateRandomBigDecimal(precision, scale).negate())
}
}
}
rows.append(Row(rowValues: _*))
}

spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
.repartition(1)
.write
.format("delta")
.save(tablePath)
}

for (parquetFormat <- Seq("v1", "v2")) {
// PARQUET_1_0 doesn't support dictionary encoding for FIXED_LEN_BYTE_ARRAY (only PARQUET_2_0)
generateGoldenTable(s"parquet-decimal-dictionaries-$parquetFormat") { tablePath =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import static java.lang.String.format;

import org.apache.parquet.schema.*;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.Type.Repetition;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
Expand Down Expand Up @@ -54,7 +56,7 @@ class ParquetSchemaUtils {

static {
List<Integer> maxBytesPerPrecision = new ArrayList<>();
for (int i = 1; i <= 38; i++) {
for (int i = 0; i <= 38; i++) {
int numBytes = 1;
while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, i)) {
numBytes += 1;
Expand Down Expand Up @@ -205,18 +207,21 @@ private static Type toParquetType(
DecimalType decimalType = (DecimalType) dataType;
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
// DecimalType constructor already has checks to make sure the precision and scale are
// within the valid range. No need to check them again.

DecimalLogicalTypeAnnotation decimalAnnotation = decimalType(scale, precision);
if (precision <= DECIMAL_MAX_DIGITS_IN_INT) {
type = primitive(INT32, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.as(decimalAnnotation)
.named(name);
} else if (precision <= DECIMAL_MAX_DIGITS_IN_LONG) {
type = primitive(INT64, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.as(decimalAnnotation)
.named(name);
} else {
type = primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.as(decimalAnnotation)
.length(MAX_BYTES_PER_PRECISION.get(precision))
.named(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
}
}

test(s"end to end: reading decimal-various-scale-precision") {
val tablePath = goldenTablePath("decimal-various-scale-precision")
val expResults = spark.sql(s"SELECT * FROM delta.`$tablePath`")
.collect()
.map(TestRow(_))

checkTable(
path = goldenTablePath("decimal-various-scale-precision"),
expectedAnswer = expResults
)
}

//////////////////////////////////////////////////////////////////////////////////
// Table/Snapshot tests
//////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ class ParquetFileWriterSuite extends AnyFunSuite
),
4 // how many columns have the stats collected from given list above
)
},
// Decimal types with various precision and scales
Seq((10000, 1)).map {
case (targetFileSize, expParquetFileCount) =>
(
"write decimal various scales and precision (with stats)", // test name
"decimal-various-scale-precision",
targetFileSize,
expParquetFileCount,
3, /* expected number of rows written to Parquet files */
Option.empty[Predicate], // predicate for filtering what rows to write to parquet files
leafLevelPrimitiveColumns(
Seq.empty, tableSchema(goldenTablePath("decimal-various-scale-precision"))),
29 // how many columns have the stats collected from given list above
)
}
).flatten.foreach {
case (name, input, fileSize, expFileCount, expRowCount, predicate, statsCols, expStatsColCnt) =>
Expand Down

0 comments on commit 5289a5e

Please # to comment.