Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[FEA] Support for a custom DataSource V2 which supplies Arrow data #1072

Closed
Dooyoung-Hwang opened this issue Nov 5, 2020 · 9 comments
Closed
Assignees
Labels
feature request New feature or request P0 Must have for release

Comments

@Dooyoung-Hwang
Copy link
Contributor

Is your feature request related to a problem? Please describe.
When I executed an aggregation query with our custom data source, I found the physical plan of the query was like this.

spark.sql("SELECT bucket, count(*) FROM test_table GROUP BY bucket").explain(true)

== Physical Plan ==
*(2) GpuColumnarToRow false
+- GpuHashAggregate(keys=[bucket#423], functions=[gpucount(1)], output=[bucket#423, count(1)#570L])
   +- GpuCoalesceBatches TargetSize(2147483647)
      +- GpuColumnarExchange gpuhashpartitioning(bucket#423, 10), true, [id=#1210]
         +- GpuHashAggregate(keys=[bucket#423], functions=[partial_gpucount(1)], output=[bucket#423, count#574L])
            +- GpuRowToColumnar TargetSize(2147483647)
               +- *(1) Scan R2Relation(com.skt.spark.r2.RedisConfig@232fa9c6,2147483647) [bucket#423] PushedFilters: [], ReadSchema: struct<bucket:string>

This shows that the InternalRows are built firstly, and they are transformed into ColumnarBatches by GpuRowToColumnar plan. If the custom DataSource can provide RDD[ColumnBatch] to spark-rapids directly, it would be more efficient because the conversion overhead is removed.

Describe the solution you'd like

  1. In spark-rapids, add a trait of scala or an interface of java that requests RDD of ColumnarBatch.
  2. If the class in a custom V1 DataSource, which extends BaseRelation, also implements this interface, the physical plan which scans a custom v1 source can also be overridden by spark-rapids.

The changed physical plan can be illustrated like this.

== Physical Plan ==
*(1) GpuColumnarToRow false
+- GpuHashAggregate(keys=[bucket#423], functions=[gpucount(1)], output=[bucket#423, count(1)#570L])
   +- GpuCoalesceBatches TargetSize(2147483647)
      +- GpuColumnarExchange gpuhashpartitioning(bucket#423, 10), true, [id=#1210]
         +- GpuHashAggregate(keys=[bucket#423], functions=[partial_gpucount(1)], output=[bucket#423, count#574L])
            +- GpuV1SourceScan Batched: true, DataFilters: [], Format: r2, PartitionFilters: [], PushedFilters: [], ReadSchema: ReadSchema: struct<bucket:string>
@Dooyoung-Hwang Dooyoung-Hwang added ? - Needs Triage Need team to review and classify feature request New feature or request labels Nov 5, 2020
@jlowe
Copy link
Contributor

jlowe commented Nov 5, 2020

If the custom DataSource can provide RDD[ColumnBatch] to spark-rapids directly, it would be more efficient because the conversion overhead is removed.

Does this RDD[ColumnarBatch] contain GPU data or CPU data? If the latter there still would be a conversion from host columnar data to device columnar data. That type of conversion is already supported by the plugin, but it's important to note that a (cheaper) conversion would still occur. The plan would have a HostColumnarToGpu node instead of a GpuRowToColumnar node.

@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Nov 10, 2020
@tgravescs tgravescs added the P0 Must have for release label Jan 5, 2021
@tgravescs
Copy link
Collaborator

After discussions data source v1 doesn't support columnar so switch to use data source v2. With datasource v2, custom datasources just work and we insert a HostColumnarToGpu transition to get the data onto the GPU.

In this case I believe the data will already be in an Arrow format ArrowColumnVector we can investigate making the HostColumnarToGpu smarter about getting the data onto the GPU

@tgravescs tgravescs changed the title [FEA] Support API which a custom V1 DataSource can provide RDD[ColumnarBatch] to spark-rapids instead of RDD[InternalRow] or RDD[Row] [FEA] Support for a custom DataSource V2 which supplies Arrow data Jan 6, 2021
@tgravescs
Copy link
Collaborator

note that looking at a couple of sample queries it uses Round of a decimal, which support for it in progress and it also uses average of a decimal which we don't support yet.

@tgravescs
Copy link
Collaborator

note for sample queries and data we can look at the taxi ride dataset and queries:

https://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/
explanation - https://tech.marksblogg.com/billion-nyc-taxi-rides-redshift.html
4 queries can so be found here: https://tech.marksblogg.com/omnisci-macos-macbookpro-mbp.html

This is the result of other solutions.
https://tech.marksblogg.com/benchmarks.html

@sameerz
Copy link
Collaborator

sameerz commented Jan 6, 2021

Rounding support is being worked on in #1244 .
Average should work once we support casting, which is being tracked in this issue: #1330 .

@tgravescs
Copy link
Collaborator

tgravescs commented Jan 19, 2021

Note we may also need percentile_approx here.

@tgravescs
Copy link
Collaborator

cudf jira for percentile_approx -> rapidsai/cudf#7170

@tgravescs
Copy link
Collaborator

the main functionality to support faster copy when using datasourcev2 supplying arrow data is commited under #1622. It supports primitive types and Strings. It does not support Decimal or nested types yet.

@tgravescs
Copy link
Collaborator

note filed separate issue for write side #1648.
I'm going to close this as the initial version is committed

@tgravescs tgravescs modified the milestone: Feb 1 - Feb 12 Feb 2, 2021
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
…IDIA#1072)

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
feature request New feature or request P0 Must have for release
Projects
None yet
Development

No branches or pull requests

4 participants