Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Allow multicolumn transformations for AbstractDataFrame #2461

Merged
merged 21 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 117 additions & 83 deletions src/abstractdataframe/selection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ normalize_selection(idx::AbstractIndex, sel::Pair{<:ColumnIndex, <:AbstractStrin

function normalize_selection(idx::AbstractIndex,
sel::Pair{<:Any,<:Pair{<:Base.Callable,
<:Union{Symbol, AbstractString, DataType,
AbstractVector{Symbol}, AbstractVector{<:AbstractString}}}},
<:Union{Symbol, AbstractString, DataType,
AbstractVector{Symbol},
AbstractVector{<:AbstractString}}}},
renamecols::Bool)
lls = last(last(sel))
if lls isa DataType
Expand Down Expand Up @@ -169,33 +170,25 @@ function normalize_selection(idx::AbstractIndex,
return (wanttable ? AsTable(c) : c) => fun => newcol
end

function _transformation_helper(df, col_idx, @nospecialize(fun))
function _transformation_helper(df::AbstractDataFrame,
col_idx::Union{Nothing, Int, AbstractVector{Int}, AsTable},
@nospecialize(fun))
if col_idx === nothing
return fun(df)
elseif col_idx isa Int
return fun(df[!, col_idx])
elseif col_idx isa AsTable
tbl = Tables.columntable(select(df, col_idx.cols, copycols=false))
if isempty(tbl) && fun isa ByRow
if isempty(df)
T = Base.return_types(fun.fun, (NamedTuple{(),Tuple{}},))[1]
return T[]
else
return [fun.fun(NamedTuple()) for _ in 1:nrow(df)]
end
return [fun.fun(NamedTuple()) for _ in 1:nrow(df)]
else
return fun(tbl)
end
else
# it should be fast enough here as we do not expect to do it millions of times
@assert col_idx isa AbstractVector{Int}
if isempty(col_idx) && fun isa ByRow
if isempty(df)
T = Base.return_types(fun.fun, ())[1]
return T[]
else
return [fun.fun() for _ in 1:nrow(df)]
end
return [fun.fun() for _ in 1:nrow(df)]
else
cdf = eachcol(df)
return fun(map(c -> cdf[c], col_idx)...)
Expand All @@ -204,40 +197,47 @@ function _transformation_helper(df, col_idx, @nospecialize(fun))
throw(ErrorException("unreachable reached"))
end

function _gen_colnames(@nospecialize(res), newname)
function _gen_colnames(@nospecialize(res), newname::Union{AbstractVector{Symbol},
Type{AsTable}, Nothing})
if res isa AbstractMatrix
colnames = gennames(size(res, 2))
else
colnames = propertynames(res)
end

if !(newname === AsTable || newname === nothing)
if newname !== AsTable && newname !== nothing
if length(colnames) != length(newname)
throw(ArgumentError("Number of returned columns does not match the " *
"length of requested output"))
end
colnames = newname
end

return colnames
# fix the type to avoid unnecesarry compilations of methods
# this should be cheap
return colnames isa Vector{Symbol} ? colnames : collect(Symbol, colnames)
end

function _expand_to_table(@nospecialize(res))
if res isa AbstractVector && !isempty(res)
kp1 = keys(res[1])
all(x -> keys(x) == kp1, res) || throw(ArgumentError("keys of the returned elements must be identical"))
newres = DataFrame()
prepend = all(x -> x isa Integer, kp1)
for n in kp1
newres[!, prepend ? Symbol("x", n) : Symbol(n)] = [x[n] for x in res]
end
return newres
else
return Tables.columntable(res)
_expand_to_table(res) = Tables.columntable(res)
_expand_to_table(res::Union{AbstractDataFrame, NamedTuple, DataFrameRow, AbstractMatrix}) = res

function _expand_to_table(res::AbstractVector)
isempty(res) && return Tables.columntable(res)
kp1 = keys(res[1])
if any(x -> !isequal(keys(x), kp1), res)
throw(ArgumentError("keys of the returned elements must be identical"))
end
newres = DataFrame()
prepend = all(x -> x isa Integer, kp1)
for n in kp1
newres[!, prepend ? Symbol("x", n) : Symbol(n)] = [x[n] for x in res]
end
return newres
end

function _insert_row_multicolumn(df, newdf, allow_resizing_newdf, colnames, res)
function _insert_row_multicolumn(newdf::DataFrame, df::AbstractDataFrame,
allow_resizing_newdf::Ref{Bool}, colnames::AbstractVector{Symbol},
res::Union{NamedTuple, DataFrameRow})
if ncol(newdf) == 0
# if allow_resizing_newdf[] is false we know this is select or transform
rows = allow_resizing_newdf[] ? 1 : nrow(df)
Expand All @@ -247,12 +247,14 @@ function _insert_row_multicolumn(df, newdf, allow_resizing_newdf, colnames, res)
end
@assert length(colnames) == length(res)
for (newname, v) in zip(colnames, res)
# note that newdf potentially can contain c in general
# note that newdf potentially can contain newname in general
newdf[!, newname] = fill!(Tables.allocatecolumn(typeof(v), rows), v)
end
end

function _fix_existing_columns_for_vector(newdf, df, allow_resizing_newdf, lr, @nospecialize(fun))
function _fix_existing_columns_for_vector(newdf::DataFrame, df::AbstractDataFrame,
allow_resizing_newdf::Ref{Bool}, lr::Int,
@nospecialize(fun))
# allow shortening to 0 rows
if allow_resizing_newdf[] && nrow(newdf) == 1
newdfcols = _columns(newdf)
Expand All @@ -270,20 +272,85 @@ function _fix_existing_columns_for_vector(newdf, df, allow_resizing_newdf, lr, @
allow_resizing_newdf[] = false
end

function _add_col_check_copy(df, newdf, col_idx, copycols, @nospecialize(fun), newname, @nospecialize(v))
function _add_col_check_copy(newdf::DataFrame, df::AbstractDataFrame,
col_idx::Union{Nothing, Int, AbstractVector{Int}, AsTable},
copycols::Bool, @nospecialize(fun),
newname::Symbol, @nospecialize(v))
cdf = eachcol(df)
vpar = parent(v)
parent_cols = col_idx isa AsTable ? col_idx.cols : (col_idx === nothing ? (1:ncol(df)) : col_idx)
parent_cols = col_idx isa AsTable ? col_idx.cols : something(col_idx, 1:ncol(df))
if copycols && !(fun isa ByRow) && (v isa SubArray || any(i -> vpar === parent(cdf[i]), parent_cols))
newdf[!, newname] = copy(v)
else
newdf[!, newname] = v
end
end

function _add_multicol_res(res::AbstractDataFrame, newdf::DataFrame, df::AbstractDataFrame,
colnames::AbstractVector{Symbol},
allow_resizing_newdf::Ref{Bool}, @nospecialize(fun),
col_idx::Union{Nothing, Int, AbstractVector{Int}, AsTable},
copycols::Bool, newname::Union{Nothing, Type{AsTable}, AbstractVector{Symbol}})
lr = nrow(res)
_fix_existing_columns_for_vector(newdf, df, allow_resizing_newdf, lr, fun)
@assert length(colnames) == ncol(res)
for (newname, v) in zip(colnames, eachcol(res))
_add_col_check_copy(newdf, df, col_idx, copycols, fun, newname, v)
end
end

function _add_multicol_res(res::AbstractMatrix, newdf::DataFrame, df::AbstractDataFrame,
colnames::AbstractVector{Symbol},
allow_resizing_newdf::Ref{Bool}, @nospecialize(fun),
col_idx::Union{Nothing, Int, AbstractVector{Int}, AsTable},
copycols::Bool, newname::Union{Nothing, Type{AsTable}, AbstractVector{Symbol}})
lr = size(res, 1)
_fix_existing_columns_for_vector(newdf, df, allow_resizing_newdf, lr, fun)
@assert length(colnames) == size(res, 2)
for (i, newname) in enumerate(colnames)
newdf[!, newname] = res[:, i]
end
end

function _add_multicol_res(res::NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}},
newdf::DataFrame, df::AbstractDataFrame,
colnames::AbstractVector{Symbol},
allow_resizing_newdf::Ref{Bool}, @nospecialize(fun),
col_idx::Union{Nothing, Int, AbstractVector{Int}, AsTable},
copycols::Bool, newname::Union{Nothing, Type{AsTable}, AbstractVector{Symbol}})
lr = length(res[1])
_fix_existing_columns_for_vector(newdf, df, allow_resizing_newdf, lr, fun)
@assert length(colnames) == length(res)
for (newname, v) in zip(colnames, res)
_add_col_check_copy(newdf, df, col_idx, copycols, fun, newname, v)
end
end

