Skip to content

Commit

Permalink
Move to sink.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
aliddell committed Jan 22, 2025
1 parent 1903840 commit d072255
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 147 deletions.
145 changes: 144 additions & 1 deletion src/streaming/sink.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
#include "sink.hh"
#include "file.sink.hh"
#include "s3.sink.hh"
#include "macros.hh"

#include <algorithm>
#include <filesystem>
#include <latch>
#include <stdexcept>
#include <unordered_set>

namespace fs = std::filesystem;

bool
zarr::finalize_sink(std::unique_ptr<zarr::Sink>&& sink)
{
Expand All @@ -15,4 +25,137 @@ zarr::finalize_sink(std::unique_ptr<zarr::Sink>&& sink)

sink.reset();
return true;
}
}

std::vector<std::string>
zarr::construct_data_paths(std::string_view base_path,
const ArrayDimensions& dimensions,
const DimensionPartsFun& parts_along_dimension)
{
std::queue<std::string> paths_queue;
paths_queue.emplace(base_path);

// create intermediate paths
for (auto i = 1; // skip the last dimension
i < dimensions.ndims() - 1; // skip the x dimension
++i) {
const auto& dim = dimensions.at(i);
const auto n_parts = parts_along_dimension(dim);
CHECK(n_parts);

auto n_paths = paths_queue.size();
for (auto j = 0; j < n_paths; ++j) {
const auto path = paths_queue.front();
paths_queue.pop();

for (auto k = 0; k < n_parts; ++k) {
const auto kstr = std::to_string(k);
paths_queue.push(path + (path.empty() ? kstr : "/" + kstr));
}
}
}

// create final paths
std::vector<std::string> paths_out;
paths_out.reserve(paths_queue.size() *
parts_along_dimension(dimensions.width_dim()));
{
const auto& dim = dimensions.width_dim();
const auto n_parts = parts_along_dimension(dim);
CHECK(n_parts);

auto n_paths = paths_queue.size();
for (auto i = 0; i < n_paths; ++i) {
const auto path = paths_queue.front();
paths_queue.pop();
for (auto j = 0; j < n_parts; ++j)
paths_out.push_back(path + "/" + std::to_string(j));
}
}

return paths_out;
}

std::vector<std::string>
zarr::get_parent_paths(const std::vector<std::string>& file_paths)
{
std::unordered_set<std::string> unique_paths;
for (const auto& file_path : file_paths) {
unique_paths.emplace(fs::path(file_path).parent_path().string());
}

return { unique_paths.begin(), unique_paths.end() };
}

bool
zarr::make_dirs(const std::vector<std::string>& dir_paths,
std::shared_ptr<ThreadPool> thread_pool)
{
if (dir_paths.empty()) {
return true;
}
EXPECT(thread_pool, "Thread pool not provided.");

std::atomic<char> all_successful = 1;

std::unordered_set<std::string> unique_paths(dir_paths.begin(),
dir_paths.end());

std::latch latch(unique_paths.size());
for (const auto& path : unique_paths) {
auto job = [&path, &latch, &all_successful](std::string& err) {
bool success = true;
if (fs::is_directory(path)) {
latch.count_down();
return success;
}

std::error_code ec;
if (!fs::create_directories(path, ec)) {
err =
"Failed to create directory '" + path + "': " + ec.message();
success = false;
}

latch.count_down();
all_successful.fetch_and(static_cast<char>(success));

return success;
};

if (!thread_pool->push_job(std::move(job))) {
LOG_ERROR("Failed to push job to thread pool.");
return false;
}
}

latch.wait();

return static_cast<bool>(all_successful);
}

std::unique_ptr<zarr::Sink>
zarr::make_file_sink(std::string_view file_path)
{
if (file_path.starts_with("file://")) {
file_path = file_path.substr(7);
}

EXPECT(!file_path.empty(), "File path must not be empty.");

fs::path path(file_path);
EXPECT(!path.empty(), "Invalid file path: ", file_path);

fs::path parent_path = path.parent_path();

if (!fs::is_directory(parent_path)) {
std::error_code ec;
if (!fs::create_directories(parent_path, ec)) {
LOG_ERROR("Failed to create directory '", parent_path, "': ",
ec.message());
return nullptr;
}
}

return std::make_unique<FileSink>(file_path);
}
34 changes: 34 additions & 0 deletions src/streaming/sink.hh
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#pragma once

#include "thread.pool.hh"
#include "zarr.dimension.hh"

#include <cstddef> // size_t, std::byte
#include <memory> // std::unique_ptr
#include <span> // std::span
Expand Down Expand Up @@ -28,4 +31,35 @@ class Sink

bool
finalize_sink(std::unique_ptr<Sink>&& sink);

/**
* @brief Construct paths for data sinks, given the dimensions and a function
* to determine the number of parts along a dimension.
* @param base_path The base path for the dataset.
* @param dimensions The dimensions of the dataset.
* @param parts_along_dimension Function to determine the number of parts
*/
std::vector<std::string>
construct_data_paths(std::string_view base_path,
const ArrayDimensions& dimensions,
const DimensionPartsFun& parts_along_dimension);

/**
* @brief Get unique paths to the parent directories of each file in @p
* file_paths.
* @param file_paths Collection of paths to files.
* @return Collection of unique parent directories.
*/
std::vector<std::string>
get_parent_paths(const std::vector<std::string>& file_paths);

