Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[FEA] Support SubqueryBroadcast on GPU to enable exchange reuse during DPP #4027

Closed
sperlingxx opened this issue Nov 4, 2021 · 0 comments · Fixed by #4150
Closed

[FEA] Support SubqueryBroadcast on GPU to enable exchange reuse during DPP #4027

sperlingxx opened this issue Nov 4, 2021 · 0 comments · Fixed by #4150
Assignees
Labels
feature request New feature or request

Comments

@sperlingxx
Copy link
Collaborator

Is your feature request related to a problem? Please describe.
In current, when GPU overrides are applied, DPP can not reuse BroadcastExchange as subquery. The reason is that rapids plugin will fall back the broadcast exchange for subquery into CPU, since we have not supported SubqueryBroadcastExec on GPU yet. Meanwhile, the original broadcast exchange, which is supposed to be reused for subquery, is converted into GpuBroadcastExchange. Therefore, the planned reuse of exchange doesn't carry on in terms of the final plan.

Here is a comparison between CPU plan and GPU plan of DPP through broadcast reuse:

*(3) HashAggregate(keys=[key#156], functions=[sum(value#155)], output=[key#156, sum(value)#161L])
+- Exchange hashpartitioning(key#156, 12), ENSURE_REQUIREMENTS, [id=#221]
   +- *(2) HashAggregate(keys=[key#156], functions=[partial_sum(value#155)], output=[key#156, sum#166L])
      +- *(2) Project [value#155, key#156]
         +- *(2) BroadcastHashJoin [key#156], [key#157], Inner, BuildRight, false
            :- *(2) Filter (isnotnull(value#155) AND (value#155 > 0))
            :  +- *(2) ColumnarToRow
            :     +- FileScan parquet default.tmp_table_574386_0[value#155,key#156] Batched: true, DataFilters: [isnotnull(value#155), (value#155 > 0)], Format: Parquet, Location: InMemoryFileIndex(10 paths)[file:/home/alfred/workspace/codes/spark-rapids/integration_tests/targ..., PartitionFilters: [isnotnull(key#156), dynamicpruningexpression(key#156 IN dynamicpruning#164)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
            :           +- SubqueryBroadcast dynamicpruning#164, 0, [key#157], [id=#126]
            :              +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#125]
            :                 +- *(1) Project [key#157]
            :                    +- *(1) Filter ((isnotnull(filter#159) AND (filter#159 = 1552)) AND isnotnull(key#157))
            :                       +- *(1) ColumnarToRow
            :                          +- FileScan parquet default.tmp_table_574386_1[key#157,filter#159] Batched: true, DataFilters: [isnotnull(filter#159), (filter#159 = 1552), isnotnull(key#157)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/alfred/workspace/codes/spark-rapids/integration_tests/targe..., PartitionFilters: [], PushedFilters: [IsNotNull(filter), EqualTo(filter,1552), IsNotNull(key)], ReadSchema: struct<key:int,filter:int>
            +- ReusedExchange [key#157], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#125]
GpuColumnarToRow false
+- GpuHashAggregate(keys=[key#156], functions=[gpusum(value#155, LongType)], output=[key#156, sum(value)#161L])
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpuhashpartitioning(key#156, 12), ENSURE_REQUIREMENTS, [id=#199]
         +- GpuHashAggregate(keys=[key#156], functions=[partial_gpusum(value#155, LongType)], output=[key#156, sum#166L])
            +- GpuProject [value#155, key#156]
               +- GpuBroadcastHashJoin [key#156], [key#157], Inner, GpuBuildRight
                  :- GpuCoalesceBatches targetsize(2147483647)
                  :  +- GpuFilter (gpuisnotnull(value#155) AND (value#155 > 0)), true
                  :     +- GpuFileGpuScan parquet default.tmp_table_625824_0[value#155,key#156] Batched: true, DataFilters: [isnotnull(value#155), (value#155 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/home/alfred/workspace/codes/spark-rapids/integration_tests/target/run_dir..., PartitionFilters: [isnotnull(key#156), dynamicpruningexpression(key#156 IN dynamicpruning#164)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
                  :           +- SubqueryBroadcast dynamicpruning#164, 0, [key#157], [id=#136]
                  :              +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#135]
                  :                 +- GpuColumnarToRow false
                  :                    +- GpuProject [key#157]
                  :                       +- GpuCoalesceBatches targetsize(2147483647)
                  :                          +- GpuFilter ((gpuisnotnull(filter#159) AND (filter#159 = 1552)) AND gpuisnotnull(key#157)), true
                  :                             +- GpuFileGpuScan parquet default.tmp_table_625824_1[key#157,filter#159] Batched: true, DataFilters: [isnotnull(filter#159), (filter#159 = 1552), isnotnull(key#157)], Format: Parquet, Location: InMemoryFileIndex[file:/home/alfred/workspace/codes/spark-rapids/integration_tests/target/run_dir..., PartitionFilters: [], PushedFilters: [IsNotNull(filter), EqualTo(filter,1552), IsNotNull(key)], ReadSchema: struct<key:int,filter:int>
                  +- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#195]
                     +- GpuProject [key#157]
                        +- GpuCoalesceBatches targetsize(2147483647)
                           +- GpuFilter ((gpuisnotnull(filter#159) AND (filter#159 = 1552)) AND gpuisnotnull(key#157)), true
                              +- GpuFileGpuScan parquet default.tmp_table_625824_1[key#157,filter#159] Batched: true, DataFilters: [isnotnull(filter#159), (filter#159 = 1552), isnotnull(key#157)], Format: Parquet, Location: InMemoryFileIndex[file:/home/alfred/workspace/codes/spark-rapids/integration_tests/target/run_dir..., PartitionFilters: [], PushedFilters: [IsNotNull(filter), EqualTo(filter,1552), IsNotNull(key)], ReadSchema: struct<key:int,filter:int>

Describe the solution you'd like
Implement GpuSubqueryBroadcastExec, which is able to reuse GpuBroadcastExchangeExec in row-wise view (Array of UnsafeRow).

@sperlingxx sperlingxx added feature request New feature or request ? - Needs Triage Need team to review and classify labels Nov 4, 2021
@Salonijain27 Salonijain27 removed the ? - Needs Triage Need team to review and classify label Nov 9, 2021
@sperlingxx sperlingxx self-assigned this Nov 17, 2021
sperlingxx added a commit that referenced this issue Dec 17, 2021
Signed-off-by: sperlingxx lovedreamf@gmail.com

Closes #4027

Current PR is to support reusing broadcast exchange for SubqueryBroadcast (which inserted by DPP) on the GPU. Current PR can only reuse GpuBroadcast when AQE is off. We need to modify GpuBroadcastToCpuExec to reuse
GpuBroadcast with AQE on.
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants