Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Intelligent Rollups and Querying of Aggregated Data #7198

Open
jsternberg opened this issue Aug 24, 2016 · 63 comments
Open

Intelligent Rollups and Querying of Aggregated Data #7198

jsternberg opened this issue Aug 24, 2016 · 63 comments

Comments

@jsternberg
Copy link
Contributor

jsternberg commented Aug 24, 2016

Feature Request

The database should support more intelligent rollups and querying of
aggregated data. Currently, the only way to rollup data is through
manually setting up continuous queries and then manually modifying the
select statements to query that data which requires the user to know
which retention policies exist rather than it being discovered
automatically.

Proposal:

It should be simple for an administrator to setup rollups for an entire
database and users should not need knowledge of the rollups for them to
automatically start using them. Using rollups should be automatic and
performant.

Current behavior:

Rollups require an administrator to create a retention policy and a
continuous query like this:

CREATE RETENTION POLICY "5m" ON mydb DURATION 0s REPLICATION 1;
CREATE CONTINUOUS QUERY mycq ON mydb BEGIN
    SELECT mean(*) INTO mydb."5m".:MEASUREMENT FROM /.*/ GROUP BY time(5m)
END;

It then requires the user to query the mean of a measurement like this:

SELECT mean_value FROM mydb."5m".cpu

If the server is not running when an interval should be calculated, that
interval will never be run and the user needs to run that query
manually. There is no way to automatically reuse the information in
continuous queries to backfill data either.

Also, if data is written after the last calculation, it will never enter
the aggregation.

It is possible to obtain partial data, but this involves telling the
continuous query to resample more frequently than the default time.

CREATE CONTINUOUS QUERY mycq ON mydb RESAMPLE EVERY 1m BEGIN
    SELECT mean(*) INTO mydb."5m".:MEASUREMENT FROM /.*/ GROUP BY time(5m)
END;

This will obtain partial data every minute, but it will not be an active
result of all of the data that is available.

Desired behavior:

Administrators should have easier commands to create rollups (optional
since the commands above are fairly easy to write).

Users should not need to care about retention policies when trying to
get the data they want. The above query the user should write is:

SELECT mean(value) FROM cpu GROUP BY time(5m)

This should use the rollup automatically if one is available and would
return the same value as querying the raw data.

Along with using the rollup automatically, we would also include syntax to automatically select the appropriate precision interval to be used based on the time range or number of points requested. So if we have raw data that is retained for 1 week, 1 minute aggregated data for 2 weeks, and 5 minute aggregated data for 4 weeks and we said this:

SELECT mean(value) FROM cpu WHERE time >= now() - 1w GROUP BY time(auto)

This would automatically select the 1 minute precision interval because that is the lowest precision. If we scaled this query for the past 3 weeks, we would return the 5 minute precision level.

Use case:

Downsampling long term data into new retention policies and greater
performance by precalculating certain aggregates for certain intervals.
This was the original use case for continuous queries, but the current
continuous queries are too cumbersome for this currently.

Documentation

@pauldix
Copy link
Member

pauldix commented Aug 24, 2016

In your desired behavior section, it's a bit off. If the user specifies GROUP BY time(...) then they're asking for a specific rollup interval, which if we have is great, but that's not generally why the user is specifying the group by time.

More often what they want is this: they have a graph of a certain width in pixels and they want to draw a line. They have a start time and end time for the graph and they want the right number of data points to come back (in whatever rollup time precision makes sense) to draw the graph as quickly as possible.

This feature should introduce new syntax into the query language to let the user specify that they want the database to select whatever precision makes sense based on the time range of the query and the desired number of data points to draw a line on a graph.

@jsternberg
Copy link
Contributor Author

That sounds like a completely different issue and I think we should make a new issue for it. New syntax for grouping points doesn't appear to have anything to do with intelligent rollups. We can hash out the syntax now and do that right now without any major improvements. I can describe my point more at length, but should I make another issue so we can discuss that separately so this issue doesn't get off track? I want to keep this issue as bare-bones as I can otherwise we will never manage to ship it.

The reason why I put that as the desired behavior is because I was specifically referring to the desired behavior of intelligent rollups and my problem statement doesn't mention anything about graphs and the width in pixels.

@pauldix
Copy link
Member

pauldix commented Aug 24, 2016

That one part of desired behavior I mention is the entire reason for this feature. I don't think the two can or should be separated. That's the high level use case. If the user specifies a specific time aggregate, that's something else and something we already support.

If you don't feel comfortable talking about syntax then the GROUP BY time part should be eliminated from that section too.

The point is, the reason to create this feature is for that high level use case. It should absolutely be included. The syntax can be figured out later.

@pauldix
Copy link
Member

pauldix commented Aug 24, 2016

My concern is that without thinking about that high level use case, the design of this thing could be done in such a way that creating that functionality would be difficult to impossible. It's important to note the real high level goals behind why we're doing intelligent rollups at all, just to make sure that whatever implementation we put out is in service of those user facing needs.

@jsternberg
Copy link
Contributor Author

Ok, I updated the above issue to include a very broad overview of what has been mentioned and to focus on the Graphite use-case. We're going to include this more thoroughly in the requirements and design documents.

@jsternberg
Copy link
Contributor Author

@pauldix a question about choosing a retention policy based on the number of desired points. Imagine I have two rollups. One is at 1m and one is 5m. I tell the query that I want the last 3d of data and say that I want a graph with 1,000 points.

With these rollups, 1m for 3d gives 4320 points and 5m for 3d gives 864 points. The second one is obviously closer to the number I requested, but this can get murkier if I chose something like 2,000 points. How should ties be broken?

@pauldix
Copy link
Member

pauldix commented Aug 31, 2016

@jsternberg I think it makes sense to have the requested number of points be a minimum or a maximum and we chose based on that.

@jimmys
Copy link

jimmys commented Sep 8, 2016

How would tags work with the intelligent rollups being proposed? We make heavy use of tags in our InfluxDB deployment to monitor our microservices. All of our Grafana dashboards depend on using tag data to work.

For example, we have a view of HTTP request counts and a tag on the measurement for service name lets us visualize those. Another case is a count of users by platform - the measurement is user logins, say, and the tags give the platform. This lets us show a total as well as per-platform breakdowns.

We would need the rollups to handle this case or else the visualizations are not going to be useful for us.

@pkittenis
Copy link

pkittenis commented Nov 10, 2016

Couple points to consider from the Graphite POV.

Example real world use case:

  • InfluxDB as back-end datastore to Graphite API
  • Retention period 5m with down sampled data from CQs with group by time(5m)
  • Storage finder has configuration to dynamically assign a group by interval of 5min for queries ranging >= 7d
  • InfluxDB storage finder for graphite api has configuration to use retention period 5m for group by intervals of 5min and above

User queries 7 days worth of data. Storage finder calculates a group by interval of 5min for the query duration which points it to retention policy 5m. Data is queried as select <..> from "5m".<measurement> where <..> GROUP BY time(5m)

This is real world use case with existing graphite API and influxdb storage finder projects.

Grafana with the InfluxDB API does the same group by interval calculation though it cannot dynamically choose a retention policy based on that interval, user has to select one.

In both cases a group by interval from range of query is what is calculated to get a desired max number of data points.

Data points returned by all 7d queries is constant and graphs are drawn quickly. Both Grafana and the Graphite API storage finder will keep increasing group by interval as date/time range of query increases.

From this real world use case, would it not make sense to have the DB automatically select a retention policy/intelligent roll-up data based on the group by interval? As a bonus, queries would not have to be re-written to support a new syntax.

If the DB has intelligent rollups of data in regular intervals, either dynamic or pre-configured, should it not use that rolled up data when a requested group by interval matches the roll up, or the rolled up data that is most near the interval if there is no direct match.

In short, in all graphing/Graphite use-cases where users define a specific time aggregate it is in fact in order to get less than a max number of data points for the date/time range of the query.

It would therefore seemingly make sense from a user's perspective for the DB to automatically use whatever intelligently rolled up data it has nearest that time aggregate. Some food for thought.

@jsternberg
Copy link
Contributor Author

jsternberg commented Jan 12, 2017

I want to just spend some time marking down some notes to help spark discussion on possible implementations of this. I also want to nail down the primary purpose of this since I think there might be too much conflated in this issue.

We have at least two different problems that may or may not be the same problem (and I definitely think they are related, but maybe not directly).

  1. Automatically switching to use a retention policy that is filled by a continuous query.
  2. Automatically adjusting the grouping interval depending on the desired time window (when zooming in and out of data visualization).

I don't personally think that these two are inextricably linked, but I do think they complement each other. For the second, we can determine a suitable grouping interval depending on the time interval requested. You don't need the alternative retention policy for this and it can be a one-off query, but it wouldn't necessarily be the quickest since it would always have to recalculate the values. But we can do it pretty easily.

There's also the first issue. While important, I don't think it's necessarily the same issue. I still think it should possibly be done at the same time since I personally find it to be more important. When you want to query a continuous query, you have to change the query itself to
take from a new source and remove the group by clause in the query. This substantially alters the query and isn't particularly user-friendly.

For the first, we can try to do some kind of silent change of where we get the data from. When we see an interval and a call iterator, we can check the list of continuous queries for a database and see if we can find the same measurement being used as a source in a continuous query with the same interval, we can switch the iterator creation to just retrieve the data from the target measurement. This might have some additional problems, but it should be good enough for the most part.

For that though, we would also have to consider how to rerun continuous queries automatically when data is written to the source and how to deal with data in an active shard.

I think the first one is the focus of this issue if I understand correctly though. Thoughts?

I think my comment here may echo some of the points in the comment above, but I'm not 100% sure.

@aderumier
Copy link
Contributor

@jsternberg : personnaly, I'm waiting more for 2 than 1. (I don't use continuous query ,but I would like to display graph like 3months, 1 year range , without need to retrieve all values. (I'm pooling each 30s with telegraf, with 1 year retention).

@pkittenis
Copy link

| 1. Automatically switching to use a retention policy that is filled by a continuous query.

Yes, this echoes what I was describing in earlier comment.

However, won't this still result in queries needing to be changed to read from different fields as CQs with aggregation will have changed field names? (prefixed with aggregation term when using wildcard CQs).

Note though that switching implies only using data present in the particular retention policy. What about newly added data not yet ingested by the continuous query populating that retention policy? How about missing data that was never processed by continuous query because the DB was down at the time the CQ was scheduled to run?

On large data sets continuous queries that run on all measurements can be several hours behind which is fine as they are usually storing historical data but it also means they will be missing several hours worth of data if they are the sole data source.

What would be ideal IMO is what is described above but instead of switching to, merging of data in retention policy matching/closest to interval plus any gaps in data or missing data taken from default retention policy. This may be wishful thinking though.

Just not sure the 'switch retention policy filled by CQ if there is matching interval' would result in a good experience, given how CQs are currently implemented which can result in gaps in data and missing latest data while CQ runs, plus the changed field names and all. At least the last part, field names, can be handled client side but missing data is missing data.

| 2. Automatically adjusting the grouping interval

While it would be easier on the user for the DB to do this automatically, presumably with a flag to turn on/off, existing clients already do this so purely from the Grafana/Graphite PoV it would not add significant value.

@jsternberg
Copy link
Contributor Author

Just not sure the 'switch retention policy filled by CQ if there is matching interval' would result in a good experience, given how CQs are currently implemented which can result in gaps in data and missing latest data while CQ runs, plus the changed field names and all. At least the last part, field names, can be handled client side but missing data is missing data.

Yes, I've been thinking about that a lot and it's a good point. The current method for how continuous queries work would have to be rethought. One idea that I had is having a way to know when the last write to a shard was. That would allow this behavior to work when dealing with cold shards (not receiving active writes) and it would query live data when querying an active shard (or keep a buffer that was described in the original proposal for this by @pauldix). So while it sounds simple to just say, "switch where we get the data from", it's a lot more complicated than that. If we managed to do that though, it would improve both this experience and the current experience of those frustrated with continuous queries and their current shortcomings.

However, won't this still result in queries needing to be changed to read from different fields as CQs with aggregation will have changed field names? (prefixed with aggregation term when using wildcard CQs).

That's pretty easy to do behind the scenes. The difficult part would likely be the backwards index that would be necessary. CQs will take a single source and have a target of who they write to, which is easy. But we would need to know which CQs refer to which source and have a fast and easy way to match those with the original query. That's less straightforward. It's possible using an O(n) search of all continuous queries or limiting it to just the current database, but you can technically have a CQ that reads from one database and writes to a second database with that second one. I'm also not comfortable with an O(n) search on every query to find the appropriate database/retention policy to read from.

I think I didn't explain myself well in the last paragraph so if I need to clarify, please just say so.

@pkittenis
Copy link

That makes sense, thanks for clarifying. Sounds like the new functionality is indeed intending to merge both dataset filled by CQ and real time dataset which is ideal.

What is not clear to me is whether or not gaps in aggregated CQ-filled data can be filled by data from default RP, if present. Gaps meaning periods of missing data in between data in the RP where CQ did not run or did not run successfully for whatever reason. Could you please clarify?

Regarding matching CQs, at least for the wild card in CQ case it can be safely said that the CQ is matching all measurements, so matching is not needed. For non-wildcard CQs yes, an index will be needed.

A simple measurement in CQ -> CQ target for measurement mapping should suffice and be O(1) for queries looking for CQ targets of a measurement, though not sure how memory intensive that will be on large datasets.

@jsternberg
Copy link
Contributor Author

Just to make sure I clarify, everything I said is currently brainstorming of what I would like to see and discussing the feature. I'll admit that my current ideas are a bit lofty so while we'll try our best, there are no guarantees that everything I say will be the end result of what we do.

My idea for gaps in the CQ-filled data was partially related to the hot/cold shard idea. A hot shard will end up being defined per-CQ rather than just the shard being hot or cold. So for a specific CQ mapping, the shard is hot if the shard has been written to more recently than the CQ has run. We might also want to have a cool off period so we aren't switching between hot or cold and to also allow the CQ to not be running constantly. It would likely not be feasible to have the CQ running constantly on shards that are currently under heavy write load anyway. An in-memory buffer would work better if we want to do those anyway.

Under that idea, that means that old shards (which we presume wouldn't be actively written to) would commonly have the CQ for the entire interval run and then have the data written by the CQ be queried instead. If you then write some historical data, the shard may become hot again and it would query the actual shard rather than the retention policy written to by the CQ. That is, until the CQ could be run again. We don't want to be too aggressive in running CQs because it is reasonable to believe that a person writing data to one shard may write more data to that shard shortly.

I don't think the index will be too big of a deal personally. It's just necessary to say that it will probably be needed. A lot of what I'm writing are also personal notes for myself.

@mscso
Copy link

mscso commented May 29, 2017

Isn't this the most basic and most important feature of a metrics database? I was looking at influxdb and i'm shocked that this is not part of release candidate 0.0.1. Honestly this is what rrdtool has been used for forever and if I just wanted to store all the metrics individually without roll-ups, I'd just write them to flat files.

Really hoping InfluxDB decides to prioritize this feature.

@bedrin
Copy link

bedrin commented Apr 3, 2018

@ewillia1983 I would try adding something like Apache Kafka for writes to InfluxDB and implement downsampling using Kafka streams. It's supported as an output by telegraf so shouldn't be a rocket science to integrate

@eWilliams35
Copy link

@krokodilerian - can you share the setting you changed to multithread the CQs? I have not seen that option anywhere in the documentation. It does appear to fire 3 or 4 at once, but it's still too slow.

@bedrin - that's our last resort option, pre-aggregating the data before inserting. You raise an interesting point in regards to re-reading the raw data kafka topic and doing the math within the poll loop, then writing the aggregate to the desired retention policy.

@krokodilerian
Copy link

@ewillia1983 , I wasn't very clear, sorry, it's an external python script (https://gist.github.com/krokodilerian/8434e15248d6da7d8947bd2935bdb3fe) that i wrote do this, and it was definitely able to deal with our kind of load, but YMMV. Also, it works for me, might not work for you (and has the roll-up queries somewhat hardcoded).

@DonDebonair
Copy link

Not trying to be an ass here, but having to include Kafka in your setup, just to do proper rollups, seems a bit silly, doesn't it? This is something that Graphite has supported for ages, including seamless querying over different rollup periods.
I'm not saying it's easy to implement for the InfuxDB team, but it might be a good idea to start looking at how Graphite does it.

@pkittenis
Copy link

FWIW, it's not rocket science to have queries use a particular RP depending on the date range of the request, as long as the query is not manually written of course.

Some third party tools do this already to enable Graphite queries over Influxdb with transparent down-sampled data, aggregation and so on - eg InfluxGraph.

Actually writing the down sampled data without having it fall behind (from CQs) is a different matter. Best solution so far seems to be what @krokodilerian suggested - thanks for the tip. The only issue with that is that the multiple queries at a time can put a lot of load on the DB, particularly when the down sampling queries are run against large databases, meaning a large amount of data points to process, million+ measurements and so on.

@ivanvanderbyl
Copy link

@CAFxX
Copy link

CAFxX commented Jun 28, 2018

If the server is not running when an interval should be calculated, that
interval will never be run and the user needs to run that query
manually. There is no way to automatically reuse the information in
continuous queries to backfill data either.
Also, if data is written after the last calculation, it will never enter
the aggregation.

See also #9571 about this.

@oplehto
Copy link

oplehto commented Sep 12, 2018

We implemented this functionality in the latest release of influx-spout metrics relay with a capability to downsample the InfluxDB line protocol stream.

Basically this is a version of the proposed the Kafka solution but a bit more lightweight (based around NATS) and designed from the ground up with InfluxDB line protocol in mind. Definitely not a be-all and end-all solution to this issue but it might be helpful to some people interested in doing this.

Downsampler: https://github.com/jumptrading/influx-spout#downsampler
Blog post about the release: https://menno.io/posts/influx-spout-v210/

@e-dard e-dard removed this from the Future Point Release milestone Nov 6, 2018
@dgnorton dgnorton added the 1.x label Jan 7, 2019
@kaiterramike
Copy link

We really needed this functionality too, so I took a slightly different approach from @oplehto and wrote a solution to do the downsampling in the DB itself. We're an IoT startup so we're dealing with data from remote sensors that could be buffered for hours or days if network quality is poor, so we just can't maintain in memory several days of data from an entire network of sensors.

The solution is in two parts:

  1. If a client writes old data to the DB, and the data is old enough that it won't get picked up by continuous queries (for us, that's 1-2 hours), then the client also registers a "downsample request". These requests are per-sensor and per-month -- I've chosen that level of granularity as a tradeoff between storing excessively detailed information about downsample requests, and doing excessively large downsample queries with millions of data points. The requests are stored in InfluxDB in a separate measurement, which means there's no additional dependencies, and the state of the DB is consistent if restored from a backup.
  2. A daemon is constantly scanning for downsample requests. When it finds one, it does the backfill query for that month, then deletes the request.

So far it's working well. Single downsample queries usually complete in well under a second, and there aren't very many of them (one or two a minute), so we've got a lot of breathing room.

The kind of tricky parts are:

  1. How to avoid writing thousands of downsample requests if you're writing thousands of old data points
  2. How to solve the race condition between deleting fulfilled downsample requests and new downsample requests arriving simultaneously
  3. Finding the balance between waiting until all the old data arrives before fulfilling downsample requests, and making downsampled data available sooner at the cost of redoing downsample queries multiple times

Unfortunately, the code is pretty tightly coupled to our business logic and use case so I won't be able to open source it any time soon. Also, the Right Place to put the downsample request writer is in a proxy in front of InfluxDB, but I didn't do that because it's a lot more work for no benefit. But if someone else is moving forward on an implementation and needs a few pointers, I'd be happy to fill in some of the details.

@nogweii
Copy link

nogweii commented Apr 24, 2019

Will this feature be included in InfluxDB 2.0 with the introduction of Flux?

@CWempe
Copy link

CWempe commented Jun 29, 2019

Any news about this feature?
Is it already implemented or will be in the next 1.x or 2.0 release?

@azhurbilo
Copy link

Any ETA to implement this feature?

@Vox1984
Copy link

Vox1984 commented Oct 24, 2019

bump

@heipei
Copy link

heipei commented Nov 8, 2019

I know you guys at InfluxDB are focused on enterprise features, but I just wanted to reiterate what others have said. Having this feature would transform InfluxDB for me from "nice for situational awareness about what's going on on my 100 hosts right now" into "I can throw everything into there and keep metrics forever and go back and do a 1-year-retrospective about how my system evolved".

@p3r7
Copy link

p3r7 commented Jan 15, 2020

Hello,

Facing the same situation, at Transatel we decided to solve it by making an HTTP proxy to handle this: Transatel/cleanflux.

We've been using it for 2 whole years, both from Grafana and various scripts (including Jupyter notebooks).

For now, it is quite declarative (retention policies have to be redeclared in proxy configuration) but this solution is pretty robust and transparent for the client.

In addition, it also solves issues #5961 (and dup #8010) and #6451.

@rnsc
Copy link

rnsc commented Jan 15, 2020

@p3r7 thanks for sharing this!

@DonDebonair
Copy link

@p3r7 thanks for sharing this, but be aware that Python 2.7 is End of Life.

@p3r7
Copy link

p3r7 commented Jan 20, 2020

@dandydev I have a working python 3.7 port.

I just want to finish validating that there is no regression before publishing it.

Now fully tested and merged on master.

@bonesoul
Copy link

bonesoul commented May 1, 2020

does influxdb 2.0 has rollup functionality?

@ouinouin
Copy link

ouinouin commented Feb 9, 2021

hi, interrested to see if influx will close the gap against 20 years old round robin databases...

@strstr1
Copy link

strstr1 commented Jun 10, 2022

Sorry for the bump. I am having the same issue, I want to use influxdb as a rrdtool replacement. Are there any news?

@RoundRobin-fr
Copy link

Don't use InfluxDB in replacement for rrdtool, it won't fit.

I've been using it professionally as a tech leader in an ISP:

  • Disk space usage goes out of control
  • missing key features like the one you ask for
  • poor SQL-Like implementation

Use timescaledb instead, it goes on top of PostgreSQL so you get rid of bad surprises, and still you can plug a grafana for data visualisation.

@strstr1
Copy link

strstr1 commented Jun 11, 2022

@ToucanTocard - Thank you for your detailed reply.

Unfortunately I have already built everything around InfluxDB, because I got along well with it right away and I saw that, for example, LibreNMS also uses it. I thought I will include some solution / workaround for it later (e.g. run multiple queries to combine the data), but this seems to be more complex than originally thought.

Yes, i know, absolutely my mistake to rely so hastily on a software that fits only at first glance, but at least I can rewrite this quickly to another software like timescaledb 😄

Nevertheless, I hope that InfluxDB will introduce such a functionality one day, because otherwise I liked InfluxDB very much.

@Sineos
Copy link

Sineos commented Jun 11, 2022

https://www.timescale.com/blog/speed-up-grafana-autoswitching/

I'm missing bitterly something like shown in the link above

@derkgort
Copy link

derkgort commented Jun 11, 2022

I ended up doing this in Grafana.
I made a variable name $bucket and use that in all influx/flux queries as the bucket.
It's updated on time range change.
I have 5 buckets with different lengths. Those are aggregated by influxdb tasks.

This will change the bucket variable to the one that has data for the "from" date in the range.

import "math"
import "dict"
import "array"

_hour = 60*60
_day = _hour*24
// _start = 1637848779379 // ${__from}
_start = ${__from}
_startfromnow = float(v: (int(v: now()) - (_start * 1000000))/1000000000)

// 6h 5d 15d 65d 370d

set = [
    {key: 6*_hour, value: "testbucket-6h"},
    {key: 5*_day, value: "testbucket-5d"},
    {key: 15*_day, value: "testbucket-15d"},
    {key: 65*_day, value: "testbucket-65d"},
    {key: 370*_day, value: "testbucket-370d"},
  ]

bins = [
  float(v: 6*_hour),
  float(v: 5*_day),
  float(v: 15*_day),
  float(v: 65*_day),
  float(v: 370*_day),
]

bucket_dict = dict.fromList(pairs: set)

array.from(rows: [{"dummy": 1}])
  |> map(fn: (r) => ({
        _value: _startfromnow,
    }))
  |> histogram(bins: bins)
  |> filter(fn: (r) => r["_value"] == 1)
  |> first()
  |> map(fn: (r) => ({
      bucket: dict.get(dict: bucket_dict, key: int(v: r.le), default: "testbucket"),
  }))

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests