diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 7b7bb80c30b..d1a4f0797d7 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -233,11 +233,11 @@ Accelerator supports are described below. S S* S +S* NS NS NS -NS -PS* (Only literal arrays and the output of the array function are supported; missing nested DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (Only literal arrays and the output of the array function are supported; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS NS NS @@ -1601,13 +1601,13 @@ Accelerator support is described below. S S* S -NS +S* S NS NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT) -PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT) -PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -4925,7 +4925,7 @@ Accelerator support is described below. -PS* (missing nested DECIMAL, BINARY, CALENDAR, MAP, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) @@ -4963,13 +4963,13 @@ Accelerator support is described below. S S* S -NS +S* S NS NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, MAP, UDT) -NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, MAP, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -5191,7 +5191,7 @@ Accelerator support is described below. -PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) @@ -5206,13 +5206,13 @@ Accelerator support is described below. S S* S +S* +S NS NS -NS -NS -PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT) -PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT) -PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) NS @@ -5893,7 +5893,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS @@ -5914,7 +5914,7 @@ Accelerator support is described below. PS (Literal value only) PS* (Literal value only) PS (Literal value only) -NS +PS* (Literal value only) NS NS NS @@ -6025,7 +6025,7 @@ Accelerator support is described below. S S* S -NS +S* S NS NS diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index 0603e867f74..309de60ae85 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -33,7 +33,8 @@ def test_array_index(data_gen): 'a[null]', 'a[3]', 'a[50]', - 'a[-1]')) + 'a[-1]'), + conf=allow_negative_scale_of_decimal_conf) # Once we support arrays as literals then we can support a[null] for # all array gens. See test_array_index for more info diff --git a/integration_tests/src/main/python/cmp_test.py b/integration_tests/src/main/python/cmp_test.py index dc9c6893394..f5ff5c9cff7 100644 --- a/integration_tests/src/main/python/cmp_test.py +++ b/integration_tests/src/main/python/cmp_test.py @@ -105,11 +105,12 @@ def test_gte(data_gen): f.col('b') >= f.lit(None).cast(data_type), f.col('a') >= f.col('b')), conf=allow_negative_scale_of_decimal_conf) -@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_isnull(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select( - f.isnull(f.col('a')))) + f.isnull(f.col('a'))), + conf=allow_negative_scale_of_decimal_conf) @pytest.mark.parametrize('data_gen', [FloatGen(), DoubleGen()], ids=idfn) def test_isnan(data_gen): @@ -117,27 +118,31 @@ def test_isnan(data_gen): lambda spark : unary_op_df(spark, data_gen).select( f.isnan(f.col('a')))) -@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_dropna_any(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : binary_op_df(spark, data_gen).dropna()) + lambda spark : binary_op_df(spark, data_gen).dropna(), + conf=allow_negative_scale_of_decimal_conf) -@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_dropna_all(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : binary_op_df(spark, data_gen).dropna(how='all')) + lambda spark : binary_op_df(spark, data_gen).dropna(how='all'), + conf=allow_negative_scale_of_decimal_conf) #dropna is really a filter along with a test for null, but lets do an explicit filter test too -@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_filter(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a'))) + lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a')), + conf=allow_negative_scale_of_decimal_conf) # coalesce batch happens after a filter, but only if something else happens on the GPU after that -@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_filter_with_project(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : two_col_df(spark, BooleanGen(), data_gen).filter(f.col('a')).selectExpr('*', 'a as a2')) + lambda spark : two_col_df(spark, BooleanGen(), data_gen).filter(f.col('a')).selectExpr('*', 'a as a2'), + conf=allow_negative_scale_of_decimal_conf) @pytest.mark.parametrize('expr', [f.lit(True), f.lit(False), f.lit(None).cast('boolean')], ids=idfn) def test_filter_with_lit(expr): @@ -146,21 +151,27 @@ def test_filter_with_lit(expr): # Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf # This is to test entries under that value. -@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn) def test_in(data_gen): # nulls are not supported for in on the GPU yet num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) - 1 - scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))) + # we have to make the scalars in a session so negative scales in decimals are supported + scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))), + conf=allow_negative_scale_of_decimal_conf) assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars))) + lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)), + conf=allow_negative_scale_of_decimal_conf) # Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf # This is to test entries over that value. -@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn) def test_in_set(data_gen): # nulls are not supported for in on the GPU yet num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) + 1 - scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))) + # we have to make the scalars in a session so negative scales in decimals are supported + scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))), + conf=allow_negative_scale_of_decimal_conf) assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars))) + lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)), + conf=allow_negative_scale_of_decimal_conf) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index f3b27bb4b9f..655ede78500 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -589,11 +589,15 @@ def gen_df(spark, data_gen, length=2048, seed=0): data = [src.gen() for index in range(0, length)] return spark.createDataFrame(data, src.data_type) -def _mark_as_lit(data): +def _mark_as_lit(data, data_type = None): # Sadly you cannot create a literal from just an array in pyspark if isinstance(data, list): return f.array([_mark_as_lit(x) for x in data]) - return f.lit(data) + if data_type is None: + return f.lit(data) + else: + # lit does not take a data type so we might have to cast it + return f.lit(data).cast(data_type) def _gen_scalars_common(data_gen, count, seed=0): if isinstance(data_gen, list): @@ -614,7 +618,8 @@ def gen_scalars(data_gen, count, seed=0, force_no_nulls=False): if force_no_nulls: assert(not isinstance(data_gen, NullGen)) src = _gen_scalars_common(data_gen, count, seed=seed) - return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls)) for i in range(0, count)) + data_type = src.data_type + return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count)) def gen_scalar(data_gen, seed=0, force_no_nulls=False): """Generate a single scalar value.""" @@ -761,7 +766,7 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): boolean_gens = [boolean_gen] -single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens] +single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + decimal_gens + [null_gen]] # Be careful to not make these too large of data generation takes for ever # This is only a few nested array gens, because nesting can be very deep diff --git a/integration_tests/src/main/python/generate_expr_test.py b/integration_tests/src/main/python/generate_expr_test.py index dc01835baea..b46de93582e 100644 --- a/integration_tests/src/main/python/generate_expr_test.py +++ b/integration_tests/src/main/python/generate_expr_test.py @@ -29,7 +29,9 @@ def four_op_df(spark, gen, length=2048, seed=0): ('d', gen)], nullable=False), length=length, seed=seed) all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), - FloatGen(), DoubleGen(), BooleanGen(), DateGen(), TimestampGen()] + FloatGen(), DoubleGen(), BooleanGen(), DateGen(), TimestampGen(), + decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, + decimal_gen_64bit] #sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 @ignore_order(local=True) diff --git a/integration_tests/src/main/python/struct_test.py b/integration_tests/src/main/python/struct_test.py index e53f3214df1..604572c0f6e 100644 --- a/integration_tests/src/main/python/struct_test.py +++ b/integration_tests/src/main/python/struct_test.py @@ -23,7 +23,8 @@ @pytest.mark.parametrize('data_gen', [StructGen([["first", boolean_gen], ["second", byte_gen], ["third", float_gen]]), StructGen([["first", short_gen], ["second", int_gen], ["third", long_gen]]), StructGen([["first", double_gen], ["second", date_gen], ["third", timestamp_gen]]), - StructGen([["first", string_gen], ["second", ArrayGen(byte_gen)], ["third", simple_string_to_string_map_gen]])], ids=idfn) + StructGen([["first", string_gen], ["second", ArrayGen(byte_gen)], ["third", simple_string_to_string_map_gen]]), + StructGen([["first", decimal_gen_default], ["second", decimal_gen_scale_precision], ["third", decimal_gen_same_scale_precision]])], ids=idfn) def test_struct_get_item(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala index a4de80295af..31c93290d43 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala @@ -80,9 +80,18 @@ case class GpuInSet( ColumnVector.timestampMicroSecondsFromBoxedLongs(timestamps:_*) case StringType => val strings = values.asInstanceOf[Seq[UTF8String]] - val builder = HostColumnVector.builder(DType.STRING, strings.size) - strings.foreach(s => builder.appendUTF8String(s.getBytes)) - builder.buildAndPutOnDevice() + withResource(HostColumnVector.builder(DType.STRING, strings.size)) { builder => + strings.foreach(s => builder.appendUTF8String(s.getBytes)) + builder.buildAndPutOnDevice() + } + case t: DecimalType => + val decs = values.asInstanceOf[Seq[Decimal]] + // When we support DECIMAL32 this will need to change to support that + withResource(HostColumnVector.builder(DType.create(DType.DTypeEnum.DECIMAL64, - t.scale), + decs.size)) { builder => + decs.foreach(d => builder.appendUnscaledDecimal(d.toUnscaledLong)) + builder.buildAndPutOnDevice() + } case _ => throw new UnsupportedOperationException(s"Unsupported list type: ${child.dataType}") } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index b0aff542ff5..170437a4dfc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -969,7 +969,7 @@ object GpuOverrides { "Checks if number of non null/Nan values is greater than a given value", ExprChecks.projectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN, repeatingParamCheck = Some(RepeatingParamCheck("input", - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.MAP + TypeSig.ARRAY + + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT).nested(), TypeSig.all))), (a, conf, p, r) => new ExprMeta[AtLeastNNonNulls](a, conf, p, r) { @@ -1466,8 +1466,10 @@ object GpuOverrides { expr[In]( "IN operator", ExprChecks.projectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN, - Seq(ParamCheck("value", TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all)), - Some(RepeatingParamCheck("list", TypeSig.commonCudfTypes.withAllLit(), TypeSig.all))), + Seq(ParamCheck("value", TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + TypeSig.all)), + Some(RepeatingParamCheck("list", (TypeSig.commonCudfTypes + TypeSig.DECIMAL).withAllLit(), + TypeSig.all))), (in, conf, p, r) => new ExprMeta[In](in, conf, p, r) { override def tagExprForGpu(): Unit = { val unaliased = in.list.map(extractLit) @@ -1485,7 +1487,7 @@ object GpuOverrides { expr[InSet]( "INSET operator", ExprChecks.unaryProjectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN, - TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all), + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all), (in, conf, p, r) => new ExprMeta[InSet](in, conf, p, r) { override def tagExprForGpu(): Unit = { if (in.hset.contains(null)) { @@ -1864,10 +1866,11 @@ object GpuOverrides { expr[GetStructField]( "Gets the named field of the struct", ExprChecks.unaryProjectNotLambda( - (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(), + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP + TypeSig.NULL + + TypeSig.DECIMAL).nested(), TypeSig.all, TypeSig.STRUCT.nested(TypeSig.commonCudfTypes + TypeSig.ARRAY + - TypeSig.STRUCT + TypeSig.MAP), + TypeSig.STRUCT + TypeSig.MAP + TypeSig.NULL + TypeSig.DECIMAL), TypeSig.STRUCT.nested(TypeSig.all)), (expr, conf, p, r) => new UnaryExprMeta[GetStructField](expr, conf, p, r) { override def convertToGpu(arr: Expression): GpuExpression = @@ -1876,10 +1879,11 @@ object GpuOverrides { expr[GetArrayItem]( "Gets the field at `ordinal` in the Array", ExprChecks.binaryProjectNotLambda( - (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.NULL).nested(), + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.NULL + + TypeSig.DECIMAL + TypeSig.MAP).nested(), TypeSig.all, ("array", TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.ARRAY + - TypeSig.STRUCT + TypeSig.NULL), + TypeSig.STRUCT + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.MAP), TypeSig.ARRAY.nested(TypeSig.all)), ("ordinal", TypeSig.lit(TypeEnum.INT), TypeSig.INT)), (in, conf, p, r) => new GpuGetArrayItemMeta(in, conf, p, r)), @@ -2165,10 +2169,10 @@ object GpuOverrides { exec[GenerateExec] ( "The backend for operations that generate more output rows than input rows like explode", ExecChecks( - TypeSig.commonCudfTypes + TypeSig.commonCudfTypes + TypeSig.DECIMAL .withPsNote(TypeEnum.ARRAY, "Only literal arrays and the output of the array function are supported") - .nested(TypeSig.commonCudfTypes), + .nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL), TypeSig.all), (gen, conf, p, r) => new GpuGenerateExecSparkPlanMeta(gen, conf, p, r)), exec[ProjectExec](