diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 89cf1b0ad39..6a3a34657ab 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -2213,13 +2213,25 @@ object SupportedOpsForTools { private def operatorMappingWithScore(): Unit = { val header = Seq("CPUOperator", "Score") println(header.mkString(",")) + val operatorCustomSpeedUp = Map( + ("BroadcastHashJoinExec", "3.0"), + ("ShuffleExchangeExec", "3.1"), + ("FilterExec", "2.4"), + ("HashAggregateExec", "3.4"), + ("SortExec", "6.0"), + ("SortMergeJoinExec", "14.9")) GpuOverrides.execs.values.toSeq.sortBy(_.tag.toString).foreach { rule => val checks = rule.getChecks if (rule.isVisible && checks.forall(_.shown)) { val cpuName = rule.tag.runtimeClass.getSimpleName - // We are assigning speed up of 2 to all the Execs supported by the plugin. This can be - // adjusted later. - val allCols = Seq(cpuName, "2") + // We have estimated speed up of some of the operators by running various queries. Assign + // custom speed up for the operators which are evaluated. For other operators we are + // assigning speed up of 2.0 + val allCols = if (operatorCustomSpeedUp.contains(cpuName)) { + Seq(cpuName, operatorCustomSpeedUp(cpuName)) + } else { + Seq(cpuName, "2.0") + } println(s"${allCols.mkString(",")}") } } diff --git a/tools/src/main/resources/operatorsScore.csv b/tools/src/main/resources/operatorsScore.csv index c17d710a56c..7b4d776d451 100644 --- a/tools/src/main/resources/operatorsScore.csv +++ b/tools/src/main/resources/operatorsScore.csv @@ -1,39 +1,39 @@ CPUOperator,Score -CoalesceExec,2 -CollectLimitExec,2 -ExpandExec,2 -FileSourceScanExec,2 -FilterExec,2 -GenerateExec,2 -GlobalLimitExec,2 -LocalLimitExec,2 -ProjectExec,2 -RangeExec,2 -SampleExec,2 -SortExec,2 -SubqueryBroadcastExec,2 -TakeOrderedAndProjectExec,2 -UnionExec,2 -CustomShuffleReaderExec,2 -HashAggregateExec,2 -ObjectHashAggregateExec,2 -SortAggregateExec,2 -InMemoryTableScanExec,2 -DataWritingCommandExec,2 -BatchScanExec,2 -BroadcastExchangeExec,2 -ShuffleExchangeExec,2 -BroadcastHashJoinExec,2 -BroadcastNestedLoopJoinExec,2 -CartesianProductExec,2 -ShuffledHashJoinExec,2 -SortMergeJoinExec,2 -AggregateInPandasExec,2 -ArrowEvalPythonExec,2 -FlatMapGroupsInPandasExec,2 -MapInPandasExec,2 -WindowInPandasExec,2 -WindowExec,2 +CoalesceExec,2.0 +CollectLimitExec,2.0 +ExpandExec,2.0 +FileSourceScanExec,2.0 +FilterExec,2.4 +GenerateExec,2.0 +GlobalLimitExec,2.0 +LocalLimitExec,2.0 +ProjectExec,2.0 +RangeExec,2.0 +SampleExec,2.0 +SortExec,6.0 +SubqueryBroadcastExec,2.0 +TakeOrderedAndProjectExec,2.0 +UnionExec,2.0 +CustomShuffleReaderExec,2.0 +HashAggregateExec,3.4 +ObjectHashAggregateExec,2.0 +SortAggregateExec,2.0 +InMemoryTableScanExec,2.0 +DataWritingCommandExec,2.0 +BatchScanExec,2.0 +BroadcastExchangeExec,2.0 +ShuffleExchangeExec,3.1 +BroadcastHashJoinExec,3.0 +BroadcastNestedLoopJoinExec,2.0 +CartesianProductExec,2.0 +ShuffledHashJoinExec,2.0 +SortMergeJoinExec,14.9 +AggregateInPandasExec,2.0 +ArrowEvalPythonExec,2.0 +FlatMapGroupsInPandasExec,2.0 +MapInPandasExec,2.0 +WindowInPandasExec,2.0 +WindowExec,2.0 Abs,3 Acos,3 Acosh,3 diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala index d961dc51953..e978c011963 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/AggregateInPandasExecParser.scala @@ -33,7 +33,7 @@ case class AggregateInPandasExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala index 77611a1b667..e13e590f704 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ArrowEvalPythonExecParser.scala @@ -33,7 +33,7 @@ case class ArrowEvalPythonExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala index 373c44817be..e2af2ae788a 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BatchScanExecParser.scala @@ -37,7 +37,7 @@ case class BatchScanExecParser( // don't use the isExecSupported because we have finer grain. val score = ReadParser.calculateReadScoreRatio(readInfo, checker) val speedupFactor = checker.getSpeedupFactor(fullExecName) - val overallSpeedup = Math.max((speedupFactor * score).toInt, 1) + val overallSpeedup = Math.max((speedupFactor * score), 1.0) // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, s"${node.name} ${readInfo.format}", "", overallSpeedup, diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala index 30a824194af..7ad10d9c528 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastExchangeExecParser.scala @@ -42,7 +42,7 @@ case class BroadcastExchangeExecParser( val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", filterSpeedupFactor, diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala index cd11459e1f7..c52a7424738 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastHashJoinExecParser.scala @@ -33,7 +33,7 @@ case class BroadcastHashJoinExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala index c6f00e51c13..2e65adf2e1e 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/BroadcastNestedLoopJoinExecParser.scala @@ -33,7 +33,7 @@ case class BroadcastNestedLoopJoinExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala index 348142d4e24..66ec127d58a 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CartesianProductExecParser.scala @@ -33,7 +33,7 @@ case class CartesianProductExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala index 66fbccde9be..7f0c0f228f8 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CoalesceExecParser.scala @@ -33,7 +33,7 @@ case class CoalesceExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala index d729ecae84e..4623bdfbe9f 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CollectLimitExecParser.scala @@ -33,7 +33,7 @@ case class CollectLimitExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala index bfe6bfda08c..63a09f79f2c 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/CustomShuffleReaderExecParser.scala @@ -35,7 +35,7 @@ case class CustomShuffleReaderExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) } diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala index 2e273a7ed1c..63619020862 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ExpandExecParser.scala @@ -33,7 +33,7 @@ case class ExpandExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala index a9359ec0dcc..1ceca1f91b9 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FileSourceScanExecParser.scala @@ -39,7 +39,7 @@ case class FileSourceScanExecParser( // don't use the isExecSupported because we have finer grain. val score = ReadParser.calculateReadScoreRatio(readInfo, checker) val speedupFactor = checker.getSpeedupFactor(fullExecName) - val overallSpeedup = Math.max((speedupFactor * score).toInt, 1) + val overallSpeedup = Math.max((speedupFactor * score), 1.0) // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", overallSpeedup, maxDuration, node.id, score > 0, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala index c256d453fc5..213232d75fa 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FilterExecParser.scala @@ -33,7 +33,7 @@ case class FilterExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala index 74a2e481dd7..1e96f0fd8fd 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/FlatMapGroupsInPandasExecParser.scala @@ -33,7 +33,7 @@ case class FlatMapGroupsInPandasExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala index 1a351e46a07..a8662dc2721 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenerateExecParser.scala @@ -33,7 +33,7 @@ case class GenerateExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala index 3a0af88a993..a707c32b226 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GlobalLimitExecParser.scala @@ -33,7 +33,7 @@ case class GlobalLimitExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala index d070e867d35..8518554a481 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/HashAggregateExecParser.scala @@ -38,7 +38,7 @@ case class HashAggregateExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala index 2d95da2c46c..81705b9001f 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/InMemoryTableScanExecParser.scala @@ -32,7 +32,7 @@ case class InMemoryTableScanExecParser( val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala index cde34251445..a550d509ef8 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/LocalLimitExecParser.scala @@ -33,7 +33,7 @@ case class LocalLimitExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala index 7e056968a39..4e32714baab 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/MapInPandasExecParser.scala @@ -33,7 +33,7 @@ case class MapInPandasExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala index 0d25ea4d81f..e9292fbe244 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ObjectHashAggregateExecParser.scala @@ -38,7 +38,7 @@ case class ObjectHashAggregateExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala index 676ae4928e1..354ccc15869 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ProjectExecParser.scala @@ -33,7 +33,7 @@ case class ProjectExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala index 8d67bc7326f..fd228833455 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/RangeExecParser.scala @@ -33,7 +33,7 @@ case class RangeExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index f51b663ffb9..466cbaec054 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -29,7 +29,7 @@ case class ExecInfo( sqlID: Long, exec: String, expr: String, - speedupFactor: Int, + speedupFactor: Double, duration: Option[Long], nodeId: Long, isSupported: Boolean, @@ -189,12 +189,12 @@ object SQLPlanParser extends Logging { /** * This function is used to calculate an average speedup factor. The input - * is assumed to an array of ints where each element is >= 1. If the input array + * is assumed to an array of doubles where each element is >= 1. If the input array * is empty we return 1 because we assume we don't slow things down. Generally * the array shouldn't be empty, but if there is some weird case we don't want to * blow up, just say we don't speed it up. */ - def averageSpeedup(arr: Seq[Int]): Int = if (arr.isEmpty) 1 else arr.sum / arr.size + def averageSpeedup(arr: Seq[Double]): Double = if (arr.isEmpty) 1.0 else arr.sum / arr.size /** * Get the total duration by finding the accumulator with the largest value. diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala index 80f3b89300f..a2596efd32b 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SampleExecParser.scala @@ -33,7 +33,7 @@ case class SampleExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala index a72222163df..471e1369c33 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffleExchangeExecParser.scala @@ -42,7 +42,7 @@ case class ShuffleExchangeExecParser( val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala index e461bf1bc17..7035d9d0d1d 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/ShuffledHashJoinExecParser.scala @@ -37,7 +37,7 @@ case class ShuffledHashJoinExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala index 75a816e64b1..4c7ac2ce589 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortAggregateExecParser.scala @@ -33,7 +33,7 @@ case class SortAggregateExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala index 81822685588..5bfb98f9fb4 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortExecParser.scala @@ -33,7 +33,7 @@ case class SortExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala index 059e150d756..d8b4c0c84ed 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SortMergeJoinExecParser.scala @@ -33,7 +33,7 @@ case class SortMergeJoinExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala index c72ac665c14..e04e8a7b540 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SubqueryBroadcastExecParser.scala @@ -27,7 +27,7 @@ case class SubqueryBroadcastExecParser( sqlID: Long, app: AppBase) extends ExecParser { - val fullExecName = "ShuffleExchangeExec" + val fullExecName = node.name + "Exec" override def parse: ExecInfo = { val collectTimeId = node.metrics.find(_.name == "time to collect (ms)").map(_.accumulatorId) @@ -35,7 +35,7 @@ case class SubqueryBroadcastExecParser( val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - check is broadcast associated can be replaced // TODO - add in parsing expressions - average speedup across? diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala index d1a6b25de97..61ae48c72b2 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/TakeOrderedAndProjectExecParser.scala @@ -33,7 +33,7 @@ case class TakeOrderedAndProjectExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala index aae64a80bf7..9b4807af5af 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/UnionExecParser.scala @@ -33,7 +33,7 @@ case class UnionExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala index 139a106fc21..9d507ce0aec 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowExecParser.scala @@ -33,7 +33,7 @@ case class WindowExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala index 99a873c9972..280634a5de9 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WindowInPandasExecParser.scala @@ -33,7 +33,7 @@ case class WindowInPandasExecParser( val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) { (checker.getSpeedupFactor(fullExecName), true) } else { - (1, false) + (1.0, false) } // TODO - add in parsing expressions - average speedup across? ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index b9bbee7ef76..9d4fa2de30a 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -65,7 +65,7 @@ class PluginTypeChecker extends Logging { def setOperatorScore(filePath: String): Unit = { val source = Source.fromFile(filePath) - supportedOperatorsScore = readSupportedOperators(source).map(x => (x._1, x._2.toInt)) + supportedOperatorsScore = readSupportedOperators(source).map(x => (x._1, x._2.toDouble)) } def setSupportedExecs(filePath: String): Unit = { @@ -82,9 +82,9 @@ class PluginTypeChecker extends Logging { def getSupportedExprs: Map[String, String] = supportedExprs - private def readOperatorsScore: Map[String, Int] = { + private def readOperatorsScore: Map[String, Double] = { val source = Source.fromResource(OPERATORS_SCORE_FILE) - readSupportedOperators(source).map(x => (x._1, x._2.toInt)) + readSupportedOperators(source).map(x => (x._1, x._2.toDouble)) } private def readSupportedExecs: Map[String, String] = { @@ -231,7 +231,7 @@ class PluginTypeChecker extends Logging { writeFormats.map(x => x.trim).contains(_)) } - def getSpeedupFactor(execOrExpr: String): Int = { + def getSpeedupFactor(execOrExpr: String): Double = { supportedOperatorsScore.get(execOrExpr).getOrElse(-1) } diff --git a/tools/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/tools/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index f66d028fcd4..9cfbd0d11c3 100644 --- a/tools/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/tools/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -53,11 +53,11 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging { } } - private def assertSizeAndSupported(size: Int, execs: Seq[ExecInfo], + private def assertSizeAndSupported(size: Int, execs: Seq[ExecInfo], speedUpFactor: Double = 2.0, expectedDur: Seq[Option[Long]] = Seq.empty, extraText: String = ""): Unit = { for (t <- Seq(execs)) { assert(t.size == size, s"$extraText $t") - assert(t.forall(_.speedupFactor == 2), s"$extraText $t") + assert(t.forall(_.speedupFactor == speedUpFactor), s"$extraText $t") assert(t.forall(_.isSupported == true), s"$extraText $t") assert(t.forall(_.children.isEmpty), s"$extraText $t") if (expectedDur.nonEmpty) { @@ -118,13 +118,13 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging { val allChildren = wholeStages.flatMap(_.children).flatten assert(allChildren.size == 10) val filters = allChildren.filter(_.exec == "Filter") - assertSizeAndSupported(2, filters) + assertSizeAndSupported(2, filters, 2.4) val projects = allChildren.filter(_.exec == "Project") assertSizeAndSupported(2, projects) val sorts = allChildren.filter(_.exec == "Sort") - assertSizeAndSupported(3, sorts) + assertSizeAndSupported(3, sorts, 6.0) val smj = allChildren.filter(_.exec == "SortMergeJoin") - assertSizeAndSupported(1, smj) + assertSizeAndSupported(1, smj, 14.9) } } } @@ -149,7 +149,7 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging { assert(wholeStages.forall(_.duration.nonEmpty)) val allChildren = wholeStages.flatMap(_.children).flatten val hashAggregate = allChildren.filter(_.exec == "HashAggregate") - assertSizeAndSupported(2, hashAggregate) + assertSizeAndSupported(2, hashAggregate, 3.4) } } } @@ -282,11 +282,12 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging { } val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq) val broadcasts = allExecInfo.filter(_.exec == "BroadcastExchange") - assertSizeAndSupported(3, broadcasts.toSeq, Seq(Some(1154), Some(1154), Some(1855))) + assertSizeAndSupported(3, broadcasts.toSeq, + expectedDur = Seq(Some(1154), Some(1154), Some(1855))) val subqueryBroadcast = allExecInfo.filter(_.exec == "SubqueryBroadcast") - assertSizeAndSupported(1, subqueryBroadcast.toSeq, Seq(Some(1175))) + assertSizeAndSupported(1, subqueryBroadcast.toSeq, expectedDur = Seq(Some(1175))) val exchanges = allExecInfo.filter(_.exec == "Exchange") - assertSizeAndSupported(2, exchanges.toSeq, Seq(Some(15688), Some(8))) + assertSizeAndSupported(2, exchanges.toSeq, 3.1, expectedDur = Seq(Some(15688), Some(8))) } test("CustomShuffleReaderExec") { @@ -343,7 +344,7 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging { val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq) for (execName <- supportedExecs) { val execs = allExecInfo.filter(_.exec == execName) - assertSizeAndSupported(1, execs.toSeq, Seq.empty, execName) + assertSizeAndSupported(1, execs.toSeq, expectedDur = Seq.empty, extraText = execName) } for (execName <- unsupportedExecs) { val execs = allExecInfo.filter(_.exec == execName) @@ -403,10 +404,12 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging { SQLPlanParser.parseSQLPlan(plan, sqlID, pluginTypeChecker, app) } val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq) - for (execName <- supportedExecs) { - val supportedExec = allExecInfo.filter(_.exec == execName) - assertSizeAndSupported(1, supportedExec) - } + val bhj = allExecInfo.filter(_.exec == "BroadcastHashJoin") + assertSizeAndSupported(1, bhj, 3.0) + val broadcastNestedJoin = allExecInfo.filter(_.exec == "BroadcastNestedLoopJoin") + assertSizeAndSupported(1, broadcastNestedJoin) + val shj = allExecInfo.filter(_.exec == "ShuffledHashJoin") + assertSizeAndSupported(1, shj) } } diff --git a/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala b/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala index fd413eb0c16..9974cb88a7c 100644 --- a/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala +++ b/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala @@ -113,7 +113,7 @@ class PluginTypeCheckerSuite extends FunSuite with Logging { test("supported operator score from default file") { val checker = new PluginTypeChecker - assert(checker.getSpeedupFactor("FilterExec") == 2) + assert(checker.getSpeedupFactor("FilterExec") == 2.4) assert(checker.getSpeedupFactor("Ceil") == 3) }