From 77fd23a5d46f609ba5af3479a5853355c9d7616d Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Sun, 8 Sep 2024 23:45:10 -0400 Subject: [PATCH 1/3] fix comma bug #5634 --- store/postgres/src/relational/rollup.rs | 43 ++++++++++++++++++------- 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/store/postgres/src/relational/rollup.rs b/store/postgres/src/relational/rollup.rs index 89aa22675a3..b25eb32da98 100644 --- a/store/postgres/src/relational/rollup.rs +++ b/store/postgres/src/relational/rollup.rs @@ -332,20 +332,24 @@ impl<'a> RollupSql<'a> { Ok(IdType::String) | Ok(IdType::Int8) => "max(id)", Err(_) => unreachable!("we make sure that the primary key has an id_type"), }; - write!(w, "select {max_id} as id, timestamp, ")?; + write!(w, "select {max_id} as id, timestamp")?; if with_block { - write!(w, "$3, ")?; + write!(w, ", $3")?; } - write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| { - agg.aggregate("id", w) - })?; + if !self.dimensions.is_empty() { + write!(w, ", ")?; + write_dims(self.dimensions, w)?; + } + comma_sep(self.aggregates, false, w, |w, agg| agg.aggregate("id", w))?; let secs = self.interval.as_duration().as_secs(); write!( w, - " from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp, " + " from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp" )?; - write_dims(self.dimensions, w)?; + if !self.dimensions.is_empty() { + write!(w, ", ")?; + write_dims(self.dimensions, w)?; + } let agg_srcs: Vec<&str> = { let mut agg_srcs: Vec<_> = self .aggregates @@ -358,9 +362,7 @@ impl<'a> RollupSql<'a> { agg_srcs.dedup(); agg_srcs }; - comma_sep(agg_srcs, self.dimensions.is_empty(), w, |w, col: &str| { - write!(w, "\"{}\"", col) - })?; + comma_sep(agg_srcs, false, w, |w, col: &str| write!(w, "\"{}\"", col))?; write!( w, " from {src_table} where {src_table}.timestamp >= $1 and {src_table}.timestamp < $2", @@ -592,6 +594,12 @@ mod tests { total_count: Int8! @aggregate(fn: "count", cumulative: true) total_sum: BigDecimal! @aggregate(fn: "sum", arg: "amount", cumulative: true) } + + type CountOnly @aggregation(intervals: ["day"], source: "Data") { + id: Int8! + timestamp: Timestamp! + count: Int8! @aggregate(fn: "count") + } "#; const STATS_HOUR_SQL: &str = r#"\ @@ -664,6 +672,14 @@ mod tests { select id, timestamp, $3 as block$, "count", "sum", "total_count", "total_sum" from combined "#; + const COUNT_ONLY_SQL: &str = r#"\ + insert into "sgd007"."count_only_day"(id, timestamp, block$, "count") \ + select max(id) as id, timestamp, $3, count(*) as "count" \ + from (select id, date_bin('86400s', timestamp, 'epoch'::timestamptz) as timestamp from "sgd007"."data" \ + where "sgd007"."data".timestamp >= $1 and "sgd007"."data".timestamp < $2 \ + order by "sgd007"."data".timestamp) data \ + group by timestamp"#; + #[track_caller] fn rollup_for<'a>(layout: &'a Layout, table_name: &str) -> &'a Rollup { layout @@ -679,7 +695,7 @@ mod tests { let site = Arc::new(make_dummy_site(hash, nsp, "rollup".to_string())); let catalog = Catalog::for_tests(site.clone(), BTreeSet::new()).unwrap(); let layout = Layout::new(site, &schema, catalog).unwrap(); - assert_eq!(5, layout.rollups.len()); + assert_eq!(6, layout.rollups.len()); // Intervals are non-decreasing assert!(layout.rollups[0].interval <= layout.rollups[1].interval); @@ -698,5 +714,8 @@ mod tests { let lifetime = rollup_for(&layout, "lifetime_day"); check_eqv(LIFETIME_SQL, &lifetime.insert_sql); + + let count_only = rollup_for(&layout, "count_only_day"); + check_eqv(COUNT_ONLY_SQL, &count_only.insert_sql); } } From d29f8f678127f18a47fc333aeee21ce58a7fbdce Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Sun, 8 Sep 2024 23:53:43 -0400 Subject: [PATCH 2/3] refactor sql comma logic --- store/postgres/src/relational/rollup.rs | 30 ++++++++----------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/store/postgres/src/relational/rollup.rs b/store/postgres/src/relational/rollup.rs index b25eb32da98..a172ecaa7ad 100644 --- a/store/postgres/src/relational/rollup.rs +++ b/store/postgres/src/relational/rollup.rs @@ -336,20 +336,14 @@ impl<'a> RollupSql<'a> { if with_block { write!(w, ", $3")?; } - if !self.dimensions.is_empty() { - write!(w, ", ")?; - write_dims(self.dimensions, w)?; - } + write_dims(self.dimensions, w)?; comma_sep(self.aggregates, false, w, |w, agg| agg.aggregate("id", w))?; let secs = self.interval.as_duration().as_secs(); write!( w, " from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp" )?; - if !self.dimensions.is_empty() { - write!(w, ", ")?; - write_dims(self.dimensions, w)?; - } + write_dims(self.dimensions, w)?; let agg_srcs: Vec<&str> = { let mut agg_srcs: Vec<_> = self .aggregates @@ -373,10 +367,7 @@ impl<'a> RollupSql<'a> { " order by {src_table}.timestamp) data group by timestamp", src_table = self.src_table )?; - Ok(if !self.dimensions.is_empty() { - write!(w, ", ")?; - write_dims(self.dimensions, w)?; - }) + Ok(write_dims(self.dimensions, w)?) } fn select(&self, w: &mut dyn fmt::Write) -> fmt::Result { @@ -390,11 +381,11 @@ impl<'a> RollupSql<'a> { fn insert_into(&self, w: &mut dyn fmt::Write) -> fmt::Result { write!( w, - "insert into {}(id, timestamp, block$, ", + "insert into {}(id, timestamp, block$", self.agg_table.qualified_name )?; write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| { + comma_sep(self.aggregates, false, w, |w, agg| { write!(w, "\"{}\"", agg.agg_column.name) })?; write!(w, ") ") @@ -443,10 +434,7 @@ impl<'a> RollupSql<'a> { " from (select *, 1 as seq from prev union all select *, 2 as seq from bucket) u " )?; write!(w, " group by id, timestamp")?; - if !self.dimensions.is_empty() { - write!(w, ", ")?; - write_dims(self.dimensions, w)?; - } + write_dims(self.dimensions, w)?; Ok(()) } @@ -478,9 +466,9 @@ impl<'a> RollupSql<'a> { self.select_cte(w)?; write!(w, " ")?; self.insert_into(w)?; - write!(w, "select id, timestamp, $3 as block$, ")?; + write!(w, "select id, timestamp, $3 as block$")?; write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| { + comma_sep(self.aggregates, false, w, |w, agg| { write!(w, "\"{}\"", agg.agg_column.name) })?; write!(w, " from combined") @@ -519,7 +507,7 @@ where /// Write the names of the columns in `dimensions` into `w` as a /// comma-separated list of quoted column names. fn write_dims(dimensions: &[&Column], w: &mut dyn fmt::Write) -> fmt::Result { - comma_sep(dimensions, true, w, |w, col| write!(w, "\"{}\"", col.name)) + comma_sep(dimensions, false, w, |w, col| write!(w, "\"{}\"", col.name)) } #[cfg(test)] From b9967f94da3f5b81dfd9753f26e38e9b7b42bfdf Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Mon, 9 Sep 2024 13:16:53 -0400 Subject: [PATCH 3/3] simplify helper function --- store/postgres/src/relational/rollup.rs | 32 +++++++++---------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/store/postgres/src/relational/rollup.rs b/store/postgres/src/relational/rollup.rs index a172ecaa7ad..7a55bf20a75 100644 --- a/store/postgres/src/relational/rollup.rs +++ b/store/postgres/src/relational/rollup.rs @@ -337,7 +337,7 @@ impl<'a> RollupSql<'a> { write!(w, ", $3")?; } write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, false, w, |w, agg| agg.aggregate("id", w))?; + comma_sep(self.aggregates, w, |w, agg| agg.aggregate("id", w))?; let secs = self.interval.as_duration().as_secs(); write!( w, @@ -356,7 +356,7 @@ impl<'a> RollupSql<'a> { agg_srcs.dedup(); agg_srcs }; - comma_sep(agg_srcs, false, w, |w, col: &str| write!(w, "\"{}\"", col))?; + comma_sep(agg_srcs, w, |w, col: &str| write!(w, "\"{}\"", col))?; write!( w, " from {src_table} where {src_table}.timestamp >= $1 and {src_table}.timestamp < $2", @@ -385,7 +385,7 @@ impl<'a> RollupSql<'a> { self.agg_table.qualified_name )?; write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, false, w, |w, agg| { + comma_sep(self.aggregates, w, |w, agg| { write!(w, "\"{}\"", agg.agg_column.name) })?; write!(w, ") ") @@ -406,10 +406,10 @@ impl<'a> RollupSql<'a> { /// for any group keys that appear in `bucket` fn select_prev(&self, w: &mut dyn fmt::Write) -> fmt::Result { write!(w, "select bucket.id, bucket.timestamp")?; - comma_sep(self.dimensions, false, w, |w, col| { + comma_sep(self.dimensions, w, |w, col| { write!(w, "bucket.\"{}\"", col.name) })?; - comma_sep(self.aggregates, false, w, |w, agg| agg.prev_agg(w))?; + comma_sep(self.aggregates, w, |w, agg| agg.prev_agg(w))?; write!(w, " from bucket cross join lateral (")?; write!(w, "select * from {} prev", self.agg_table.qualified_name)?; write!(w, " where prev.timestamp < $1")?; @@ -425,10 +425,8 @@ impl<'a> RollupSql<'a> { fn select_combined(&self, w: &mut dyn fmt::Write) -> fmt::Result { write!(w, "select id, timestamp")?; - comma_sep(self.dimensions, false, w, |w, col| { - write!(w, "\"{}\"", col.name) - })?; - comma_sep(self.aggregates, false, w, |w, agg| agg.combine("seq", w))?; + comma_sep(self.dimensions, w, |w, col| write!(w, "\"{}\"", col.name))?; + comma_sep(self.aggregates, w, |w, agg| agg.combine("seq", w))?; write!( w, " from (select *, 1 as seq from prev union all select *, 2 as seq from bucket) u " @@ -468,7 +466,7 @@ impl<'a> RollupSql<'a> { self.insert_into(w)?; write!(w, "select id, timestamp, $3 as block$")?; write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, false, w, |w, agg| { + comma_sep(self.aggregates, w, |w, agg| { write!(w, "\"{}\"", agg.agg_column.name) })?; write!(w, " from combined") @@ -485,20 +483,12 @@ impl<'a> RollupSql<'a> { /// Write the elements in `list` separated by commas into `w`. The list /// elements are written by calling `out` with each of them. -fn comma_sep( - list: impl IntoIterator, - mut first: bool, - w: &mut dyn fmt::Write, - out: F, -) -> fmt::Result +fn comma_sep(list: impl IntoIterator, w: &mut dyn fmt::Write, out: F) -> fmt::Result where F: Fn(&mut dyn fmt::Write, T) -> fmt::Result, { for elem in list { - if !first { - write!(w, ", ")?; - } - first = false; + write!(w, ", ")?; out(w, elem)?; } Ok(()) @@ -507,7 +497,7 @@ where /// Write the names of the columns in `dimensions` into `w` as a /// comma-separated list of quoted column names. fn write_dims(dimensions: &[&Column], w: &mut dyn fmt::Write) -> fmt::Result { - comma_sep(dimensions, false, w, |w, col| write!(w, "\"{}\"", col.name)) + comma_sep(dimensions, w, |w, col| write!(w, "\"{}\"", col.name)) } #[cfg(test)]