diff --git a/jenkins/deploy.sh b/jenkins/deploy.sh index 61dda1d367b..a748722ec70 100755 --- a/jenkins/deploy.sh +++ b/jenkins/deploy.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,6 +36,8 @@ SIGN_FILE=$1 DATABRICKS=$2 VERSIONS_BUILT=$3 +export M2DIR=${M2DIR:-"$WORKSPACE/.m2"} + ###### Build the path of jar(s) to be deployed ###### cd $WORKSPACE @@ -96,3 +98,17 @@ TOOL_DOC_JARS="-Dsources=${TOOL_FPATH}-sources.jar -Djavadoc=${TOOL_FPATH}-javad $DEPLOY_CMD -Durl=$SERVER_URL -DrepositoryId=$SERVER_ID \ $TOOL_DOC_JARS \ -Dfile=$TOOL_FPATH.jar -DpomFile=${TOOL_PL}/pom.xml + +###### Deploy Spark 2.x explain meta jar ###### +SPARK2_PL=${SPARK2_PL:-"spark2-sql-plugin"} +SPARK2_ART_ID=`mvn help:evaluate -q -pl $SPARK2_PL -Dexpression=project.artifactId -DforceStdout -Dbuildver=24X` +SPARK2_ART_VER=`mvn help:evaluate -q -pl $SPARK2_PL -Dexpression=project.version -DforceStdout -Dbuildver=24X` +SPARK2_FPATH="$M2DIR/repository/com/nvidia/$SPARK2_ART_ID/$SPARK2_ART_VER/$SPARK2_ART_ID-$SPARK2_ART_VER" +SPARK2_DOC_JARS="-Dsources=${SPARK2_FPATH}-sources.jar -Djavadoc=${SPARK2_FPATH}-javadoc.jar" +# a bit ugly but just hardcode to spark24 for now since only version supported +SPARK2_CLASSIFIER='spark24' +SPARK2_CLASSIFIER_JAR="{$SPARK2_FPATH}-${SPARK2_CLASSIFIER}.jar" +$DEPLOY_CMD -Durl=$SERVER_URL -DrepositoryId=$SERVER_ID \ + $SPARK2_DOC_JARS \ + -Dclassifier=$SPARK2_CLASSIFIER \ + -Dfile=$SPARK2_CLASSIFIER_JAR -DpomFile=${SPARK2_PL}/pom.xml diff --git a/scripts/rundiffspark2.sh b/scripts/rundiffspark2.sh index 4dae595d630..19955e3afdc 100755 --- a/scripts/rundiffspark2.sh +++ b/scripts/rundiffspark2.sh @@ -25,44 +25,59 @@ # just using interface, and we don't really expect them to use it on 2.x so just skip diffing # ../spark2-sql-plugin/src/main/java/com/nvidia/spark/RapidsUDF.java +# If this script fails then a developer should do something like: +# 1. Look at each file with a diff output from the script +# 2. Look at the commits for that file in sql-plugin module and see what changed dealing +# with the metadata. +# 3. update the corresponding spark2-sql-plugin file to pick up the meta changes if necessary +# 4. Rerun the diff script: cd scripts && ./rundiffspark2.sh +# 5. If there is still diffs update the diff file in spark2diffs/ corresponding to the +# changed file. +# Generally the way I do this is find the commands below for the file that changed +# and run them manually. If the diff looks ok then just copy the .newdiff file to +# the diff file in the spark2diffs directory. +# + +set -e + echo "Done running Diffs of spark2 files" tmp_dir=$(mktemp -d -t spark2diff-XXXXXXXXXX) echo "Using temporary directory: $tmp_dir" -diff ../sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala > $tmp_dir/GpuHiveOverrides.newdiff +diff ../sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala > $tmp_dir/GpuHiveOverrides.newdiff || true if [[ $(diff spark2diffs/GpuHiveOverrides.diff $tmp_dir/GpuHiveOverrides.newdiff) ]]; then echo "check diff for ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala" fi sed -n '/class GpuBroadcastNestedLoopJoinMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinMeta.scala > $tmp_dir/GpuBroadcastNestedLoopJoinMeta_new.out sed -n '/class GpuBroadcastNestedLoopJoinMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala > $tmp_dir/GpuBroadcastNestedLoopJoinMeta_old.out -diff $tmp_dir/GpuBroadcastNestedLoopJoinMeta_new.out $tmp_dir/GpuBroadcastNestedLoopJoinMeta_old.out > $tmp_dir/GpuBroadcastNestedLoopJoinMeta.newdiff +diff $tmp_dir/GpuBroadcastNestedLoopJoinMeta_new.out $tmp_dir/GpuBroadcastNestedLoopJoinMeta_old.out > $tmp_dir/GpuBroadcastNestedLoopJoinMeta.newdiff || true diff -c spark2diffs/GpuBroadcastNestedLoopJoinMeta.diff $tmp_dir/GpuBroadcastNestedLoopJoinMeta.newdiff sed -n '/object JoinTypeChecks/,/def extractTopLevelAttributes/{/def extractTopLevelAttributes/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala > $tmp_dir/GpuHashJoin_new.out sed -n '/object JoinTypeChecks/,/def extractTopLevelAttributes/{/def extractTopLevelAttributes/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala > $tmp_dir/GpuHashJoin_old.out -diff $tmp_dir/GpuHashJoin_new.out $tmp_dir/GpuHashJoin_old.out > $tmp_dir/GpuHashJoin.newdiff +diff $tmp_dir/GpuHashJoin_new.out $tmp_dir/GpuHashJoin_old.out > $tmp_dir/GpuHashJoin.newdiff || true diff -c spark2diffs/GpuHashJoin.diff $tmp_dir/GpuHashJoin.newdiff sed -n '/class GpuShuffleMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleMeta.scala > $tmp_dir/GpuShuffleMeta_new.out sed -n '/class GpuShuffleMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala > $tmp_dir/GpuShuffleMeta_old.out -diff $tmp_dir/GpuShuffleMeta_new.out $tmp_dir/GpuShuffleMeta_old.out > $tmp_dir/GpuShuffleMeta.newdiff +diff $tmp_dir/GpuShuffleMeta_new.out $tmp_dir/GpuShuffleMeta_old.out > $tmp_dir/GpuShuffleMeta.newdiff || true diff -c spark2diffs/GpuShuffleMeta.diff $tmp_dir/GpuShuffleMeta.newdiff sed -n '/class GpuBroadcastMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExecMeta.scala > $tmp_dir/GpuBroadcastExchangeExecMeta_new.out sed -n '/class GpuBroadcastMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala > $tmp_dir/GpuBroadcastExchangeExecMeta_old.out -diff $tmp_dir/GpuBroadcastExchangeExecMeta_new.out $tmp_dir/GpuBroadcastExchangeExecMeta_old.out > $tmp_dir/GpuBroadcastExchangeExecMeta.newdiff +diff $tmp_dir/GpuBroadcastExchangeExecMeta_new.out $tmp_dir/GpuBroadcastExchangeExecMeta_old.out > $tmp_dir/GpuBroadcastExchangeExecMeta.newdiff || true diff -c spark2diffs/GpuBroadcastExchangeExecMeta.diff $tmp_dir/GpuBroadcastExchangeExecMeta.newdiff sed -n '/abstract class UnixTimeExprMeta/,/sealed trait TimeParserPolicy/{/sealed trait TimeParserPolicy/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressionsMeta.scala > $tmp_dir/UnixTimeExprMeta_new.out sed -n '/abstract class UnixTimeExprMeta/,/sealed trait TimeParserPolicy/{/sealed trait TimeParserPolicy/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala > $tmp_dir/UnixTimeExprMeta_old.out -diff $tmp_dir/UnixTimeExprMeta_new.out $tmp_dir/UnixTimeExprMeta_old.out > $tmp_dir/UnixTimeExprMeta.newdiff +diff $tmp_dir/UnixTimeExprMeta_new.out $tmp_dir/UnixTimeExprMeta_old.out > $tmp_dir/UnixTimeExprMeta.newdiff || true diff -c spark2diffs/UnixTimeExprMeta.diff $tmp_dir/UnixTimeExprMeta.newdiff sed -n '/object GpuToTimestamp/,/abstract class UnixTimeExprMeta/{/abstract class UnixTimeExprMeta/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressionsMeta.scala > $tmp_dir/GpuToTimestamp_new.out sed -n '/object GpuToTimestamp/,/val REMOVE_WHITESPACE_FROM_MONTH_DAY/{/val REMOVE_WHITESPACE_FROM_MONTH_DAY/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala > $tmp_dir/GpuToTimestamp_old.out -diff $tmp_dir/GpuToTimestamp_new.out $tmp_dir/GpuToTimestamp_old.out > $tmp_dir/GpuToTimestamp.newdiff +diff $tmp_dir/GpuToTimestamp_new.out $tmp_dir/GpuToTimestamp_old.out > $tmp_dir/GpuToTimestamp.newdiff || true diff -c spark2diffs/GpuToTimestamp.diff $tmp_dir/GpuToTimestamp.newdiff sed -n '/case class ParseFormatMeta/,/case class RegexReplace/p' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressionsMeta.scala > $tmp_dir/datemisc_new.out @@ -71,17 +86,17 @@ diff -c $tmp_dir/datemisc_new.out $tmp_dir/datemisc_old.out sed -n '/class GpuRLikeMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringMeta.scala > $tmp_dir/GpuRLikeMeta_new.out sed -n '/class GpuRLikeMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala > $tmp_dir/GpuRLikeMeta_old.out -diff $tmp_dir/GpuRLikeMeta_new.out $tmp_dir/GpuRLikeMeta_old.out > $tmp_dir/GpuRLikeMeta.newdiff +diff $tmp_dir/GpuRLikeMeta_new.out $tmp_dir/GpuRLikeMeta_old.out > $tmp_dir/GpuRLikeMeta.newdiff || true diff -c spark2diffs/GpuRLikeMeta.diff $tmp_dir/GpuRLikeMeta.newdiff sed -n '/class GpuRegExpExtractMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringMeta.scala > $tmp_dir/GpuRegExpExtractMeta_new.out sed -n '/class GpuRegExpExtractMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala > $tmp_dir/GpuRegExpExtractMeta_old.out -diff $tmp_dir/GpuRegExpExtractMeta_new.out $tmp_dir/GpuRegExpExtractMeta_old.out > $tmp_dir/GpuRegExpExtractMeta.newdiff +diff $tmp_dir/GpuRegExpExtractMeta_new.out $tmp_dir/GpuRegExpExtractMeta_old.out > $tmp_dir/GpuRegExpExtractMeta.newdiff || true diff -c spark2diffs/GpuRegExpExtractMeta.diff $tmp_dir/GpuRegExpExtractMeta.newdiff sed -n '/class SubstringIndexMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringMeta.scala > $tmp_dir/SubstringIndexMeta_new.out sed -n '/class SubstringIndexMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala > $tmp_dir/SubstringIndexMeta_old.out -diff $tmp_dir/SubstringIndexMeta_new.out $tmp_dir/SubstringIndexMeta_old.out > $tmp_dir/SubstringIndexMeta.newdiff +diff $tmp_dir/SubstringIndexMeta_new.out $tmp_dir/SubstringIndexMeta_old.out > $tmp_dir/SubstringIndexMeta.newdiff || true diff -c spark2diffs/SubstringIndexMeta.diff $tmp_dir/SubstringIndexMeta.newdiff sed -n '/object CudfRegexp/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringMeta.scala > $tmp_dir/CudfRegexp_new.out @@ -94,37 +109,37 @@ diff -c $tmp_dir/GpuSubstringIndex_new.out $tmp_dir/GpuSubstringIndex_old.out > sed -n '/class GpuStringSplitMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringMeta.scala > $tmp_dir/GpuStringSplitMeta_new.out sed -n '/class GpuStringSplitMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala > $tmp_dir/GpuStringSplitMeta_old.out -diff $tmp_dir/GpuStringSplitMeta_new.out $tmp_dir/GpuStringSplitMeta_old.out > $tmp_dir/GpuStringSplitMeta.newdiff +diff $tmp_dir/GpuStringSplitMeta_new.out $tmp_dir/GpuStringSplitMeta_old.out > $tmp_dir/GpuStringSplitMeta.newdiff || true diff -c spark2diffs/GpuStringSplitMeta.diff $tmp_dir/GpuStringSplitMeta.newdiff sed -n '/object GpuOrcFileFormat/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala > $tmp_dir/GpuOrcFileFormat_new.out sed -n '/object GpuOrcFileFormat/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala > $tmp_dir/GpuOrcFileFormat_old.out -diff $tmp_dir/GpuOrcFileFormat_new.out $tmp_dir/GpuOrcFileFormat_old.out > $tmp_dir/GpuOrcFileFormat.newdiff +diff $tmp_dir/GpuOrcFileFormat_new.out $tmp_dir/GpuOrcFileFormat_old.out > $tmp_dir/GpuOrcFileFormat.newdiff || true diff -c spark2diffs/GpuOrcFileFormat.diff $tmp_dir/GpuOrcFileFormat.newdiff sed -n '/class GpuSequenceMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala > $tmp_dir/GpuSequenceMeta_new.out sed -n '/class GpuSequenceMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala > $tmp_dir/GpuSequenceMeta_old.out -diff $tmp_dir/GpuSequenceMeta_new.out $tmp_dir/GpuSequenceMeta_old.out > $tmp_dir/GpuSequenceMeta.newdiff +diff $tmp_dir/GpuSequenceMeta_new.out $tmp_dir/GpuSequenceMeta_old.out > $tmp_dir/GpuSequenceMeta.newdiff || true diff -c spark2diffs/GpuSequenceMeta.diff $tmp_dir/GpuSequenceMeta.newdiff sed -n '/object GpuDataSource/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSource.scala > $tmp_dir/GpuDataSource_new.out sed -n '/object GpuDataSource/,/val GLOB_PATHS_KEY/{/val GLOB_PATHS_KEY/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSource.scala > $tmp_dir/GpuDataSource_old.out -diff $tmp_dir/GpuDataSource_new.out $tmp_dir/GpuDataSource_old.out > $tmp_dir/GpuDataSource.newdiff +diff $tmp_dir/GpuDataSource_new.out $tmp_dir/GpuDataSource_old.out > $tmp_dir/GpuDataSource.newdiff || true diff -c spark2diffs/GpuDataSource.diff $tmp_dir/GpuDataSource.newdiff sed -n '/class GpuGetArrayItemMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala > $tmp_dir/GpuGetArrayItemMeta_new.out sed -n '/class GpuGetArrayItemMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala > $tmp_dir/GpuGetArrayItemMeta_old.out -diff $tmp_dir/GpuGetArrayItemMeta_new.out $tmp_dir/GpuGetArrayItemMeta_old.out > $tmp_dir/GpuGetArrayItemMeta.newdiff +diff $tmp_dir/GpuGetArrayItemMeta_new.out $tmp_dir/GpuGetArrayItemMeta_old.out > $tmp_dir/GpuGetArrayItemMeta.newdiff || true diff -c spark2diffs/GpuGetArrayItemMeta.diff $tmp_dir/GpuGetArrayItemMeta.newdiff sed -n '/class GpuGetMapValueMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala > $tmp_dir/GpuGetMapValueMeta_new.out sed -n '/class GpuGetMapValueMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala > $tmp_dir/GpuGetMapValueMeta_old.out -diff $tmp_dir/GpuGetMapValueMeta_new.out $tmp_dir/GpuGetMapValueMeta_old.out > $tmp_dir/GpuGetMapValueMeta.newdiff +diff $tmp_dir/GpuGetMapValueMeta_new.out $tmp_dir/GpuGetMapValueMeta_old.out > $tmp_dir/GpuGetMapValueMeta.newdiff || true diff -c spark2diffs/GpuGetMapValueMeta.diff $tmp_dir/GpuGetMapValueMeta.newdiff sed -n '/abstract class ScalaUDFMetaBase/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDFMeta.scala> $tmp_dir/ScalaUDFMetaBase_new.out sed -n '/abstract class ScalaUDFMetaBase/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala > $tmp_dir/ScalaUDFMetaBase_old.out -diff $tmp_dir/ScalaUDFMetaBase_new.out $tmp_dir/ScalaUDFMetaBase_old.out > $tmp_dir/ScalaUDFMetaBase.newdiff +diff $tmp_dir/ScalaUDFMetaBase_new.out $tmp_dir/ScalaUDFMetaBase_old.out > $tmp_dir/ScalaUDFMetaBase.newdiff || true diff -c spark2diffs/ScalaUDFMetaBase.diff $tmp_dir/ScalaUDFMetaBase.newdiff sed -n '/object GpuScalaUDF/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDFMeta.scala > $tmp_dir/GpuScalaUDF_new.out @@ -133,12 +148,12 @@ diff -c $tmp_dir/GpuScalaUDF_new.out $tmp_dir/GpuScalaUDF_old.out > $tmp_dir/Gp sed -n '/object GpuDecimalMultiply/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala > $tmp_dir/GpuDecimalMultiply_new.out sed -n '/object GpuDecimalMultiply/,/def checkForOverflow/{/def checkForOverflow/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala > $tmp_dir/GpuDecimalMultiply_old.out -diff $tmp_dir/GpuDecimalMultiply_new.out $tmp_dir/GpuDecimalMultiply_old.out > $tmp_dir/GpuDecimalMultiply.newdiff +diff $tmp_dir/GpuDecimalMultiply_new.out $tmp_dir/GpuDecimalMultiply_old.out > $tmp_dir/GpuDecimalMultiply.newdiff || true diff -c spark2diffs/GpuDecimalMultiply.diff $tmp_dir/GpuDecimalMultiply.newdiff sed -n '/object GpuDecimalDivide/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala > $tmp_dir/GpuDecimalDivide_new.out sed -n '/object GpuDecimalDivide/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala > $tmp_dir/GpuDecimalDivide_old.out -diff $tmp_dir/GpuDecimalDivide_new.out $tmp_dir/GpuDecimalDivide_old.out > $tmp_dir/GpuDecimalDivide.newdiff +diff $tmp_dir/GpuDecimalDivide_new.out $tmp_dir/GpuDecimalDivide_old.out > $tmp_dir/GpuDecimalDivide.newdiff || true diff -c spark2diffs/GpuDecimalDivide.diff $tmp_dir/GpuDecimalDivide.newdiff sed -n '/def isSupportedRelation/,/^ }/{/^ }/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/TrampolineUtil.scala > $tmp_dir/isSupportedRelation_new.out @@ -153,31 +168,31 @@ sed -n '/def getSimpleName/,/^ }/{/^ }/!p}' ../spark2-sql-plugin/src/main/sc sed -n '/def getSimpleName/,/^ }/{/^ }/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala > $tmp_dir/getSimpleName_old.out diff -c $tmp_dir/getSimpleName_new.out $tmp_dir/getSimpleName_old.out -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RegexParser.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/RegexParser.scala > $tmp_dir/RegexParser.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RegexParser.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/RegexParser.scala > $tmp_dir/RegexParser.newdiff || true diff -c spark2diffs/RegexParser.diff $tmp_dir/RegexParser.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala > $tmp_dir/TypeChecks.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala > $tmp_dir/TypeChecks.newdiff || true diff -c spark2diffs/TypeChecks.diff $tmp_dir/TypeChecks.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/DataTypeUtils.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/DataTypeUtils.scala > $tmp_dir/DataTypeUtils.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/DataTypeUtils.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/DataTypeUtils.scala > $tmp_dir/DataTypeUtils.newdiff || true diff -c spark2diffs/DataTypeUtils.diff $tmp_dir/DataTypeUtils.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala > $tmp_dir/GpuOverrides.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala > $tmp_dir/GpuOverrides.newdiff || true diff -c spark2diffs/GpuOverrides.diff $tmp_dir/GpuOverrides.newdiff sed -n '/GpuOverrides.expr\[Cast\]/,/doFloatToIntCheck/p' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimGpuOverrides.scala > $tmp_dir/cast_new.out sed -n '/GpuOverrides.expr\[Cast\]/,/doFloatToIntCheck/p' ../sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala > $tmp_dir/cast_old.out -diff $tmp_dir/cast_new.out $tmp_dir/cast_old.out > $tmp_dir/cast.newdiff +diff $tmp_dir/cast_new.out $tmp_dir/cast_old.out > $tmp_dir/cast.newdiff || true diff -c spark2diffs/cast.diff $tmp_dir/cast.newdiff sed -n '/GpuOverrides.expr\[Average\]/,/GpuOverrides.expr\[Abs/p' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimGpuOverrides.scala > $tmp_dir/average_new.out sed -n '/GpuOverrides.expr\[Average\]/,/GpuOverrides.expr\[Abs/p' ../sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala > $tmp_dir/average_old.out -diff $tmp_dir/average_new.out $tmp_dir/average_old.out > $tmp_dir/average.newdiff +diff $tmp_dir/average_new.out $tmp_dir/average_old.out > $tmp_dir/average.newdiff || true diff -c spark2diffs/average.diff $tmp_dir/average.newdiff sed -n '/GpuOverrides.expr\[Abs\]/,/GpuOverrides.expr\[RegExpReplace/p' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimGpuOverrides.scala > $tmp_dir/abs_new.out sed -n '/GpuOverrides.expr\[Abs\]/,/GpuOverrides.expr\[RegExpReplace/p' ../sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala > $tmp_dir/abs_old.out -diff $tmp_dir/abs_new.out $tmp_dir/abs_old.out > $tmp_dir/abs.newdiff +diff $tmp_dir/abs_new.out $tmp_dir/abs_old.out > $tmp_dir/abs.newdiff || true diff -c spark2diffs/abs.diff $tmp_dir/abs.newdiff sed -n '/GpuOverrides.expr\[RegExpReplace\]/,/GpuOverrides.expr\[TimeSub/{/GpuOverrides.expr\[TimeSub/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimGpuOverrides.scala > $tmp_dir/regexreplace_new.out @@ -186,22 +201,22 @@ diff -c $tmp_dir/regexreplace_new.out $tmp_dir/regexreplace_old.out sed -n '/GpuOverrides.expr\[TimeSub\]/,/GpuOverrides.expr\[ScalaUDF/{/GpuOverrides.expr\[ScalaUDF/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimGpuOverrides.scala > $tmp_dir/TimeSub_new.out sed -n '/GpuOverrides.expr\[TimeSub\]/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala > $tmp_dir/TimeSub_old.out -diff -w $tmp_dir/TimeSub_new.out $tmp_dir/TimeSub_old.out > $tmp_dir/TimeSub.newdiff +diff -w $tmp_dir/TimeSub_new.out $tmp_dir/TimeSub_old.out > $tmp_dir/TimeSub.newdiff || true diff -c spark2diffs/TimeSub.diff $tmp_dir/TimeSub.newdiff sed -n '/GpuOverrides.expr\[ScalaUDF\]/,/})/{/})/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimGpuOverrides.scala > $tmp_dir/ScalaUDF_new.out sed -n '/GpuOverrides.expr\[ScalaUDF\]/,/})/{/})/!p}' ../sql-plugin/src/main/301until310-all/scala/com/nvidia/spark/rapids/shims/v2/GpuRowBasedScalaUDF.scala > $tmp_dir/ScalaUDF_old.out -diff -w $tmp_dir/ScalaUDF_new.out $tmp_dir/ScalaUDF_old.out > $tmp_dir/ScalaUDF.newdiff +diff -w $tmp_dir/ScalaUDF_new.out $tmp_dir/ScalaUDF_old.out > $tmp_dir/ScalaUDF.newdiff || true diff -c spark2diffs/ScalaUDF.diff $tmp_dir/ScalaUDF.newdiff sed -n '/GpuOverrides.exec\[FileSourceScanExec\]/,/})/{/})/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimGpuOverrides.scala > $tmp_dir/FileSourceScanExec_new.out sed -n '/GpuOverrides.exec\[FileSourceScanExec\]/,/override def convertToCpu/{/override def convertToCpu/!p}' ../sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala > $tmp_dir/FileSourceScanExec_old.out -diff -w $tmp_dir/FileSourceScanExec_new.out $tmp_dir/FileSourceScanExec_old.out > $tmp_dir/FileSourceScanExec.newdiff +diff -w $tmp_dir/FileSourceScanExec_new.out $tmp_dir/FileSourceScanExec_old.out > $tmp_dir/FileSourceScanExec.newdiff || true diff -c spark2diffs/FileSourceScanExec.diff $tmp_dir/FileSourceScanExec.newdiff sed -n '/GpuOverrides.exec\[ArrowEvalPythonExec\]/,/})/{/})/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimGpuOverrides.scala > $tmp_dir/ArrowEvalPythonExec_new.out sed -n '/GpuOverrides.exec\[ArrowEvalPythonExec\]/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala > $tmp_dir/ArrowEvalPythonExec_old.out -diff -w $tmp_dir/ArrowEvalPythonExec_new.out $tmp_dir/ArrowEvalPythonExec_old.out > $tmp_dir/ArrowEvalPythonExec.newdiff +diff -w $tmp_dir/ArrowEvalPythonExec_new.out $tmp_dir/ArrowEvalPythonExec_old.out > $tmp_dir/ArrowEvalPythonExec.newdiff || true diff -c spark2diffs/ArrowEvalPythonExec.diff $tmp_dir/ArrowEvalPythonExec.newdiff sed -n '/GpuOverrides.exec\[FlatMapGroupsInPandasExec\]/,/GpuOverrides.exec\[WindowInPandasExec/{/GpuOverrides.exec\[WindowInPandasExec/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimGpuOverrides.scala > $tmp_dir/FlatMapGroupsInPandasExec_new.out @@ -218,119 +233,117 @@ diff -c -w $tmp_dir/AggregateInPandasExec_new.out $tmp_dir/AggregateInPandasExec sed -n '/object GpuOrcScanBase/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScanBase.scala > $tmp_dir/GpuOrcScanBase_new.out sed -n '/object GpuOrcScanBase/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScanBase.scala > $tmp_dir/GpuOrcScanBase_old.out -diff $tmp_dir/GpuOrcScanBase_new.out $tmp_dir/GpuOrcScanBase_old.out > $tmp_dir/GpuOrcScanBase.newdiff +diff $tmp_dir/GpuOrcScanBase_new.out $tmp_dir/GpuOrcScanBase_old.out > $tmp_dir/GpuOrcScanBase.newdiff || true diff -c spark2diffs/GpuOrcScanBase.diff $tmp_dir/GpuOrcScanBase.newdiff sed -n '/class LiteralExprMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/literalsMeta.scala > $tmp_dir/LiteralExprMeta_new.out sed -n '/class LiteralExprMeta/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala > $tmp_dir/LiteralExprMeta_old.out -diff $tmp_dir/LiteralExprMeta_new.out $tmp_dir/LiteralExprMeta_old.out > $tmp_dir/LiteralExprMeta.newdiff +diff $tmp_dir/LiteralExprMeta_new.out $tmp_dir/LiteralExprMeta_old.out > $tmp_dir/LiteralExprMeta.newdiff || true diff -c spark2diffs/LiteralExprMeta.diff $tmp_dir/LiteralExprMeta.newdiff # 2.x doesn't have a base aggregate class so this is much different, check the revision for now -CUR_COMMIT=`git log -1 ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala | grep commit | cut -d ' ' -f 2` -if [ "$CUR_COMMIT" != "b17c685788c0a62763aa8101709e241877f02025" ]; then - echo "sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala has different commit - check manually" -fi +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregateMeta.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala > $tmp_dir/aggregate.newdiff || true +diff -c spark2diffs/aggregate.diff $tmp_dir/aggregate.newdiff sed -n '/class GpuGenerateExecSparkPlanMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExecMeta.scala > $tmp_dir/GpuGenerateExecSparkPlanMeta_new.out sed -n '/class GpuGenerateExecSparkPlanMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala > $tmp_dir/GpuGenerateExecSparkPlanMeta_old.out -diff $tmp_dir/GpuGenerateExecSparkPlanMeta_new.out $tmp_dir/GpuGenerateExecSparkPlanMeta_old.out > $tmp_dir/GpuGenerateExecSparkPlanMeta.newdiff +diff $tmp_dir/GpuGenerateExecSparkPlanMeta_new.out $tmp_dir/GpuGenerateExecSparkPlanMeta_old.out > $tmp_dir/GpuGenerateExecSparkPlanMeta.newdiff || true diff -c spark2diffs/GpuGenerateExecSparkPlanMeta.diff $tmp_dir/GpuGenerateExecSparkPlanMeta.newdiff sed -n '/abstract class GeneratorExprMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExecMeta.scala > $tmp_dir/GeneratorExprMeta_new.out sed -n '/abstract class GeneratorExprMeta/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala > $tmp_dir/GeneratorExprMeta_old.out -diff $tmp_dir/GeneratorExprMeta_new.out $tmp_dir/GeneratorExprMeta_old.out > $tmp_dir/GeneratorExprMeta.newdiff +diff $tmp_dir/GeneratorExprMeta_new.out $tmp_dir/GeneratorExprMeta_old.out > $tmp_dir/GeneratorExprMeta.newdiff || true diff -c spark2diffs/GeneratorExprMeta.diff $tmp_dir/GeneratorExprMeta.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ExplainPlan.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/ExplainPlan.scala > $tmp_dir/ExplainPlan.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/ExplainPlan.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/ExplainPlan.scala > $tmp_dir/ExplainPlan.newdiff || true diff spark2diffs/ExplainPlan.diff $tmp_dir/ExplainPlan.newdiff sed -n '/class GpuProjectExecMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperatorsMeta.scala > $tmp_dir/GpuProjectExecMeta_new.out sed -n '/class GpuProjectExecMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala > $tmp_dir/GpuProjectExecMeta_old.out -diff $tmp_dir/GpuProjectExecMeta_new.out $tmp_dir/GpuProjectExecMeta_old.out > $tmp_dir/GpuProjectExecMeta.newdiff +diff $tmp_dir/GpuProjectExecMeta_new.out $tmp_dir/GpuProjectExecMeta_old.out > $tmp_dir/GpuProjectExecMeta.newdiff || true diff -c spark2diffs/GpuProjectExecMeta.diff $tmp_dir/GpuProjectExecMeta.newdiff sed -n '/class GpuSampleExecMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperatorsMeta.scala > $tmp_dir/GpuSampleExecMeta_new.out sed -n '/class GpuSampleExecMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala > $tmp_dir/GpuSampleExecMeta_old.out -diff $tmp_dir/GpuSampleExecMeta_new.out $tmp_dir/GpuSampleExecMeta_old.out > $tmp_dir/GpuSampleExecMeta.newdiff +diff $tmp_dir/GpuSampleExecMeta_new.out $tmp_dir/GpuSampleExecMeta_old.out > $tmp_dir/GpuSampleExecMeta.newdiff || true diff -c spark2diffs/GpuSampleExecMeta.diff $tmp_dir/GpuSampleExecMeta.newdiff sed -n '/class GpuSortMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExecMeta.scala > $tmp_dir/GpuSortMeta_new.out sed -n '/class GpuSortMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala > $tmp_dir/GpuSortMeta_old.out -diff $tmp_dir/GpuSortMeta_new.out $tmp_dir/GpuSortMeta_old.out > $tmp_dir/GpuSortMeta.newdiff +diff $tmp_dir/GpuSortMeta_new.out $tmp_dir/GpuSortMeta_old.out > $tmp_dir/GpuSortMeta.newdiff || true diff -c spark2diffs/GpuSortMeta.diff $tmp_dir/GpuSortMeta.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala > $tmp_dir/InputFileBlockRule.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala > $tmp_dir/InputFileBlockRule.newdiff || true diff -c spark2diffs/InputFileBlockRule.diff $tmp_dir/InputFileBlockRule.newdiff sed -n '/abstract class GpuWindowInPandasExecMetaBase/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/gpuPandasMeta.scala > $tmp_dir/GpuWindowInPandasExecMetaBase_new.out sed -n '/abstract class GpuWindowInPandasExecMetaBase/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala > $tmp_dir/GpuWindowInPandasExecMetaBase_old.out -diff $tmp_dir/GpuWindowInPandasExecMetaBase_new.out $tmp_dir/GpuWindowInPandasExecMetaBase_old.out > $tmp_dir/GpuWindowInPandasExecMetaBase.newdiff +diff $tmp_dir/GpuWindowInPandasExecMetaBase_new.out $tmp_dir/GpuWindowInPandasExecMetaBase_old.out > $tmp_dir/GpuWindowInPandasExecMetaBase.newdiff || true diff -c spark2diffs/GpuWindowInPandasExecMetaBase.diff $tmp_dir/GpuWindowInPandasExecMetaBase.newdiff sed -n '/class GpuAggregateInPandasExecMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/gpuPandasMeta.scala > $tmp_dir/GpuAggregateInPandasExecMeta_new.out sed -n '/class GpuAggregateInPandasExecMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala > $tmp_dir/GpuAggregateInPandasExecMeta_old.out -diff $tmp_dir/GpuAggregateInPandasExecMeta_new.out $tmp_dir/GpuAggregateInPandasExecMeta_old.out > $tmp_dir/GpuAggregateInPandasExecMeta.newdiff +diff $tmp_dir/GpuAggregateInPandasExecMeta_new.out $tmp_dir/GpuAggregateInPandasExecMeta_old.out > $tmp_dir/GpuAggregateInPandasExecMeta.newdiff || true diff -c spark2diffs/GpuAggregateInPandasExecMeta.diff $tmp_dir/GpuAggregateInPandasExecMeta.newdiff sed -n '/class GpuFlatMapGroupsInPandasExecMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/gpuPandasMeta.scala > $tmp_dir/GpuFlatMapGroupsInPandasExecMeta_new.out sed -n '/class GpuFlatMapGroupsInPandasExecMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/301+-nondb/scala/org/apache/spark/sql/rapids/execution/python/shims/v2/GpuFlatMapGroupsInPandasExec.scala > $tmp_dir/GpuFlatMapGroupsInPandasExecMeta_old.out -diff $tmp_dir/GpuFlatMapGroupsInPandasExecMeta_new.out $tmp_dir/GpuFlatMapGroupsInPandasExecMeta_old.out > $tmp_dir/GpuFlatMapGroupsInPandasExecMeta.newdiff +diff $tmp_dir/GpuFlatMapGroupsInPandasExecMeta_new.out $tmp_dir/GpuFlatMapGroupsInPandasExecMeta_old.out > $tmp_dir/GpuFlatMapGroupsInPandasExecMeta.newdiff || true diff -c spark2diffs/GpuFlatMapGroupsInPandasExecMeta.diff $tmp_dir/GpuFlatMapGroupsInPandasExecMeta.newdiff sed -n '/class GpuShuffledHashJoinMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExecMeta.scala > $tmp_dir/GpuShuffledHashJoinMeta_new.out sed -n '/class GpuShuffledHashJoinMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala > $tmp_dir/GpuShuffledHashJoinMeta_old.out -diff $tmp_dir/GpuShuffledHashJoinMeta_new.out $tmp_dir/GpuShuffledHashJoinMeta_old.out > $tmp_dir/GpuShuffledHashJoinMeta.newdiff +diff $tmp_dir/GpuShuffledHashJoinMeta_new.out $tmp_dir/GpuShuffledHashJoinMeta_old.out > $tmp_dir/GpuShuffledHashJoinMeta.newdiff || true diff -c spark2diffs/GpuShuffledHashJoinMeta.diff $tmp_dir/GpuShuffledHashJoinMeta.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/TreeNode.scala ../sql-plugin/src/main/pre320-treenode/scala/com/nvidia/spark/rapids/shims/v2/TreeNode.scala > $tmp_dir/TreeNode.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/TreeNode.scala ../sql-plugin/src/main/pre320-treenode/scala/com/nvidia/spark/rapids/shims/v2/TreeNode.scala > $tmp_dir/TreeNode.newdiff || true diff -c spark2diffs/TreeNode.diff $tmp_dir/TreeNode.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuSortMergeJoinMeta.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala > $tmp_dir/GpuSortMergeJoinMeta.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuSortMergeJoinMeta.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala > $tmp_dir/GpuSortMergeJoinMeta.newdiff || true diff -c spark2diffs/GpuSortMergeJoinMeta.diff $tmp_dir/GpuSortMergeJoinMeta.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuJoinUtils.scala ../sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuJoinUtils.scala > $tmp_dir/GpuJoinUtils.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuJoinUtils.scala ../sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuJoinUtils.scala > $tmp_dir/GpuJoinUtils.newdiff || true diff -c spark2diffs/GpuJoinUtils.diff $tmp_dir/GpuJoinUtils.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/TypeSigUtil.scala ../sql-plugin/src/main/301until320-all/scala/com/nvidia/spark/rapids/shims/v2/TypeSigUtil.scala > $tmp_dir/TypeSigUtil.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/TypeSigUtil.scala ../sql-plugin/src/main/301until320-all/scala/com/nvidia/spark/rapids/shims/v2/TypeSigUtil.scala > $tmp_dir/TypeSigUtil.newdiff || true diff -c spark2diffs/TypeSigUtil.diff $tmp_dir/TypeSigUtil.newdiff sed -n '/class GpuBroadcastHashJoinMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExecMeta.scala > $tmp_dir/GpuBroadcastHashJoinMeta_new.out sed -n '/class GpuBroadcastHashJoinMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala > $tmp_dir/GpuBroadcastHashJoinMeta_old.out -diff $tmp_dir/GpuBroadcastHashJoinMeta_new.out $tmp_dir/GpuBroadcastHashJoinMeta_old.out > $tmp_dir/GpuBroadcastHashJoinMeta.newdiff +diff $tmp_dir/GpuBroadcastHashJoinMeta_new.out $tmp_dir/GpuBroadcastHashJoinMeta_old.out > $tmp_dir/GpuBroadcastHashJoinMeta.newdiff || true diff -c spark2diffs/GpuBroadcastHashJoinMeta.diff $tmp_dir/GpuBroadcastHashJoinMeta.newdiff sed -n '/object GpuCSVScan/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuCSVScan.scala > $tmp_dir/GpuCSVScan_new.out sed -n '/object GpuCSVScan/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala > $tmp_dir/GpuCSVScan_old.out -diff $tmp_dir/GpuCSVScan_new.out $tmp_dir/GpuCSVScan_old.out > $tmp_dir/GpuCSVScan.newdiff +diff $tmp_dir/GpuCSVScan_new.out $tmp_dir/GpuCSVScan_old.out > $tmp_dir/GpuCSVScan.newdiff || true diff -c spark2diffs/GpuCSVScan.diff $tmp_dir/GpuCSVScan.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/OffsetWindowFunctionMeta.scala ../sql-plugin/src/main/301until310-all/scala/com/nvidia/spark/rapids/shims/v2/OffsetWindowFunctionMeta.scala > $tmp_dir/OffsetWindowFunctionMeta.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/OffsetWindowFunctionMeta.scala ../sql-plugin/src/main/301until310-all/scala/com/nvidia/spark/rapids/shims/v2/OffsetWindowFunctionMeta.scala > $tmp_dir/OffsetWindowFunctionMeta.newdiff || true diff -c spark2diffs/OffsetWindowFunctionMeta.diff $tmp_dir/OffsetWindowFunctionMeta.newdiff sed -n '/class GpuRegExpReplaceMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceMeta.scala > $tmp_dir/GpuRegExpReplaceMeta_new.out sed -n '/class GpuRegExpReplaceMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceMeta.scala > $tmp_dir/GpuRegExpReplaceMeta_old.out -diff $tmp_dir/GpuRegExpReplaceMeta_new.out $tmp_dir/GpuRegExpReplaceMeta_old.out > $tmp_dir/GpuRegExpReplaceMeta.newdiff +diff $tmp_dir/GpuRegExpReplaceMeta_new.out $tmp_dir/GpuRegExpReplaceMeta_old.out > $tmp_dir/GpuRegExpReplaceMeta.newdiff || true diff -c spark2diffs/GpuRegExpReplaceMeta.diff $tmp_dir/GpuRegExpReplaceMeta.newdiff sed -n '/class GpuWindowExpressionMetaBase/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/gpuWindows.scala > $tmp_dir/GpuWindowExpressionMetaBase_new.out sed -n '/class GpuWindowExpressionMetaBase/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala > $tmp_dir/GpuWindowExpressionMetaBase_old.out -diff $tmp_dir/GpuWindowExpressionMetaBase_new.out $tmp_dir/GpuWindowExpressionMetaBase_old.out > $tmp_dir/GpuWindowExpressionMetaBase.newdiff +diff $tmp_dir/GpuWindowExpressionMetaBase_new.out $tmp_dir/GpuWindowExpressionMetaBase_old.out > $tmp_dir/GpuWindowExpressionMetaBase.newdiff || true diff -c spark2diffs/GpuWindowExpressionMetaBase.diff $tmp_dir/GpuWindowExpressionMetaBase.newdiff sed -n '/abstract class GpuSpecifiedWindowFrameMetaBase/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/gpuWindows.scala > $tmp_dir/GpuSpecifiedWindowFrameMetaBase_new.out sed -n '/abstract class GpuSpecifiedWindowFrameMetaBase/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala > $tmp_dir/GpuSpecifiedWindowFrameMetaBase_old.out -diff $tmp_dir/GpuSpecifiedWindowFrameMetaBase_new.out $tmp_dir/GpuSpecifiedWindowFrameMetaBase_old.out > $tmp_dir/GpuSpecifiedWindowFrameMetaBase.newdiff +diff $tmp_dir/GpuSpecifiedWindowFrameMetaBase_new.out $tmp_dir/GpuSpecifiedWindowFrameMetaBase_old.out > $tmp_dir/GpuSpecifiedWindowFrameMetaBase.newdiff || true diff -c spark2diffs/GpuSpecifiedWindowFrameMetaBase.diff $tmp_dir/GpuSpecifiedWindowFrameMetaBase.newdiff sed -n '/class GpuSpecifiedWindowFrameMeta(/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/gpuWindows.scala > $tmp_dir/GpuSpecifiedWindowFrameMeta_new.out sed -n '/class GpuSpecifiedWindowFrameMeta(/,/^}/{/^}/!p}' ../sql-plugin/src/main/301until320-all/scala/com/nvidia/spark/rapids/shims/v2/gpuWindows.scala > $tmp_dir/GpuSpecifiedWindowFrameMeta_old.out -diff $tmp_dir/GpuSpecifiedWindowFrameMeta_new.out $tmp_dir/GpuSpecifiedWindowFrameMeta_old.out > $tmp_dir/GpuSpecifiedWindowFrameMeta.newdiff +diff $tmp_dir/GpuSpecifiedWindowFrameMeta_new.out $tmp_dir/GpuSpecifiedWindowFrameMeta_old.out > $tmp_dir/GpuSpecifiedWindowFrameMeta.newdiff || true diff -c spark2diffs/GpuSpecifiedWindowFrameMeta.diff $tmp_dir/GpuSpecifiedWindowFrameMeta.newdiff sed -n '/class GpuWindowExpressionMeta(/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/gpuWindows.scala > $tmp_dir/GpuWindowExpressionMeta_new.out sed -n '/class GpuWindowExpressionMeta(/,/^}/{/^}/!p}' ../sql-plugin/src/main/301until320-all/scala/com/nvidia/spark/rapids/shims/v2/gpuWindows.scala > $tmp_dir/GpuWindowExpressionMeta_old.out -diff $tmp_dir/GpuWindowExpressionMeta_new.out $tmp_dir/GpuWindowExpressionMeta_old.out > $tmp_dir/GpuWindowExpressionMeta.newdiff +diff $tmp_dir/GpuWindowExpressionMeta_new.out $tmp_dir/GpuWindowExpressionMeta_old.out > $tmp_dir/GpuWindowExpressionMeta.newdiff || true diff -c spark2diffs/GpuWindowExpressionMeta.diff $tmp_dir/GpuWindowExpressionMeta.newdiff sed -n '/object GpuWindowUtil/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/gpuWindows.scala > $tmp_dir/GpuWindowUtil_new.out @@ -343,66 +356,66 @@ diff -c $tmp_dir/ParsedBoundary_new.out $tmp_dir/ParsedBoundary_old.out sed -n '/class GpuWindowSpecDefinitionMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/gpuWindows.scala > $tmp_dir/GpuWindowSpecDefinitionMeta_new.out sed -n '/class GpuWindowSpecDefinitionMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala > $tmp_dir/GpuWindowSpecDefinitionMeta_old.out -diff $tmp_dir/GpuWindowSpecDefinitionMeta_new.out $tmp_dir/GpuWindowSpecDefinitionMeta_old.out > $tmp_dir/GpuWindowSpecDefinitionMeta.newdiff +diff $tmp_dir/GpuWindowSpecDefinitionMeta_new.out $tmp_dir/GpuWindowSpecDefinitionMeta_old.out > $tmp_dir/GpuWindowSpecDefinitionMeta.newdiff || true diff -c spark2diffs/GpuWindowSpecDefinitionMeta.diff $tmp_dir/GpuWindowSpecDefinitionMeta.newdiff sed -n '/abstract class GpuBaseWindowExecMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowMeta.scala > $tmp_dir/GpuBaseWindowExecMeta_new.out sed -n '/abstract class GpuBaseWindowExecMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala > $tmp_dir/GpuBaseWindowExecMeta_old.out -diff $tmp_dir/GpuBaseWindowExecMeta_new.out $tmp_dir/GpuBaseWindowExecMeta_old.out > $tmp_dir/GpuBaseWindowExecMeta.newdiff +diff $tmp_dir/GpuBaseWindowExecMeta_new.out $tmp_dir/GpuBaseWindowExecMeta_old.out > $tmp_dir/GpuBaseWindowExecMeta.newdiff || true diff -c spark2diffs/GpuBaseWindowExecMeta.diff $tmp_dir/GpuBaseWindowExecMeta.newdiff sed -n '/class GpuWindowExecMeta(/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowMeta.scala > $tmp_dir/GpuWindowExecMeta_new.out sed -n '/class GpuWindowExecMeta(/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala > $tmp_dir/GpuWindowExecMeta_old.out -diff $tmp_dir/GpuWindowExecMeta_new.out $tmp_dir/GpuWindowExecMeta_old.out > $tmp_dir/GpuWindowExecMeta.newdiff +diff $tmp_dir/GpuWindowExecMeta_new.out $tmp_dir/GpuWindowExecMeta_old.out > $tmp_dir/GpuWindowExecMeta.newdiff || true diff -c spark2diffs/GpuWindowExecMeta.diff $tmp_dir/GpuWindowExecMeta.newdiff sed -n '/class GpuExpandExecMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandMeta.scala > $tmp_dir/GpuExpandExecMeta_new.out sed -n '/class GpuExpandExecMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala > $tmp_dir/GpuExpandExecMeta_old.out -diff $tmp_dir/GpuExpandExecMeta_new.out $tmp_dir/GpuExpandExecMeta_old.out > $tmp_dir/GpuExpandExecMeta.newdiff +diff $tmp_dir/GpuExpandExecMeta_new.out $tmp_dir/GpuExpandExecMeta_old.out > $tmp_dir/GpuExpandExecMeta.newdiff || true diff -c spark2diffs/GpuExpandExecMeta.diff $tmp_dir/GpuExpandExecMeta.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala > $tmp_dir/RapidsMeta.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala > $tmp_dir/RapidsMeta.newdiff || true diff -c spark2diffs/RapidsMeta.diff $tmp_dir/RapidsMeta.newdiff sed -n '/object GpuReadCSVFileFormat/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadCSVFileFormat.scala > $tmp_dir/GpuReadCSVFileFormat_new.out sed -n '/object GpuReadCSVFileFormat/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadCSVFileFormat.scala > $tmp_dir/GpuReadCSVFileFormat_old.out -diff $tmp_dir/GpuReadCSVFileFormat_new.out $tmp_dir/GpuReadCSVFileFormat_old.out > $tmp_dir/GpuReadCSVFileFormat.newdiff +diff $tmp_dir/GpuReadCSVFileFormat_new.out $tmp_dir/GpuReadCSVFileFormat_old.out > $tmp_dir/GpuReadCSVFileFormat.newdiff || true diff -c spark2diffs/GpuReadCSVFileFormat.diff $tmp_dir/GpuReadCSVFileFormat.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala > $tmp_dir/RapidsConf.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala > $tmp_dir/RapidsConf.newdiff || true diff -c spark2diffs/RapidsConf.diff $tmp_dir/RapidsConf.newdiff sed -n '/object GpuReadParquetFileFormat/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala > $tmp_dir/GpuReadParquetFileFormat_new.out sed -n '/object GpuReadParquetFileFormat/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala > $tmp_dir/GpuReadParquetFileFormat_old.out -diff $tmp_dir/GpuReadParquetFileFormat_new.out $tmp_dir/GpuReadParquetFileFormat_old.out > $tmp_dir/GpuReadParquetFileFormat.newdiff +diff $tmp_dir/GpuReadParquetFileFormat_new.out $tmp_dir/GpuReadParquetFileFormat_old.out > $tmp_dir/GpuReadParquetFileFormat.newdiff || true diff -c spark2diffs/GpuReadParquetFileFormat.diff $tmp_dir/GpuReadParquetFileFormat.newdiff sed -n '/object GpuParquetScanBase/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala > $tmp_dir/GpuParquetScanBase_new.out sed -n '/object GpuParquetScanBase/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala > $tmp_dir/GpuParquetScanBase_old.out -diff $tmp_dir/GpuParquetScanBase_new.out $tmp_dir/GpuParquetScanBase_old.out > $tmp_dir/GpuParquetScanBase.newdiff +diff $tmp_dir/GpuParquetScanBase_new.out $tmp_dir/GpuParquetScanBase_old.out > $tmp_dir/GpuParquetScanBase.newdiff || true diff -c spark2diffs/GpuParquetScanBase.diff $tmp_dir/GpuParquetScanBase.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastJoinMeta.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastJoinMeta.scala > $tmp_dir/GpuBroadcastJoinMeta.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastJoinMeta.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastJoinMeta.scala > $tmp_dir/GpuBroadcastJoinMeta.newdiff || true diff -c spark2diffs/GpuBroadcastJoinMeta.diff $tmp_dir/GpuBroadcastJoinMeta.newdiff sed -n '/object AggregateUtils/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/AggregateUtils.scala > $tmp_dir/AggregateUtils_new.out sed -n '/object AggregateUtils/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala > $tmp_dir/AggregateUtils_old.out -diff $tmp_dir/AggregateUtils_new.out $tmp_dir/AggregateUtils_old.out > $tmp_dir/AggregateUtils.newdiff +diff $tmp_dir/AggregateUtils_new.out $tmp_dir/AggregateUtils_old.out > $tmp_dir/AggregateUtils.newdiff || true diff -c spark2diffs/AggregateUtils.diff $tmp_dir/AggregateUtils.newdiff sed -n '/final class CastExprMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCastMeta.scala > $tmp_dir/CastExprMeta_new.out sed -n '/final class CastExprMeta/,/override def convertToGpu/{/override def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala > $tmp_dir/CastExprMeta_old.out -diff $tmp_dir/CastExprMeta_new.out $tmp_dir/CastExprMeta_old.out > $tmp_dir/CastExprMeta.newdiff +diff $tmp_dir/CastExprMeta_new.out $tmp_dir/CastExprMeta_old.out > $tmp_dir/CastExprMeta.newdiff || true diff -c spark2diffs/CastExprMeta.diff $tmp_dir/CastExprMeta.newdiff sed -n '/object GpuReadOrcFileFormat/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadOrcFileFormat.scala > $tmp_dir/GpuReadOrcFileFormat_new.out sed -n '/object GpuReadOrcFileFormat/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadOrcFileFormat.scala > $tmp_dir/GpuReadOrcFileFormat_old.out -diff $tmp_dir/GpuReadOrcFileFormat_new.out $tmp_dir/GpuReadOrcFileFormat_old.out > $tmp_dir/GpuReadOrcFileFormat.newdiff +diff $tmp_dir/GpuReadOrcFileFormat_new.out $tmp_dir/GpuReadOrcFileFormat_old.out > $tmp_dir/GpuReadOrcFileFormat.newdiff || true diff -c spark2diffs/GpuReadOrcFileFormat.diff $tmp_dir/GpuReadOrcFileFormat.newdiff sed -n '/object GpuParquetFileFormat/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala > $tmp_dir/GpuParquetFileFormat_new.out sed -n '/object GpuParquetFileFormat/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala > $tmp_dir/GpuParquetFileFormat_old.out -diff $tmp_dir/GpuParquetFileFormat_new.out $tmp_dir/GpuParquetFileFormat_old.out > $tmp_dir/GpuParquetFileFormat.newdiff +diff $tmp_dir/GpuParquetFileFormat_new.out $tmp_dir/GpuParquetFileFormat_old.out > $tmp_dir/GpuParquetFileFormat.newdiff || true diff -c spark2diffs/GpuParquetFileFormat.diff $tmp_dir/GpuParquetFileFormat.newdiff sed -n '/def asDecimalType/,/^ }/{/^ }/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala > $tmp_dir/asDecimalType_new.out @@ -411,36 +424,36 @@ diff -c $tmp_dir/asDecimalType_new.out $tmp_dir/asDecimalType_old.out sed -n '/def optionallyAsDecimalType/,/^ }/{/^ }/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala > $tmp_dir/optionallyAsDecimalType_new.out sed -n '/def optionallyAsDecimalType/,/^ }/{/^ }/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala > $tmp_dir/optionallyAsDecimalType_old.out -diff $tmp_dir/optionallyAsDecimalType_new.out $tmp_dir/optionallyAsDecimalType_old.out > $tmp_dir/optionallyAsDecimalType.newdiff +diff $tmp_dir/optionallyAsDecimalType_new.out $tmp_dir/optionallyAsDecimalType_old.out > $tmp_dir/optionallyAsDecimalType.newdiff || true diff -c spark2diffs/optionallyAsDecimalType.diff $tmp_dir/optionallyAsDecimalType.newdiff sed -n '/def getPrecisionForIntegralType/,/^ }/{/^ }/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala > $tmp_dir/getPrecisionForIntegralType_new.out sed -n '/def getPrecisionForIntegralType/,/^ }/{/^ }/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala > $tmp_dir/getPrecisionForIntegralType_old.out -diff $tmp_dir/getPrecisionForIntegralType_new.out $tmp_dir/getPrecisionForIntegralType_old.out > $tmp_dir/getPrecisionForIntegralType.newdiff +diff $tmp_dir/getPrecisionForIntegralType_new.out $tmp_dir/getPrecisionForIntegralType_old.out > $tmp_dir/getPrecisionForIntegralType.newdiff || true diff -c spark2diffs/getPrecisionForIntegralType.diff $tmp_dir/getPrecisionForIntegralType.newdiff # not sure this diff works very well due to java vs scala and quite a bit different but should find any changes in those functions sed -n '/def toRapidsStringOrNull/,/^ }/{/^ }/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala > $tmp_dir/toRapidsStringOrNull_new.out sed -n '/private static DType/,/^ }/{/^ }/!p}' ../sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java > $tmp_dir/toRapidsStringOrNull_old.out -diff $tmp_dir/toRapidsStringOrNull_new.out $tmp_dir/toRapidsStringOrNull_old.out > $tmp_dir/toRapidsStringOrNull.newdiff +diff $tmp_dir/toRapidsStringOrNull_new.out $tmp_dir/toRapidsStringOrNull_old.out > $tmp_dir/toRapidsStringOrNull.newdiff || true diff -c spark2diffs/toRapidsStringOrNull.diff $tmp_dir/toRapidsStringOrNull.newdiff sed -n '/def createCudfDecimal/,/^ }/{/^ }/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala > $tmp_dir/createCudfDecimal_new.out sed -n '/def createCudfDecimal/,/^ }/{/^ }/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/DecimalUtil.scala > $tmp_dir/createCudfDecimal_old.out -diff $tmp_dir/createCudfDecimal_new.out $tmp_dir/createCudfDecimal_old.out > $tmp_dir/createCudfDecimal.newdiff +diff $tmp_dir/createCudfDecimal_new.out $tmp_dir/createCudfDecimal_old.out > $tmp_dir/createCudfDecimal.newdiff || true diff -c spark2diffs/createCudfDecimal.diff $tmp_dir/createCudfDecimal.newdiff sed -n '/abstract class ReplicateRowsExprMeta/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReplicateRowsMeta.scala > $tmp_dir/ReplicateRowsExprMeta_new.out sed -n '/abstract class ReplicateRowsExprMeta/,/override final def convertToGpu/{/override final def convertToGpu/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala > $tmp_dir/ReplicateRowsExprMeta_old.out -diff $tmp_dir/ReplicateRowsExprMeta_new.out $tmp_dir/ReplicateRowsExprMeta_old.out > $tmp_dir/ReplicateRowsExprMeta.newdiff +diff $tmp_dir/ReplicateRowsExprMeta_new.out $tmp_dir/ReplicateRowsExprMeta_old.out > $tmp_dir/ReplicateRowsExprMeta.newdiff || true diff -c spark2diffs/ReplicateRowsExprMeta.diff $tmp_dir/ReplicateRowsExprMeta.newdiff -diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala > $tmp_dir/DateUtils.newdiff +diff ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala > $tmp_dir/DateUtils.newdiff || true diff -c spark2diffs/DateUtils.diff $tmp_dir/DateUtils.newdiff sed -n '/object CudfTDigest/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuApproximatePercentile.scala > $tmp_dir/CudfTDigest_new.out sed -n '/object CudfTDigest/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuApproximatePercentile.scala > $tmp_dir/CudfTDigest_old.out -diff $tmp_dir/CudfTDigest_new.out $tmp_dir/CudfTDigest_old.out > $tmp_dir/CudfTDigest.newdiff +diff $tmp_dir/CudfTDigest_new.out $tmp_dir/CudfTDigest_old.out > $tmp_dir/CudfTDigest.newdiff || true diff -c spark2diffs/CudfTDigest.diff $tmp_dir/CudfTDigest.newdiff sed -n '/sealed trait TimeParserPolicy/p' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala > $tmp_dir/TimeParserPolicy_new.out @@ -465,7 +478,7 @@ diff -c $tmp_dir/GpuFloorCeil_new.out $tmp_dir/GpuFloorCeil_old.out sed -n '/object GpuFileSourceScanExec/,/^}/{/^}/!p}' ../spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala > $tmp_dir/GpuFileSourceScanExec_new.out sed -n '/object GpuFileSourceScanExec/,/^}/{/^}/!p}' ../sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala > $tmp_dir/GpuFileSourceScanExec_old.out -diff $tmp_dir/GpuFileSourceScanExec_new.out $tmp_dir/GpuFileSourceScanExec_old.out > $tmp_dir/GpuFileSourceScanExec.newdiff +diff $tmp_dir/GpuFileSourceScanExec_new.out $tmp_dir/GpuFileSourceScanExec_old.out > $tmp_dir/GpuFileSourceScanExec.newdiff || true diff -c spark2diffs/GpuFileSourceScanExec.diff $tmp_dir/GpuFileSourceScanExec.newdiff echo "Done running Diffs of spark2.x files" diff --git a/scripts/spark2diffs/GpuHashJoin.diff b/scripts/spark2diffs/GpuHashJoin.diff index 331be606a48..5771c5f5834 100644 --- a/scripts/spark2diffs/GpuHashJoin.diff +++ b/scripts/spark2diffs/GpuHashJoin.diff @@ -6,13 +6,9 @@ < object GpuHashJoin { --- > object GpuHashJoin extends Arm { -72c72 -< meta: RapidsMeta[_, _], ---- -> meta: RapidsMeta[_, _, _], -99a100 +101a102 > -120c121 +122c123 < } --- > diff --git a/scripts/spark2diffs/GpuSortMergeJoinMeta.diff b/scripts/spark2diffs/GpuSortMergeJoinMeta.diff index a93408146fa..34c8687edd3 100644 --- a/scripts/spark2diffs/GpuSortMergeJoinMeta.diff +++ b/scripts/spark2diffs/GpuSortMergeJoinMeta.diff @@ -13,22 +13,28 @@ < parent: Option[RapidsMeta[_, _]], --- > parent: Option[RapidsMeta[_, _, _]], -76a74,91 +76a74,97 > } > > override def convertToGpu(): GpuExec = { +> val condition = conditionMeta.map(_.convertToGpu()) +> val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) { +> (condition, None) +> } else { +> (None, condition) +> } > val Seq(left, right) = childPlans.map(_.convertIfNeeded()) > val joinExec = GpuShuffledHashJoinExec( > leftKeys.map(_.convertToGpu()), > rightKeys.map(_.convertToGpu()), > join.joinType, > buildSide, -> None, +> joinCondition, > left, > right, > join.isSkewJoin)( > join.leftKeys, > join.rightKeys) -> // The GPU does not yet support conditional joins, so conditions are implemented -> // as a filter after the join when possible. -> condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec) +> // For inner joins we can apply a post-join condition for any conditions that cannot be +> // evaluated directly in a mixed join that leverages a cudf AST expression +> filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec) diff --git a/scripts/spark2diffs/aggregate.diff b/scripts/spark2diffs/aggregate.diff new file mode 100644 index 00000000000..c16b158bf5b --- /dev/null +++ b/scripts/spark2diffs/aggregate.diff @@ -0,0 +1,1496 @@ +2c2 +< * Copyright (c) 2022, NVIDIA CORPORATION. +--- +> * Copyright (c) 2019-2022, NVIDIA CORPORATION. +18a19,20 +> import java.util +> +22c24,34 +< import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId, NamedExpression} +--- +> import ai.rapids.cudf +> import ai.rapids.cudf.{DType, NvtxColor} +> import com.nvidia.spark.rapids.GpuMetric._ +> import com.nvidia.spark.rapids.RapidsPluginImplicits._ +> import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode +> +> import org.apache.spark.TaskContext +> import org.apache.spark.internal.Logging +> import org.apache.spark.rdd.RDD +> import org.apache.spark.sql.catalyst.InternalRow +> import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, ExprId, If, NamedExpression, NullsFirst} +24,25c36,44 +< import org.apache.spark.sql.execution.{SortExec, SparkPlan, TrampolineUtil} +< import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +--- +> import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering +> import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +> import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, HashPartitioning, Partitioning, UnspecifiedDistribution} +> import org.apache.spark.sql.catalyst.trees.TreeNodeTag +> import org.apache.spark.sql.catalyst.util.truncatedString +> import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan} +> import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +> import org.apache.spark.sql.rapids.{CpuToGpuAggregateBufferConverter, CudfAggregate, GpuAggregateExpression, GpuToCpuAggregateBufferConverter} +> import org.apache.spark.sql.rapids.execution.{GpuShuffleMeta, TrampolineUtil} +26a46 +> import org.apache.spark.sql.vectorized.ColumnarBatch +28,29c48 +< // Spark 2.x - had to copy the GpuBaseAggregateMeta into each Hash and Sort Meta because no +< // BaseAggregateExec class in Spark 2.x +--- +> object AggregateUtils { +31,32c50,808 +< class GpuHashAggregateMeta( +< val agg: HashAggregateExec, +--- +> private val aggs = List("min", "max", "avg", "sum", "count", "first", "last") +> +> /** +> * Return true if the Attribute passed is one of aggregates in the aggs list. +> * Use it with caution. We are comparing the name of a column looking for anything that matches +> * with the values in aggs. +> */ +> def validateAggregate(attributes: AttributeSet): Boolean = { +> attributes.toSeq.exists(attr => aggs.exists(agg => attr.name.contains(agg))) +> } +> +> /** +> * Return true if there are multiple distinct functions along with non-distinct functions. +> */ +> def shouldFallbackMultiDistinct(aggExprs: Seq[AggregateExpression]): Boolean = { +> // Check if there is an `If` within `First`. This is included in the plan for non-distinct +> // functions only when multiple distincts along with non-distinct functions are present in the +> // query. We fall back to CPU in this case when references of `If` are an aggregate. We cannot +> // call `isDistinct` here on aggregateExpressions to get the total number of distinct functions. +> // If there are multiple distincts, the plan is rewritten by `RewriteDistinctAggregates` where +> // regular aggregations and every distinct aggregation is calculated in a separate group. +> aggExprs.map(e => e.aggregateFunction).exists { +> func => { +> func match { +> case First(If(_, _, _), _) if validateAggregate(func.references) => true +> case _ => false +> } +> } +> } +> } +> +> /** +> * Computes a target input batch size based on the assumption that computation can consume up to +> * 4X the configured batch size. +> * @param confTargetSize user-configured maximum desired batch size +> * @param inputTypes input batch schema +> * @param outputTypes output batch schema +> * @param isReductionOnly true if this is a reduction-only aggregation without grouping +> * @return maximum target batch size to keep computation under the 4X configured batch limit +> */ +> def computeTargetBatchSize( +> confTargetSize: Long, +> inputTypes: Seq[DataType], +> outputTypes: Seq[DataType], +> isReductionOnly: Boolean): Long = { +> def typesToSize(types: Seq[DataType]): Long = +> types.map(GpuBatchUtils.estimateGpuMemory(_, nullable = false, rowCount = 1)).sum +> val inputRowSize = typesToSize(inputTypes) +> val outputRowSize = typesToSize(outputTypes) +> // The cudf hash table implementation allocates four 32-bit integers per input row. +> val hashTableRowSize = 4 * 4 +> +> // Using the memory management for joins as a reference, target 4X batch size as a budget. +> var totalBudget = 4 * confTargetSize +> +> // Compute the amount of memory being consumed per-row in the computation +> var computationBytesPerRow = inputRowSize + hashTableRowSize +> if (isReductionOnly) { +> // Remove the lone output row size from the budget rather than track per-row in computation +> totalBudget -= outputRowSize +> } else { +> // The worst-case memory consumption during a grouping aggregation is the case where the +> // grouping does not combine any input rows, so just as many rows appear in the output. +> computationBytesPerRow += outputRowSize +> } +> +> // Calculate the max rows that can be processed during computation within the budget +> val maxRows = totalBudget / computationBytesPerRow +> +> // Finally compute the input target batching size taking into account the cudf row limits +> Math.min(inputRowSize * maxRows, Int.MaxValue) +> } +> } +> +> /** Utility class to hold all of the metrics related to hash aggregation */ +> case class GpuHashAggregateMetrics( +> numOutputRows: GpuMetric, +> numOutputBatches: GpuMetric, +> numTasksFallBacked: GpuMetric, +> opTime: GpuMetric, +> computeAggTime: GpuMetric, +> concatTime: GpuMetric, +> sortTime: GpuMetric, +> semWaitTime: GpuMetric, +> spillCallback: SpillCallback) +> +> /** Utility class to convey information on the aggregation modes being used */ +> case class AggregateModeInfo( +> uniqueModes: Seq[AggregateMode], +> hasPartialMode: Boolean, +> hasPartialMergeMode: Boolean, +> hasFinalMode: Boolean, +> hasCompleteMode: Boolean) +> +> object AggregateModeInfo { +> def apply(uniqueModes: Seq[AggregateMode]): AggregateModeInfo = { +> AggregateModeInfo( +> uniqueModes = uniqueModes, +> hasPartialMode = uniqueModes.contains(Partial), +> hasPartialMergeMode = uniqueModes.contains(PartialMerge), +> hasFinalMode = uniqueModes.contains(Final), +> hasCompleteMode = uniqueModes.contains(Complete) +> ) +> } +> } +> +> /** +> * Iterator that takes another columnar batch iterator as input and emits new columnar batches that +> * are aggregated based on the specified grouping and aggregation expressions. This iterator tries +> * to perform a hash-based aggregation but is capable of falling back to a sort-based aggregation +> * which can operate on data that is either larger than can be represented by a cudf column or +> * larger than can fit in GPU memory. +> * +> * The iterator starts by pulling all batches from the input iterator, performing an initial +> * projection and aggregation on each individual batch via `aggregateInputBatches()`. The resulting +> * aggregated batches are cached in memory as spillable batches. Once all input batches have been +> * aggregated, `tryMergeAggregatedBatches()` is called to attempt a merge of the aggregated batches +> * into a single batch. If this is successful then the resulting batch can be returned, otherwise +> * `buildSortFallbackIterator` is used to sort the aggregated batches by the grouping keys and +> * performs a final merge aggregation pass on the sorted batches. +> * +> * @param cbIter iterator providing the input columnar batches +> * @param inputAttributes input attributes to identify the input columns from the input batches +> * @param groupingExpressions expressions used for producing the grouping keys +> * @param aggregateExpressions GPU aggregate expressions used to produce the aggregations +> * @param aggregateAttributes attribute references to each aggregate expression +> * @param resultExpressions output expression for the aggregation +> * @param modeInfo identifies which aggregation modes are being used +> * @param metrics metrics that will be updated during aggregation +> * @param configuredTargetBatchSize user-specified value for the targeted input batch size +> */ +> class GpuHashAggregateIterator( +> cbIter: Iterator[ColumnarBatch], +> inputAttributes: Seq[Attribute], +> groupingExpressions: Seq[NamedExpression], +> aggregateExpressions: Seq[GpuAggregateExpression], +> aggregateAttributes: Seq[Attribute], +> resultExpressions: Seq[NamedExpression], +> modeInfo: AggregateModeInfo, +> metrics: GpuHashAggregateMetrics, +> configuredTargetBatchSize: Long) +> extends Iterator[ColumnarBatch] with Arm with AutoCloseable with Logging { +> +> // Partial mode: +> // 1. boundInputReferences: picks column from raw input +> // 2. boundFinalProjections: is a pass-through of the agg buffer +> // 3. boundResultReferences: is a pass-through of the merged aggregate +> // +> // Final mode: +> // 1. boundInputReferences: is a pass-through of the merged aggregate +> // 2. boundFinalProjections: on merged batches, finalize aggregates +> // (GpuAverage => CudfSum/CudfCount) +> // 3. boundResultReferences: project the result expressions Spark expects in the output. +> // +> // Complete mode: +> // 1. boundInputReferences: picks column from raw input +> // 2. boundFinalProjections: on merged batches, finalize aggregates +> // (GpuAverage => CudfSum/CudfCount) +> // 3. boundResultReferences: project the result expressions Spark expects in the output. +> private case class BoundExpressionsModeAggregates( +> boundFinalProjections: Option[Seq[GpuExpression]], +> boundResultReferences: Seq[Expression]) +> +> Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => close())) +> +> private[this] val isReductionOnly = groupingExpressions.isEmpty +> private[this] val boundExpressions = setupReferences() +> private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize) +> private[this] val aggregatedBatches = new util.ArrayDeque[LazySpillableColumnarBatch] +> private[this] var outOfCoreIter: Option[GpuOutOfCoreSortIterator] = None +> +> /** Iterator for fetching aggregated batches if a sort-based fallback has occurred */ +> private[this] var sortFallbackIter: Option[Iterator[ColumnarBatch]] = None +> +> /** Whether a batch is pending for a reduction-only aggregation */ +> private[this] var hasReductionOnlyBatch: Boolean = isReductionOnly +> +> override def hasNext: Boolean = { +> sortFallbackIter.map(_.hasNext).getOrElse { +> // reductions produce a result even if the input is empty +> hasReductionOnlyBatch || !aggregatedBatches.isEmpty || cbIter.hasNext +> } +> } +> +> override def next(): ColumnarBatch = { +> val batch = sortFallbackIter.map(_.next()).getOrElse { +> // aggregate and merge all pending inputs +> if (cbIter.hasNext) { +> aggregateInputBatches() +> tryMergeAggregatedBatches() +> } +> +> if (aggregatedBatches.size() > 1) { +> // Unable to merge to a single output, so must fall back to a sort-based approach. +> sortFallbackIter = Some(buildSortFallbackIterator()) +> sortFallbackIter.get.next() +> } else if (aggregatedBatches.isEmpty) { +> if (hasReductionOnlyBatch) { +> hasReductionOnlyBatch = false +> generateEmptyReductionBatch() +> } else { +> throw new NoSuchElementException("batches exhausted") +> } +> } else { +> // this will be the last batch +> hasReductionOnlyBatch = false +> withResource(aggregatedBatches.pop()) { lazyBatch => +> GpuColumnVector.incRefCounts(lazyBatch.getBatch) +> } +> } +> } +> +> finalProjectBatch(batch) +> } +> +> override def close(): Unit = { +> aggregatedBatches.forEach(_.safeClose()) +> aggregatedBatches.clear() +> outOfCoreIter.foreach(_.close()) +> outOfCoreIter = None +> sortFallbackIter = None +> hasReductionOnlyBatch = false +> } +> +> private def computeTargetMergeBatchSize(confTargetSize: Long): Long = { +> val mergedTypes = groupingExpressions.map(_.dataType) ++ aggregateExpressions.map(_.dataType) +> AggregateUtils.computeTargetBatchSize(confTargetSize, mergedTypes, mergedTypes,isReductionOnly) +> } +> +> /** Aggregate all input batches and place the results in the aggregatedBatches queue. */ +> private def aggregateInputBatches(): Unit = { +> val aggHelper = new AggHelper(forceMerge = false) +> while (cbIter.hasNext) { +> withResource(cbIter.next()) { childBatch => +> val isLastInputBatch = GpuColumnVector.isTaggedAsFinalBatch(childBatch) +> withResource(computeAggregate(childBatch, aggHelper)) { aggBatch => +> val batch = LazySpillableColumnarBatch(aggBatch, metrics.spillCallback, "aggbatch") +> // Avoid making batch spillable for the common case of the last and only batch +> if (!(isLastInputBatch && aggregatedBatches.isEmpty)) { +> batch.allowSpilling() +> } +> aggregatedBatches.add(batch) +> } +> } +> } +> } +> +> /** +> * Attempt to merge adjacent batches in the aggregatedBatches queue until either there is only +> * one batch or merging adjacent batches would exceed the target batch size. +> */ +> private def tryMergeAggregatedBatches(): Unit = { +> while (aggregatedBatches.size() > 1) { +> val concatTime = metrics.concatTime +> val opTime = metrics.opTime +> withResource(new NvtxWithMetrics("agg merge pass", NvtxColor.BLUE, concatTime, +> opTime)) { _ => +> // continue merging as long as some batches are able to be combined +> if (!mergePass()) { +> if (aggregatedBatches.size() > 1 && isReductionOnly) { +> // We were unable to merge the aggregated batches within the target batch size limit, +> // which means normally we would fallback to a sort-based approach. However for +> // reduction-only aggregation there are no keys to use for a sort. The only way this +> // can work is if all batches are merged. This will exceed the target batch size limit, +> // but at this point it is either risk an OOM/cudf error and potentially work or +> // not work at all. +> logWarning(s"Unable to merge reduction-only aggregated batches within " + +> s"target batch limit of $targetMergeBatchSize, attempting to merge remaining " + +> s"${aggregatedBatches.size()} batches beyond limit") +> withResource(mutable.ArrayBuffer[LazySpillableColumnarBatch]()) { batchesToConcat => +> aggregatedBatches.forEach(b => batchesToConcat += b) +> aggregatedBatches.clear() +> val batch = concatenateAndMerge(batchesToConcat) +> // batch does not need to be marked spillable since it is the last and only batch +> // and will be immediately retrieved on the next() call. +> aggregatedBatches.add(batch) +> } +> } +> return +> } +> } +> } +> } +> +> /** +> * Perform a single pass over the aggregated batches attempting to merge adjacent batches. +> * @return true if at least one merge operation occurred +> */ +> private def mergePass(): Boolean = { +> val batchesToConcat: mutable.ArrayBuffer[LazySpillableColumnarBatch] = mutable.ArrayBuffer.empty +> var wasBatchMerged = false +> // Current size in bytes of the batches targeted for the next concatenation +> var concatSize: Long = 0L +> var batchesLeftInPass = aggregatedBatches.size() +> +> while (batchesLeftInPass > 0) { +> closeOnExcept(batchesToConcat) { _ => +> var isConcatSearchFinished = false +> // Old batches are picked up at the front of the queue and freshly merged batches are +> // appended to the back of the queue. Although tempting to allow the pass to "wrap around" +> // and pick up batches freshly merged in this pass, it's avoided to prevent changing the +> // order of aggregated batches. +> while (batchesLeftInPass > 0 && !isConcatSearchFinished) { +> val candidate = aggregatedBatches.getFirst +> val potentialSize = concatSize + candidate.deviceMemorySize +> isConcatSearchFinished = concatSize > 0 && potentialSize > targetMergeBatchSize +> if (!isConcatSearchFinished) { +> batchesLeftInPass -= 1 +> batchesToConcat += aggregatedBatches.removeFirst() +> concatSize = potentialSize +> } +> } +> } +> +> val mergedBatch = if (batchesToConcat.length > 1) { +> wasBatchMerged = true +> val batch = concatenateAndMerge(batchesToConcat) +> batch.allowSpilling() +> batch +> } else { +> // Unable to find a neighboring buffer to produce a valid merge in this pass, +> // so simply put this buffer back on the queue for other passes. +> batchesToConcat.remove(0) +> } +> +> // Add the merged batch to the end of the aggregated batch queue. Only a single pass over +> // the batches is being performed due to the batch count check above, so the single-pass +> // loop will terminate before picking up this new batch. +> aggregatedBatches.addLast(mergedBatch) +> batchesToConcat.clear() +> concatSize = 0 +> } +> +> wasBatchMerged +> } +> +> private lazy val concatAndMergeHelper = new AggHelper(forceMerge = true) +> +> /** +> * Concatenate batches together and perform a merge aggregation on the result. The input batches +> * will be closed as part of this operation. +> * @param batches batches to concatenate and merge aggregate +> * @return lazy spillable batch which has NOT been marked spillable +> */ +> private def concatenateAndMerge( +> batches: mutable.ArrayBuffer[LazySpillableColumnarBatch]): LazySpillableColumnarBatch = { +> withResource(batches) { _ => +> withResource(concatenateBatches(batches)) { concatBatch => +> withResource(computeAggregate(concatBatch, concatAndMergeHelper)) { mergedBatch => +> LazySpillableColumnarBatch(mergedBatch, metrics.spillCallback, "agg merged batch") +> } +> } +> } +> } +> +> /** Build an iterator that uses a sort-based approach to merge aggregated batches together. */ +> private def buildSortFallbackIterator(): Iterator[ColumnarBatch] = { +> logInfo(s"Falling back to sort-based aggregation with ${aggregatedBatches.size()} batches") +> metrics.numTasksFallBacked += 1 +> val aggregatedBatchIter = new Iterator[ColumnarBatch] { +> override def hasNext: Boolean = !aggregatedBatches.isEmpty +> +> override def next(): ColumnarBatch = { +> withResource(aggregatedBatches.removeFirst()) { lazyBatch => +> GpuColumnVector.incRefCounts(lazyBatch.getBatch) +> } +> } +> } +> +> if (isReductionOnly) { +> // Normally this should never happen because `tryMergeAggregatedBatches` should have done +> // a last-ditch effort to concatenate all batches together regardless of target limits. +> throw new IllegalStateException("Unable to fallback to sort-based aggregation " + +> "without grouping keys") +> } +> +> val shims = ShimLoader.getSparkShims +> val groupingAttributes = groupingExpressions.map(_.toAttribute) +> val ordering = groupingAttributes.map(shims.sortOrder(_, Ascending, NullsFirst)) +> val aggBufferAttributes = groupingAttributes ++ +> aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) +> val sorter = new GpuSorter(ordering, aggBufferAttributes) +> val aggBatchTypes = aggBufferAttributes.map(_.dataType) +> +> // Use the out of core sort iterator to sort the batches by grouping key +> outOfCoreIter = Some(GpuOutOfCoreSortIterator( +> aggregatedBatchIter, +> sorter, +> LazilyGeneratedOrdering.forSchema(TrampolineUtil.fromAttributes(groupingAttributes)), +> configuredTargetBatchSize, +> opTime = metrics.opTime, +> sortTime = metrics.sortTime, +> outputBatches = NoopMetric, +> outputRows = NoopMetric, +> peakDevMemory = NoopMetric, +> spillCallback = metrics.spillCallback)) +> +> // The out of core sort iterator does not guarantee that a batch contains all of the values +> // for a particular key, so add a key batching iterator to enforce this. That allows each batch +> // to be merge-aggregated safely since all values associated with a particular key are +> // guaranteed to be in the same batch. +> val keyBatchingIter = new GpuKeyBatchingIterator( +> outOfCoreIter.get, +> sorter, +> aggBatchTypes.toArray, +> configuredTargetBatchSize, +> numInputRows = NoopMetric, +> numInputBatches = NoopMetric, +> numOutputRows = NoopMetric, +> numOutputBatches = NoopMetric, +> concatTime = metrics.concatTime, +> opTime = metrics.opTime, +> peakDevMemory = NoopMetric, +> spillCallback = metrics.spillCallback) +> +> // Finally wrap the key batching iterator with a merge aggregation on the output batches. +> new Iterator[ColumnarBatch] { +> override def hasNext: Boolean = keyBatchingIter.hasNext +> +> private val mergeSortedHelper = new AggHelper(true, isSorted = true) +> +> override def next(): ColumnarBatch = { +> // batches coming out of the sort need to be merged +> withResource(keyBatchingIter.next()) { batch => +> computeAggregate(batch, mergeSortedHelper) +> } +> } +> } +> } +> +> /** +> * Generates the result of a reduction-only aggregation on empty input by emitting the +> * initial value of each aggregator. +> */ +> private def generateEmptyReductionBatch(): ColumnarBatch = { +> val aggregateFunctions = aggregateExpressions.map(_.aggregateFunction) +> val defaultValues = +> aggregateFunctions.flatMap(_.initialValues) +> // We have to grab the semaphore in this scenario, since this is a reduction that produces +> // rows on the GPU out of empty input, meaning that if a batch has 0 rows, a new single +> // row is getting created with 0 as the count (if count is the operation), and other default +> // values. +> GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics.semWaitTime) +> val vecs = defaultValues.safeMap { ref => +> withResource(GpuScalar.from(ref.asInstanceOf[GpuLiteral].value, ref.dataType)) { +> scalar => GpuColumnVector.from(scalar, 1, ref.dataType) +> } +> } +> new ColumnarBatch(vecs.toArray, 1) +> } +> +> /** +> * Project a merged aggregated batch result to the layout that Spark expects +> * i.e.: select avg(foo) from bar group by baz will produce: +> * Partial mode: 3 columns => [bar, sum(foo) as sum_foo, count(foo) as count_foo] +> * Final mode: 2 columns => [bar, sum(sum_foo) / sum(count_foo)] +> */ +> private def finalProjectBatch(batch: ColumnarBatch): ColumnarBatch = { +> val aggTime = metrics.computeAggTime +> val opTime = metrics.opTime +> withResource(new NvtxWithMetrics("finalize agg", NvtxColor.DARK_GREEN, aggTime, +> opTime)) { _ => +> val finalBatch = if (boundExpressions.boundFinalProjections.isDefined) { +> withResource(batch) { _ => +> val finalCvs = boundExpressions.boundFinalProjections.get.map { ref => +> // aggregatedCb is made up of ColumnVectors +> // and the final projections from the aggregates won't change that, +> // so we can assume they will be vectors after we eval +> ref.columnarEval(batch).asInstanceOf[GpuColumnVector] +> } +> new ColumnarBatch(finalCvs.toArray, finalCvs.head.getRowCount.toInt) +> } +> } else { +> batch +> } +> +> // If `resultCvs` empty, it means we don't have any `resultExpressions` for this +> // aggregate. In these cases, the row count is the only important piece of information +> // that this aggregate exec needs to report up, so it will return batches that have no columns +> // but that do have a row count. If `resultCvs` is non-empty, the row counts match +> // `finalBatch.numRows` since `columnarEvalToColumn` cannot change the number of rows. +> val finalNumRows = finalBatch.numRows() +> +> // Perform the last project to get the correct shape that Spark expects. Note this may +> // add things like literals that were not part of the aggregate into the batch. +> val resultCvs = withResource(finalBatch) { _ => +> boundExpressions.boundResultReferences.safeMap { ref => +> // Result references can be virtually anything, we need to coerce +> // them to be vectors since this is going into a ColumnarBatch +> GpuExpressionsUtils.columnarEvalToColumn(ref, finalBatch) +> } +> } +> closeOnExcept(resultCvs) { _ => +> metrics.numOutputRows += finalNumRows +> metrics.numOutputBatches += 1 +> new ColumnarBatch(resultCvs.toArray, finalNumRows) +> } +> } +> } +> +> /** +> * Concatenates batches after extracting them from `LazySpillableColumnarBatch` +> * @note the input batches are not closed as part of this operation +> * @param spillableBatchesToConcat lazy spillable batches to concatenate +> * @return concatenated batch result +> */ +> private def concatenateBatches( +> spillableBatchesToConcat: mutable.ArrayBuffer[LazySpillableColumnarBatch]): ColumnarBatch = { +> val concatTime = metrics.concatTime +> val opTime = metrics.opTime +> withResource(new NvtxWithMetrics("concatenateBatches", NvtxColor.BLUE, concatTime, +> opTime)) { _ => +> val batchesToConcat = spillableBatchesToConcat.map(_.getBatch) +> val numCols = batchesToConcat.head.numCols() +> val dataTypes = (0 until numCols).map { +> c => batchesToConcat.head.column(c).dataType +> }.toArray +> withResource(batchesToConcat.map(GpuColumnVector.from)) { tbl => +> withResource(cudf.Table.concatenate(tbl: _*)) { concatenated => +> GpuColumnVector.from(concatenated, dataTypes) +> } +> } +> } +> } +> +> /** +> * `setupReferences` binds input, final and result references for the aggregate. +> * - input: used to obtain columns coming into the aggregate from the child +> * - final: some aggregates like average use this to specify an expression to produce +> * the final output of the aggregate. Average keeps sum and count throughout, +> * and at the end it has to divide the two, to produce the single sum/count result. +> * - result: used at the end to output to our parent +> */ +> private def setupReferences(): BoundExpressionsModeAggregates = { +> val groupingAttributes = groupingExpressions.map(_.toAttribute) +> val aggBufferAttributes = groupingAttributes ++ +> aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) +> +> val boundFinalProjections = if (modeInfo.hasFinalMode || modeInfo.hasCompleteMode) { +> val finalProjections = groupingExpressions ++ +> aggregateExpressions.map(_.aggregateFunction.evaluateExpression) +> Some(GpuBindReferences.bindGpuReferences(finalProjections, aggBufferAttributes)) +> } else { +> None +> } +> +> // allAttributes can be different things, depending on aggregation mode: +> // - Partial mode: grouping key + cudf aggregates (e.g. no avg, intead sum::count +> // - Final mode: grouping key + spark aggregates (e.g. avg) +> val finalAttributes = groupingAttributes ++ aggregateAttributes +> +> // boundResultReferences is used to project the aggregated input batch(es) for the result. +> // - Partial mode: it's a pass through. We take whatever was aggregated and let it come +> // out of the node as is. +> // - Final or Complete mode: we use resultExpressions to pick out the correct columns that +> // finalReferences has pre-processed for us +> val boundResultReferences = if (modeInfo.hasPartialMode || modeInfo.hasPartialMergeMode) { +> GpuBindReferences.bindGpuReferences( +> resultExpressions, +> resultExpressions.map(_.toAttribute)) +> } else if (modeInfo.hasFinalMode || modeInfo.hasCompleteMode) { +> GpuBindReferences.bindGpuReferences( +> resultExpressions, +> finalAttributes) +> } else { +> GpuBindReferences.bindGpuReferences( +> resultExpressions, +> groupingAttributes) +> } +> BoundExpressionsModeAggregates( +> boundFinalProjections, +> boundResultReferences) +> } +> +> /** +> * Internal class used in `computeAggregates` for the pre, agg, and post steps +> * +> * @param forceMerge - if true, we are merging two pre-aggregated batches, so we should use +> * the merge steps for each aggregate function +> * @param isSorted - if the batch is sorted this is set to true and is passed to cuDF +> * as an optimization hint +> */ +> class AggHelper(forceMerge: Boolean, isSorted: Boolean = false) { +> // `CudfAggregate` instances to apply, either update or merge aggregates +> private val cudfAggregates = new mutable.ArrayBuffer[CudfAggregate]() +> +> // integers for each column the aggregate is operating on +> private val aggOrdinals = new mutable.ArrayBuffer[Int] +> +> // the resulting data type from the cuDF aggregate (from +> // the update or merge aggregate, be it reduction or group by) +> private val postStepDataTypes = new mutable.ArrayBuffer[DataType]() +> +> private val groupingAttributes = groupingExpressions.map(_.toAttribute) +> private val aggBufferAttributes = groupingAttributes ++ +> aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) +> +> // `GpuAggregateFunction` can add a pre and post step for update +> // and merge aggregates. +> private val preStep = new mutable.ArrayBuffer[Expression]() +> private val postStep = new mutable.ArrayBuffer[Expression]() +> private val postStepAttr = new mutable.ArrayBuffer[Attribute]() +> +> // we add the grouping expression first, which should bind as pass-through +> if (forceMerge) { +> // a grouping expression can do actual computation, but we cannot do that computation again +> // on a merge, nor would we want to if we could. So use the attributes instead of the +> // original expression when we are forcing a merge. +> preStep ++= groupingAttributes +> } else { +> preStep ++= groupingExpressions +> } +> postStep ++= groupingAttributes +> postStepAttr ++= groupingAttributes +> postStepDataTypes ++= +> groupingExpressions.map(_.dataType) +> +> private var ix = groupingAttributes.length +> for (aggExp <- aggregateExpressions) { +> val aggFn = aggExp.aggregateFunction +> if ((aggExp.mode == Partial || aggExp.mode == Complete) && !forceMerge) { +> val ordinals = (ix until ix + aggFn.updateAggregates.length) +> aggOrdinals ++= ordinals +> ix += ordinals.length +> val updateAggs = aggFn.updateAggregates +> postStepDataTypes ++= updateAggs.map(_.dataType) +> cudfAggregates ++= updateAggs +> preStep ++= aggFn.inputProjection +> postStep ++= aggFn.postUpdate +> postStepAttr ++= aggFn.postUpdateAttr +> } else { +> val ordinals = (ix until ix + aggFn.mergeAggregates.length) +> aggOrdinals ++= ordinals +> ix += ordinals.length +> val mergeAggs = aggFn.mergeAggregates +> postStepDataTypes ++= mergeAggs.map(_.dataType) +> cudfAggregates ++= mergeAggs +> preStep ++= aggFn.preMerge +> postStep ++= aggFn.postMerge +> postStepAttr ++= aggFn.postMergeAttr +> } +> } +> +> // a bound expression that is applied before the cuDF aggregate +> private val preStepBound = if (forceMerge) { +> GpuBindReferences.bindGpuReferences(preStep.toList, aggBufferAttributes.toList) +> } else { +> GpuBindReferences.bindGpuReferences(preStep, inputAttributes) +> } +> +> // a bound expression that is applied after the cuDF aggregate +> private val postStepBound = +> GpuBindReferences.bindGpuReferences(postStep, postStepAttr) +> +> /** +> * Apply the "pre" step: preMerge for merge, or pass-through in the update case +> * @param toAggregateBatch - input (to the agg) batch from the child directly in the +> * merge case, or from the `inputProjection` in the update case. +> * @return a pre-processed batch that can be later cuDF aggregated +> */ +> def preProcess(toAggregateBatch: ColumnarBatch): ColumnarBatch = { +> GpuProjectExec.project(toAggregateBatch, preStepBound) +> } +> +> /** +> * Invoke reduction functions as defined in each `CudfAggreagte` +> * @param preProcessed - a batch after the "pre" step +> * @return +> */ +> def performReduction(preProcessed: ColumnarBatch): ColumnarBatch = { +> val cvs = mutable.ArrayBuffer[GpuColumnVector]() +> cudfAggregates.zipWithIndex.foreach { case (cudfAgg, ix) => +> val aggFn = cudfAgg.reductionAggregate +> val cols = GpuColumnVector.extractColumns(preProcessed) +> val reductionCol = cols(aggOrdinals(ix)) +> withResource(aggFn(reductionCol.getBase)) { res => +> cvs += GpuColumnVector.from( +> cudf.ColumnVector.fromScalar(res, 1), cudfAgg.dataType) +> } +> } +> new ColumnarBatch(cvs.toArray, 1) +> } +> +> /** +> * Used to produce a group-by aggregate +> * @param preProcessed the batch after the "pre" step +> * @return a Table that has been cuDF aggregated +> */ +> def performGroupByAggregation(preProcessed: ColumnarBatch): ColumnarBatch = { +> withResource(GpuColumnVector.from(preProcessed)) { preProcessedTbl => +> val groupOptions = cudf.GroupByOptions.builder() +> .withIgnoreNullKeys(false) +> .withKeysSorted(isSorted) +> .build() +> +> val cudfAggsOnColumn = cudfAggregates.zip(aggOrdinals).map { +> case (cudfAgg, ord) => cudfAgg.groupByAggregate.onColumn(ord) +> } +> +> // perform the aggregate +> val aggTbl = preProcessedTbl +> .groupBy(groupOptions, groupingExpressions.indices: _*) +> .aggregate(cudfAggsOnColumn: _*) +> +> withResource(aggTbl) { _ => +> GpuColumnVector.from(aggTbl, postStepDataTypes.toArray) +> } +> } +> } +> +> /** +> * Used to produce the outbound batch from the aggregate that could be +> * shuffled or could be passed through the evaluateExpression if we are in the final +> * stage. +> * It takes a cuDF aggregated batch and applies the "post" step: +> * postUpdate for update, or postMerge for merge +> * @param resultBatch - cuDF aggregated batch +> * @return output batch from the aggregate +> */ +> def postProcess(resultBatch: ColumnarBatch): ColumnarBatch = { +> withResource(resultBatch) { _ => +> GpuProjectExec.project(resultBatch, postStepBound) +> } +> } +> } +> +> /** +> * Compute the aggregations on the projected input columns. +> * @param toAggregateBatch input batch to aggregate +> * @param helper an internal object that carries state required to execute computeAggregate from +> * different parts of the codebase. +> * @return aggregated batch +> */ +> private def computeAggregate( +> toAggregateBatch: ColumnarBatch, helper: AggHelper): ColumnarBatch = { +> val computeAggTime = metrics.computeAggTime +> val opTime = metrics.opTime +> withResource(new NvtxWithMetrics("computeAggregate", NvtxColor.CYAN, computeAggTime, +> opTime)) { _ => +> // a pre-processing step required before we go into the cuDF aggregate, in some cases +> // casting and in others creating a struct (MERGE_M2 for instance, requires a struct) +> withResource(helper.preProcess(toAggregateBatch)) { preProcessed => +> val resultBatch = if (groupingExpressions.nonEmpty) { +> helper.performGroupByAggregation(preProcessed) +> } else { +> helper.performReduction(preProcessed) +> } +> +> // a post-processing step required in some scenarios, casting or picking +> // apart a struct +> helper.postProcess(resultBatch) +> } +> } +> } +> } +> +> abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( +> plan: INPUT, +> aggRequiredChildDistributionExpressions: Option[Seq[Expression]], +34,36c810,813 +< parent: Option[RapidsMeta[_, _]], +< rule: DataFromReplacementRule) +< extends SparkPlanMeta[HashAggregateExec](agg, conf, parent, rule) { +--- +> parent: Option[RapidsMeta[_, _, _]], +> rule: DataFromReplacementRule) extends SparkPlanMeta[INPUT](plan, conf, parent, rule) { +> +> val agg: BaseAggregateExec +63c840 +< dt.asInstanceOf[DecimalType].precision > GpuOverrides.DECIMAL64_MAX_PRECISION)) +--- +> dt.asInstanceOf[DecimalType].precision > DType.DECIMAL64_MAX_PRECISION)) +68a846,854 +> +> if (agg.aggregateExpressions.exists(expr => expr.isDistinct) +> && agg.aggregateExpressions.exists(expr => expr.filter.isDefined)) { +> // Distinct with Filter is not supported on the GPU currently, +> // This makes sure that if we end up here, the plan falls back to the CPU +> // which will do the right thing. +> willNotWorkOnGpu( +> "DISTINCT and FILTER cannot be used in aggregate functions at the same time") +> } +132a919,929 +> +> override def convertToGpu(): GpuExec = { +> GpuHashAggregateExec( +> aggRequiredChildDistributionExpressions, +> groupingExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), +> aggregateExpressions.map(_.convertToGpu().asInstanceOf[GpuAggregateExpression]), +> aggregateAttributes.map(_.convertToGpu().asInstanceOf[Attribute]), +> resultExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), +> childPlans.head.convertIfNeeded(), +> conf.gpuTargetBatchSizeBytes) +> } +135,136c932,938 +< class GpuSortAggregateExecMeta( +< val agg: SortAggregateExec, +--- +> /** +> * Base class for metadata around `SortAggregateExec` and `ObjectHashAggregateExec`, which may +> * contain TypedImperativeAggregate functions in aggregate expressions. +> */ +> abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggregateExec]( +> plan: INPUT, +> aggRequiredChildDistributionExpressions: Option[Seq[Expression]], +138,140c940,948 +< parent: Option[RapidsMeta[_, _]], +< rule: DataFromReplacementRule) +< extends SparkPlanMeta[SortAggregateExec](agg, conf, parent, rule) { +--- +> parent: Option[RapidsMeta[_, _, _]], +> rule: DataFromReplacementRule) extends GpuBaseAggregateMeta[INPUT](plan, +> aggRequiredChildDistributionExpressions, conf, parent, rule) { +> +> private val mayNeedAggBufferConversion: Boolean = +> agg.aggregateExpressions.exists { expr => +> expr.aggregateFunction.isInstanceOf[TypedImperativeAggregate[_]] && +> (expr.mode == Partial || expr.mode == PartialMerge) +> } +142,149c950,951 +< val groupingExpressions: Seq[BaseExprMeta[_]] = +< agg.groupingExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) +< val aggregateExpressions: Seq[BaseExprMeta[_]] = +< agg.aggregateExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) +< val aggregateAttributes: Seq[BaseExprMeta[_]] = +< agg.aggregateAttributes.map(GpuOverrides.wrapExpr(_, conf, Some(this))) +< val resultExpressions: Seq[BaseExprMeta[_]] = +< agg.resultExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) +--- +> // overriding data types of Aggregation Buffers if necessary +> if (mayNeedAggBufferConversion) overrideAggBufTypes() +151,152c953,964 +< override val childExprs: Seq[BaseExprMeta[_]] = +< groupingExpressions ++ aggregateExpressions ++ aggregateAttributes ++ resultExpressions +--- +> override protected lazy val outputTypeMetas: Option[Seq[DataTypeMeta]] = +> if (mayNeedAggBufferConversion) { +> Some(resultExpressions.map(_.typeMeta)) +> } else { +> None +> } +> +> override val availableRuntimeDataTransition: Boolean = +> aggregateExpressions.map(_.childExprs.head).forall { +> case aggMeta: TypedImperativeAggExprMeta[_] => aggMeta.supportBufferConversion +> case _ => true +> } +155,161c967,1011 +< // We don't support Arrays and Maps as GroupBy keys yet, even they are nested in Structs. So, +< // we need to run recursive type check on the structs. +< val arrayOrMapGroupings = agg.groupingExpressions.exists(e => +< TrampolineUtil.dataTypeExistsRecursively(e.dataType, +< dt => dt.isInstanceOf[ArrayType] || dt.isInstanceOf[MapType])) +< if (arrayOrMapGroupings) { +< willNotWorkOnGpu("ArrayTypes or MapTypes in grouping expressions are not supported") +--- +> super.tagPlanForGpu() +> +> // If a typedImperativeAggregate function run across CPU and GPU (ex: Partial mode on CPU, +> // Merge mode on GPU), it will lead to a runtime crash. Because aggregation buffers produced +> // by the previous stage of function are NOT in the same format as the later stage consuming. +> // If buffer converters are available for all incompatible buffers, we will build these buffer +> // converters and bind them to certain plans, in order to integrate these converters into +> // R2C/C2R Transitions during GpuTransitionOverrides to fix the gap between GPU and CPU. +> // Otherwise, we have to fall back all Aggregate stages to CPU once any of them did fallback. +> // +> // The binding also works when AQE is on, since it leverages the TreeNodeTag to cache buffer +> // converters. +> GpuTypedImperativeSupportedAggregateExecMeta.handleAggregationBuffer(this) +> } +> +> override def convertToGpu(): GpuExec = { +> if (mayNeedAggBufferConversion) { +> // transforms the data types of aggregate attributes with typeMeta +> val aggAttributes = aggregateAttributes.map { +> case meta if meta.typeMeta.typeConverted => +> val ref = meta.wrapped.asInstanceOf[AttributeReference] +> val converted = ref.copy( +> dataType = meta.typeMeta.dataType.get)(ref.exprId, ref.qualifier) +> GpuOverrides.wrapExpr(converted, conf, Some(this)) +> case meta => meta +> } +> // transforms the data types of result expressions with typeMeta +> val retExpressions = resultExpressions.map { +> case meta if meta.typeMeta.typeConverted => +> val ref = meta.wrapped.asInstanceOf[AttributeReference] +> val converted = ref.copy( +> dataType = meta.typeMeta.dataType.get)(ref.exprId, ref.qualifier) +> GpuOverrides.wrapExpr(converted, conf, Some(this)) +> case meta => meta +> } +> GpuHashAggregateExec( +> aggRequiredChildDistributionExpressions, +> groupingExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), +> aggregateExpressions.map(_.convertToGpu().asInstanceOf[GpuAggregateExpression]), +> aggAttributes.map(_.convertToGpu().asInstanceOf[Attribute]), +> retExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), +> childPlans.head.convertIfNeeded(), +> conf.gpuTargetBatchSizeBytes) +> } else { +> super.convertToGpu() +162a1013 +> } +164,169c1015,1039 +< val dec128Grouping = agg.groupingExpressions.exists(e => +< TrampolineUtil.dataTypeExistsRecursively(e.dataType, +< dt => dt.isInstanceOf[DecimalType] && +< dt.asInstanceOf[DecimalType].precision > GpuOverrides.DECIMAL64_MAX_PRECISION)) +< if (dec128Grouping) { +< willNotWorkOnGpu("grouping by a 128-bit decimal value is not currently supported") +--- +> /** +> * The method replaces data types of aggregation buffers created by TypedImperativeAggregate +> * functions with the actual data types used in the GPU runtime. +> * +> * Firstly, this method traverses aggregateFunctions, to search attributes referring to +> * aggregation buffers of TypedImperativeAggregate functions. +> * Then, we extract the desired (actual) data types on GPU runtime for these attributes, +> * and map them to expression IDs of attributes. +> * At last, we traverse aggregateAttributes and resultExpressions, overriding data type in +> * RapidsMeta if necessary, in order to ensure TypeChecks tagging exact data types in runtime. +> */ +> private def overrideAggBufTypes(): Unit = { +> val desiredAggBufTypes = mutable.HashMap.empty[ExprId, DataType] +> val desiredInputAggBufTypes = mutable.HashMap.empty[ExprId, DataType] +> // Collects exprId from TypedImperativeAggBufferAttributes, and maps them to the data type +> // of `TypedImperativeAggExprMeta.aggBufferAttribute`. +> aggregateExpressions.map(_.childExprs.head).foreach { +> case aggMeta: TypedImperativeAggExprMeta[_] => +> val aggFn = aggMeta.wrapped.asInstanceOf[TypedImperativeAggregate[_]] +> val desiredType = aggMeta.aggBufferAttribute.dataType +> +> desiredAggBufTypes(aggFn.aggBufferAttributes.head.exprId) = desiredType +> desiredInputAggBufTypes(aggFn.inputAggBufferAttributes.head.exprId) = desiredType +> +> case _ => +172c1042,1276 +< tagForReplaceMode() +--- +> // Overrides the data types of typed imperative aggregation buffers for type checking +> aggregateAttributes.foreach { attrMeta => +> attrMeta.wrapped match { +> case ar: AttributeReference if desiredAggBufTypes.contains(ar.exprId) => +> attrMeta.overrideDataType(desiredAggBufTypes(ar.exprId)) +> case _ => +> } +> } +> resultExpressions.foreach { retMeta => +> retMeta.wrapped match { +> case ar: AttributeReference if desiredInputAggBufTypes.contains(ar.exprId) => +> retMeta.overrideDataType(desiredInputAggBufTypes(ar.exprId)) +> case _ => +> } +> } +> } +> } +> +> object GpuTypedImperativeSupportedAggregateExecMeta { +> +> private val bufferConverterInjected = TreeNodeTag[Boolean]( +> "rapids.gpu.bufferConverterInjected") +> +> /** +> * The method will bind buffer converters (CPU Expressions) to certain CPU Plans if necessary, +> * so the method returns nothing. The binding work will help us to insert these converters as +> * pre/post processing of GpuRowToColumnarExec/GpuColumnarToRowExec during the materialization +> * of these transition Plans. +> * +> * In the beginning, it collects all physical Aggregate Plans (stages) of current Aggregate. +> * Then, it goes through all stages in pair, to check whether buffer converters need to be +> * inserted between its child and itself. +> * If it is necessary, it goes on to check whether all these buffers are available to convert. +> * If not, it falls back all stages to CPU to keep consistency of intermediate data; Otherwise, +> * it creates buffer converters based on the createBufferConverter methods of +> * TypedImperativeAggExprMeta, and binds these newly-created converters to certain Plans. +> * +> * In terms of binding, it is critical and sophisticated to find out where to bind these +> * converters. Generally, we need to find out CPU plans right before/after the potential R2C/C2R +> * transitions, so as to integrate these converters during post columnar overriding. These plans +> * may be Aggregate stages themselves. They may also be plans inserted between Aggregate stages, +> * such as: ShuffleExchangeExec, SortExec. +> * +> * The binding carries out by storing buffer converters as tag values of certain CPU Plans. These +> * plans are either the child of GpuRowToColumnarExec or the parent of GpuColumnarToRowExec. The +> * converters are cached inside CPU Plans rather than GPU ones (like: the child of +> * GpuColumnarToRowExec), because we can't access the GPU Plans during binding since they are +> * yet created. And GPU Plans created by RapidsMeta don't keep the tags of their CPU +> * counterparts. +> */ +> private def handleAggregationBuffer( +> meta: GpuTypedImperativeSupportedAggregateExecMeta[_]): Unit = { +> // We only run the check for final stages which contain TypedImperativeAggregate. +> val needToCheck = containTypedImperativeAggregate(meta, Some(Final)) +> if (!needToCheck) return +> // Avoid duplicated check and fallback. +> val checked = meta.agg.getTagValue[Boolean](bufferConverterInjected).contains(true) +> if (checked) return +> meta.agg.setTagValue(bufferConverterInjected, true) +> +> // Fetch AggregateMetas of all stages which belong to current Aggregate +> val stages = getAggregateOfAllStages(meta, meta.agg.logicalLink.get) +> +> // Find out stages in which the buffer converters are essential. +> val needBufferConversion = stages.indices.map { +> case i if i == stages.length - 1 => false +> case i => +> // Buffer converters are only needed to inject between two stages if [A] and [B]. +> // [A]. there will be a R2C or C2R transition between them +> // [B]. there exists TypedImperativeAggregate functions in each of them +> (stages(i).canThisBeReplaced ^ stages(i + 1).canThisBeReplaced) && +> containTypedImperativeAggregate(stages(i)) && +> containTypedImperativeAggregate(stages(i + 1)) +> } +> +> // Return if all internal aggregation buffers are compatible with GPU Overrides. +> if (needBufferConversion.forall(!_)) return +> +> // Fall back all GPU supported stages to CPU, if there exists TypedImperativeAggregate buffer +> // who doesn't support data format transition in runtime. Otherwise, build buffer converters +> // and bind them to certain SparkPlans. +> val entireFallback = !stages.zip(needBufferConversion).forall { case (stage, needConvert) => +> !needConvert || stage.availableRuntimeDataTransition +> } +> if (entireFallback) { +> stages.foreach { +> case aggMeta if aggMeta.canThisBeReplaced => +> aggMeta.willNotWorkOnGpu("Associated fallback for TypedImperativeAggregate") +> case _ => +> } +> } else { +> bindBufferConverters(stages, needBufferConversion) +> } +> } +> +> /** +> * Bind converters as TreeNodeTags into the CPU plans who are right before/after the potential +> * R2C/C2R transitions (the transitions are yet inserted). +> * +> * These converters are CPU expressions, and they will be integrated into GpuRowToColumnarExec/ +> * GpuColumnarToRowExec as pre/post transitions. What we do here, is to create these converters +> * according to the stage pairs. +> */ +> private def bindBufferConverters(stages: Seq[GpuBaseAggregateMeta[_]], +> needBufferConversion: Seq[Boolean]): Unit = { +> +> needBufferConversion.zipWithIndex.foreach { +> case (needConvert, i) if needConvert => +> // Find the next edge from the given stage which is a splitting point between CPU plans +> // and GPU plans. The method returns a pair of plans around the edge. For instance, +> // +> // stage(i): GpuObjectHashAggregateExec -> +> // pair_head: GpuShuffleExec -> +> // pair_tail and stage(i + 1): ObjectHashAggregateExec +> // +> // In example above, stage(i) is a GpuPlan while stage(i + 1) is not. And we assume +> // both of them including TypedImperativeAgg. So, in terms of stage(i) the buffer +> // conversion is necessary. Then, we call the method nextEdgeForConversion to find +> // the edge next to stage(i), which is between GpuShuffleExec and stage(i + 1). +> nextEdgeForConversion(stages(i)) match { +> // create preRowToColumnarTransition, and bind it to the child node (CPU plan) of +> // GpuRowToColumnarExec +> case List(parent, child) if parent.canThisBeReplaced => +> val childPlan = child.wrapped.asInstanceOf[SparkPlan] +> val expressions = createBufferConverter(stages(i), stages(i + 1), true) +> childPlan.setTagValue(GpuOverrides.preRowToColProjection, expressions) +> // create postColumnarToRowTransition, and bind it to the parent node (CPU plan) of +> // GpuColumnarToRowExec +> case List(parent, _) => +> val parentPlan = parent.wrapped.asInstanceOf[SparkPlan] +> val expressions = createBufferConverter(stages(i), stages(i + 1), false) +> parentPlan.setTagValue(GpuOverrides.postColToRowProjection, expressions) +> } +> case _ => +> } +> } +> +> private def containTypedImperativeAggregate(meta: GpuBaseAggregateMeta[_], +> desiredMode: Option[AggregateMode] = None): Boolean = +> meta.agg.aggregateExpressions.exists { +> case e: AggregateExpression if desiredMode.forall(_ == e.mode) => +> e.aggregateFunction.isInstanceOf[TypedImperativeAggregate[_]] +> case _ => false +> } +> +> private def createBufferConverter(mergeAggMeta: GpuBaseAggregateMeta[_], +> partialAggMeta: GpuBaseAggregateMeta[_], +> fromCpuToGpu: Boolean): Seq[NamedExpression] = { +> +> val converters = mutable.Queue[Either[ +> CpuToGpuAggregateBufferConverter, GpuToCpuAggregateBufferConverter]]() +> mergeAggMeta.childExprs.foreach { +> case e if e.childExprs.length == 1 && +> e.childExprs.head.isInstanceOf[TypedImperativeAggExprMeta[_]] => +> e.wrapped.asInstanceOf[AggregateExpression].mode match { +> case Final | PartialMerge => +> val typImpAggMeta = e.childExprs.head.asInstanceOf[TypedImperativeAggExprMeta[_]] +> val converter = if (fromCpuToGpu) { +> Left(typImpAggMeta.createCpuToGpuBufferConverter()) +> } else { +> Right(typImpAggMeta.createGpuToCpuBufferConverter()) +> } +> converters.enqueue(converter) +> case _ => +> } +> case _ => +> } +> +> val expressions = partialAggMeta.resultExpressions.map { +> case retExpr if retExpr.typeMeta.typeConverted => +> val resultExpr = retExpr.wrapped.asInstanceOf[AttributeReference] +> val ref = if (fromCpuToGpu) { +> resultExpr +> } else { +> resultExpr.copy(dataType = retExpr.typeMeta.dataType.get)( +> resultExpr.exprId, resultExpr.qualifier) +> } +> converters.dequeue() match { +> case Left(converter) => +> ShimLoader.getSparkShims.alias(converter.createExpression(ref), +> ref.name + "_converted")(NamedExpression.newExprId) +> case Right(converter) => +> ShimLoader.getSparkShims.alias(converter.createExpression(ref), +> ref.name + "_converted")(NamedExpression.newExprId) +> } +> case retExpr => +> retExpr.wrapped.asInstanceOf[NamedExpression] +> } +> +> expressions +> } +> +> private def getAggregateOfAllStages( +> currentMeta: SparkPlanMeta[_], logical: LogicalPlan): List[GpuBaseAggregateMeta[_]] = { +> currentMeta match { +> case aggMeta: GpuBaseAggregateMeta[_] if aggMeta.agg.logicalLink.contains(logical) => +> List[GpuBaseAggregateMeta[_]](aggMeta) ++ +> getAggregateOfAllStages(aggMeta.childPlans.head, logical) +> case shuffleMeta: GpuShuffleMeta => +> getAggregateOfAllStages(shuffleMeta.childPlans.head, logical) +> case sortMeta: GpuSortMeta => +> getAggregateOfAllStages(sortMeta.childPlans.head, logical) +> case _ => +> List[GpuBaseAggregateMeta[_]]() +> } +> } +> +> @tailrec +> private def nextEdgeForConversion(meta: SparkPlanMeta[_]): Seq[SparkPlanMeta[_]] = { +> val child = meta.childPlans.head +> if (meta.canThisBeReplaced ^ child.canThisBeReplaced) { +> List(meta, child) +> } else { +> nextEdgeForConversion(child) +> } +> } +> } +> +> class GpuHashAggregateMeta( +> override val agg: HashAggregateExec, +> conf: RapidsConf, +> parent: Option[RapidsMeta[_, _, _]], +> rule: DataFromReplacementRule) +> extends GpuBaseAggregateMeta(agg, agg.requiredChildDistributionExpressions, +> conf, parent, rule) +> +> class GpuSortAggregateExecMeta( +> override val agg: SortAggregateExec, +> conf: RapidsConf, +> parent: Option[RapidsMeta[_, _, _]], +> rule: DataFromReplacementRule) +> extends GpuTypedImperativeSupportedAggregateExecMeta(agg, +> agg.requiredChildDistributionExpressions, conf, parent, rule) { +> override def tagPlanForGpu(): Unit = { +> super.tagPlanForGpu() +199,261d1302 +< +< /** +< * Tagging checks tied to configs that control the aggregation modes that are replaced. +< * +< * The rule of replacement is determined by `spark.rapids.sql.hashAgg.replaceMode`, which +< * is a string configuration consisting of AggregateMode names in lower cases connected by +< * &(AND) and |(OR). The default value of this config is `all`, which indicates replacing all +< * aggregates if possible. +< * +< * The `|` serves as the outer connector, which represents patterns of both sides are able to be +< * replaced. For instance, `final|partialMerge` indicates that aggregate plans purely in either +< * Final mode or PartialMerge mode can be replaced. But aggregate plans also contain +< * AggExpressions of other mode will NOT be replaced, such as: stage 3 of single distinct +< * aggregate who contains both Partial and PartialMerge. +< * +< * On the contrary, the `&` serves as the inner connector, which intersects modes of both sides +< * to form a mode pattern. The replacement only takes place for aggregate plans who have the +< * exact same mode pattern as what defined the rule. For instance, `partial&partialMerge` means +< * that aggregate plans can be only replaced if they contain AggExpressions of Partial and +< * contain AggExpressions of PartialMerge and don't contain AggExpressions of other modes. +< * +< * In practice, we need to combine `|` and `&` to form some sophisticated patterns. For instance, +< * `final&complete|final|partialMerge` represents aggregate plans in three different patterns are +< * GPU-replaceable: plans contain both Final and Complete modes; plans only contain Final mode; +< * plans only contain PartialMerge mode. +< */ +< private def tagForReplaceMode(): Unit = { +< val aggPattern = agg.aggregateExpressions.map(_.mode).toSet +< val strPatternToReplace = conf.hashAggReplaceMode.toLowerCase +< +< if (aggPattern.nonEmpty && strPatternToReplace != "all") { +< val aggPatternsCanReplace = strPatternToReplace.split("\\|").map { subPattern => +< subPattern.split("&").map { +< case "partial" => Partial +< case "partialmerge" => PartialMerge +< case "final" => Final +< case "complete" => Complete +< case s => throw new IllegalArgumentException(s"Invalid Aggregate Mode $s") +< }.toSet +< } +< if (!aggPatternsCanReplace.contains(aggPattern)) { +< val message = aggPattern.map(_.toString).mkString(",") +< willNotWorkOnGpu(s"Replacing mode pattern `$message` hash aggregates disabled") +< } else if (aggPattern == Set(Partial)) { +< // In partial mode, if there are non-distinct functions and multiple distinct functions, +< // non-distinct functions are computed using the First operator. The final result would be +< // incorrect for non-distinct functions for partition size > 1. Reason for this is - if +< // the first batch computed and sent to CPU doesn't contain all the rows required to +< // compute non-distinct function(s), then Spark would consider that value as final result +< // (due to First). Fall back to CPU in this case. +< if (AggregateUtils.shouldFallbackMultiDistinct(agg.aggregateExpressions)) { +< willNotWorkOnGpu("Aggregates of non-distinct functions with multiple distinct " + +< "functions are non-deterministic for non-distinct functions as it is " + +< "computed using First.") +< } +< } +< } +< +< if (!conf.partialMergeDistinctEnabled && aggPattern.contains(PartialMerge)) { +< willNotWorkOnGpu("Replacing Partial Merge aggregates disabled. " + +< s"Set ${conf.partialMergeDistinctEnabled} to true if desired") +< } +< } +264,266d1304 +< // SPARK 2.x we can't check for the TypedImperativeAggregate properly so don't say we do the +< // ObjectHashAggregate +< /* +270c1308 +< parent: Option[RapidsMeta[_, _]], +--- +> parent: Option[RapidsMeta[_, _, _]], +275c1313,1514 +< */ +--- +> /** +> * The GPU version of HashAggregateExec +> * +> * @param requiredChildDistributionExpressions this is unchanged by the GPU. It is used in +> * EnsureRequirements to be able to add shuffle nodes +> * @param groupingExpressions The expressions that, when applied to the input batch, return the +> * grouping key +> * @param aggregateExpressions The GpuAggregateExpression instances for this node +> * @param aggregateAttributes References to each GpuAggregateExpression (attribute references) +> * @param resultExpressions the expected output expression of this hash aggregate (which this +> * node should project) +> * @param child incoming plan (where we get input columns from) +> * @param configuredTargetBatchSize user-configured maximum device memory size of a batch +> */ +> case class GpuHashAggregateExec( +> requiredChildDistributionExpressions: Option[Seq[Expression]], +> groupingExpressions: Seq[NamedExpression], +> aggregateExpressions: Seq[GpuAggregateExpression], +> aggregateAttributes: Seq[Attribute], +> resultExpressions: Seq[NamedExpression], +> child: SparkPlan, +> configuredTargetBatchSize: Long) extends ShimUnaryExecNode with GpuExec with Arm { +> +> // lifted directly from `BaseAggregateExec.inputAttributes`, edited comment. +> def inputAttributes: Seq[Attribute] = { +> val modes = aggregateExpressions.map(_.mode).distinct +> if (modes.contains(Final) || modes.contains(PartialMerge)) { +> // SPARK-31620: when planning aggregates, the partial aggregate uses aggregate function's +> // `inputAggBufferAttributes` as its output. And Final and PartialMerge aggregate rely on the +> // output to bind references used by `mergeAggregates`. But if we copy the +> // aggregate function somehow after aggregate planning, the `DeclarativeAggregate` will +> // be replaced by a new instance with new `inputAggBufferAttributes`. Then Final and +> // PartialMerge aggregate can't bind the references used by `mergeAggregates` with the output +> // of the partial aggregate, as they use the `inputAggBufferAttributes` of the +> // original `DeclarativeAggregate` before copy. Instead, we shall use +> // `inputAggBufferAttributes` after copy to match the new `mergeExpressions`. +> val aggAttrs = inputAggBufferAttributes +> child.output.dropRight(aggAttrs.length) ++ aggAttrs +> } else { +> child.output +> } +> } +> +> private val inputAggBufferAttributes: Seq[Attribute] = { +> aggregateExpressions +> // there're exactly four cases needs `inputAggBufferAttributes` from child according to the +> // agg planning in `AggUtils`: Partial -> Final, PartialMerge -> Final, +> // Partial -> PartialMerge, PartialMerge -> PartialMerge. +> .filter(a => a.mode == Final || a.mode == PartialMerge) +> .flatMap(_.aggregateFunction.aggBufferAttributes) +> } +> +> private lazy val uniqueModes: Seq[AggregateMode] = aggregateExpressions.map(_.mode).distinct +> +> protected override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL +> protected override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL +> override lazy val additionalMetrics: Map[String, GpuMetric] = Map( +> NUM_TASKS_FALL_BACKED -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_TASKS_FALL_BACKED), +> OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME), +> AGG_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_AGG_TIME), +> CONCAT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_TIME), +> SORT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_SORT_TIME) +> ) ++ spillMetrics +> +> // requiredChildDistributions are CPU expressions, so remove it from the GPU expressions list +> override def gpuExpressions: Seq[Expression] = +> groupingExpressions ++ aggregateExpressions ++ aggregateAttributes ++ resultExpressions +> +> override def verboseStringWithOperatorId(): String = { +> s""" +> |$formattedNodeName +> |${ExplainUtils.generateFieldString("Input", child.output)} +> |${ExplainUtils.generateFieldString("Keys", groupingExpressions)} +> |${ExplainUtils.generateFieldString("Functions", aggregateExpressions)} +> |${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)} +> |${ExplainUtils.generateFieldString("Results", resultExpressions)} +> |""".stripMargin +> } +> +> override def doExecuteColumnar(): RDD[ColumnarBatch] = { +> val aggMetrics = GpuHashAggregateMetrics( +> numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS), +> numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES), +> numTasksFallBacked = gpuLongMetric(NUM_TASKS_FALL_BACKED), +> opTime = gpuLongMetric(OP_TIME), +> computeAggTime = gpuLongMetric(AGG_TIME), +> concatTime = gpuLongMetric(CONCAT_TIME), +> sortTime = gpuLongMetric(SORT_TIME), +> semWaitTime = gpuLongMetric(SEMAPHORE_WAIT_TIME), +> makeSpillCallback(allMetrics)) +> +> // cache in a local variable to avoid serializing the full child plan +> val groupingExprs = groupingExpressions +> val aggregateExprs = aggregateExpressions +> val aggregateAttrs = aggregateAttributes +> val resultExprs = resultExpressions +> val modeInfo = AggregateModeInfo(uniqueModes) +> +> val rdd = child.executeColumnar() +> +> rdd.mapPartitions { cbIter => +> new GpuHashAggregateIterator( +> cbIter, +> inputAttributes, +> groupingExprs, +> aggregateExprs, +> aggregateAttrs, +> resultExprs, +> modeInfo, +> aggMetrics, +> configuredTargetBatchSize) +> } +> } +> +> protected def outputExpressions: Seq[NamedExpression] = resultExpressions +> +> // +> // This section is derived (copied in most cases) from HashAggregateExec +> // +> private[this] val aggregateBufferAttributes = { +> aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) +> } +> +> final override def outputPartitioning: Partitioning = { +> if (hasAlias) { +> child.outputPartitioning match { +> case h: HashPartitioning => h.copy(expressions = replaceAliases(h.expressions)) +> case other => other +> } +> } else { +> child.outputPartitioning +> } +> } +> +> protected def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined +> +> protected def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = { +> exprs.map { +> case a: AttributeReference => replaceAlias(a).getOrElse(a) +> case other => other +> } +> } +> +> protected def replaceAlias(attr: AttributeReference): Option[Attribute] = { +> outputExpressions.collectFirst { +> case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) => +> a.toAttribute +> } +> } +> +> // Used in de-duping and optimizer rules +> override def producedAttributes: AttributeSet = +> AttributeSet(aggregateAttributes) ++ +> AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ +> AttributeSet(aggregateBufferAttributes) +> +> // AllTuples = distribution with a single partition and all tuples of the dataset are co-located. +> // Clustered = dataset with tuples co-located in the same partition if they share a specific value +> // Unspecified = distribution with no promises about co-location +> override def requiredChildDistribution: List[Distribution] = { +> requiredChildDistributionExpressions match { +> case Some(exprs) if exprs.isEmpty => AllTuples :: Nil +> case Some(exprs) if exprs.nonEmpty => ClusteredDistribution(exprs) :: Nil +> case None => UnspecifiedDistribution :: Nil +> } +> } +> +> override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) +> +> override def doExecute(): RDD[InternalRow] = throw new IllegalStateException( +> "Row-based execution should not occur for this class") +> +> /** +> * All the attributes that are used for this plan. NOT used for aggregation +> */ +> override lazy val allAttributes: AttributeSeq = +> child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++ +> aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) +> +> override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields) +> +> override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields) +> +> private def toString(verbose: Boolean, maxFields: Int): String = { +> val allAggregateExpressions = aggregateExpressions +> +> val keyString = +> truncatedString(groupingExpressions, "[", ", ", "]", maxFields) +> val functionString = +> truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields) +> val outputString = truncatedString(output, "[", ", ", "]", maxFields) +> if (verbose) { +> s"GpuHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)" +> } else { +> s"GpuHashAggregate(keys=$keyString, functions=$functionString)," + +> s" filters=${aggregateExpressions.map(_.filter)})" +> } +> } +> // +> // End copies from HashAggregateExec +> // +> } diff --git a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExecMeta.scala b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExecMeta.scala index 92aa71728e4..8c19f858cff 100644 --- a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExecMeta.scala +++ b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuBroadcastHashJoinExecMeta.scala @@ -33,18 +33,18 @@ class GpuBroadcastHashJoinMeta( join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val rightKeys: Seq[BaseExprMeta[_]] = join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val condition: Option[BaseExprMeta[_]] = + val conditionMeta: Option[BaseExprMeta[_]] = join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val buildSide: GpuBuildSide = GpuJoinUtils.getGpuBuildSide(join.buildSide) override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] = - JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition) + JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta) - override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys, - join.condition) + conditionMeta) val Seq(leftChild, rightChild) = childPlans val buildSideMeta = buildSide match { case GpuBuildLeft => leftChild diff --git a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExecMeta.scala b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExecMeta.scala index 8b3382b3db8..888e8b4f8fb 100644 --- a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExecMeta.scala +++ b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuShuffledHashJoinExecMeta.scala @@ -31,17 +31,17 @@ class GpuShuffledHashJoinMeta( join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val rightKeys: Seq[BaseExprMeta[_]] = join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val condition: Option[BaseExprMeta[_]] = + val conditionMeta: Option[BaseExprMeta[_]] = join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val buildSide: GpuBuildSide = GpuJoinUtils.getGpuBuildSide(join.buildSide) - override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] = - JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition) + JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta) override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys, - join.condition) + conditionMeta) } } diff --git a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuSortMergeJoinMeta.scala b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuSortMergeJoinMeta.scala index fa78118b484..c19b1213798 100644 --- a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuSortMergeJoinMeta.scala +++ b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/v2/GpuSortMergeJoinMeta.scala @@ -34,7 +34,7 @@ class GpuSortMergeJoinMeta( join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val rightKeys: Seq[BaseExprMeta[_]] = join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val condition: Option[BaseExprMeta[_]] = join.condition.map( + val conditionMeta: Option[BaseExprMeta[_]] = join.condition.map( GpuOverrides.wrapExpr(_, conf, Some(this))) val buildSide: GpuBuildSide = if (GpuHashJoin.canBuildRight(join.joinType)) { GpuBuildRight @@ -44,15 +44,15 @@ class GpuSortMergeJoinMeta( throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join") } - override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] = - JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition) + JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta) override def tagPlanForGpu(): Unit = { // Use conditions from Hash Join GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys, - join.condition) + conditionMeta) if (!conf.enableReplaceSortMergeJoin) { willNotWorkOnGpu(s"Not replacing sort merge join with hash join, " + diff --git a/spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index c01dc173c64..8d2886c6f80 100644 --- a/spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/spark2-sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -94,15 +94,15 @@ object JoinTypeChecks { object GpuHashJoin { def tagJoin( - meta: RapidsMeta[_, _], + meta: SparkPlanMeta[_], joinType: JoinType, buildSide: GpuBuildSide, leftKeys: Seq[Expression], rightKeys: Seq[Expression], - condition: Option[Expression]): Unit = { + conditionMeta: Option[BaseExprMeta[_]]): Unit = { val keyDataTypes = (leftKeys ++ rightKeys).map(_.dataType) - def unSupportNonEqualCondition(): Unit = if (condition.isDefined) { + def unSupportNonEqualCondition(): Unit = if (conditionMeta.isDefined) { meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions") } def unSupportStructKeys(): Unit = if (keyDataTypes.exists(_.isInstanceOf[StructType])) { @@ -111,10 +111,12 @@ object GpuHashJoin { JoinTypeChecks.tagForGpu(joinType, meta) joinType match { case _: InnerLike => - case RightOuter | LeftOuter | LeftSemi | LeftAnti => + case RightOuter | LeftOuter => + conditionMeta.foreach(meta.requireAstForGpuOn) + case LeftSemi | LeftAnti => unSupportNonEqualCondition() case FullOuter => - unSupportNonEqualCondition() + conditionMeta.foreach(meta.requireAstForGpuOn) // FullOuter join cannot support with struct keys as two issues below // * https://github.com/NVIDIA/spark-rapids/issues/2126 // * https://github.com/rapidsai/cudf/issues/7947