Skip to content

Commit

Permalink
Fix collapse groupby q8 (#54)
Browse files Browse the repository at this point in the history
Thanks! Planning on running all the solutions again soon on latest versions
  • Loading branch information
SebKrantz authored Jun 6, 2024
1 parent b05e367 commit 269a9a7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 48 deletions.
4 changes: 2 additions & 2 deletions _benchplot/benchplot-dict.R
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ groupby.syntax.dict = {list(
"sum v1:v3 by id6" = "x |> group_by(id6) |> select(v1:v3) |> sum()",
"median v3 sd v3 by id4 id5" = "x |> group_by(id4, id5) |> summarise(v3_median = median(v3), v3_sd = sd(v3))",
"max v1 - min v2 by id3" = "x |> group_by(id3) |> summarise(range_v1_v2=max(v1)%-=%min(v2))",
"largest two v3 by id6" = "x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, 1-1e-7, ties = 'min'))",
"regression v1 v2 by id2 id4" = "x |> group_by(id2, id4) |> mutate(tmp = scale(v1)%*=%scale(v2)) |> summarise(r2 = (sum(tmp)%/=%(nobs(tmp)%-=%1))^2)",
"largest two v3 by id6" = "x |> gby(id6) |> smr(max=max(v3), sec=nth(v3,.9999,ties='min')) |> pivot('id6') |> compute(largest2_v3=value, keep='id6')",
"regression v1 v2 by id2 id4" = "x |> group_by(id2, id4) |> summarise(r2 = cor(v1, v2, use='na.or.complete')^2)",
"sum v3 count by id1:id6" = "x |> group_by(id1:id6) |> summarise(v3=sum(v3), count=n())"
)},
"data.table" = {c(
Expand Down
79 changes: 33 additions & 46 deletions collapse/groupby-collapse.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ source("./_helpers/helpers.R")

stopifnot(requireNamespace("data.table", quietly=TRUE)) # collapse does not support integer64. Oversized ints will be summed to double.
.libPaths("./collapse/r-collapse") # tidyverse/collapse#4641
suppressPackageStartupMessages(library("collapse", lib.loc="./collapse/r-collapse", warn.conflicts=FALSE))
suppressPackageStartupMessages(library("collapse", lib.loc="./collapse/r-collapse", warn.conflicts=FALSE))
ver = packageVersion("collapse")
git = "" # uses stable version now #124
task = "groupby"
Expand All @@ -23,25 +23,23 @@ x = data.table::fread(src_grp, showProgress=FALSE, stringsAsFactors=TRUE, na.str
print(nrow(x))
gc()

big_data <- nrow(x) > 1e8

# Setting collapse options: namespace masking and performance
oldopts <- set_collapse(nthreads = data.table::getDTthreads(),
oldopts <- set_collapse(nthreads = max(data.table::getDTthreads(), 4),
mask = "all",
sort = endsWith(data_name, "_1"),
sort = endsWith(data_name, "_1") || nrow(x) > 2e8,
na.rm = anyNA(num_vars(x)),
stable.algo = FALSE)

task_init = proc.time()[["elapsed"]]
cat("grouping...\n")

question = "sum v1 by id1" # q1
t = system.time(print(dim(ans<-collap(x, v1 ~ id1, sum))))[["elapsed"]]
t = system.time(print(dim(ans<-collap(x, v1 ~ id1, sum, fill = TRUE))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-sum(ans$v1))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-collap(x, v1 ~ id1, sum))))[["elapsed"]]
t = system.time(print(dim(ans<-collap(x, v1 ~ id1, sum, fill = TRUE))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-sum(ans$v1))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
Expand All @@ -50,12 +48,12 @@ print(tail(ans, 3))
rm(ans)

question = "sum v1 by id1:id2" # q2
t = system.time(print(dim(ans<-collap(x, v1 ~ id1 + id2, sum))))[["elapsed"]]
t = system.time(print(dim(ans<-collap(x, v1 ~ id1 + id2, sum, fill = TRUE))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-sum(x$v1))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-collap(x, v1 ~ id1 + id2, sum))))[["elapsed"]]
t = system.time(print(dim(ans<-collap(x, v1 ~ id1 + id2, sum, fill = TRUE))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-sum(x$v1))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
Expand All @@ -64,12 +62,13 @@ print(tail(ans, 3))
rm(ans)

question = "sum v1 mean v3 by id3" # q3
t = system.time(print(dim(ans<-collap(x, ~ id3, custom = list(sum = "v1", mean = "v3")))))[["elapsed"]]
options(collapse_unused_arg_action = "none")
t = system.time(print(dim(ans<-collap(x, ~ id3, custom = list(sum = "v1", mean = "v3"), fill = TRUE))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-qDF(list(v1=sum(ans$v1), v3=sum(ans$v3))))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-collap(x, ~ id3, custom = list(sum = "v1", mean = "v3")))))[["elapsed"]]
t = system.time(print(dim(ans<-collap(x, ~ id3, custom = list(sum = "v1", mean = "v3"), fill = TRUE))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-qDF(list(v1=sum(ans$v1), v3=sum(ans$v3))))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
Expand All @@ -92,12 +91,12 @@ print(tail(ans, 3))
rm(ans)

question = "sum v1:v3 by id6" # q5
t = system.time(print(dim(ans<-x |> group_by(id6) |> select(v1:v3) |> sum())))[["elapsed"]]
t = system.time(print(dim(ans<-x |> group_by(id6) |> select(v1:v3) |> sum(fill = TRUE))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-ans |> select(v1, v2, v3) |> sum())[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x |> group_by(id6) |> select(v1:v3) |> sum())))[["elapsed"]]
t = system.time(print(dim(ans<-x |> group_by(id6) |> select(v1:v3) |> sum(fill = TRUE))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-ans |> select(v1, v2, v3) |> sum())[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
Expand All @@ -106,7 +105,6 @@ print(tail(ans, 3))
rm(ans)

question = "median v3 sd v3 by id4 id5" # q6
if(big_data) set_collapse(sort = TRUE) # This is because with sort = FALSE, an internal ordering vector for the elements to be passed to quickselect still needs to be computed. It turns out that the cost of this increases disproportionally with data size, so grouping directly with sort = TRUE is faster on big data.
t = system.time(print(dim(ans<-x |> group_by(id4, id5) |> summarize(v3_median = median(v3), v3_sd = sd(v3)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-ans |> select(v3_median, v3_sd) |> sum())[["elapsed"]]
Expand All @@ -116,7 +114,6 @@ t = system.time(print(dim(ans<-x |> group_by(id4, id5) |> summarize(v3_median =
m = memory_usage()
chkt = system.time(chk<-ans |> select(v3_median, v3_sd) |> sum())[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
if(big_data) set_collapse(sort = endsWith(data_name, "_1"))
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)
Expand All @@ -136,54 +133,44 @@ print(tail(ans, 3))
rm(ans)

# Note: this is a native collapse solution to this problem: utilizing the fast fnth() function and quickselect to get the second largest element
# please fix the following error: Occurs when all benchmark results are compared with each other.
# all other solutions have out_rows=200000, collapse has out_rows=100000
# Quitting from lines 26-55 [init] (index.Rmd)
# Error in `model_time()`:
# ! Value of 'out_rows' varies for different runs for single question
# Backtrace:
# 1. global time_logs()
# 2. global model_time(new_ct)
# question = "largest two v3 by id6" # q8
# if(big_data) set_collapse(sort = TRUE) # This is because with sort = FALSE, an internal ordering vector for the elements to be passed to quickselect still needs to be computed. It turns out that the cost of this increases disproportionally with data size, so grouping directly with sort = TRUE is faster on big data.
# t = system.time(print(dim(ans<-x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, 1-1e-7, ties = "min")))))[["elapsed"]]
# m = memory_usage()
# chkt = system.time(chk<-summarise(ans, largest2_v3=sum(max_v3+second_v3)))[["elapsed"]]
# write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
# rm(ans)
# t = system.time(print(dim(ans<-x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, 1-1e-7, ties = "min")))))[["elapsed"]]
# m = memory_usage()
# chkt = system.time(chk<-summarise(ans, largest2_v3=sum(max_v3+second_v3)))[["elapsed"]]
# write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
# if(big_data) set_collapse(sort = endsWith(data_name, "_1"))
# print(head(ans, 3))
# print(tail(ans, 3))
# rm(ans)

# Note: this is also a native collapse solution: this expression is fully vectorized using the functions the package provides. It could be executed on many more groups
# without large performance decay. The package does not currently provide a vectorized correlation function.

question = "largest two v3 by id6" # q8
t = system.time(print(dim(ans<-x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, .99999, ties = "min")) |> pivot("id6") |> compute(largest2_v3 = value, keep = "id6"))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, largest2_v3=sum(largest2_v3)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x |> group_by(id6) |> summarize(max_v3 = max(v3), second_v3 = nth(v3, .99999, ties = "min")) |> pivot("id6") |> compute(largest2_v3 = value, keep = "id6"))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, largest2_v3=sum(largest2_v3)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)


# Previous: x |> group_by(id2, id4) |> mutate(tmp = scale(v1)%*=%scale(v2)) |> summarise(r2 = (sum(tmp)%/=%(nobs(tmp)%-=%1))^2)
question = "regression v1 v2 by id2 id4" # q9
t = system.time(print(dim(ans<-x |> group_by(id2, id4) |> mutate(tmp = scale(v1)%*=%scale(v2)) |> summarise(r2 = (sum(tmp)%/=%(nobs(tmp)%-=%1))^2))))[["elapsed"]]
t = system.time(print(dim(ans<-x |> group_by(id2, id4) |> summarise(r2 = cor(v1, v2, use = "na.or.complete")^2))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, r2=sum(r2)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x |> group_by(id2, id4) |> mutate(tmp = scale(v1)%*=%scale(v2)) |> summarise(r2 = (sum(tmp)%/=%(nobs(tmp)%-=%1))^2))))[["elapsed"]]
t = system.time(print(dim(ans<-x |> group_by(id2, id4) |> summarise(r2 = cor(v1, v2, use = "na.or.complete")^2))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, r2=sum(r2)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

# TODO: it could be that on really big data, radix ordering (sort = TRUE) is faster than hashing
question = "sum v3 count by id1:id6" # q10
t = system.time(print(dim(ans<-x |> group_by(id1:id6) |> summarise(v3=sum(v3), count=n()))))[["elapsed"]]
t = system.time(print(dim(ans<-x |> group_by(id1:id6) |> summarise(v3=sum(v3, fill = TRUE), count=n()))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v3=sum(v3), count=sum(count)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x |> group_by(id1:id6) |> summarise(v3=sum(v3), count=n()))))[["elapsed"]]
t = system.time(print(dim(ans<-x |> group_by(id1:id6) |> summarise(v3=sum(v3, fill = TRUE), count=n()))))[["elapsed"]]
mn = memory_usage()
chkt = system.time(chk<-summarise(ans, v3=sum(v3), count=sum(count)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
Expand Down

0 comments on commit 269a9a7

Please # to comment.