Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

"count" aggregations fix #5639

Merged
merged 3 commits into from
Sep 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 36 additions & 39 deletions store/postgres/src/relational/rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,18 +332,16 @@ impl<'a> RollupSql<'a> {
Ok(IdType::String) | Ok(IdType::Int8) => "max(id)",
Err(_) => unreachable!("we make sure that the primary key has an id_type"),
};
write!(w, "select {max_id} as id, timestamp, ")?;
write!(w, "select {max_id} as id, timestamp")?;
if with_block {
write!(w, "$3, ")?;
write!(w, ", $3")?;
}
write_dims(self.dimensions, w)?;
comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| {
agg.aggregate("id", w)
})?;
comma_sep(self.aggregates, w, |w, agg| agg.aggregate("id", w))?;
let secs = self.interval.as_duration().as_secs();
write!(
w,
" from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp, "
" from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp"
)?;
write_dims(self.dimensions, w)?;
let agg_srcs: Vec<&str> = {
Expand All @@ -358,9 +356,7 @@ impl<'a> RollupSql<'a> {
agg_srcs.dedup();
agg_srcs
};
comma_sep(agg_srcs, self.dimensions.is_empty(), w, |w, col: &str| {
write!(w, "\"{}\"", col)
})?;
comma_sep(agg_srcs, w, |w, col: &str| write!(w, "\"{}\"", col))?;
write!(
w,
" from {src_table} where {src_table}.timestamp >= $1 and {src_table}.timestamp < $2",
Expand All @@ -371,10 +367,7 @@ impl<'a> RollupSql<'a> {
" order by {src_table}.timestamp) data group by timestamp",
src_table = self.src_table
)?;
Ok(if !self.dimensions.is_empty() {
write!(w, ", ")?;
write_dims(self.dimensions, w)?;
})
Ok(write_dims(self.dimensions, w)?)
}

fn select(&self, w: &mut dyn fmt::Write) -> fmt::Result {
Expand All @@ -388,11 +381,11 @@ impl<'a> RollupSql<'a> {
fn insert_into(&self, w: &mut dyn fmt::Write) -> fmt::Result {
write!(
w,
"insert into {}(id, timestamp, block$, ",
"insert into {}(id, timestamp, block$",
self.agg_table.qualified_name
)?;
write_dims(self.dimensions, w)?;
comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| {
comma_sep(self.aggregates, w, |w, agg| {
write!(w, "\"{}\"", agg.agg_column.name)
})?;
write!(w, ") ")
Expand All @@ -413,10 +406,10 @@ impl<'a> RollupSql<'a> {
/// for any group keys that appear in `bucket`
fn select_prev(&self, w: &mut dyn fmt::Write) -> fmt::Result {
write!(w, "select bucket.id, bucket.timestamp")?;
comma_sep(self.dimensions, false, w, |w, col| {
comma_sep(self.dimensions, w, |w, col| {
write!(w, "bucket.\"{}\"", col.name)
})?;
comma_sep(self.aggregates, false, w, |w, agg| agg.prev_agg(w))?;
comma_sep(self.aggregates, w, |w, agg| agg.prev_agg(w))?;
write!(w, " from bucket cross join lateral (")?;
write!(w, "select * from {} prev", self.agg_table.qualified_name)?;
write!(w, " where prev.timestamp < $1")?;
Expand All @@ -432,19 +425,14 @@ impl<'a> RollupSql<'a> {

fn select_combined(&self, w: &mut dyn fmt::Write) -> fmt::Result {
write!(w, "select id, timestamp")?;
comma_sep(self.dimensions, false, w, |w, col| {
write!(w, "\"{}\"", col.name)
})?;
comma_sep(self.aggregates, false, w, |w, agg| agg.combine("seq", w))?;
comma_sep(self.dimensions, w, |w, col| write!(w, "\"{}\"", col.name))?;
comma_sep(self.aggregates, w, |w, agg| agg.combine("seq", w))?;
write!(
w,
" from (select *, 1 as seq from prev union all select *, 2 as seq from bucket) u "
)?;
write!(w, " group by id, timestamp")?;
if !self.dimensions.is_empty() {
write!(w, ", ")?;
write_dims(self.dimensions, w)?;
}
write_dims(self.dimensions, w)?;
Ok(())
}

Expand Down Expand Up @@ -476,9 +464,9 @@ impl<'a> RollupSql<'a> {
self.select_cte(w)?;
write!(w, " ")?;
self.insert_into(w)?;
write!(w, "select id, timestamp, $3 as block$, ")?;
write!(w, "select id, timestamp, $3 as block$")?;
write_dims(self.dimensions, w)?;
comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| {
comma_sep(self.aggregates, w, |w, agg| {
write!(w, "\"{}\"", agg.agg_column.name)
})?;
write!(w, " from combined")
Expand All @@ -495,20 +483,12 @@ impl<'a> RollupSql<'a> {

/// Write the elements in `list` separated by commas into `w`. The list
/// elements are written by calling `out` with each of them.
fn comma_sep<T, F>(
list: impl IntoIterator<Item = T>,
mut first: bool,
w: &mut dyn fmt::Write,
out: F,
) -> fmt::Result
fn comma_sep<T, F>(list: impl IntoIterator<Item = T>, w: &mut dyn fmt::Write, out: F) -> fmt::Result
where
F: Fn(&mut dyn fmt::Write, T) -> fmt::Result,
{
for elem in list {
if !first {
write!(w, ", ")?;
}
first = false;
write!(w, ", ")?;
out(w, elem)?;
}
Ok(())
Expand All @@ -517,7 +497,7 @@ where
/// Write the names of the columns in `dimensions` into `w` as a
/// comma-separated list of quoted column names.
fn write_dims(dimensions: &[&Column], w: &mut dyn fmt::Write) -> fmt::Result {
comma_sep(dimensions, true, w, |w, col| write!(w, "\"{}\"", col.name))
comma_sep(dimensions, w, |w, col| write!(w, "\"{}\"", col.name))
}

#[cfg(test)]
Expand Down Expand Up @@ -592,6 +572,12 @@ mod tests {
total_count: Int8! @aggregate(fn: "count", cumulative: true)
total_sum: BigDecimal! @aggregate(fn: "sum", arg: "amount", cumulative: true)
}

type CountOnly @aggregation(intervals: ["day"], source: "Data") {
id: Int8!
timestamp: Timestamp!
count: Int8! @aggregate(fn: "count")
}
"#;

const STATS_HOUR_SQL: &str = r#"\
Expand Down Expand Up @@ -664,6 +650,14 @@ mod tests {
select id, timestamp, $3 as block$, "count", "sum", "total_count", "total_sum" from combined
"#;

const COUNT_ONLY_SQL: &str = r#"\
insert into "sgd007"."count_only_day"(id, timestamp, block$, "count") \
select max(id) as id, timestamp, $3, count(*) as "count" \
from (select id, date_bin('86400s', timestamp, 'epoch'::timestamptz) as timestamp from "sgd007"."data" \
where "sgd007"."data".timestamp >= $1 and "sgd007"."data".timestamp < $2 \
order by "sgd007"."data".timestamp) data \
group by timestamp"#;

#[track_caller]
fn rollup_for<'a>(layout: &'a Layout, table_name: &str) -> &'a Rollup {
layout
Expand All @@ -679,7 +673,7 @@ mod tests {
let site = Arc::new(make_dummy_site(hash, nsp, "rollup".to_string()));
let catalog = Catalog::for_tests(site.clone(), BTreeSet::new()).unwrap();
let layout = Layout::new(site, &schema, catalog).unwrap();
assert_eq!(5, layout.rollups.len());
assert_eq!(6, layout.rollups.len());

// Intervals are non-decreasing
assert!(layout.rollups[0].interval <= layout.rollups[1].interval);
Expand All @@ -698,5 +692,8 @@ mod tests {

let lifetime = rollup_for(&layout, "lifetime_day");
check_eqv(LIFETIME_SQL, &lifetime.insert_sql);

let count_only = rollup_for(&layout, "count_only_day");
check_eqv(COUNT_ONLY_SQL, &count_only.insert_sql);
}
}
Loading