From 258d4558000e2165e4452c6b9e56a4c342fb28a3 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Mon, 21 Mar 2022 14:24:05 +0800 Subject: [PATCH 1/2] fix Signed-off-by: sperlingxx --- .../src/main/python/hash_aggregate_test.py | 19 +++---------------- .../nvidia/spark/rapids/GpuOverrides.scala | 4 ++-- .../spark/sql/rapids/AggregateFunctions.scala | 6 +++--- 3 files changed, 8 insertions(+), 21 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index c59225ddc09..379ec208acc 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -588,7 +588,8 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf): _repeat_agg_column_for_collect_set_op = [ RepeatSeqGen(all_basic_struct_gen, length=15), - RepeatSeqGen(StructGen([['child0', all_basic_struct_gen]]), length=15)] + RepeatSeqGen(StructGen([ + ['c0', all_basic_struct_gen], ['c1', int_gen]]), length=15)] _gen_data_for_collect_set_op_for_unique_group_by_key = [[ ('a', LongRangeGen()), @@ -656,25 +657,11 @@ def test_hash_groupby_collect_set(data_gen): @ignore_order(local=True) @incompat @pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op, ids=idfn) -@pytest.mark.xfail(reason="the result order from collect-set can not be ensured for CPU and GPU." - " We need to enable this after SortArray has supported on nested types." - " See https://github.com/NVIDIA/spark-rapids/issues/3715") def test_hash_groupby_collect_set_on_nested_type(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: gen_df(spark, data_gen, length=100) .groupby('a') - .agg(f.sort_array(f.collect_set('b')), f.count('b'))) - -# After https://github.com/NVIDIA/spark-rapids/issues/3715 is fixed, we should remove this test case -@approximate_float -@ignore_order(local=True) -@incompat -@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_for_unique_group_by_key, ids=idfn) -def test_hash_groupby_collect_set_on_nested_type_for_unique_group_by(data_gen): - assert_gpu_and_cpu_are_equal_collect( - lambda spark: gen_df(spark, data_gen, length=100) - .groupby('a') - .agg(f.collect_set('b'))) + .agg(f.sort_array(f.collect_set('b')))) @approximate_float @ignore_order(local=True) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a94390349c2..00ac7ffa878 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3203,8 +3203,8 @@ object GpuOverrides extends Logging { expr[CollectSet]( "Collect a set of unique elements, not supported in reduction", // GpuCollectSet is not yet supported in Reduction context. - // Compared to CollectList, StructType is NOT in GpuCollectSet because underlying - // method drop_list_duplicates doesn't support nested types. + // Compared to CollectList, ArrayType and MapType are NOT supported in GpuCollectSet + // because underlying cuDF operator drop_list_duplicates doesn't support LIST type. ExprChecks.aggNotReduction( TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.NULL + TypeSig.STRUCT), diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 1e916ac1fc3..b7bf51b7caa 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf -import ai.rapids.cudf.{Aggregation128Utils, BinaryOp, ColumnVector, DType, GroupByAggregation, GroupByScanAggregation, NullPolicy, ReductionAggregation, ReplacePolicy, RollingAggregation, RollingAggregationOnColumn, Scalar, ScanAggregation} +import ai.rapids.cudf.{Aggregation128Utils, BinaryOp, ColumnVector, DType, GroupByAggregation, GroupByScanAggregation, NaNEquality, NullEquality, NullPolicy, ReductionAggregation, ReplacePolicy, RollingAggregation, RollingAggregationOnColumn, Scalar, ScanAggregation} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.{GpuDeterministicFirstLastCollectShim, ShimExpression, ShimUnaryExpression} @@ -372,7 +372,7 @@ class CudfCollectSet(override val dataType: DataType) extends CudfAggregate { override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = _ => throw new UnsupportedOperationException("CollectSet is not yet supported in reduction") override lazy val groupByAggregate: GroupByAggregation = - GroupByAggregation.collectSet() + GroupByAggregation.collectSet(NullPolicy.EXCLUDE, NullEquality.EQUAL, NaNEquality.UNEQUAL) override val name: String = "CudfCollectSet" } @@ -380,7 +380,7 @@ class CudfMergeSets(override val dataType: DataType) extends CudfAggregate { override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = _ => throw new UnsupportedOperationException("CudfMergeSets is not yet supported in reduction") override lazy val groupByAggregate: GroupByAggregation = - GroupByAggregation.mergeSets() + GroupByAggregation.mergeSets(NullEquality.EQUAL, NaNEquality.UNEQUAL) override val name: String = "CudfMergeSets" } From 0f1ab85f4d5af18706528f5a884366b644f114ec Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Mon, 21 Mar 2022 15:34:25 +0800 Subject: [PATCH 2/2] update --- integration_tests/src/main/python/hash_aggregate_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 379ec208acc..ccb5d605fbb 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -580,6 +580,7 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf): _repeat_agg_column_for_collect_list_op = [ RepeatSeqGen(ArrayGen(int_gen), length=15), RepeatSeqGen(all_basic_struct_gen, length=15), + RepeatSeqGen(StructGen([['c0', all_basic_struct_gen]]), length=15), RepeatSeqGen(simple_string_to_string_map_gen, length=15)] _gen_data_for_collect_list_op = _full_gen_data_for_collect_op + [[ @@ -591,10 +592,6 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf): RepeatSeqGen(StructGen([ ['c0', all_basic_struct_gen], ['c1', int_gen]]), length=15)] -_gen_data_for_collect_set_op_for_unique_group_by_key = [[ - ('a', LongRangeGen()), - ('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op] - _gen_data_for_collect_set_op = [[ ('a', RepeatSeqGen(LongGen(), length=20)), ('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op]