Skip to content

Optimize SELECT min/max queries with limit #7198

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
alamb opened this issue Aug 4, 2023 · 7 comments
Closed

Optimize SELECT min/max queries with limit #7198

alamb opened this issue Aug 4, 2023 · 7 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Aug 4, 2023

Is your feature request related to a problem or challenge?

The following query pattern shows up in many of our usecases:

SELECT tag, max(time)
FROM t
GROUP BY tag
ORDER BY max(time) DESC
LIMIT 10

There may also be predicates

In English this query returns the top 10 groups that had the most recent values

A more specific example, @JayjeetAtGithub found that the Jaeger tool issues this query to show the top ten most recent queries

SELECT "trace_id", MAX("time") AS t FROM 'spans' WHERE "service.name" = 'frontend' AND "time" >= to_timestamp(1688713200000000000) AND "time" <= to_timestamp(1689000240000000000) GROUP BY "trace_id" ORDER BY t DESC LIMIT 20;

Describe the solution you'd like

Implement some sort of optimization for this query

Describe alternatives you've considered

I believe #7191 / #7192 from @avantgardnerio is designed for this use case, so that may be sufficient. I did think it was worth documenting the actual end user effect of the change as a separate item which is why I filed this ticket

Additional context

No response

@alamb alamb added the enhancement New feature or request label Aug 4, 2023
@alamb
Copy link
Contributor Author

alamb commented Aug 4, 2023

I am not sure if we can get a general purpose optimization that also handles queries with different aggregates

SELECT tag, field, max(time), min(other_field)
FROM t
GROUP BY tag
ORDER BY max(time) DESC
LIMIT 10

Though maybe that is not so useful

@avantgardnerio
Copy link
Contributor

avantgardnerio commented Aug 5, 2023

if we can get a general purpose optimization that also handles queries with different aggregates

I don't think #7192 can handle your example. It works by "evicting" (nice term @tustvold ) groups from the accumulator unless they are the current min/max.

So if we run your example:

SELECT tag, field, max(time), min(other_field)
FROM t
GROUP BY tag
ORDER BY max(time) DESC
LIMIT 10

on:

+-----+-------+-------+-------------+
| tag | time  | field | other_field |
+-----+-------+-------+-------------+
| 1   | 01:00 |       | 0           |
| 2   | 02:00 |       | 1           |
| 3   | 03:00 |       | 1           |
| 4   | 04:00 |       | 1           |
| 5   | 05:00 |       | 1           |
| 6   | 06:00 |       | 1           |
| 7   | 07:00 |       | 1           |
| 8   | 08:00 |       | 1           |
| 9   | 09:00 |       | 1           |
| 10  | 10:00 |       | 1           |
| 11  | 11:00 |       | 1           |
| 1   | 12:00 |       | 999         |
+-----+-------+-------+-------------+

We will:

  1. accumulate 10 tag-groups
  2. run into tag-group 11 with a greater time that group-tag 1 (11:00 vs 01:00)
  3. evict tag-group 1 from our accumulator, along with it's other_field=0 value
  4. run into tag-group 1 again with a greater time than tag-group 2 (12:00 vs 02:00)
  5. evict tag-group 2, replace it with tag-group 1, and it's new "min" of 999

Hopefully this example makes it clear why we can only accumulate values present in the order by clause, given the approach in #7192 based on the functional requirements (not-sorting) of #7191 .

@alamb
Copy link
Contributor Author

alamb commented Aug 5, 2023

The more I think about this, the more I like where @avantgardnerio is going with #7192, and I think we could use the same operator in #7192 for this ticket as well as #6899, and #7196.

I hope we can use the same operator for all these queries because:

  1. It will allow us to pool resources (to make it very fast and efficient)
  2. Can keep the boundaries clearly defined (and this keep the long term maintenance cost down)

"Observation" -- No Aggregates

One key observation that @avantgardnerio made (perhaps implicitly) in #7192 is that even though the query in this ticket has aggregates (max(time)) there is no actual aggregation -- what is needed is to 'keep the top K items per group' where 'top' is defined by some particular sort order.

Proposal

Thus, I think we could make the code in #7192 into a TopKPerPartition ExecutionPlan1 that has the following semantics:

Keeps the top K values, as defined by the order_exprs for each distinct value of partition_exprs

┌───────────────────────────────┐
│       TopKPerPartition        │
│           gby_exprs           │
│          order_exprs          │
│               K               │
└───────────────────────────────┘

Use for min/max queries with limit (this ticket)

So for the

SELECT tag, max(time)
FROM t
GROUP BY tag
ORDER BY max(time) DESC
LIMIT 10

We would use

TopKPerPartition
  gby_exprs: [tag]
  order_exprs: [time DESC]
  k: 10

General purpose ORDER BY limit query #7196

SELECT c1, c2 
FROM t
ORDER BY c3
LIMIT 10

We could use the same operator (though maybe it has a more optimized implementation when there are no groups, like we have for no group aggregate streams):

TopKPerPartition
  gby_exprs: []
  order_exprs: [c3]
  k: 10

Queries that have a predicate on row_number() in #6899

SELECT ...
  ROW_NUMBER() OVER (PARTITION BY value1, ORDER BY value2) as rn
WHERE
  rn < 10

we could use

TopKPerPartition
  gby_exprs: [value1]
  order_exprs: [value2]
  k: 10

P.s. I also tried, and failed, to think of a clever rewrites at the SQL level.

Footnotes

  1. I think this is what @ozankabak and @comphead were hinting at in https://github.com/apache/arrow-datafusion/issues/6899#issuecomment-1630479576

@JayjeetAtGithub
Copy link
Contributor

Reproducer using Jaegar/IOx: https://github.com/JayjeetAtGithub/iox_observe_bench/blob/main/docs/oom_kill.md
Dataset: https://drive.google.com/drive/folders/1nd3FaZXlsvM8JelXHJjHZONDzvB9UeVs?usp=sharing

@avantgardnerio
Copy link
Contributor

Dataset

@JayjeetAtGithub thank you! I just sent a request for access...

@alamb
Copy link
Contributor Author

alamb commented Aug 8, 2023

Note that the dataset in the above example is in the form of an influxdb_iox catalog.

If you prefer a datafusion-cli only version, here it is:

Download traces.zip (240MB):

In datafusion-cli:

❯ create external table traces stored as parquet location 'traces';
0 rows in set. Query took 0.030 seconds.

❯ SELECT trace_id, MAX(time) FROM traces GROUP BY trace_id ORDER BY MAX(time) DESC LIMIT 1;

@alamb
Copy link
Contributor Author

alamb commented Sep 13, 2023

Completed in #7192

@alamb alamb closed this as completed Sep 13, 2023
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants