Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Qualification tool: Update speedup factor for few operators #5490

Merged
merged 2 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2213,13 +2213,25 @@ object SupportedOpsForTools {
private def operatorMappingWithScore(): Unit = {
val header = Seq("CPUOperator", "Score")
println(header.mkString(","))
val operatorCustomSpeedUp = Map(
("BroadcastHashJoinExec", "3.0"),
("ShuffleExchangeExec", "3.1"),
("FilterExec", "2.4"),
("HashAggregateExec", "3.4"),
("SortExec", "6.0"),
("SortMergeJoinExec", "14.9"))
GpuOverrides.execs.values.toSeq.sortBy(_.tag.toString).foreach { rule =>
val checks = rule.getChecks
if (rule.isVisible && checks.forall(_.shown)) {
val cpuName = rule.tag.runtimeClass.getSimpleName
// We are assigning speed up of 2 to all the Execs supported by the plugin. This can be
// adjusted later.
val allCols = Seq(cpuName, "2")
// We have estimated speed up of some of the operators by running various queries. Assign
// custom speed up for the operators which are evaluated. For other operators we are
// assigning speed up of 2.0
val allCols = if (operatorCustomSpeedUp.contains(cpuName)) {
Seq(cpuName, operatorCustomSpeedUp(cpuName))
} else {
Seq(cpuName, "2.0")
}
println(s"${allCols.mkString(",")}")
}
}
Expand Down
70 changes: 35 additions & 35 deletions tools/src/main/resources/operatorsScore.csv
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
CPUOperator,Score
CoalesceExec,2
CollectLimitExec,2
ExpandExec,2
FileSourceScanExec,2
FilterExec,2
GenerateExec,2
GlobalLimitExec,2
LocalLimitExec,2
ProjectExec,2
RangeExec,2
SampleExec,2
SortExec,2
SubqueryBroadcastExec,2
TakeOrderedAndProjectExec,2
UnionExec,2
CustomShuffleReaderExec,2
HashAggregateExec,2
ObjectHashAggregateExec,2
SortAggregateExec,2
InMemoryTableScanExec,2
DataWritingCommandExec,2
BatchScanExec,2
BroadcastExchangeExec,2
ShuffleExchangeExec,2
BroadcastHashJoinExec,2
BroadcastNestedLoopJoinExec,2
CartesianProductExec,2
ShuffledHashJoinExec,2
SortMergeJoinExec,2
AggregateInPandasExec,2
ArrowEvalPythonExec,2
FlatMapGroupsInPandasExec,2
MapInPandasExec,2
WindowInPandasExec,2
WindowExec,2
CoalesceExec,2.0
CollectLimitExec,2.0
ExpandExec,2.0
FileSourceScanExec,2.0
FilterExec,2.4
GenerateExec,2.0
GlobalLimitExec,2.0
LocalLimitExec,2.0
ProjectExec,2.0
RangeExec,2.0
SampleExec,2.0
SortExec,6.0
SubqueryBroadcastExec,2.0
TakeOrderedAndProjectExec,2.0
UnionExec,2.0
CustomShuffleReaderExec,2.0
HashAggregateExec,3.4
ObjectHashAggregateExec,2.0
SortAggregateExec,2.0
InMemoryTableScanExec,2.0
DataWritingCommandExec,2.0
BatchScanExec,2.0
BroadcastExchangeExec,2.0
ShuffleExchangeExec,3.1
BroadcastHashJoinExec,3.0
BroadcastNestedLoopJoinExec,2.0
CartesianProductExec,2.0
ShuffledHashJoinExec,2.0
SortMergeJoinExec,14.9
AggregateInPandasExec,2.0
ArrowEvalPythonExec,2.0
FlatMapGroupsInPandasExec,2.0
MapInPandasExec,2.0
WindowInPandasExec,2.0
WindowExec,2.0
Abs,3
Acos,3
Acosh,3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class AggregateInPandasExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class ArrowEvalPythonExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class BatchScanExecParser(
// don't use the isExecSupported because we have finer grain.
val score = ReadParser.calculateReadScoreRatio(readInfo, checker)
val speedupFactor = checker.getSpeedupFactor(fullExecName)
val overallSpeedup = Math.max((speedupFactor * score).toInt, 1)
val overallSpeedup = Math.max((speedupFactor * score), 1.0)

// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, s"${node.name} ${readInfo.format}", "", overallSpeedup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class BroadcastExchangeExecParser(
val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", filterSpeedupFactor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class BroadcastHashJoinExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class BroadcastNestedLoopJoinExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class CartesianProductExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class CoalesceExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class CollectLimitExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class CustomShuffleReaderExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class ExpandExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case class FileSourceScanExecParser(
// don't use the isExecSupported because we have finer grain.
val score = ReadParser.calculateReadScoreRatio(readInfo, checker)
val speedupFactor = checker.getSpeedupFactor(fullExecName)
val overallSpeedup = Math.max((speedupFactor * score).toInt, 1)
val overallSpeedup = Math.max((speedupFactor * score), 1.0)

// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", overallSpeedup, maxDuration, node.id, score > 0, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class FilterExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class FlatMapGroupsInPandasExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class GenerateExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class GlobalLimitExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class HashAggregateExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}

// TODO - add in parsing expressions - average speedup across?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class InMemoryTableScanExecParser(
val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class LocalLimitExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class MapInPandasExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class ObjectHashAggregateExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}

// TODO - add in parsing expressions - average speedup across?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class ProjectExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class RangeExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ case class ExecInfo(
sqlID: Long,
exec: String,
expr: String,
speedupFactor: Int,
speedupFactor: Double,
duration: Option[Long],
nodeId: Long,
isSupported: Boolean,
Expand Down Expand Up @@ -189,12 +189,12 @@ object SQLPlanParser extends Logging {

/**
* This function is used to calculate an average speedup factor. The input
* is assumed to an array of ints where each element is >= 1. If the input array
* is assumed to an array of doubles where each element is >= 1. If the input array
* is empty we return 1 because we assume we don't slow things down. Generally
* the array shouldn't be empty, but if there is some weird case we don't want to
* blow up, just say we don't speed it up.
*/
def averageSpeedup(arr: Seq[Int]): Int = if (arr.isEmpty) 1 else arr.sum / arr.size
def averageSpeedup(arr: Seq[Double]): Double = if (arr.isEmpty) 1.0 else arr.sum / arr.size

/**
* Get the total duration by finding the accumulator with the largest value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class SampleExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class ShuffleExchangeExecParser(
val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", filterSpeedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class ShuffledHashJoinExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}

// TODO - add in parsing expressions - average speedup across?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class SortAggregateExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class SortExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class SortMergeJoinExecParser(
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
(1.0, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
Expand Down
Loading