Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Support default values for filename dataset #160

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 112 additions & 36 deletions hybridbackend/tensorflow/common/arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,53 +381,115 @@ Status MakeTensorsFromArrowArray(
return Status::OK();
}

#define CASE_TENSOR_FILL(ENUM, OUT, IN) \
case ENUM: { \
OUT.flat<EnumToDataType<ENUM>::Type>().setConstant( \
IN.scalar<EnumToDataType<ENUM>::Type>()()); \
break; \
}

Status MakeTensorsFromRecordDefaultValue(const DataType type,
const int32 ragged_rank,
const PartialTensorShape& shape,
const int64 actual_batch_size,
const Tensor& record_default,
std::vector<Tensor>* output_tensors) {
TensorShape actual_shape;
if (!TF_PREDICT_TRUE(PartialTensorShape({actual_batch_size})
.Concatenate(shape)
.AsTensorShape(&actual_shape))) {
return errors::InvalidArgument(
"Calculated shape of input batch is not fully defined");
}
Tensor values_tensor(type, actual_shape);
switch (type) {
CASE_TENSOR_FILL(DT_INT8, values_tensor, record_default);
CASE_TENSOR_FILL(DT_UINT8, values_tensor, record_default);
CASE_TENSOR_FILL(DT_INT32, values_tensor, record_default);
CASE_TENSOR_FILL(DT_UINT32, values_tensor, record_default);
CASE_TENSOR_FILL(DT_INT64, values_tensor, record_default);
CASE_TENSOR_FILL(DT_UINT64, values_tensor, record_default);
CASE_TENSOR_FILL(DT_HALF, values_tensor, record_default);
CASE_TENSOR_FILL(DT_FLOAT, values_tensor, record_default);
CASE_TENSOR_FILL(DT_DOUBLE, values_tensor, record_default);
CASE_TENSOR_FILL(DT_STRING, values_tensor, record_default);
default:
return errors::Unimplemented("Data type ", DataTypeString(type),
" not supported.");
}
output_tensors->emplace_back(std::move(values_tensor));
int32 remained_ragged_rank = ragged_rank;
int64 stride = shape.num_elements();
if (stride < 1) {
stride = 1;
}
while (remained_ragged_rank > 0) {
Tensor split_tensor(DT_INT32, {actual_batch_size + 1});
auto split_tensor_flat = split_tensor.tensor<int32, 1>();
split_tensor_flat(0) = 0;
for (size_t i = 0; i < actual_batch_size; i++) {
split_tensor_flat(i + 1) = (i + 1) * stride;
}
output_tensors->emplace_back(std::move(split_tensor));
remained_ragged_rank--;
}

return Status::OK();
}

Status ValidateSchema(const string& filename,
const std::vector<string>& field_names,
const DataTypeVector& field_dtypes,
const std::vector<int32>& field_ragged_ranks,
std::shared_ptr<::arrow::Schema>& schema,
std::vector<int>* out_column_indices) {
std::vector<int>* columns,
std::vector<int>* field_column_indices) {
if (TF_PREDICT_FALSE(columns == nullptr || field_column_indices == nullptr)) {
return errors::Internal("columns and field_column_indices must be valid");
}
if (TF_PREDICT_FALSE(!schema->HasDistinctFieldNames())) {
return errors::InvalidArgument(filename, " must has distinct column names");
}
int column_idx = 0;
for (size_t i = 0; i < field_names.size(); ++i) {
auto& cname = field_names[i];
int column_index = schema->GetFieldIndex(cname);
if (TF_PREDICT_FALSE(column_index < 0)) {
return errors::NotFound("No column called `", cname, "` found in ",
filename);
}
if (out_column_indices != nullptr) {
out_column_indices->push_back(column_index);
}
const auto& expected_dtype = field_dtypes[i];
const auto& expected_ragged_rank = field_ragged_ranks[i];
DataType actual_dtype;
int32 actual_ragged_rank = 0;
TF_RETURN_IF_ERROR(MakeDataTypeAndRaggedRankFromArrowDataType(
schema->field(column_index)->type(), &actual_dtype,
&actual_ragged_rank));
if (TF_PREDICT_FALSE(actual_dtype != expected_dtype)) {
return errors::InvalidArgument(
"Field ", cname, " in ", filename, " has unexpected data type ",
DataTypeString(actual_dtype), ", which should be ",
DataTypeString(expected_dtype));
}
if (TF_PREDICT_FALSE(actual_ragged_rank != expected_ragged_rank)) {
return errors::InvalidArgument(
"Field ", cname, " in ", filename, " has unexpected ragged rank ",
actual_ragged_rank, ", which should be ", expected_ragged_rank);
int column = schema->GetFieldIndex(cname);
if (TF_PREDICT_FALSE(column < 0)) {
field_column_indices->push_back(-1);
} else {
columns->push_back(column);
field_column_indices->push_back(column_idx);
column_idx++;
const auto& expected_dtype = field_dtypes[i];
const auto& expected_ragged_rank = field_ragged_ranks[i];
DataType actual_dtype;
int32 actual_ragged_rank = 0;
TF_RETURN_IF_ERROR(MakeDataTypeAndRaggedRankFromArrowDataType(
schema->field(column)->type(), &actual_dtype, &actual_ragged_rank));
if (TF_PREDICT_FALSE(actual_dtype != expected_dtype)) {
return errors::InvalidArgument(
"Field ", cname, " in ", filename, " has unexpected data type ",
DataTypeString(actual_dtype), ", which should be ",
DataTypeString(expected_dtype));
}
if (TF_PREDICT_FALSE(actual_ragged_rank != expected_ragged_rank)) {
return errors::InvalidArgument(
"Field ", cname, " in ", filename, " has unexpected ragged rank ",
actual_ragged_rank, ", which should be ", expected_ragged_rank);
}
}
}
return Status::OK();
}

Status ReadRecordBatch(::arrow::RecordBatchReader* batch_reader,
const string& filename, const int64 batch_size,
const std::vector<Tensor>& record_defaults,
const std::vector<string>& field_names,
const DataTypeVector& field_dtypes,
const std::vector<int32>& field_ragged_ranks,
const std::vector<PartialTensorShape>& field_shapes,
const std::vector<int>& field_column_indices,
const bool drop_remainder, const int64 row_limit,
std::vector<Tensor>* output_tensors,
int64* row_counter) {
Expand All @@ -450,19 +512,33 @@ Status ReadRecordBatch(::arrow::RecordBatchReader* batch_reader,

// Populate tensors from record batch.
auto arrays = batch->columns();
for (size_t i = 0; i < arrays.size(); ++i) {
auto s =
MakeTensorsFromArrowArray(field_dtypes[i], field_ragged_ranks[i],
field_shapes[i], arrays[i], output_tensors);
if (!s.ok()) {
return errors::DataLoss("Failed to parse row #", *row_counter, " - #",
(*row_counter) + batch->num_rows(), " at column ",
field_names[i], " (#", i, ") in ", filename, ": ",
s.error_message());
const int64 actual_batch_size = batch->num_rows();
for (size_t i = 0; i < field_names.size(); ++i) {
int column_idx = field_column_indices[i];
if (column_idx == -1) {
auto s = MakeTensorsFromRecordDefaultValue(
field_dtypes[i], field_ragged_ranks[i], field_shapes[i],
actual_batch_size, record_defaults[i], output_tensors);
if (!s.ok()) {
return errors::DataLoss(
"Failed to populate default value for row #", *row_counter, " - #",
(*row_counter) + actual_batch_size, " at column ", field_names[i],
" (#", i, ") in ", filename, ": ", s.error_message());
}
} else {
auto s = MakeTensorsFromArrowArray(field_dtypes[i], field_ragged_ranks[i],
field_shapes[i], arrays[column_idx],
output_tensors);
if (!s.ok()) {
return errors::DataLoss("Failed to parse row #", *row_counter, " - #",
(*row_counter) + actual_batch_size,
" at column ", field_names[i], " (#", i,
") in ", filename, ": ", s.error_message());
}
}
}

(*row_counter) += batch->num_rows();
(*row_counter) += actual_batch_size;
return Status::OK();
#else
return errors::Unimplemented("HYBRIDBACKEND_WITH_ARROW must be ON");
Expand Down
10 changes: 9 additions & 1 deletion hybridbackend/tensorflow/common/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,27 @@ Status MakeTensorsFromArrowArray(
const std::shared_ptr<::arrow::Array>& arrow_array,
std::vector<Tensor>* output_tensors);

Status MakeTensorsFromDefaultValue(const DataType type, const int32 ragged_rank,
const PartialTensorShape& shape,
const Tensor& default_value,
std::vector<Tensor>* output_tensors);

Status ValidateSchema(const string& filename,
const std::vector<string>& field_names,
const DataTypeVector& field_dtypes,
const std::vector<int32>& field_ragged_ranks,
std::shared_ptr<::arrow::Schema>& schema,
std::vector<int>* out_column_indices);
std::vector<int>* columns,
std::vector<int>* field_column_indices);

Status ReadRecordBatch(::arrow::RecordBatchReader* batch_reader,
const string& filename, const int64 batch_size,
const std::vector<Tensor>& record_defaults,
const std::vector<string>& field_names,
const DataTypeVector& field_dtypes,
const std::vector<int32>& field_ragged_ranks,
const std::vector<PartialTensorShape>& field_shapes,
const std::vector<int>& field_column_indices,
const bool drop_remainder, const int64 row_limit,
std::vector<Tensor>* output_tensors, int64* row_counter);

Expand Down
1 change: 0 additions & 1 deletion hybridbackend/tensorflow/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

from hybridbackend.tensorflow.data.dataframe import DataFrame
from hybridbackend.tensorflow.data.dataframe import parse
from hybridbackend.tensorflow.data.dataframe import populate_defaults
from hybridbackend.tensorflow.data.dataframe import unbatch_and_to_sparse
from hybridbackend.tensorflow.data.deduplicate.dataset import deduplicate
from hybridbackend.tensorflow.data.prefetch.iterator import Iterator
Expand Down
Loading