From 269a9a77b950b36c7c812c57889aa41f23c0b98a Mon Sep 17 00:00:00 2001 From: Sebastian Krantz Date: Thu, 6 Jun 2024 17:34:03 +0200 Subject: [PATCH] Fix collapse groupby q8 (#54) Thanks! Planning on running all the solutions again soon on latest versions --- _benchplot/benchplot-dict.R | 4 +- collapse/groupby-collapse.R | 79 ++++++++++++++++--------------------- 2 files changed, 35 insertions(+), 48 deletions(-) diff --git a/_benchplot/benchplot-dict.R b/_benchplot/benchplot-dict.R index 59a13bf8..9d650875 100644 --- a/_benchplot/benchplot-dict.R +++ b/_benchplot/benchplot-dict.R @@ -75,8 +75,8 @@ groupby.syntax.dict = {list( "sum v1:v3 by id6" = "x |> group_by(id6) |> select(v1:v3) |> sum()", "median v3 sd v3 by id4 id5" = "x |> group_by(id4, id5) |> summarise(v3_median = median(v3), v3_sd = sd(v3))", "max v1 - min v2 by id3" = "x |> group_by(id3) |> summarise(range_v1_v2=max(v1)%-=%min(v2))", - "largest two v3 by id6" = "x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, 1-1e-7, ties = 'min'))", - "regression v1 v2 by id2 id4" = "x |> group_by(id2, id4) |> mutate(tmp = scale(v1)%*=%scale(v2)) |> summarise(r2 = (sum(tmp)%/=%(nobs(tmp)%-=%1))^2)", + "largest two v3 by id6" = "x |> gby(id6) |> smr(max=max(v3), sec=nth(v3,.9999,ties='min')) |> pivot('id6') |> compute(largest2_v3=value, keep='id6')", + "regression v1 v2 by id2 id4" = "x |> group_by(id2, id4) |> summarise(r2 = cor(v1, v2, use='na.or.complete')^2)", "sum v3 count by id1:id6" = "x |> group_by(id1:id6) |> summarise(v3=sum(v3), count=n())" )}, "data.table" = {c( diff --git a/collapse/groupby-collapse.R b/collapse/groupby-collapse.R index 1a81f82f..9574a810 100755 --- a/collapse/groupby-collapse.R +++ b/collapse/groupby-collapse.R @@ -6,7 +6,7 @@ source("./_helpers/helpers.R") stopifnot(requireNamespace("data.table", quietly=TRUE)) # collapse does not support integer64. Oversized ints will be summed to double. .libPaths("./collapse/r-collapse") # tidyverse/collapse#4641 -suppressPackageStartupMessages(library("collapse", lib.loc="./collapse/r-collapse", warn.conflicts=FALSE)) +suppressPackageStartupMessages(library("collapse", lib.loc="./collapse/r-collapse", warn.conflicts=FALSE)) ver = packageVersion("collapse") git = "" # uses stable version now #124 task = "groupby" @@ -23,12 +23,10 @@ x = data.table::fread(src_grp, showProgress=FALSE, stringsAsFactors=TRUE, na.str print(nrow(x)) gc() -big_data <- nrow(x) > 1e8 - # Setting collapse options: namespace masking and performance -oldopts <- set_collapse(nthreads = data.table::getDTthreads(), +oldopts <- set_collapse(nthreads = max(data.table::getDTthreads(), 4), mask = "all", - sort = endsWith(data_name, "_1"), + sort = endsWith(data_name, "_1") || nrow(x) > 2e8, na.rm = anyNA(num_vars(x)), stable.algo = FALSE) @@ -36,12 +34,12 @@ task_init = proc.time()[["elapsed"]] cat("grouping...\n") question = "sum v1 by id1" # q1 -t = system.time(print(dim(ans<-collap(x, v1 ~ id1, sum))))[["elapsed"]] +t = system.time(print(dim(ans<-collap(x, v1 ~ id1, sum, fill = TRUE))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-sum(ans$v1))[["elapsed"]] write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) rm(ans) -t = system.time(print(dim(ans<-collap(x, v1 ~ id1, sum))))[["elapsed"]] +t = system.time(print(dim(ans<-collap(x, v1 ~ id1, sum, fill = TRUE))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-sum(ans$v1))[["elapsed"]] write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) @@ -50,12 +48,12 @@ print(tail(ans, 3)) rm(ans) question = "sum v1 by id1:id2" # q2 -t = system.time(print(dim(ans<-collap(x, v1 ~ id1 + id2, sum))))[["elapsed"]] +t = system.time(print(dim(ans<-collap(x, v1 ~ id1 + id2, sum, fill = TRUE))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-sum(x$v1))[["elapsed"]] write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) rm(ans) -t = system.time(print(dim(ans<-collap(x, v1 ~ id1 + id2, sum))))[["elapsed"]] +t = system.time(print(dim(ans<-collap(x, v1 ~ id1 + id2, sum, fill = TRUE))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-sum(x$v1))[["elapsed"]] write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) @@ -64,12 +62,13 @@ print(tail(ans, 3)) rm(ans) question = "sum v1 mean v3 by id3" # q3 -t = system.time(print(dim(ans<-collap(x, ~ id3, custom = list(sum = "v1", mean = "v3")))))[["elapsed"]] +options(collapse_unused_arg_action = "none") +t = system.time(print(dim(ans<-collap(x, ~ id3, custom = list(sum = "v1", mean = "v3"), fill = TRUE))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-qDF(list(v1=sum(ans$v1), v3=sum(ans$v3))))[["elapsed"]] write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) rm(ans) -t = system.time(print(dim(ans<-collap(x, ~ id3, custom = list(sum = "v1", mean = "v3")))))[["elapsed"]] +t = system.time(print(dim(ans<-collap(x, ~ id3, custom = list(sum = "v1", mean = "v3"), fill = TRUE))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-qDF(list(v1=sum(ans$v1), v3=sum(ans$v3))))[["elapsed"]] write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) @@ -92,12 +91,12 @@ print(tail(ans, 3)) rm(ans) question = "sum v1:v3 by id6" # q5 -t = system.time(print(dim(ans<-x |> group_by(id6) |> select(v1:v3) |> sum())))[["elapsed"]] +t = system.time(print(dim(ans<-x |> group_by(id6) |> select(v1:v3) |> sum(fill = TRUE))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-ans |> select(v1, v2, v3) |> sum())[["elapsed"]] write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) rm(ans) -t = system.time(print(dim(ans<-x |> group_by(id6) |> select(v1:v3) |> sum())))[["elapsed"]] +t = system.time(print(dim(ans<-x |> group_by(id6) |> select(v1:v3) |> sum(fill = TRUE))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-ans |> select(v1, v2, v3) |> sum())[["elapsed"]] write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) @@ -106,7 +105,6 @@ print(tail(ans, 3)) rm(ans) question = "median v3 sd v3 by id4 id5" # q6 -if(big_data) set_collapse(sort = TRUE) # This is because with sort = FALSE, an internal ordering vector for the elements to be passed to quickselect still needs to be computed. It turns out that the cost of this increases disproportionally with data size, so grouping directly with sort = TRUE is faster on big data. t = system.time(print(dim(ans<-x |> group_by(id4, id5) |> summarize(v3_median = median(v3), v3_sd = sd(v3)))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-ans |> select(v3_median, v3_sd) |> sum())[["elapsed"]] @@ -116,7 +114,6 @@ t = system.time(print(dim(ans<-x |> group_by(id4, id5) |> summarize(v3_median = m = memory_usage() chkt = system.time(chk<-ans |> select(v3_median, v3_sd) |> sum())[["elapsed"]] write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -if(big_data) set_collapse(sort = endsWith(data_name, "_1")) print(head(ans, 3)) print(tail(ans, 3)) rm(ans) @@ -136,39 +133,30 @@ print(tail(ans, 3)) rm(ans) # Note: this is a native collapse solution to this problem: utilizing the fast fnth() function and quickselect to get the second largest element -# please fix the following error: Occurs when all benchmark results are compared with each other. -# all other solutions have out_rows=200000, collapse has out_rows=100000 -# Quitting from lines 26-55 [init] (index.Rmd) -# Error in `model_time()`: -# ! Value of 'out_rows' varies for different runs for single question -# Backtrace: -# 1. global time_logs() -# 2. global model_time(new_ct) -# question = "largest two v3 by id6" # q8 -# if(big_data) set_collapse(sort = TRUE) # This is because with sort = FALSE, an internal ordering vector for the elements to be passed to quickselect still needs to be computed. It turns out that the cost of this increases disproportionally with data size, so grouping directly with sort = TRUE is faster on big data. -# t = system.time(print(dim(ans<-x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, 1-1e-7, ties = "min")))))[["elapsed"]] -# m = memory_usage() -# chkt = system.time(chk<-summarise(ans, largest2_v3=sum(max_v3+second_v3)))[["elapsed"]] -# write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# rm(ans) -# t = system.time(print(dim(ans<-x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, 1-1e-7, ties = "min")))))[["elapsed"]] -# m = memory_usage() -# chkt = system.time(chk<-summarise(ans, largest2_v3=sum(max_v3+second_v3)))[["elapsed"]] -# write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# if(big_data) set_collapse(sort = endsWith(data_name, "_1")) -# print(head(ans, 3)) -# print(tail(ans, 3)) -# rm(ans) - -# Note: this is also a native collapse solution: this expression is fully vectorized using the functions the package provides. It could be executed on many more groups -# without large performance decay. The package does not currently provide a vectorized correlation function. + +question = "largest two v3 by id6" # q8 +t = system.time(print(dim(ans<-x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, .99999, ties = "min")) |> pivot("id6") |> compute(largest2_v3 = value, keep = "id6"))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-summarise(ans, largest2_v3=sum(largest2_v3)))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(dim(ans<-x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, .99999, ties = "min")) |> pivot("id6") |> compute(largest2_v3 = value, keep = "id6"))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-summarise(ans, largest2_v3=sum(largest2_v3)))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + + +# Previous: x |> group_by(id2, id4) |> mutate(tmp = scale(v1)%*=%scale(v2)) |> summarise(r2 = (sum(tmp)%/=%(nobs(tmp)%-=%1))^2) question = "regression v1 v2 by id2 id4" # q9 -t = system.time(print(dim(ans<-x |> group_by(id2, id4) |> mutate(tmp = scale(v1)%*=%scale(v2)) |> summarise(r2 = (sum(tmp)%/=%(nobs(tmp)%-=%1))^2))))[["elapsed"]] +t = system.time(print(dim(ans<-x |> group_by(id2, id4) |> summarise(r2 = cor(v1, v2, use = "na.or.complete")^2))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-summarise(ans, r2=sum(r2)))[["elapsed"]] write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) rm(ans) -t = system.time(print(dim(ans<-x |> group_by(id2, id4) |> mutate(tmp = scale(v1)%*=%scale(v2)) |> summarise(r2 = (sum(tmp)%/=%(nobs(tmp)%-=%1))^2))))[["elapsed"]] +t = system.time(print(dim(ans<-x |> group_by(id2, id4) |> summarise(r2 = cor(v1, v2, use = "na.or.complete")^2))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-summarise(ans, r2=sum(r2)))[["elapsed"]] write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) @@ -176,14 +164,13 @@ print(head(ans, 3)) print(tail(ans, 3)) rm(ans) -# TODO: it could be that on really big data, radix ordering (sort = TRUE) is faster than hashing question = "sum v3 count by id1:id6" # q10 -t = system.time(print(dim(ans<-x |> group_by(id1:id6) |> summarise(v3=sum(v3), count=n()))))[["elapsed"]] +t = system.time(print(dim(ans<-x |> group_by(id1:id6) |> summarise(v3=sum(v3, fill = TRUE), count=n()))))[["elapsed"]] m = memory_usage() chkt = system.time(chk<-summarise(ans, v3=sum(v3), count=sum(count)))[["elapsed"]] write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) rm(ans) -t = system.time(print(dim(ans<-x |> group_by(id1:id6) |> summarise(v3=sum(v3), count=n()))))[["elapsed"]] +t = system.time(print(dim(ans<-x |> group_by(id1:id6) |> summarise(v3=sum(v3, fill = TRUE), count=n()))))[["elapsed"]] mn = memory_usage() chkt = system.time(chk<-summarise(ans, v3=sum(v3), count=sum(count)))[["elapsed"]] write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)