From 0c6491c8680482a9671a5751848212e41a668168 Mon Sep 17 00:00:00 2001 From: Yin Ye Date: Mon, 10 Mar 2025 12:50:46 -0700 Subject: [PATCH 1/6] skeleton of parquet op kernel --- WORKSPACE | 1 - tensorflow_io/arrow.py | 3 + .../core/kernels/arrow/arrow_dataset_ops.cc | 463 +++++++++++++----- tensorflow_io/core/ops/arrow_ops.cc | 28 +- 4 files changed, 364 insertions(+), 131 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index 19c30f7e9..5c7c9e63f 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -54,7 +54,6 @@ http_archive( sha256 = "14bf9bf97431b890e0ae5dca8f8904841d4883b8596a7108a42f5700ae58d711", strip_prefix = "google-cloud-cpp-1.21.0", urls = [ - "https://storage.googleapis.com/mirror.tensorflow.org/github.com/googleapis/google-cloud-cpp/archive/v1.21.0.tar.gz", "https://github.com/googleapis/google-cloud-cpp/archive/v1.21.0.tar.gz", ], ) diff --git a/tensorflow_io/arrow.py b/tensorflow_io/arrow.py index 44de3253c..2f325f385 100644 --- a/tensorflow_io/arrow.py +++ b/tensorflow_io/arrow.py @@ -17,6 +17,7 @@ @@ArrowDataset @@ArrowFeatherDataset @@ArrowStreamDataset +@@ArrowParquetDataset @@list_feather_columns """ @@ -26,6 +27,7 @@ from tensorflow_io.python.ops.arrow_dataset_ops import ArrowDataset from tensorflow_io.python.ops.arrow_dataset_ops import ArrowFeatherDataset from tensorflow_io.python.ops.arrow_dataset_ops import ArrowStreamDataset +from tensorflow_io.python.ops.arrow_dataset_ops import ArrowParquetDataset from tensorflow_io.python.ops.arrow_dataset_ops import list_feather_columns @@ -33,6 +35,7 @@ "ArrowDataset", "ArrowFeatherDataset", "ArrowStreamDataset", + "ArrowParquetDataset", "list_feather_columns", ] diff --git a/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc b/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc index f6ee8a434..124af2a6d 100644 --- a/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc +++ b/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc @@ -17,6 +17,7 @@ limitations under the License. #include "arrow/io/stdio.h" #include "arrow/ipc/api.h" #include "arrow/result.h" +#include "parquet/arrow/reader.h" #include "tensorflow/core/framework/dataset.h" #include "tensorflow/core/framework/dataset_options.pb.h" #include "tensorflow/core/graph/graph.h" @@ -29,13 +30,22 @@ limitations under the License. namespace tensorflow { namespace data { +namespace { + +// TODO(yye): implement this func and support reading parquet data in +// streamining way. +Status OpenParquetFile(arrow::fs::FileSystem *fs, + arrow::io::RandomAccessFile *file) {} + +} // namespace + enum ArrowBatchMode { BATCH_KEEP_REMAINDER, BATCH_DROP_REMAINDER, BATCH_AUTO, }; -Status GetBatchModeStr(ArrowBatchMode batch_mode, tstring* batch_mode_str) { +Status GetBatchModeStr(ArrowBatchMode batch_mode, tstring *batch_mode_str) { switch (batch_mode) { case ArrowBatchMode::BATCH_KEEP_REMAINDER: *batch_mode_str = "keep_remainder"; @@ -53,7 +63,7 @@ Status GetBatchModeStr(ArrowBatchMode batch_mode, tstring* batch_mode_str) { return OkStatus(); } -Status GetBatchMode(string batch_mode_str, ArrowBatchMode* batch_mode) { +Status GetBatchMode(string batch_mode_str, ArrowBatchMode *batch_mode) { if (batch_mode_str == "keep_remainder") { *batch_mode = ArrowBatchMode::BATCH_KEEP_REMAINDER; } else if (batch_mode_str == "drop_remainder") { @@ -70,10 +80,10 @@ Status GetBatchMode(string batch_mode_str, ArrowBatchMode* batch_mode) { // iterator that iterates over rows of the batch to get Tensors class ArrowDatasetBase : public DatasetBase { public: - ArrowDatasetBase(OpKernelContext* ctx, const std::vector& columns, + ArrowDatasetBase(OpKernelContext *ctx, const std::vector &columns, const int64 batch_size, const ArrowBatchMode batch_mode, - const DataTypeVector& output_types, - const std::vector& output_shapes) + const DataTypeVector &output_types, + const std::vector &output_shapes) : DatasetBase(DatasetContext(ctx)), columns_(columns), batch_size_(batch_size), @@ -81,9 +91,9 @@ class ArrowDatasetBase : public DatasetBase { output_types_(output_types), output_shapes_(output_shapes) {} - const DataTypeVector& output_dtypes() const override { return output_types_; } + const DataTypeVector &output_dtypes() const override { return output_types_; } - const std::vector& output_shapes() const override { + const std::vector &output_shapes() const override { return output_shapes_; } @@ -95,12 +105,12 @@ class ArrowDatasetBase : public DatasetBase { class ArrowBaseIterator : public DatasetIterator { public: ArrowBaseIterator( - const typename DatasetIterator::Params& params) + const typename DatasetIterator::Params ¶ms) : DatasetIterator(params) {} - Status GetNextInternal(IteratorContext* ctx, - std::vector* out_tensors, - bool* end_of_sequence) override { + Status GetNextInternal(IteratorContext *ctx, + std::vector *out_tensors, + bool *end_of_sequence) override { mutex_lock l(mu_); // If in initial state, setup and read first batch @@ -108,7 +118,7 @@ class ArrowDatasetBase : public DatasetBase { TF_RETURN_IF_ERROR(SetupStreamsLocked(ctx->env())); } - std::vector* result_tensors = out_tensors; + std::vector *result_tensors = out_tensors; auto partial_batches = std::vector>>(); int64 partial_batch_size = 0; @@ -206,9 +216,9 @@ class ArrowDatasetBase : public DatasetBase { private: Status AppendPartialTensors( - IteratorContext* ctx, int64 batch_size, - const std::vector>>& partials, - std::vector* out_tensors) { + IteratorContext *ctx, int64 batch_size, + const std::vector>> &partials, + std::vector *out_tensors) { int64 batch_index = 0; // If only one partial batch, can just move to output @@ -222,7 +232,7 @@ class ArrowDatasetBase : public DatasetBase { it_partial++) { int64 partial_batch_size = 0; for (size_t i = 0; i < (*it_partial)->size(); ++i) { - const Tensor& element = (*it_partial)->at(i); + const Tensor &element = (*it_partial)->at(i); partial_batch_size = element.dim_size(0); // Allocate tensor sized to batch on first iteration @@ -243,7 +253,7 @@ class ArrowDatasetBase : public DatasetBase { } template - Status HandleElementsToParent(const Tensor& element, Tensor* parent, + Status HandleElementsToParent(const Tensor &element, Tensor *parent, int64 index) { // TODO: look into removing this loop, move tensor instead of copy for (int64 i = 0; i < element.dim_size(0); ++i) { @@ -253,7 +263,7 @@ class ArrowDatasetBase : public DatasetBase { return OkStatus(); } - Status CopyElementsToParent(const Tensor& element, Tensor* parent, + Status CopyElementsToParent(const Tensor &element, Tensor *parent, int64 index) { #define HANDLE_TYPE(T) \ case DataTypeToEnum::value: { \ @@ -275,24 +285,24 @@ class ArrowDatasetBase : public DatasetBase { } protected: - Status SaveInternal(SerializationContext* ctx, - IteratorStateWriter* writer) override { + Status SaveInternal(SerializationContext *ctx, + IteratorStateWriter *writer) override { return errors::Unimplemented("SaveInternal is currently not supported"); } - Status RestoreInternal(IteratorContext* ctx, - IteratorStateReader* reader) override { + Status RestoreInternal(IteratorContext *ctx, + IteratorStateReader *reader) override { return errors::Unimplemented( "RestoreInternal is currently not supported"); } // Setup Arrow record batch consumer and initialze current_batch_ - virtual Status SetupStreamsLocked(Env* env) + virtual Status SetupStreamsLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) = 0; // Get the next Arrow record batch, if available. If not then // current_batch_ will be set to nullptr to indicate no further batches. - virtual Status NextStreamLocked(Env* env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + virtual Status NextStreamLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { current_batch_ = nullptr; current_row_idx_ = 0; return OkStatus(); @@ -336,14 +346,14 @@ class ArrowOpKernelBase : public DatasetOpKernel { public: using DatasetOpKernel::DatasetOpKernel; - ArrowOpKernelBase(OpKernelConstruction* ctx) : DatasetOpKernel(ctx) { + ArrowOpKernelBase(OpKernelConstruction *ctx) : DatasetOpKernel(ctx) { OP_REQUIRES_OK(ctx, ctx->GetAttr("output_types", &output_types_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("output_shapes", &output_shapes_)); - for (const DataType& dt : output_types_) { + for (const DataType &dt : output_types_) { std::shared_ptr arrow_type; OP_REQUIRES_OK(ctx, ArrowUtil::GetArrowType(dt, &arrow_type)); } - for (const PartialTensorShape& pts : output_shapes_) { + for (const PartialTensorShape &pts : output_shapes_) { OP_REQUIRES(ctx, -1 <= pts.dims() && pts.dims() <= 2, errors::InvalidArgument("Output shape must be a scalar, " "vector, matrix or unknown")); @@ -351,8 +361,8 @@ class ArrowOpKernelBase : public DatasetOpKernel { } private: - void MakeDataset(OpKernelContext* ctx, DatasetBase** output) override { - const Tensor* columns_tensor; + void MakeDataset(OpKernelContext *ctx, DatasetBase **output) override { + const Tensor *columns_tensor; OP_REQUIRES_OK(ctx, ctx->input("columns", &columns_tensor)); OP_REQUIRES( ctx, columns_tensor->dims() <= 1, @@ -374,7 +384,7 @@ class ArrowOpKernelBase : public DatasetOpKernel { ArrowBatchMode batch_mode; OP_REQUIRES_OK(ctx, GetBatchMode(batch_mode_str, &batch_mode)); - ArrowDatasetBase* arrow_output; + ArrowDatasetBase *arrow_output; MakeArrowDataset(ctx, columns, batch_size, batch_mode, output_types_, output_shapes_, &arrow_output); *output = arrow_output; @@ -383,11 +393,11 @@ class ArrowOpKernelBase : public DatasetOpKernel { protected: // Define to construct an implementation of ArrowDatasetBase virtual void MakeArrowDataset( - OpKernelContext* ctx, const std::vector& columns, + OpKernelContext *ctx, const std::vector &columns, const int64 batch_size, const ArrowBatchMode batch_mode, - const DataTypeVector& output_types, - const std::vector& output_shapes, - ArrowDatasetBase** output) = 0; + const DataTypeVector &output_types, + const std::vector &output_shapes, + ArrowDatasetBase **output) = 0; DataTypeVector output_types_; std::vector output_shapes_; @@ -397,19 +407,19 @@ class ArrowOpKernelBase : public DatasetOpKernel { // from a memory buffer address owned in Python. class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase { public: - explicit ArrowZeroCopyDatasetOp(OpKernelConstruction* ctx) + explicit ArrowZeroCopyDatasetOp(OpKernelConstruction *ctx) : ArrowOpKernelBase(ctx) {} virtual void MakeArrowDataset( - OpKernelContext* ctx, const std::vector& columns, + OpKernelContext *ctx, const std::vector &columns, const int64 batch_size, const ArrowBatchMode batch_mode, - const DataTypeVector& output_types, - const std::vector& output_shapes, - ArrowDatasetBase** output) override { + const DataTypeVector &output_types, + const std::vector &output_shapes, + ArrowDatasetBase **output) override { uintptr_t buffer_address; OP_REQUIRES_OK(ctx, ParseScalarArgument(ctx, "buffer_address", &buffer_address)); - const uint8_t* buffer = reinterpret_cast(buffer_address); + const uint8_t *buffer = reinterpret_cast(buffer_address); int64_t buffer_size; OP_REQUIRES_OK( @@ -421,11 +431,11 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase { private: class Dataset : public ArrowDatasetBase { public: - Dataset(OpKernelContext* ctx, const uint8_t* buffer_ptr, - const int64 buffer_size, const std::vector& columns, + Dataset(OpKernelContext *ctx, const uint8_t *buffer_ptr, + const int64 buffer_size, const std::vector &columns, const int64 batch_size, const ArrowBatchMode batch_mode, - const DataTypeVector& output_types, - const std::vector& output_shapes) + const DataTypeVector &output_types, + const std::vector &output_shapes) : ArrowDatasetBase(ctx, columns, batch_size, batch_mode, output_types, output_shapes), buffer_ptr_(buffer_ptr), @@ -438,20 +448,20 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase { } protected: - Status AsGraphDefInternal(SerializationContext* ctx, - DatasetGraphDefBuilder* b, - Node** output) const override { - Node* buffer = nullptr; + Status AsGraphDefInternal(SerializationContext *ctx, + DatasetGraphDefBuilder *b, + Node **output) const override { + Node *buffer = nullptr; uintptr_t buffer_temp = reinterpret_cast(buffer_ptr_); uint64 buffer_address = buffer_temp; TF_RETURN_IF_ERROR(b->AddScalar(buffer_address, &buffer)); - Node* size = nullptr; + Node *size = nullptr; TF_RETURN_IF_ERROR(b->AddScalar(static_cast(buffer_size_), &size)); - Node* columns = nullptr; + Node *columns = nullptr; TF_RETURN_IF_ERROR(b->AddVector(columns_, &columns)); - Node* batch_size = nullptr; + Node *batch_size = nullptr; TF_RETURN_IF_ERROR(b->AddScalar(batch_size_, &batch_size)); - Node* batch_mode = nullptr; + Node *batch_mode = nullptr; tstring batch_mode_str; TF_RETURN_IF_ERROR(GetBatchModeStr(batch_mode_, &batch_mode_str)); TF_RETURN_IF_ERROR(b->AddScalar(batch_mode_str, &batch_mode)); @@ -461,7 +471,7 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase { } std::unique_ptr MakeIteratorInternal( - const string& prefix) const override { + const string &prefix) const override { return std::unique_ptr( new Iterator({this, strings::StrCat(prefix, "::Arrow")})); } @@ -469,11 +479,11 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase { private: class Iterator : public ArrowBaseIterator { public: - explicit Iterator(const Params& params) + explicit Iterator(const Params ¶ms) : ArrowBaseIterator(params) {} private: - Status SetupStreamsLocked(Env* env) + Status SetupStreamsLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { buffer_ = std::make_shared(dataset()->buffer_ptr_, dataset()->buffer_size_); @@ -494,7 +504,7 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase { return OkStatus(); } - Status NextStreamLocked(Env* env) + Status NextStreamLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { ArrowBaseIterator::NextStreamLocked(env); if (++current_batch_idx_ < num_batches_) { @@ -522,25 +532,229 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase { int num_batches_ TF_GUARDED_BY(mu_) = 0; }; - const uint8_t* buffer_ptr_; + const uint8_t *buffer_ptr_; const int64 buffer_size_; }; }; +class ArrowParquetDatasetOp : public ArrowOpKernelBase { + public: + explicit ArrowParquetDatasetOp(OpKernelConstruction *ctx) + : ArrowOpKernelBase(ctx) {} + + virtual void MakeArrowDataset( + OpKernelContext *ctx, const std::vector &_, const int64 batch_size, + const ArrowBatchMode batch_mode, const DataTypeVector &output_types, + const std::vector &output_shapes, + ArrowDatasetBase **output) override { + const Tensor *file_paths_as_tensor; + const Tensor *column_names_as_tensor; + OP_REQUIRES_OK(ctx, ctx->input("file_paths", &file_paths_as_tensor)); + std::vector file_paths, column_names; + file_paths.reserve(file_paths_as_tensor->NumElements()); + for (int i = 0; i < file_paths_as_tensor->NumElements(); i++) { + file_paths.push_back(file_paths_as_tensor->flat()(i)) + } + column_names.reserve(column_names_as_tensor->NumElements()); + for (int i = 0; i < column_names_as_tensor->NumElements(); i++) { + column_names.push_back(column_names_as_tensor->flat()(i)); + } + *output = new Dataset(ctx, file_paths, column_names, batch_size, batch_mode, + output_types_, output_shapes_); + } + + private: + class Dataset : public ArrowDatasetBase { + public: + Dataset(OpKernelContext *ctx, const std::vector &file_paths, + const std::vector &column_names, + const int64 batch_size, const ArrowBatchMode batch_mode, + const DataTypeVector &output_types, + const std::vector &output_shapes) + : ArrowDatasetBase(ctx, /*columns=*/std::vector(), batch_size, + batch_mode, output_types, output_shapes), + file_paths_(file_paths), + column_names_(column_names) {} + + string DebugString() const override { + return "ArrowParquetDatasetOp::Dataset"; + } + Status InputDatasets(std::vector *inputs) const { + return OkStatus(); + } + Status CheckExternalState() const override { return OkStatus(); } + + protected: + Status AsGraphDefInternal(SerializationContext *ctx, + DatasetGraphDefBuilder *b, + Node **output) const override { + Node *file_paths = nullptr; + TF_RETURN_IF_ERROR(b->AddVector(file_paths_, &file_paths)); + Node *column_names = nullptr; + TF_RETURN_IF_ERROR(b->AddVector(column_names_, &column_names)); + Node *columns = nullptr; + TF_RETURN_IF_ERROR(b->AddVector(columns_, &columns)); + Node *batch_size = nullptr; + TF_RETURN_IF_ERROR(b->AddScalar(batch_size_, &batch_size)); + Node *batch_mode = nullptr; + tstring batch_mode_str; + TF_RETURN_IF_ERROR(GetBatchModeStr(batch_mode_, &batch_mode_str)); + TF_RETURN_IF_ERROR(b->AddScalar(batch_mode_str, &batch_mode)); + TF_RETURN_IF_ERROR(b->AddDataset( + this, {file_paths, column_names, batch_size, batch_mode}, output)); + return OkStatus(); + } + + std::unique_ptr MakeIteratorInternal( + const string &prefix) const override { + return std::unique_ptr( + new Iterator({this, strings::StrCat(prefix, "::Parquet")})); + } + + private: + class Iterator : public ArrowBaseIterator { + public: + explicit Iterator(const Params ¶ms) + : ArrowBaseIterator(params) {} + + private: + // TODO(yye): implementation of getting the first batch. + Status SetupStreamsLocked(Env *env) + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { + TF_RETURN_IF_ERROR(ReadFile(current_file_idx_)); + + // Open and read parquet file. + while (record_batches_.empty() && + ++current_file_idx_ < dataset()->file_paths_.size()) { + TF_RETURN_IF_ERROR(ReadFile(current_file_idx_)); + } + + return OkStatus(); + } + + // TODO(yye): implementation of getting the next batch. + Status NextStreamLocked(Env *env) + TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { + return OkStatus(); + } + + void ResetStreamsLocked() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { + ArrowBaseIterator::ResetStreamsLocked(); + current_file_idx_ = 0; + record_batches_.clear(); + } + + Status ReadFile(int file_index) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + Status res = OkStatus(); + do { + std::shared_ptr file; + res = OpenParquetFile(&fs_, file.get()); + if (!res.ok()) { + break; + } + + parquet::ArrowReaderProperties properties; + properties.set_use_threads(true); + properties.set_pre_buffer(true); + parquet::ReaderProperties parquet_properties = + parquet::default_reader_properties(); + + std::shared_ptr builder = + std::make_shared(); + builder->Open(file, parquet_properties); + + std::unique_ptr reader; + builder->properties(properties)->Build(&reader); + + if (column_indices_.empty()) { + column_indices_.clear(); + std::shared_ptr schema; + reader->GetSchema(&schema); + // check column name exist + std::string err_column_names; + for (const auto &name : dataset()->column_names_) { + int fieldIndex = schema->GetFieldIndex(name); + column_indices_.push_back(fieldIndex); + if (-1 == fieldIndex) { + err_column_names = err_column_names + " " + name; + } + } + + if (err_column_names.length() != 0) { + res = errors::InvalidArgument( + "these column names don't exist: ", err_column_names, + " when read file: ", dataset()->file_paths_[file_index]); + break; + } + } + // Read file columns and build a table + std::shared_ptr<::arrow::Table> table; + arrow::Status arrow_status = + reader->ReadTable(column_indices_, &table); + if (!arrow_status.ok()) { + res = errors::Internal(arrow_status.ToString()); + break; + } + // Convert the table to a sequence of batches + std::shared_ptr batch_reader = + std::make_shared(table); + std::shared_ptr batch = nullptr; + + arrow_status = batch_reader->ReadNext(&batch); + if (!arrow_status.ok()) { + res = errors::Internal(arrow_status.ToString()); + break; + } + res = CheckBatchColumnTypes(batch); + if (!res.ok()) { + break; + } + record_batches_.clear(); + while (batch != nullptr) { + if (batch->num_rows() != 0) { + record_batches_.emplace_back(batch); + } + arrow_status = batch_reader->ReadNext(&batch); + if (!arrow_status.ok()) { + res = errors::Internal(arrow_status.ToString()); + break; + } + } + } while (0); + return res; + } + + size_t current_file_idx_ TF_GUARDED_BY(mu_) = 0; + // TODO(yye): stop maintaining/holding all the record batches. + std::vector> record_batches_ + TF_GUARDED_BY(mu_); + std::shared_ptr fs_ TF_GUARDED_BY(mu_) = nullptr; + + // Maintains the index of the columns to read. + std::vector column_indices_ TF_GUARDED_BY(mu_); + }; + + // path of parquet files. + const std::vector file_paths_; + // column names to read from the parquet files. + const std::vector column_names_; + }; +}; + // Op to create an ArrowSerializedDataset that consumes Arrow record batches // serialized in a Tensor buffer. class ArrowSerializedDatasetOp : public ArrowOpKernelBase { public: - explicit ArrowSerializedDatasetOp(OpKernelConstruction* ctx) + explicit ArrowSerializedDatasetOp(OpKernelConstruction *ctx) : ArrowOpKernelBase(ctx) {} virtual void MakeArrowDataset( - OpKernelContext* ctx, const std::vector& columns, + OpKernelContext *ctx, const std::vector &columns, const int64 batch_size, const ArrowBatchMode batch_mode, - const DataTypeVector& output_types, - const std::vector& output_shapes, - ArrowDatasetBase** output) override { - const Tensor* batches_tensor; + const DataTypeVector &output_types, + const std::vector &output_shapes, + ArrowDatasetBase **output) override { + const Tensor *batches_tensor; OP_REQUIRES_OK(ctx, ctx->input("serialized_batches", &batches_tensor)); OP_REQUIRES(ctx, TensorShapeUtils::IsScalar(batches_tensor->shape()), errors::InvalidArgument("serialized_batches must be a scalar")); @@ -553,10 +767,10 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase { public: // Construct a Dataset that consumed Arrow batches from serialized bytes // in a string. Record batches should be serialized in Arrow File format. - Dataset(OpKernelContext* ctx, const Tensor batches_tensor, - const std::vector& columns, const int64 batch_size, - const ArrowBatchMode batch_mode, const DataTypeVector& output_types, - const std::vector& output_shapes) + Dataset(OpKernelContext *ctx, const Tensor batches_tensor, + const std::vector &columns, const int64 batch_size, + const ArrowBatchMode batch_mode, const DataTypeVector &output_types, + const std::vector &output_shapes) : ArrowDatasetBase(ctx, columns, batch_size, batch_mode, output_types, output_shapes), batches_(std::move(batches_tensor)) {} @@ -568,10 +782,10 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase { Status CheckExternalState() const override { return OkStatus(); } protected: - Status AsGraphDefInternal(SerializationContext* ctx, - DatasetGraphDefBuilder* b, - Node** output) const override { - Node* batches = nullptr; + Status AsGraphDefInternal(SerializationContext *ctx, + DatasetGraphDefBuilder *b, + Node **output) const override { + Node *batches = nullptr; // optimization_only has been removed in // https://github.com/tensorflow/tensorflow/commit/6d8f05acd72df61e5f4e5b4c72837b7caed3e942#diff-5eac6c133a3a701a696767960e796bd3 // if (ctx->optimization_only()) { @@ -582,11 +796,11 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase { // TF_RETURN_IF_ERROR(b->AddTensor(batches_, &batches)); //} TF_RETURN_IF_ERROR(b->AddTensor(batches_, &batches)); - Node* columns = nullptr; + Node *columns = nullptr; TF_RETURN_IF_ERROR(b->AddVector(columns_, &columns)); - Node* batch_size = nullptr; + Node *batch_size = nullptr; TF_RETURN_IF_ERROR(b->AddScalar(batch_size_, &batch_size)); - Node* batch_mode = nullptr; + Node *batch_mode = nullptr; tstring batch_mode_str; TF_RETURN_IF_ERROR(GetBatchModeStr(batch_mode_, &batch_mode_str)); TF_RETURN_IF_ERROR(b->AddScalar(batch_mode_str, &batch_mode)); @@ -596,7 +810,7 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase { } std::unique_ptr MakeIteratorInternal( - const string& prefix) const override { + const string &prefix) const override { return std::unique_ptr( new Iterator({this, strings::StrCat(prefix, "::Arrow")})); } @@ -604,13 +818,13 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase { private: class Iterator : public ArrowBaseIterator { public: - explicit Iterator(const Params& params) + explicit Iterator(const Params ¶ms) : ArrowBaseIterator(params) {} private: - Status SetupStreamsLocked(Env* env) + Status SetupStreamsLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { - const string& batches = dataset()->batches_.scalar()(); + const string &batches = dataset()->batches_.scalar()(); auto buffer = std::make_shared(batches); auto buffer_reader = std::make_shared(buffer); auto result = arrow::ipc::RecordBatchFileReader::Open(buffer_reader); @@ -626,7 +840,7 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase { return OkStatus(); } - Status NextStreamLocked(Env* env) + Status NextStreamLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { ArrowBaseIterator::NextStreamLocked(env); if (++current_batch_idx_ < num_batches_) { @@ -659,16 +873,16 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase { // ideal for simple writing of Pandas DataFrames. class ArrowFeatherDatasetOp : public ArrowOpKernelBase { public: - explicit ArrowFeatherDatasetOp(OpKernelConstruction* ctx) + explicit ArrowFeatherDatasetOp(OpKernelConstruction *ctx) : ArrowOpKernelBase(ctx) {} virtual void MakeArrowDataset( - OpKernelContext* ctx, const std::vector& columns, + OpKernelContext *ctx, const std::vector &columns, const int64 batch_size, const ArrowBatchMode batch_mode, - const DataTypeVector& output_types, - const std::vector& output_shapes, - ArrowDatasetBase** output) override { - const Tensor* filenames_tensor; + const DataTypeVector &output_types, + const std::vector &output_shapes, + ArrowDatasetBase **output) override { + const Tensor *filenames_tensor; OP_REQUIRES_OK(ctx, ctx->input("filenames", &filenames_tensor)); OP_REQUIRES( ctx, filenames_tensor->dims() <= 1, @@ -686,10 +900,10 @@ class ArrowFeatherDatasetOp : public ArrowOpKernelBase { private: class Dataset : public ArrowDatasetBase { public: - Dataset(OpKernelContext* ctx, const std::vector& filenames, - const std::vector& columns, const int64 batch_size, - const ArrowBatchMode batch_mode, const DataTypeVector& output_types, - const std::vector& output_shapes) + Dataset(OpKernelContext *ctx, const std::vector &filenames, + const std::vector &columns, const int64 batch_size, + const ArrowBatchMode batch_mode, const DataTypeVector &output_types, + const std::vector &output_shapes) : ArrowDatasetBase(ctx, columns, batch_size, batch_mode, output_types, output_shapes), filenames_(filenames) {} @@ -701,16 +915,16 @@ class ArrowFeatherDatasetOp : public ArrowOpKernelBase { Status CheckExternalState() const override { return OkStatus(); } protected: - Status AsGraphDefInternal(SerializationContext* ctx, - DatasetGraphDefBuilder* b, - Node** output) const override { - Node* filenames = nullptr; + Status AsGraphDefInternal(SerializationContext *ctx, + DatasetGraphDefBuilder *b, + Node **output) const override { + Node *filenames = nullptr; TF_RETURN_IF_ERROR(b->AddVector(filenames_, &filenames)); - Node* columns = nullptr; + Node *columns = nullptr; TF_RETURN_IF_ERROR(b->AddVector(columns_, &columns)); - Node* batch_size = nullptr; + Node *batch_size = nullptr; TF_RETURN_IF_ERROR(b->AddScalar(batch_size_, &batch_size)); - Node* batch_mode = nullptr; + Node *batch_mode = nullptr; tstring batch_mode_str; TF_RETURN_IF_ERROR(GetBatchModeStr(batch_mode_, &batch_mode_str)); TF_RETURN_IF_ERROR(b->AddScalar(batch_mode_str, &batch_mode)); @@ -720,7 +934,7 @@ class ArrowFeatherDatasetOp : public ArrowOpKernelBase { } std::unique_ptr MakeIteratorInternal( - const string& prefix) const override { + const string &prefix) const override { return std::unique_ptr( new Iterator({this, strings::StrCat(prefix, "::ArrowFeather")})); } @@ -728,13 +942,13 @@ class ArrowFeatherDatasetOp : public ArrowOpKernelBase { private: class Iterator : public ArrowBaseIterator { public: - explicit Iterator(const Params& params) + explicit Iterator(const Params ¶ms) : ArrowBaseIterator(params) {} private: - Status SetupStreamsLocked(Env* env) + Status SetupStreamsLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { - const string& filename = dataset()->filenames_[current_file_idx_]; + const string &filename = dataset()->filenames_[current_file_idx_]; // Init a TF file from the filename and determine size // TODO: set optional memory to nullptr until input arg is added @@ -773,7 +987,7 @@ class ArrowFeatherDatasetOp : public ArrowOpKernelBase { return OkStatus(); } - Status NextStreamLocked(Env* env) + Status NextStreamLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { ArrowBaseIterator::NextStreamLocked(env); if (++current_batch_idx_ < record_batches_.size()) { @@ -809,16 +1023,16 @@ class ArrowFeatherDatasetOp : public ArrowOpKernelBase { // "unix://", and STDIN with endpoint "fd://0" or "fd://-". class ArrowStreamDatasetOp : public ArrowOpKernelBase { public: - explicit ArrowStreamDatasetOp(OpKernelConstruction* ctx) + explicit ArrowStreamDatasetOp(OpKernelConstruction *ctx) : ArrowOpKernelBase(ctx) {} virtual void MakeArrowDataset( - OpKernelContext* ctx, const std::vector& columns, + OpKernelContext *ctx, const std::vector &columns, const int64 batch_size, const ArrowBatchMode batch_mode, - const DataTypeVector& output_types, - const std::vector& output_shapes, - ArrowDatasetBase** output) override { - const Tensor* endpoints_tensor; + const DataTypeVector &output_types, + const std::vector &output_shapes, + ArrowDatasetBase **output) override { + const Tensor *endpoints_tensor; OP_REQUIRES_OK(ctx, ctx->input("endpoints", &endpoints_tensor)); OP_REQUIRES( ctx, endpoints_tensor->dims() <= 1, @@ -836,10 +1050,10 @@ class ArrowStreamDatasetOp : public ArrowOpKernelBase { private: class Dataset : public ArrowDatasetBase { public: - Dataset(OpKernelContext* ctx, const std::vector& endpoints, - const std::vector& columns, const int64 batch_size, - const ArrowBatchMode batch_mode, const DataTypeVector& output_types, - const std::vector& output_shapes) + Dataset(OpKernelContext *ctx, const std::vector &endpoints, + const std::vector &columns, const int64 batch_size, + const ArrowBatchMode batch_mode, const DataTypeVector &output_types, + const std::vector &output_shapes) : ArrowDatasetBase(ctx, columns, batch_size, batch_mode, output_types, output_shapes), endpoints_(endpoints) {} @@ -851,16 +1065,16 @@ class ArrowStreamDatasetOp : public ArrowOpKernelBase { Status CheckExternalState() const override { return OkStatus(); } protected: - Status AsGraphDefInternal(SerializationContext* ctx, - DatasetGraphDefBuilder* b, - Node** output) const override { - Node* endpoints = nullptr; + Status AsGraphDefInternal(SerializationContext *ctx, + DatasetGraphDefBuilder *b, + Node **output) const override { + Node *endpoints = nullptr; TF_RETURN_IF_ERROR(b->AddVector(endpoints_, &endpoints)); - Node* columns = nullptr; + Node *columns = nullptr; TF_RETURN_IF_ERROR(b->AddVector(columns_, &columns)); - Node* batch_size = nullptr; + Node *batch_size = nullptr; TF_RETURN_IF_ERROR(b->AddScalar(batch_size_, &batch_size)); - Node* batch_mode = nullptr; + Node *batch_mode = nullptr; tstring batch_mode_str; TF_RETURN_IF_ERROR(GetBatchModeStr(batch_mode_, &batch_mode_str)); TF_RETURN_IF_ERROR(b->AddScalar(batch_mode_str, &batch_mode)); @@ -870,7 +1084,7 @@ class ArrowStreamDatasetOp : public ArrowOpKernelBase { } std::unique_ptr MakeIteratorInternal( - const string& prefix) const override { + const string &prefix) const override { return std::unique_ptr( new Iterator({this, strings::StrCat(prefix, "::ArrowStream")})); } @@ -878,13 +1092,13 @@ class ArrowStreamDatasetOp : public ArrowOpKernelBase { private: class Iterator : public ArrowBaseIterator { public: - explicit Iterator(const Params& params) + explicit Iterator(const Params ¶ms) : ArrowBaseIterator(params) {} private: - Status SetupStreamsLocked(Env* env) + Status SetupStreamsLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { - const string& endpoint = dataset()->endpoints_[current_endpoint_idx_]; + const string &endpoint = dataset()->endpoints_[current_endpoint_idx_]; string endpoint_type; string endpoint_value; TF_RETURN_IF_ERROR(ArrowUtil::ParseEndpoint(endpoint, &endpoint_type, @@ -910,7 +1124,7 @@ class ArrowStreamDatasetOp : public ArrowOpKernelBase { return OkStatus(); } - Status NextStreamLocked(Env* env) + Status NextStreamLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { ArrowBaseIterator::NextStreamLocked(env); CHECK_ARROW(reader_->ReadNext(¤t_batch_)); @@ -947,6 +1161,9 @@ REGISTER_KERNEL_BUILDER(Name("IO>ArrowSerializedDataset").Device(DEVICE_CPU), REGISTER_KERNEL_BUILDER(Name("IO>ArrowFeatherDataset").Device(DEVICE_CPU), ArrowFeatherDatasetOp); +REGISTER_KERNEL_BUILDER(Name("IO>ArrowParquetDataset").Device(DEVICE_CPU), + ArrowParquetDatasetOp); + REGISTER_KERNEL_BUILDER(Name("IO>ArrowStreamDataset").Device(DEVICE_CPU), ArrowStreamDatasetOp); diff --git a/tensorflow_io/core/ops/arrow_ops.cc b/tensorflow_io/core/ops/arrow_ops.cc index 34d027032..e657885e5 100644 --- a/tensorflow_io/core/ops/arrow_ops.cc +++ b/tensorflow_io/core/ops/arrow_ops.cc @@ -38,6 +38,20 @@ in file format. buffer_size: Buffer size in bytes )doc"); +REGISTER_OP("IO>ArrowParquetDataset") + .Input("file_paths: string") + .Input("column_names: string") + .Input("batch_size: int64") + .Input("batch_mode: string") + .Output("handle: variant") + .Attr("output_types: list(type) >= 1") + .Attr("output_shapes: list(shape) >= 1") + .SetIsStateful() + .SetShapeFn(shape_inference::ScalarShape) + .Doc(R"doc( +Creates a dataset from parquet files. +)doc"); + REGISTER_OP("IO>ArrowSerializedDataset") .Input("serialized_batches: string") .Input("columns: int32") @@ -92,7 +106,7 @@ REGISTER_OP("IO>ListFeatherColumns") .Output("columns: string") .Output("dtypes: string") .Output("shapes: int64") - .SetShapeFn([](shape_inference::InferenceContext* c) { + .SetShapeFn([](shape_inference::InferenceContext *c) { c->set_output(0, c->MakeShape({c->UnknownDim()})); c->set_output(1, c->MakeShape({c->UnknownDim()})); c->set_output(2, c->MakeShape({c->UnknownDim(), c->UnknownDim()})); @@ -105,7 +119,7 @@ REGISTER_OP("IO>FeatherReadableInit") .Output("components: string") .Attr("container: string = ''") .Attr("shared_name: string = ''") - .SetShapeFn([](shape_inference::InferenceContext* c) { + .SetShapeFn([](shape_inference::InferenceContext *c) { c->set_output(0, c->Scalar()); c->set_output(1, c->MakeShape({})); return OkStatus(); @@ -116,7 +130,7 @@ REGISTER_OP("IO>FeatherReadableSpec") .Output("shape: int64") .Output("dtype: int64") .Attr("component: string") - .SetShapeFn([](shape_inference::InferenceContext* c) { + .SetShapeFn([](shape_inference::InferenceContext *c) { c->set_output(0, c->MakeShape({c->UnknownDim()})); c->set_output(1, c->MakeShape({})); return OkStatus(); @@ -130,7 +144,7 @@ REGISTER_OP("IO>FeatherReadableRead") .Attr("component: string") .Attr("shape: shape") .Attr("dtype: type") - .SetShapeFn([](shape_inference::InferenceContext* c) { + .SetShapeFn([](shape_inference::InferenceContext *c) { PartialTensorShape shape; TF_RETURN_IF_ERROR(c->GetAttr("shape", &shape)); shape_inference::ShapeHandle entry; @@ -148,7 +162,7 @@ REGISTER_OP("IO>ArrowReadableFromMemoryInit") .Output("resource: resource") .Attr("container: string = ''") .Attr("shared_name: string = ''") - .SetShapeFn([](shape_inference::InferenceContext* c) { + .SetShapeFn([](shape_inference::InferenceContext *c) { c->set_output(0, c->Scalar()); return OkStatus(); }); @@ -159,7 +173,7 @@ REGISTER_OP("IO>ArrowReadableSpec") .Input("column_name: string") .Output("shape: int64") .Output("dtype: int64") - .SetShapeFn([](shape_inference::InferenceContext* c) { + .SetShapeFn([](shape_inference::InferenceContext *c) { c->set_output(0, c->MakeShape({c->UnknownDim()})); c->set_output(1, c->MakeShape({})); return OkStatus(); @@ -174,7 +188,7 @@ REGISTER_OP("IO>ArrowReadableRead") .Input("stop: int64") .Output("value: dtype") .Attr("dtype: type") - .SetShapeFn([](shape_inference::InferenceContext* c) { + .SetShapeFn([](shape_inference::InferenceContext *c) { shape_inference::ShapeHandle full; TF_RETURN_IF_ERROR(c->MakeShapeFromShapeTensor(3, &full)); if (!(c->RankKnown(full) && c->Rank(full) > 0)) { From d786733ce7073c4561a6e74dcf02bdf1e9bc88eb Mon Sep 17 00:00:00 2001 From: Yin Ye Date: Sun, 16 Mar 2025 21:44:42 -0700 Subject: [PATCH 2/6] implementation of parquet arrow reader --- .../core/kernels/arrow/arrow_dataset_ops.cc | 208 ++++++++++-------- 1 file changed, 111 insertions(+), 97 deletions(-) diff --git a/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc b/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc index 124af2a6d..bbaea5770 100644 --- a/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc +++ b/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc @@ -14,6 +14,7 @@ limitations under the License. ==============================================================================*/ #include "arrow/api.h" +#include "arrow/filesystem/localfs.h" #include "arrow/io/stdio.h" #include "arrow/ipc/api.h" #include "arrow/result.h" @@ -32,10 +33,68 @@ namespace data { namespace { -// TODO(yye): implement this func and support reading parquet data in -// streamining way. -Status OpenParquetFile(arrow::fs::FileSystem *fs, - arrow::io::RandomAccessFile *file) {} +// Struct to hold all the resource needed to read a parquet file for better +// management. +struct ParquetReaderResource { + void reset() { + if (this->batch_reader != nullptr) { + this->batch_reader.reset(); + } + if (this->table != nullptr) { + this->table.reset(); + } + if (this->reader != nullptr) { + this->reader.reset(); + } + if (this->file != nullptr) { + this->file.reset(); + } + } + + std::shared_ptr<::arrow::io::RandomAccessFile> file; + std::unique_ptr reader; + std::shared_ptr<::arrow::Table> table; + std::shared_ptr batch_reader; +}; + +arrow::Status OpenParquetFile(const std::string &file_name, + const std::vector &column_names, + const int64 batch_size, arrow::fs::FileSystem *fs, + ParquetReaderResource *resource) { + resource->reset(); + ARROW_ASSIGN_OR_RAISE(resource->file, fs->OpenInputFile(file_name)); + parquet::ArrowReaderProperties properties; + properties.set_use_threads(true); + properties.set_pre_buffer(true); + parquet::ReaderProperties parquet_properties = + parquet::default_reader_properties(); + + std::shared_ptr builder = + std::make_shared(); + builder->Open(resource->file, parquet_properties); + builder->properties(properties)->Build(&resource->reader); + + std::shared_ptr schema; + resource->reader->GetSchema(&schema); + // check column name exist + std::vector missing_columns; + std::vector column_indices; + for (const auto &name : column_names) { + int fieldIndex = schema->GetFieldIndex(name); + column_indices.push_back(fieldIndex); + if (-1 == fieldIndex) { + missing_columns.push_back(name); + } + } + + ARROW_RETURN_NOT_OK( + resource->reader->ReadTable(column_indices, &resource->table)); + // Convert the table to a sequence of batches + resource->batch_reader = + std::make_shared(*(resource->table)); + resource->batch_reader->set_chunksize(batch_size); + return arrow::Status::OK(); +} } // namespace @@ -553,7 +612,7 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { std::vector file_paths, column_names; file_paths.reserve(file_paths_as_tensor->NumElements()); for (int i = 0; i < file_paths_as_tensor->NumElements(); i++) { - file_paths.push_back(file_paths_as_tensor->flat()(i)) + file_paths.push_back(file_paths_as_tensor->flat()(i)); } column_names.reserve(column_names_as_tensor->NumElements()); for (int i = 0; i < column_names_as_tensor->NumElements(); i++) { @@ -618,118 +677,73 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { : ArrowBaseIterator(params) {} private: - // TODO(yye): implementation of getting the first batch. Status SetupStreamsLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { - TF_RETURN_IF_ERROR(ReadFile(current_file_idx_)); - - // Open and read parquet file. - while (record_batches_.empty() && - ++current_file_idx_ < dataset()->file_paths_.size()) { - TF_RETURN_IF_ERROR(ReadFile(current_file_idx_)); - } + this->fs_ = std::make_shared(); + do { + auto arrow_status = + OpenParquetFile(dataset()->file_paths_[current_file_idx_], + dataset()->column_names_, dataset()->batch_size_, + this->fs_.get(), &parquet_reader_resource_); + if (!arrow_status.ok()) { + return errors::Internal(arrow_status.message()); + } + std::shared_ptr next_batch = nullptr; + arrow_status = this->parquet_reader_resource_.batch_reader->ReadNext( + ¤t_batch_); + if (!arrow_status.ok()) { + return errors::Internal(arrow_status.message()); + } + if (current_batch_ == nullptr) { + current_file_idx_ = current_file_idx_ + 1; + } + } while (current_file_idx_ < dataset()->file_paths_.size()); return OkStatus(); } - // TODO(yye): implementation of getting the next batch. Status NextStreamLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { - return OkStatus(); - } - - void ResetStreamsLocked() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { - ArrowBaseIterator::ResetStreamsLocked(); - current_file_idx_ = 0; - record_batches_.clear(); - } - - Status ReadFile(int file_index) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - Status res = OkStatus(); + ArrowBaseIterator::NextStreamLocked(env); do { - std::shared_ptr file; - res = OpenParquetFile(&fs_, file.get()); - if (!res.ok()) { - break; - } - - parquet::ArrowReaderProperties properties; - properties.set_use_threads(true); - properties.set_pre_buffer(true); - parquet::ReaderProperties parquet_properties = - parquet::default_reader_properties(); - - std::shared_ptr builder = - std::make_shared(); - builder->Open(file, parquet_properties); - - std::unique_ptr reader; - builder->properties(properties)->Build(&reader); - - if (column_indices_.empty()) { - column_indices_.clear(); - std::shared_ptr schema; - reader->GetSchema(&schema); - // check column name exist - std::string err_column_names; - for (const auto &name : dataset()->column_names_) { - int fieldIndex = schema->GetFieldIndex(name); - column_indices_.push_back(fieldIndex); - if (-1 == fieldIndex) { - err_column_names = err_column_names + " " + name; - } - } - - if (err_column_names.length() != 0) { - res = errors::InvalidArgument( - "these column names don't exist: ", err_column_names, - " when read file: ", dataset()->file_paths_[file_index]); - break; - } - } - // Read file columns and build a table - std::shared_ptr<::arrow::Table> table; - arrow::Status arrow_status = - reader->ReadTable(column_indices_, &table); + auto arrow_status = + this->parquet_reader_resource_.batch_reader->ReadNext( + ¤t_batch_); if (!arrow_status.ok()) { - res = errors::Internal(arrow_status.ToString()); - break; + return errors::Internal(arrow_status.message()); } - // Convert the table to a sequence of batches - std::shared_ptr batch_reader = - std::make_shared(table); - std::shared_ptr batch = nullptr; - - arrow_status = batch_reader->ReadNext(&batch); - if (!arrow_status.ok()) { - res = errors::Internal(arrow_status.ToString()); - break; - } - res = CheckBatchColumnTypes(batch); - if (!res.ok()) { - break; - } - record_batches_.clear(); - while (batch != nullptr) { - if (batch->num_rows() != 0) { - record_batches_.emplace_back(batch); + if (current_batch_ != nullptr) { + return OkStatus(); + } else { + current_file_idx_ = current_file_idx_ + 1; + if (current_file_idx_ == dataset()->file_paths_.size()) { + break; } - arrow_status = batch_reader->ReadNext(&batch); + auto arrow_status = OpenParquetFile( + dataset()->file_paths_[current_file_idx_], + dataset()->column_names_, dataset()->batch_size_, + this->fs_.get(), &parquet_reader_resource_); if (!arrow_status.ok()) { - res = errors::Internal(arrow_status.ToString()); - break; + return errors::Internal(arrow_status.message()); } } - } while (0); - return res; + } while (true); + return OkStatus(); + } + + void ResetStreamsLocked() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { + ArrowBaseIterator::ResetStreamsLocked(); + current_file_idx_ = 0; + parquet_reader_resource_.reset(); + fs_.reset(); } size_t current_file_idx_ TF_GUARDED_BY(mu_) = 0; - // TODO(yye): stop maintaining/holding all the record batches. - std::vector> record_batches_ - TF_GUARDED_BY(mu_); + std::shared_ptr fs_ TF_GUARDED_BY(mu_) = nullptr; + ParquetReaderResource parquet_reader_resource_ TF_GUARDED_BY(mu_); + // Maintains the index of the columns to read. std::vector column_indices_ TF_GUARDED_BY(mu_); }; From 1681930eb5bf75b737273eca6903793cf88c83d3 Mon Sep 17 00:00:00 2001 From: Yin Ye Date: Sun, 16 Mar 2025 21:57:59 -0700 Subject: [PATCH 3/6] build python wrapper --- tensorflow_io/python/ops/arrow_dataset_ops.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tensorflow_io/python/ops/arrow_dataset_ops.py b/tensorflow_io/python/ops/arrow_dataset_ops.py index e051c0b75..782807db6 100644 --- a/tensorflow_io/python/ops/arrow_dataset_ops.py +++ b/tensorflow_io/python/ops/arrow_dataset_ops.py @@ -651,6 +651,39 @@ def gen_record_batches(): ) +class ArrowParquetDataset(ArrowBaseDataset): + """An Arrow Dataset for reading record batches from parquet files. + """ + + def __init__( + self, + parquet_files, + column_names, + columns, + output_types, + output_shapes=None, + batch_size=None, + batch_mode="keep_remainder", + ): + parquet_files = tf.convert_to_tensor( + parquet_files, dtype=dtypes.string, name="parquet_files" + ) + column_names = tf.convert_to_tensor( + column_names, dtype=dtypes.string, name="column_names" + ) + super().__init__( + partial( + core_ops.io_arrow_parquet_dataset, + parquet_files, + column_names, + ), + columns, + output_types, + output_shapes, + batch_size, + batch_mode, + ) + def list_feather_columns(filename, **kwargs): """list_feather_columns""" if not tf.executing_eagerly(): From ad52c2f817ea770dbfd1fb361f9a7fc6e0beb05b Mon Sep 17 00:00:00 2001 From: yye Date: Fri, 21 Mar 2025 05:18:33 +0000 Subject: [PATCH 4/6] fix build errors --- WORKSPACE | 9 +-- .../core/kernels/arrow/arrow_dataset_ops.cc | 28 ++++++---- tensorflow_io/core/ops/arrow_ops.cc | 1 + tensorflow_io/python/ops/arrow_dataset_ops.py | 8 +-- third_party/arrow.BUILD | 18 +++++- third_party/aws-sdk-cpp.BUILD | 55 +++++++++++++++++++ 6 files changed, 100 insertions(+), 19 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index 5c7c9e63f..ecd18d4e0 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -173,11 +173,12 @@ http_archive( http_archive( name = "arrow", build_file = "//third_party:arrow.BUILD", - sha256 = "57e13c62f27b710e1de54fd30faed612aefa22aa41fa2c0c3bacd204dd18a8f3", - strip_prefix = "arrow-apache-arrow-7.0.0", + patch_cmds = ["""sed -i.bak '24i\\'$'\\n#undef ARROW_WITH_OPENTELEMETRY\\n' cpp/src/arrow/util/tracing_internal.h"""], + sha256 = "19ece12de48e51ce4287d2dee00dc358fbc5ff02f41629d16076f77b8579e272", + strip_prefix = "arrow-apache-arrow-8.0.0", urls = [ - "https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/arrow/archive/apache-arrow-7.0.0.tar.gz", - "https://github.com/apache/arrow/archive/apache-arrow-7.0.0.tar.gz", + "https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/arrow/archive/apache-arrow-8.0.0.tar.gz", + "https://github.com/apache/arrow/archive/apache-arrow-8.0.0.tar.gz", ], ) diff --git a/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc b/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc index bbaea5770..e77c9eeb9 100644 --- a/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc +++ b/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc @@ -13,6 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ +#include + #include "arrow/api.h" #include "arrow/filesystem/localfs.h" #include "arrow/io/stdio.h" @@ -602,8 +604,9 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { : ArrowOpKernelBase(ctx) {} virtual void MakeArrowDataset( - OpKernelContext *ctx, const std::vector &_, const int64 batch_size, - const ArrowBatchMode batch_mode, const DataTypeVector &output_types, + OpKernelContext *ctx, const std::vector &columns, + const int64 batch_size, const ArrowBatchMode batch_mode, + const DataTypeVector &output_types, const std::vector &output_shapes, ArrowDatasetBase **output) override { const Tensor *file_paths_as_tensor; @@ -614,12 +617,13 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { for (int i = 0; i < file_paths_as_tensor->NumElements(); i++) { file_paths.push_back(file_paths_as_tensor->flat()(i)); } + OP_REQUIRES_OK(ctx, ctx->input("column_names", &column_names_as_tensor)); column_names.reserve(column_names_as_tensor->NumElements()); for (int i = 0; i < column_names_as_tensor->NumElements(); i++) { column_names.push_back(column_names_as_tensor->flat()(i)); } - *output = new Dataset(ctx, file_paths, column_names, batch_size, batch_mode, - output_types_, output_shapes_); + *output = new Dataset(ctx, file_paths, column_names, columns, batch_size, + batch_mode, output_types_, output_shapes_); } private: @@ -627,13 +631,15 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { public: Dataset(OpKernelContext *ctx, const std::vector &file_paths, const std::vector &column_names, - const int64 batch_size, const ArrowBatchMode batch_mode, - const DataTypeVector &output_types, + const std::vector &columns, const int64 batch_size, + const ArrowBatchMode batch_mode, const DataTypeVector &output_types, const std::vector &output_shapes) - : ArrowDatasetBase(ctx, /*columns=*/std::vector(), batch_size, - batch_mode, output_types, output_shapes), + : ArrowDatasetBase(ctx, columns, batch_size, batch_mode, output_types, + output_shapes), file_paths_(file_paths), - column_names_(column_names) {} + column_names_(column_names) { + LOG(INFO) << "Dataset called "; + } string DebugString() const override { return "ArrowParquetDatasetOp::Dataset"; @@ -660,7 +666,8 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { TF_RETURN_IF_ERROR(GetBatchModeStr(batch_mode_, &batch_mode_str)); TF_RETURN_IF_ERROR(b->AddScalar(batch_mode_str, &batch_mode)); TF_RETURN_IF_ERROR(b->AddDataset( - this, {file_paths, column_names, batch_size, batch_mode}, output)); + this, {file_paths, column_names, columns, batch_size, batch_mode}, + output)); return OkStatus(); } @@ -679,6 +686,7 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { private: Status SetupStreamsLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { + std::cout << "SetupStreamLocked called " << std::endl; this->fs_ = std::make_shared(); do { auto arrow_status = diff --git a/tensorflow_io/core/ops/arrow_ops.cc b/tensorflow_io/core/ops/arrow_ops.cc index e657885e5..5a2fd251c 100644 --- a/tensorflow_io/core/ops/arrow_ops.cc +++ b/tensorflow_io/core/ops/arrow_ops.cc @@ -41,6 +41,7 @@ buffer_size: Buffer size in bytes REGISTER_OP("IO>ArrowParquetDataset") .Input("file_paths: string") .Input("column_names: string") + .Input("columns: int32") .Input("batch_size: int64") .Input("batch_mode: string") .Output("handle: variant") diff --git a/tensorflow_io/python/ops/arrow_dataset_ops.py b/tensorflow_io/python/ops/arrow_dataset_ops.py index 782807db6..8cbefe555 100644 --- a/tensorflow_io/python/ops/arrow_dataset_ops.py +++ b/tensorflow_io/python/ops/arrow_dataset_ops.py @@ -657,7 +657,7 @@ class ArrowParquetDataset(ArrowBaseDataset): def __init__( self, - parquet_files, + file_paths, column_names, columns, output_types, @@ -665,8 +665,8 @@ def __init__( batch_size=None, batch_mode="keep_remainder", ): - parquet_files = tf.convert_to_tensor( - parquet_files, dtype=dtypes.string, name="parquet_files" + file_paths = tf.convert_to_tensor( + file_paths, dtype=dtypes.string, name="file_paths" ) column_names = tf.convert_to_tensor( column_names, dtype=dtypes.string, name="column_names" @@ -674,7 +674,7 @@ def __init__( super().__init__( partial( core_ops.io_arrow_parquet_dataset, - parquet_files, + file_paths, column_names, ), columns, diff --git a/third_party/arrow.BUILD b/third_party/arrow.BUILD index 53ba65ba9..fdcf80af0 100644 --- a/third_party/arrow.BUILD +++ b/third_party/arrow.BUILD @@ -37,7 +37,12 @@ cc_library( [ "cpp/src/arrow/*.cc", "cpp/src/arrow/array/*.cc", + "cpp/src/arrow/compute/*.cc", + "cpp/src/arrow/compute/exec/*.cc", + "cpp/src/arrow/compute/kernels/*.cc", "cpp/src/arrow/csv/*.cc", + "cpp/src/arrow/dataset/*.cc", + "cpp/src/arrow/filesystem/*.cc", "cpp/src/arrow/io/*.cc", "cpp/src/arrow/ipc/*.cc", "cpp/src/arrow/json/*.cc", @@ -46,6 +51,10 @@ cc_library( "cpp/src/arrow/vendored/optional.hpp", "cpp/src/arrow/vendored/string_view.hpp", "cpp/src/arrow/vendored/variant.hpp", + "cpp/src/arrow/vendored/base64.cpp", + "cpp/src/arrow/vendored/datetime/tz.cpp", + "cpp/src/arrow/vendored/uriparser/*.c", + "cpp/src/arrow/vendored/pcg/*.hpp", "cpp/src/arrow/**/*.h", "cpp/src/parquet/**/*.h", "cpp/src/parquet/**/*.cc", @@ -58,9 +67,11 @@ cc_library( "cpp/src/**/*_main.cc", "cpp/src/**/*_nossl.cc", "cpp/src/**/*_test.cc", - "cpp/src/**/test_*.cc", + "cpp/src/**/*test*.h", + "cpp/src/**/*test*.cc", "cpp/src/**/*hdfs*.cc", "cpp/src/**/*fuzz*.cc", + "cpp/src/**/*gcsfs*.cc", "cpp/src/**/file_to_stream.cc", "cpp/src/**/stream_to_file.cc", "cpp/src/arrow/util/bpacking_avx2.cc", @@ -99,9 +110,12 @@ cc_library( "PARQUET_STATIC", "PARQUET_EXPORT=", "WIN32_LEAN_AND_MEAN", + "ARROW_DS_STATIC", + "URI_STATIC_BUILD", ], includes = [ "cpp/src", + "cpp/src/generated", "cpp/src/arrow/vendored/xxhash", "cpp/thirdparty/flatbuffers/include", ], @@ -109,6 +123,8 @@ cc_library( "cpp/src/arrow/vendored/xxhash/xxhash.c", ], deps = [ + "@aws-sdk-cpp//:identity-management", + "@aws-sdk-cpp//:s3", "@boringssl//:crypto", "@brotli", "@bzip2", diff --git a/third_party/aws-sdk-cpp.BUILD b/third_party/aws-sdk-cpp.BUILD index ba7d90bcb..16e9cc9d1 100644 --- a/third_party/aws-sdk-cpp.BUILD +++ b/third_party/aws-sdk-cpp.BUILD @@ -163,6 +163,61 @@ cc_library( ], ) +cc_library( + name = "cognito-identity", + srcs = glob([ + "aws-cpp-sdk-cognito-identity/source/*.cpp", + "aws-cpp-sdk-cognito-identity/source/model/*.cpp", + ]), + hdrs = glob([ + "aws-cpp-sdk-cognito-identity/include/aws/cognito-identity/*.h", + "aws-cpp-sdk-cognito-identity/include/aws/cognito-identity/model/*.h", + ]), + includes = [ + "aws-cpp-sdk-cognito-identity/include", + ], + deps = [ + ":core", + ], +) + +cc_library( + name = "sts", + srcs = glob([ + "aws-cpp-sdk-sts/source/*.cpp", + "aws-cpp-sdk-sts/source/model/*.cpp", + ]), + hdrs = glob([ + "aws-cpp-sdk-sts/include/aws/sts/*.h", + "aws-cpp-sdk-sts/include/aws/sts/model/*.h", + ]), + includes = [ + "aws-cpp-sdk-sts/include", + ], + deps = [ + ":core", + ], +) + +cc_library( + name = "identity-management", + srcs = glob([ + "aws-cpp-sdk-identity-management/source/auth/*.cpp", + ]), + hdrs = glob([ + "aws-cpp-sdk-identity-management/include/aws/identity-management/*.h", + "aws-cpp-sdk-identity-management/include/aws/identity-management/auth/*.h", + ]), + includes = [ + "aws-cpp-sdk-identity-management/include", + ], + deps = [ + ":cognito-identity", + ":core", + ":sts", + ], +) + genrule( name = "SDKConfig_h", outs = [ From fb4be3f0562d4a2ff88c047f2190221019fa0413 Mon Sep 17 00:00:00 2001 From: yye Date: Fri, 21 Mar 2025 20:05:00 +0000 Subject: [PATCH 5/6] fix --- .../core/kernels/arrow/arrow_dataset_ops.cc | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc b/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc index e77c9eeb9..696ba85d4 100644 --- a/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc +++ b/tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc @@ -637,9 +637,7 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { : ArrowDatasetBase(ctx, columns, batch_size, batch_mode, output_types, output_shapes), file_paths_(file_paths), - column_names_(column_names) { - LOG(INFO) << "Dataset called "; - } + column_names_(column_names) {} string DebugString() const override { return "ArrowParquetDatasetOp::Dataset"; @@ -686,7 +684,6 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { private: Status SetupStreamsLocked(Env *env) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { - std::cout << "SetupStreamLocked called " << std::endl; this->fs_ = std::make_shared(); do { auto arrow_status = @@ -701,13 +698,14 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { ¤t_batch_); if (!arrow_status.ok()) { return errors::Internal(arrow_status.message()); - } - if (current_batch_ == nullptr) { + } else if (current_batch_ == nullptr) { current_file_idx_ = current_file_idx_ + 1; - } + } else { + return OkStatus(); + } } while (current_file_idx_ < dataset()->file_paths_.size()); - return OkStatus(); + return errors::Interal("Expected EOF."); } Status NextStreamLocked(Env *env) @@ -736,7 +734,7 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase { } } } while (true); - return OkStatus(); + return errors::Interal("Expected EOF."); } void ResetStreamsLocked() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override { From ed99fff5f65dd31e79cd40b1d9f277736c8f96f1 Mon Sep 17 00:00:00 2001 From: yye Date: Fri, 21 Mar 2025 20:06:04 +0000 Subject: [PATCH 6/6] add test python script for parquet dataset. --- test_parquet_dataset.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 test_parquet_dataset.py diff --git a/test_parquet_dataset.py b/test_parquet_dataset.py new file mode 100644 index 000000000..f0ce98566 --- /dev/null +++ b/test_parquet_dataset.py @@ -0,0 +1,15 @@ +import tensorflow as tf +import tensorflow_io.arrow as arrow_io + +dataset = arrow_io.ArrowParquetDataset( + file_paths = ['/home/yye/training-platform/training-platform/bento/apps/demos/chicago_taxi/data/test.parquet'], + column_names=('tips'), + columns=(), + output_types=(tf.float32), + output_shapes=([]), + batch_size=4, + batch_mode='keep_remainder') + +# This will iterate over each row of each file provided +for row in dataset: + print(row)