Skip to content

Commit

Permalink
Standalone 12/N: Implement and test ZarrV3ArrayWriter (#304)
Browse files Browse the repository at this point in the history
Depends on #303.
  • Loading branch information
aliddell authored Oct 4, 2024
1 parent 41f71a7 commit aa59079
Show file tree
Hide file tree
Showing 22 changed files with 885 additions and 38 deletions.
2 changes: 1 addition & 1 deletion acquire-common
2 changes: 2 additions & 0 deletions src/streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ add_library(${tgt}
array.writer.cpp
zarrv2.array.writer.hh
zarrv2.array.writer.cpp
zarrv3.array.writer.hh
zarrv3.array.writer.cpp
)

target_include_directories(${tgt}
Expand Down
2 changes: 0 additions & 2 deletions src/streaming/array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,6 @@ zarr::ArrayWriter::compress_buffers_()
} catch (const std::exception& exc) {
err = "Failed to compress chunk: " +
std::string(exc.what());
} catch (...) {
err = "Failed to compress chunk (unknown)";
}
latch.count_down();

Expand Down
6 changes: 0 additions & 6 deletions src/streaming/sink.creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,6 @@ zarr::SinkCreator::make_files_(std::queue<std::string>& file_paths,
} catch (const std::exception& exc) {
err = "Failed to create file '" + filename +
"': " + exc.what();
} catch (...) {
err = "Failed to create file '" + filename +
"': (unknown).";
}

latch.count_down();
Expand Down Expand Up @@ -396,9 +393,6 @@ zarr::SinkCreator::make_files_(
} catch (const std::exception& exc) {
err = "Failed to create file '" + filename +
"': " + exc.what();
} catch (...) {
err = "Failed to create file '" + filename +
"': (unknown).";
}

latch.count_down();
Expand Down
8 changes: 1 addition & 7 deletions src/streaming/zarr.stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,7 @@ ZarrStream_s::create_writers_()
void
ZarrStream_s::create_scaled_frames_()
{
if (multiscale_) {
// TODO (aliddell): implement this
}
// TODO (aliddell): implement this
}

bool
Expand Down Expand Up @@ -465,9 +463,5 @@ void
ZarrStream_s::write_multiscale_frames_(const uint8_t* data,
size_t bytes_of_data)
{
if (multiscale_) {
return;
}

// TODO (aliddell): implement this
}
2 changes: 0 additions & 2 deletions src/streaming/zarrv2.array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ zarr::ZarrV2ArrayWriter::flush_impl_()
} catch (const std::exception& exc) {
err = "Failed to write chunk: " +
std::string(exc.what());
} catch (...) {
err = "Failed to write chunk: (unknown)";
}

latch.count_down();
Expand Down
254 changes: 254 additions & 0 deletions src/streaming/zarrv3.array.writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
#include "macros.hh"
#include "zarrv3.array.writer.hh"
#include "sink.creator.hh"
#include "zarr.common.hh"

#include <nlohmann/json.hpp>

#include <algorithm> // std::fill
#include <latch>
#include <stdexcept>

#ifdef max
#undef max
#endif

namespace {
std::string
sample_type_to_dtype(ZarrDataType t)
{
switch (t) {
case ZarrDataType_uint8:
return "uint8";
case ZarrDataType_uint16:
return "uint16";
case ZarrDataType_uint32:
return "uint32";
case ZarrDataType_uint64:
return "uint64";
case ZarrDataType_int8:
return "int8";
case ZarrDataType_int16:
return "int16";
case ZarrDataType_int32:
return "int32";
case ZarrDataType_int64:
return "int64";
case ZarrDataType_float32:
return "float32";
case ZarrDataType_float64:
return "float64";
default:
throw std::runtime_error("Invalid ZarrDataType: " +
std::to_string(static_cast<int>(t)));
}
}
} // namespace

zarr::ZarrV3ArrayWriter::ZarrV3ArrayWriter(
ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool)
: ZarrV3ArrayWriter(std::move(config), thread_pool, nullptr)
{
}

