Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add Decimal support for In, InSet, AtLeastNNonNulls, GetArrayItem, GetStructField, and GenerateExec #1410

Merged
merged 4 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (Only literal arrays and the output of the array function are supported; missing nested DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><em>PS* (Only literal arrays and the output of the array function are supported; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -1601,13 +1601,13 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -4925,7 +4925,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -4963,13 +4963,13 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -5191,7 +5191,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td> </td>
</tr>
<tr>
Expand All @@ -5206,13 +5206,13 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -5893,7 +5893,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -5914,7 +5914,7 @@ Accelerator support is described below.
<td><em>PS (Literal value only)</em></td>
<td><em>PS* (Literal value only)</em></td>
<td><em>PS (Literal value only)</em></td>
<td><b>NS</b></td>
<td><em>PS* (Literal value only)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -6025,7 +6025,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def test_array_index(data_gen):
'a[null]',
'a[3]',
'a[50]',
'a[-1]'))
'a[-1]'),
conf=allow_negative_scale_of_decimal_conf)

# Once we support arrays as literals then we can support a[null] for
# all array gens. See test_array_index for more info
Expand Down
43 changes: 27 additions & 16 deletions integration_tests/src/main/python/cmp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,39 +105,44 @@ def test_gte(data_gen):
f.col('b') >= f.lit(None).cast(data_type),
f.col('a') >= f.col('b')), conf=allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_isnull(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(
f.isnull(f.col('a'))))
f.isnull(f.col('a'))),
conf=allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', [FloatGen(), DoubleGen()], ids=idfn)
def test_isnan(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(
f.isnan(f.col('a'))))

@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_dropna_any(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).dropna())
lambda spark : binary_op_df(spark, data_gen).dropna(),
conf=allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_dropna_all(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).dropna(how='all'))
lambda spark : binary_op_df(spark, data_gen).dropna(how='all'),
conf=allow_negative_scale_of_decimal_conf)

#dropna is really a filter along with a test for null, but lets do an explicit filter test too
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_filter(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a')))
lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a')),
conf=allow_negative_scale_of_decimal_conf)

# coalesce batch happens after a filter, but only if something else happens on the GPU after that
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_filter_with_project(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : two_col_df(spark, BooleanGen(), data_gen).filter(f.col('a')).selectExpr('*', 'a as a2'))
lambda spark : two_col_df(spark, BooleanGen(), data_gen).filter(f.col('a')).selectExpr('*', 'a as a2'),
conf=allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('expr', [f.lit(True), f.lit(False), f.lit(None).cast('boolean')], ids=idfn)
def test_filter_with_lit(expr):
Expand All @@ -146,21 +151,27 @@ def test_filter_with_lit(expr):

# Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf
# This is to test entries under that value.
@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn)
def test_in(data_gen):
# nulls are not supported for in on the GPU yet
num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) - 1
scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen)))
# we have to make the scalars in a session so negative scales in decimals are supported
scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))),
conf=allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)))
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)),
conf=allow_negative_scale_of_decimal_conf)

# Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf
# This is to test entries over that value.
@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn)
def test_in_set(data_gen):
# nulls are not supported for in on the GPU yet
num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) + 1
scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen)))
# we have to make the scalars in a session so negative scales in decimals are supported
scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))),
conf=allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)))
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)),
conf=allow_negative_scale_of_decimal_conf)

13 changes: 9 additions & 4 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,15 @@ def gen_df(spark, data_gen, length=2048, seed=0):
data = [src.gen() for index in range(0, length)]
return spark.createDataFrame(data, src.data_type)

def _mark_as_lit(data):
def _mark_as_lit(data, data_type = None):
# Sadly you cannot create a literal from just an array in pyspark
if isinstance(data, list):
return f.array([_mark_as_lit(x) for x in data])
return f.lit(data)
if data_type is None:
return f.lit(data)
else:
# lit does not take a data type so we might have to cast it
return f.lit(data).cast(data_type)

def _gen_scalars_common(data_gen, count, seed=0):
if isinstance(data_gen, list):
Expand All @@ -614,7 +618,8 @@ def gen_scalars(data_gen, count, seed=0, force_no_nulls=False):
if force_no_nulls:
assert(not isinstance(data_gen, NullGen))
src = _gen_scalars_common(data_gen, count, seed=seed)
return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls)) for i in range(0, count))
data_type = src.data_type
return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count))

def gen_scalar(data_gen, seed=0, force_no_nulls=False):
"""Generate a single scalar value."""
Expand Down Expand Up @@ -761,7 +766,7 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):

boolean_gens = [boolean_gen]

single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens]
single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + decimal_gens + [null_gen]]

# Be careful to not make these too large of data generation takes for ever
# This is only a few nested array gens, because nesting can be very deep
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def four_op_df(spark, gen, length=2048, seed=0):
('d', gen)], nullable=False), length=length, seed=seed)

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
FloatGen(), DoubleGen(), BooleanGen(), DateGen(), TimestampGen()]
FloatGen(), DoubleGen(), BooleanGen(), DateGen(), TimestampGen(),
decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision,
decimal_gen_64bit]

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
@ignore_order(local=True)
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/struct_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
@pytest.mark.parametrize('data_gen', [StructGen([["first", boolean_gen], ["second", byte_gen], ["third", float_gen]]),
StructGen([["first", short_gen], ["second", int_gen], ["third", long_gen]]),
StructGen([["first", double_gen], ["second", date_gen], ["third", timestamp_gen]]),
StructGen([["first", string_gen], ["second", ArrayGen(byte_gen)], ["third", simple_string_to_string_map_gen]])], ids=idfn)
StructGen([["first", string_gen], ["second", ArrayGen(byte_gen)], ["third", simple_string_to_string_map_gen]]),
StructGen([["first", decimal_gen_default], ["second", decimal_gen_scale_precision], ["third", decimal_gen_same_scale_precision]])], ids=idfn)
def test_struct_get_item(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr(
Expand Down
15 changes: 12 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,18 @@ case class GpuInSet(
ColumnVector.timestampMicroSecondsFromBoxedLongs(timestamps:_*)
case StringType =>
val strings = values.asInstanceOf[Seq[UTF8String]]
val builder = HostColumnVector.builder(DType.STRING, strings.size)
strings.foreach(s => builder.appendUTF8String(s.getBytes))
builder.buildAndPutOnDevice()
withResource(HostColumnVector.builder(DType.STRING, strings.size)) { builder =>
strings.foreach(s => builder.appendUTF8String(s.getBytes))
builder.buildAndPutOnDevice()
}
case t: DecimalType =>
val decs = values.asInstanceOf[Seq[Decimal]]
// When we support DECIMAL32 this will need to change to support that
withResource(HostColumnVector.builder(DType.create(DType.DTypeEnum.DECIMAL64, - t.scale),
decs.size)) { builder =>
decs.foreach(d => builder.appendUnscaledDecimal(d.toUnscaledLong))
builder.buildAndPutOnDevice()
}
case _ =>
throw new UnsupportedOperationException(s"Unsupported list type: ${child.dataType}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ object GpuOverrides {
"Checks if number of non null/Nan values is greater than a given value",
ExprChecks.projectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN,
repeatingParamCheck = Some(RepeatingParamCheck("input",
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.MAP + TypeSig.ARRAY +
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.MAP + TypeSig.ARRAY +
TypeSig.STRUCT).nested(),
TypeSig.all))),
(a, conf, p, r) => new ExprMeta[AtLeastNNonNulls](a, conf, p, r) {
Expand Down Expand Up @@ -1466,8 +1466,10 @@ object GpuOverrides {
expr[In](
"IN operator",
ExprChecks.projectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN,
Seq(ParamCheck("value", TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all)),
Some(RepeatingParamCheck("list", TypeSig.commonCudfTypes.withAllLit(), TypeSig.all))),
Seq(ParamCheck("value", TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
TypeSig.all)),
Some(RepeatingParamCheck("list", (TypeSig.commonCudfTypes + TypeSig.DECIMAL).withAllLit(),
TypeSig.all))),
(in, conf, p, r) => new ExprMeta[In](in, conf, p, r) {
override def tagExprForGpu(): Unit = {
val unaliased = in.list.map(extractLit)
Expand All @@ -1485,7 +1487,7 @@ object GpuOverrides {
expr[InSet](
"INSET operator",
ExprChecks.unaryProjectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN,
TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(in, conf, p, r) => new ExprMeta[InSet](in, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (in.hset.contains(null)) {
Expand Down Expand Up @@ -1864,10 +1866,11 @@ object GpuOverrides {
expr[GetStructField](
"Gets the named field of the struct",
ExprChecks.unaryProjectNotLambda(
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(),
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP + TypeSig.NULL +
TypeSig.DECIMAL).nested(),
TypeSig.all,
TypeSig.STRUCT.nested(TypeSig.commonCudfTypes + TypeSig.ARRAY +
TypeSig.STRUCT + TypeSig.MAP),
TypeSig.STRUCT + TypeSig.MAP + TypeSig.NULL + TypeSig.DECIMAL),
TypeSig.STRUCT.nested(TypeSig.all)),
(expr, conf, p, r) => new UnaryExprMeta[GetStructField](expr, conf, p, r) {
override def convertToGpu(arr: Expression): GpuExpression =
Expand All @@ -1876,10 +1879,11 @@ object GpuOverrides {
expr[GetArrayItem](
"Gets the field at `ordinal` in the Array",
ExprChecks.binaryProjectNotLambda(
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.NULL).nested(),
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.NULL +
TypeSig.DECIMAL + TypeSig.MAP).nested(),
TypeSig.all,
("array", TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.ARRAY +
TypeSig.STRUCT + TypeSig.NULL),
TypeSig.STRUCT + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.MAP),
TypeSig.ARRAY.nested(TypeSig.all)),
("ordinal", TypeSig.lit(TypeEnum.INT), TypeSig.INT)),
(in, conf, p, r) => new GpuGetArrayItemMeta(in, conf, p, r)),
Expand Down Expand Up @@ -2165,10 +2169,10 @@ object GpuOverrides {
exec[GenerateExec] (
"The backend for operations that generate more output rows than input rows like explode",
ExecChecks(
TypeSig.commonCudfTypes
TypeSig.commonCudfTypes + TypeSig.DECIMAL
.withPsNote(TypeEnum.ARRAY,
"Only literal arrays and the output of the array function are supported")
.nested(TypeSig.commonCudfTypes),
.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL),
TypeSig.all),
(gen, conf, p, r) => new GpuGenerateExecSparkPlanMeta(gen, conf, p, r)),
exec[ProjectExec](
Expand Down