function _add_multicol_res(res::NamedTuple, newdf::DataFrame, df::AbstractDataFrame,
colnames::AbstractVector{Symbol},
allow_resizing_newdf::Ref{Bool}, @nospecialize(fun),
col_idx::Union{Nothing, Int, AbstractVector{Int}, AsTable},
copycols::Bool, newname::Union{Nothing, Type{AsTable}, AbstractVector{Symbol}})
if any(v -> v isa AbstractVector, res)
throw(ArgumentError("mixing single values and vectors in a named tuple is not allowed"))
else
_insert_row_multicolumn(newdf, df, allow_resizing_newdf, colnames, res)
end
end

function _add_multicol_res(res::DataFrameRow, newdf::DataFrame, df::AbstractDataFrame,
colnames::AbstractVector{Symbol},
allow_resizing_newdf::Ref{Bool}, @nospecialize(fun),
col_idx::Union{Nothing, Int, AbstractVector{Int}, AsTable},
copycols::Bool, newname::Union{Nothing, Type{AsTable}, AbstractVector{Symbol}})
_insert_row_multicolumn(newdf, df, allow_resizing_newdf, colnames, res)
end

function select_transform!(@nospecialize(nc::Union{Base.Callable, Pair{<:Union{Int, AbstractVector{Int}, AsTable},
<:Pair{<:Base.Callable,
<:Union{Symbol, AbstractVector{Symbol}, DataType}}}}),
<:Pair{<:Base.Callable,
<:Union{Symbol,
AbstractVector{Symbol},
DataType}}}}),
df::AbstractDataFrame, newdf::DataFrame,
transformed_cols::Set{Symbol}, copycols::Bool,
allow_resizing_newdf::Ref{Bool})
Expand All @@ -300,8 +367,7 @@ function select_transform!(@nospecialize(nc::Union{Base.Callable, Pair{<:Union{I
# in _manipulate, therefore in select_transform! such a duplicate should not happen
res = _transformation_helper(df, col_idx, fun)

if (newname === AsTable || newname isa AbstractVector{Symbol}) &&
!(res isa Union{AbstractDataFrame, NamedTuple, DataFrameRow, AbstractMatrix})
if (newname === AsTable || newname isa AbstractVector{Symbol})
res = _expand_to_table(res)
end

Expand All @@ -313,60 +379,33 @@ function select_transform!(@nospecialize(nc::Union{Base.Callable, Pair{<:Union{I
isempty(colnames) && return # nothing to do

if any(in(transformed_cols), colnames)
throw(ArgumentError("Duplicate column name returned"))
throw(ArgumentError("Duplicate column name(s) returned: :" *
"$(join(intersect(colnames, transformed_cols), ", :"))"))
else
startlen = length(transformed_cols)
union!(transformed_cols, colnames)
@assert startlen + length(colnames) == length(transformed_cols)
end
if res isa AbstractDataFrame
lr = nrow(res)
_fix_existing_columns_for_vector(newdf, df, allow_resizing_newdf, lr, fun)
@assert length(colnames) == ncol(res)
for (newname, v) in zip(colnames, eachcol(res))
_add_col_check_copy(df, newdf, col_idx, copycols, fun, newname, v)
end
elseif res isa AbstractMatrix
lr = size(res, 1)
_fix_existing_columns_for_vector(newdf, df, allow_resizing_newdf, lr, fun)
@assert length(colnames) == size(res, 2)
for (i, newname) in enumerate(colnames)
newdf[!, newname] = res[:, i]
end
elseif res isa NamedTuple
if all(v -> v isa AbstractVector, res)
lr = length(res[1])
_fix_existing_columns_for_vector(newdf, df, allow_resizing_newdf, lr, fun)
@assert length(colnames) == length(res)
for (newname, v) in zip(colnames, res)
_add_col_check_copy(df, newdf, col_idx, copycols, fun, newname, v)
end
elseif any(v -> v isa AbstractVector, res)
throw(ArgumentError("mixing single values and vectors in a named tuple is not allowed"))
else
_insert_row_multicolumn(df, newdf, allow_resizing_newdf, colnames, res)
end
elseif res isa DataFrameRow
_insert_row_multicolumn(df, newdf, allow_resizing_newdf, colnames, res)
end
_add_multicol_res(res, newdf, df, colnames, allow_resizing_newdf, fun,
col_idx, copycols, newname)
elseif res isa AbstractVector
if newname === nothing
newname = :x1
end
if newname in transformed_cols
throw(ArgumentError("duplicate name of a transformed column"))
throw(ArgumentError("duplicate output column name: :$newname"))
else
push!(transformed_cols, newname)
end
lr = length(res)
_fix_existing_columns_for_vector(newdf, df, allow_resizing_newdf, lr, fun)
_add_col_check_copy(df, newdf, col_idx, copycols, fun, newname, res)
_add_col_check_copy(newdf, df, col_idx, copycols, fun, newname, res)
else
if newname === nothing
newname = :x1
end
if newname in transformed_cols
throw(ArgumentError("duplicate name of a transformed column"))
throw(ArgumentError("duplicate output column name: :$newname"))
else
push!(transformed_cols, newname)
end
Expand Down Expand Up @@ -535,8 +574,7 @@ select!(df::DataFrame, args...; renamecols::Bool=true) =

function select!(arg::Base.Callable, df::AbstractDataFrame; renamecols::Bool=true)
if arg isa Colon
throw(ArgumentError("Only transformations are allowed when function is a " *
"frist argument to select!"))
throw(ArgumentError("First argument must be a transformation if the second argument is a data frame"))
end
return select!(df, arg)
end
Expand All @@ -555,8 +593,7 @@ transform!(df::DataFrame, args...; renamecols::Bool=true) =

function transform!(arg::Base.Callable, df::AbstractDataFrame; renamecols::Bool=true)
if arg isa Colon
throw(ArgumentError("Only transformations are allowed when function is a " *
"frist argument to transform!"))
throw(ArgumentError("First argument must be a transformation if the second argument is a data frame"))
end
return transform!(df, arg)
end
Expand Down Expand Up @@ -686,8 +723,7 @@ select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=tru

function select(arg::Base.Callable, df::AbstractDataFrame; renamecols::Bool=true)
if arg isa Colon
throw(ArgumentError("Only transformations are allowed when function is a " *
"frist argument to select"))
throw(ArgumentError("First argument must be a transformation if the second argument is a data frame"))
end
return select(df, arg)
end
Expand All @@ -707,8 +743,7 @@ transform(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=

function transform(arg::Base.Callable, df::AbstractDataFrame; renamecols::Bool=true)
if arg isa Colon
throw(ArgumentError("Only transformations are allowed when function is a " *
"frist argument to transform"))
throw(ArgumentError("First argument to must be a transformation if the second argument is a data frame"))
end
return transform(df, arg)
end
Expand Down Expand Up @@ -750,8 +785,7 @@ combine(df::AbstractDataFrame, args...; renamecols::Bool=true) =

function combine(arg::Base.Callable, df::AbstractDataFrame; renamecols::Bool=true)
if arg isa Colon
throw(ArgumentError("Only transformations are allowed when function is a " *
"frist argument to combine"))
throw(ArgumentError("First argument to select! must be a transformation if the second argument is a data frame"))
end
return combine(df, arg)
end
Expand Down
Loading