/**
* @brief Parallel create directories for a collection of paths.
* @param dir_paths The directories to create.
* @param thread_pool The thread pool to use for parallel creation.
* @return True iff all directories were created successfully.
*/
bool
make_dirs(const std::vector<std::string>& dir_paths,
std::shared_ptr<ThreadPool> thread_pool);
} // namespace zarr
113 changes: 0 additions & 113 deletions src/streaming/zarr.common.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
#include "macros.hh"
#include "zarr.common.hh"

#include <algorithm>
#include <filesystem>
#include <latch>
#include <queue>
#include <stdexcept>
#include <unordered_set>

namespace fs = std::filesystem;

std::string
zarr::trim(std::string_view s)
Expand Down Expand Up @@ -97,110 +91,3 @@ zarr::shards_along_dimension(const ZarrDimension& dimension)
const auto n_chunks = chunks_along_dimension(dimension);
return (n_chunks + shard_size - 1) / shard_size;
}

std::vector<std::string>
zarr::construct_data_paths(std::string_view base_path,
const ArrayDimensions& dimensions,
const DimensionPartsFun& parts_along_dimension)
{
std::queue<std::string> paths_queue;
paths_queue.emplace(base_path);

// create intermediate paths
for (auto i = 1; // skip the last dimension
i < dimensions.ndims() - 1; // skip the x dimension
++i) {
const auto& dim = dimensions.at(i);
const auto n_parts = parts_along_dimension(dim);
CHECK(n_parts);

auto n_paths = paths_queue.size();
for (auto j = 0; j < n_paths; ++j) {
const auto path = paths_queue.front();
paths_queue.pop();

for (auto k = 0; k < n_parts; ++k) {
const auto kstr = std::to_string(k);
paths_queue.push(path + (path.empty() ? kstr : "/" + kstr));
}
}
}

// create final paths
std::vector<std::string> paths_out;
paths_out.reserve(paths_queue.size() *
parts_along_dimension(dimensions.width_dim()));
{
const auto& dim = dimensions.width_dim();
const auto n_parts = parts_along_dimension(dim);
CHECK(n_parts);

auto n_paths = paths_queue.size();
for (auto i = 0; i < n_paths; ++i) {
const auto path = paths_queue.front();
paths_queue.pop();
for (auto j = 0; j < n_parts; ++j)
paths_out.push_back(path + "/" + std::to_string(j));
}
}

return paths_out;
}

std::vector<std::string>
zarr::get_parent_paths(const std::vector<std::string>& file_paths)
{
std::unordered_set<std::string> unique_paths;
for (const auto& file_path : file_paths) {
unique_paths.emplace(fs::path(file_path).parent_path().string());
}

return std::vector<std::string>(unique_paths.begin(), unique_paths.end());
}

bool
zarr::make_dirs(const std::vector<std::string>& dir_paths,
std::shared_ptr<ThreadPool> thread_pool)
{
if (dir_paths.empty()) {
return true;
}
EXPECT(thread_pool, "Thread pool not provided.");

std::atomic<char> all_successful = 1;

std::unordered_set<std::string> unique_paths(dir_paths.begin(),
dir_paths.end());

std::latch latch(unique_paths.size());
for (const auto& path : unique_paths) {
auto job = [&path, &latch, &all_successful](std::string& err) {
bool success = true;
if (fs::is_directory(path)) {
latch.count_down();
return success;
}

std::error_code ec;
if (!fs::create_directories(path, ec)) {
err =
"Failed to create directory '" + path + "': " + ec.message();
success = false;
}

latch.count_down();
all_successful.fetch_and(static_cast<char>(success));

return success;
};

if (!thread_pool->push_job(std::move(job))) {
LOG_ERROR("Failed to push job to thread pool.");
return false;
}
}

latch.wait();

return static_cast<bool>(all_successful);
}
31 changes: 0 additions & 31 deletions src/streaming/zarr.common.hh
Original file line number Diff line number Diff line change
Expand Up @@ -61,35 +61,4 @@ chunks_along_dimension(const ZarrDimension& dimension);
*/
uint32_t
shards_along_dimension(const ZarrDimension& dimension);

/**
* @brief Construct paths for data sinks, given the dimensions and a function
* to determine the number of parts along a dimension.
* @param base_path The base path for the dataset.
* @param dimensions The dimensions of the dataset.
* @param parts_along_dimension Function to determine the number of parts
*/
std::vector<std::string>
construct_data_paths(std::string_view base_path,
const ArrayDimensions& dimensions,
const DimensionPartsFun& parts_along_dimension);

/**
* @brief Get unique paths to the parent directories of each file in @p
* file_paths.
* @param file_paths Collection of paths to files.
* @return Collection of unique parent directories.
*/
std::vector<std::string>
get_parent_paths(const std::vector<std::string>& file_paths);

/**
* @brief Parallel create directories for a collection of paths.
* @param dir_paths The directories to create.
* @param thread_pool The thread pool to use for parallel creation.
* @return True iff all directories were created successfully.
*/
bool
make_dirs(const std::vector<std::string>& dir_paths,
std::shared_ptr<ThreadPool> thread_pool);
} // namespace zarr
2 changes: 1 addition & 1 deletion tests/unit-tests/construct-data-paths.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "unit.test.macros.hh"
#include "zarr.common.hh"
#include "sink.hh"
#include "zarr.dimension.hh"

#include <string>
Expand Down
2 changes: 1 addition & 1 deletion tests/unit-tests/make-dirs.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "unit.test.macros.hh"
#include "zarr.common.hh"
#include "sink.hh"

#include <filesystem>

Expand Down

0 comments on commit d072255

Please # to comment.