zarr::ZarrV3ArrayWriter::ZarrV3ArrayWriter(
ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool)
: ArrayWriter(std::move(config), thread_pool, s3_connection_pool)
{
const auto number_of_shards = config_.dimensions->number_of_shards();
const auto chunks_per_shard = config_.dimensions->chunks_per_shard();

shard_file_offsets_.resize(number_of_shards, 0);
shard_tables_.resize(number_of_shards);

for (auto& table : shard_tables_) {
table.resize(2 * chunks_per_shard);
std::fill(
table.begin(), table.end(), std::numeric_limits<uint64_t>::max());
}
}

bool
zarr::ZarrV3ArrayWriter::flush_impl_()
{
// create shard files if they don't exist
if (data_sinks_.empty() && !make_data_sinks_()) {
return false;
}

const auto n_shards = config_.dimensions->number_of_shards();
CHECK(data_sinks_.size() == n_shards);

// get shard indices for each chunk
std::vector<std::vector<size_t>> chunk_in_shards(n_shards);
for (auto i = 0; i < chunk_buffers_.size(); ++i) {
const auto index = config_.dimensions->shard_index_for_chunk(i);
chunk_in_shards.at(index).push_back(i);
}

// write out chunks to shards
auto write_table = is_finalizing_ || should_rollover_();
std::latch latch(n_shards);
for (auto i = 0; i < n_shards; ++i) {
const auto& chunks = chunk_in_shards.at(i);
auto& chunk_table = shard_tables_.at(i);
auto* file_offset = &shard_file_offsets_.at(i);

EXPECT(thread_pool_->push_job([&sink = data_sinks_.at(i),
&chunks,
&chunk_table,
file_offset,
write_table,
&latch,
this](std::string& err) {
bool success = false;

try {
for (const auto& chunk_idx : chunks) {
auto& chunk = chunk_buffers_.at(chunk_idx);
std::span data{ reinterpret_cast<std::byte*>(chunk.data()),
chunk.size() };
success = sink->write(*file_offset, data);
if (!success) {
break;
}

const auto internal_idx =
config_.dimensions->shard_internal_index(chunk_idx);
chunk_table.at(2 * internal_idx) = *file_offset;
chunk_table.at(2 * internal_idx + 1) = chunk.size();

*file_offset += chunk.size();
}

if (success && write_table) {
auto* table =
reinterpret_cast<std::byte*>(chunk_table.data());
std::span data{ table,
chunk_table.size() * sizeof(uint64_t) };
success = sink->write(*file_offset, data);
}
} catch (const std::exception& exc) {
err = "Failed to write chunk: " + std::string(exc.what());
}

latch.count_down();
return success;
}),
"Failed to push job to thread pool");
}

// wait for all threads to finish
latch.wait();

// reset shard tables and file offsets
if (write_table) {
for (auto& table : shard_tables_) {
std::fill(
table.begin(), table.end(), std::numeric_limits<uint64_t>::max());
}

std::fill(shard_file_offsets_.begin(), shard_file_offsets_.end(), 0);
}

return true;
}

bool
zarr::ZarrV3ArrayWriter::write_array_metadata_()
{
if (!make_metadata_sink_()) {
return false;
}

using json = nlohmann::json;

std::vector<size_t> array_shape, chunk_shape, shard_shape;

size_t append_size = frames_written_;
for (auto i = config_.dimensions->ndims() - 3; i > 0; --i) {
const auto& dim = config_.dimensions->at(i);
const auto& array_size_px = dim.array_size_px;
CHECK(array_size_px);
append_size = (append_size + array_size_px - 1) / array_size_px;
}
array_shape.push_back(append_size);

const auto& final_dim = config_.dimensions->final_dim();
chunk_shape.push_back(final_dim.chunk_size_px);
shard_shape.push_back(final_dim.shard_size_chunks);
for (auto i = 1; i < config_.dimensions->ndims(); ++i) {
const auto& dim = config_.dimensions->at(i);
array_shape.push_back(dim.array_size_px);
chunk_shape.push_back(dim.chunk_size_px);
shard_shape.push_back(dim.shard_size_chunks);
}

json metadata;
metadata["attributes"] = json::object();
metadata["chunk_grid"] = json::object({
{ "chunk_shape", chunk_shape },
{ "separator", "/" },
{ "type", "regular" },
});

metadata["chunk_memory_layout"] = "C";
metadata["data_type"] = sample_type_to_dtype(config_.dtype);
metadata["extensions"] = json::array();
metadata["fill_value"] = 0;
metadata["shape"] = array_shape;

if (config_.compression_params) {
const auto params = *config_.compression_params;
metadata["compressor"] = json::object({
{ "codec", "https://purl.org/zarr/spec/codec/blosc/1.0" },
{ "configuration",
json::object({
{ "blocksize", 0 },
{ "clevel", params.clevel },
{ "cname", params.codec_id },
{ "shuffle", params.shuffle },
}) },
});
} else {
metadata["compressor"] = nullptr;
}

// sharding storage transformer
// TODO (aliddell):
// https://github.com/zarr-developers/zarr-python/issues/877
metadata["storage_transformers"] = json::array();
metadata["storage_transformers"][0] = json::object({
{ "type", "indexed" },
{ "extension",
"https://purl.org/zarr/spec/storage_transformers/sharding/1.0" },
{ "configuration",
json::object({
{ "chunks_per_shard", shard_shape },
}) },
});

std::string metadata_str = metadata.dump(4);
std::span data = { reinterpret_cast<std::byte*>(metadata_str.data()),
metadata_str.size() };

return metadata_sink_->write(0, data);
}

bool
zarr::ZarrV3ArrayWriter::should_rollover_() const
{
const auto& dims = config_.dimensions;
const auto& append_dim = dims->final_dim();
size_t frames_before_flush =
append_dim.chunk_size_px * append_dim.shard_size_chunks;
for (auto i = 1; i < dims->ndims() - 2; ++i) {
frames_before_flush *= dims->at(i).array_size_px;
}

CHECK(frames_before_flush > 0);
return frames_written_ % frames_before_flush == 0;
}
25 changes: 25 additions & 0 deletions src/streaming/zarrv3.array.writer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "array.writer.hh"

namespace zarr {
struct ZarrV3ArrayWriter : public ArrayWriter
{
public:
ZarrV3ArrayWriter(ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool);
ZarrV3ArrayWriter(
ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool);

private:
std::vector<size_t> shard_file_offsets_;
std::vector<std::vector<uint64_t>> shard_tables_;

ZarrVersion version_() const override { return ZarrVersion_3; }
bool flush_impl_() override;
bool write_array_metadata_() override;
bool should_rollover_() const override;
};
} // namespace zarr
3 changes: 3 additions & 0 deletions tests/unit-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ set(tests
zarrv2-writer-write-even
zarrv2-writer-write-ragged-append-dim
zarrv2-writer-write-ragged-internal-dim
zarrv3-writer-write-even
zarrv3-writer-write-ragged-append-dim
zarrv3-writer-write-ragged-internal-dim
)

foreach (name ${tests})
Expand Down
2 changes: 0 additions & 2 deletions tests/unit-tests/array-dimensions-chunk-internal-offset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ main()
retval = 0;
} catch (const std::exception& exc) {
LOG_ERROR("Exception: ", exc.what());
} catch (...) {
LOG_ERROR("Exception: (unknown)");
}

return retval;
Expand Down
2 changes: 0 additions & 2 deletions tests/unit-tests/array-dimensions-chunk-lattice-index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ main()
retval = 0;
} catch (const std::exception& exc) {
LOG_ERROR("Exception: ", exc.what());
} catch (...) {
LOG_ERROR("Exception: (unknown)");
}

return retval;
Expand Down
2 changes: 0 additions & 2 deletions tests/unit-tests/array-dimensions-shard-index-for-chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,6 @@ main()
retval = 0;
} catch (const std::exception& exc) {
LOG_ERROR("Exception: ", exc.what());
} catch (...) {
LOG_ERROR("Exception: (unknown)");
}

return retval;
Expand Down
2 changes: 0 additions & 2 deletions tests/unit-tests/array-dimensions-shard-internal-index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ main()
retval = 0;
} catch (const std::exception& exc) {
LOG_ERROR("Exception: ", exc.what());
} catch (...) {
LOG_ERROR("Exception: (unknown)");
}

return retval;
Expand Down
Loading

0 comments on commit aa59079

Please # to comment.