-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Improve performance of Sort for the common single batch use case #10572
Conversation
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
build |
val spillableIter = iter.flatMap { cb => | ||
// Filter out empty batches and make them spillable | ||
if (cb.numRows() > 0) { | ||
Some(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) | ||
} else { | ||
cb.close() | ||
None | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If single batch is the common case it does not seem to matter but if we needed to save intermediate Option generation we could use an explicit PartialFunction instance:
val spillableIter = iter.flatMap { cb => | |
// Filter out empty batches and make them spillable | |
if (cb.numRows() > 0) { | |
Some(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) | |
} else { | |
cb.close() | |
None | |
} | |
} | |
val spillableIter = iter.collect { | |
// Filter out empty batches and make them spillable | |
new PartialFunction[ColumnarBatch, SpillableColumnarBatch] { | |
override def isDefinedAt(cb: ColumnarBatch): Boolean = if (cb.numRows() > 0) { | |
true | |
} else { | |
cb.close() | |
false | |
} | |
override def apply(cb: ColumnarBatch): SpillableColumnarBatch = | |
SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll keep it in mind, but I'm not sure it matters that much here.
if (cb.numRows() > 0) { | ||
Some(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) | ||
} else { | ||
cb.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this throws?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When someone calls next the exception is thrown and they would need to handle it.
*/ | ||
private final def firstPassReadBatches(scb: SpillableColumnarBatch): Unit = { | ||
splitOneSortedBatch(scb) | ||
while(alreadySortedIter.hasNext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space
while(alreadySortedIter.hasNext) { | |
while (alreadySortedIter.hasNext) { |
build |
This fixes #10570
The performance speed up is not that huge, but it is there.
I ran
Both with this patch and without it. I captured the metric for sort time and the total run time of the query.
With this patch, on my desktop, the median run time was 6968 ms, and from the Spark UI the median sort time was 5.5 seconds.
Without this patch the runtime was 7051 and the sort time was 5.9 seconds. That saves 83 ms (about 1% which is not really that huge), but the sort time metric showed about 0.4 seconds of savings or about 6% less time, which is a lot better.