Skip to content

Commit

Permalink
Standalone 9/N: Implement and test Sink and SinkCreator types (#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
aliddell authored Oct 2, 2024
1 parent 8b20b10 commit c9746e1
Show file tree
Hide file tree
Showing 30 changed files with 2,175 additions and 442 deletions.
8 changes: 8 additions & 0 deletions src/streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ add_library(${tgt}
thread.pool.cpp
s3.connection.hh
s3.connection.cpp
sink.hh
sink.cpp
file.sink.hh
file.sink.cpp
s3.sink.hh
s3.sink.cpp
sink.creator.hh
sink.creator.cpp
)

target_include_directories(${tgt}
Expand Down
6 changes: 3 additions & 3 deletions src/streaming/acquire.zarr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ extern "C"
try {
Logger::set_log_level(level);
} catch (const std::exception& e) {
LOG_ERROR("Error setting log level: %s", e.what());
LOG_ERROR("Error setting log level: ", e.what());
return ZarrStatusCode_InternalError;
}
return ZarrStatusCode_Success;
Expand Down Expand Up @@ -94,7 +94,7 @@ extern "C"
{
EXPECT_VALID_ARGUMENT(settings, "Null pointer: settings");
EXPECT_VALID_ARGUMENT(dimension_count >= 3,
"Invalid dimension count: %zu",
"Invalid dimension count: ",
dimension_count);

ZarrDimensionProperties* dimensions = nullptr;
Expand Down Expand Up @@ -160,7 +160,7 @@ extern "C"
try {
*bytes_out = stream->append(data, bytes_in);
} catch (const std::exception& e) {
LOG_ERROR("Error appending data: %s", e.what());
LOG_ERROR("Error appending data: ", e.what());
return ZarrStatusCode_InternalError;
}

Expand Down
30 changes: 30 additions & 0 deletions src/streaming/file.sink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include "file.sink.hh"

#include <filesystem>

namespace fs = std::filesystem;

zarr::FileSink::FileSink(std::string_view filename)
: file_(filename.data(), std::ios::binary | std::ios::trunc)
{
}

bool
zarr::FileSink::write(size_t offset, std::span<std::byte> data)
{
const auto bytes_of_buf = data.size();
if (data.data() == nullptr || bytes_of_buf == 0) {
return true;
}

file_.seekp(offset);
file_.write(reinterpret_cast<const char*>(data.data()), bytes_of_buf);
return true;
}

bool
zarr::FileSink::flush_()
{
file_.flush();
return true;
}
22 changes: 22 additions & 0 deletions src/streaming/file.sink.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include "sink.hh"

#include <fstream>
#include <string_view>

namespace zarr {
class FileSink : public Sink
{
public:
explicit FileSink(std::string_view filename);

bool write(size_t offset, std::span<std::byte> data) override;

protected:
bool flush_() override;

private:
std::ofstream file_;
};
} // namespace zarr
42 changes: 22 additions & 20 deletions src/streaming/s3.connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,19 @@ zarr::S3Connection::put_object(std::string_view bucket_name,
data.size());
std::basic_istream stream(&buffer);

LOG_DEBUG(
"Putting object %s in bucket %s", object_name.data(), bucket_name.data());
LOG_DEBUG("Putting object ", object_name, " in bucket ", bucket_name);
minio::s3::PutObjectArgs args(stream, static_cast<long>(data.size()), 0);
args.bucket = bucket_name;
args.object = object_name;

auto response = client_->PutObject(args);
if (!response) {
LOG_ERROR("Failed to put object %s in bucket %s: %s",
object_name.data(),
bucket_name.data(),
response.Error().String().c_str());
LOG_ERROR("Failed to put object ",
object_name,
" in bucket ",
bucket_name,
": ",
response.Error().String());
return {};
}

Expand All @@ -88,19 +89,19 @@ zarr::S3Connection::delete_object(std::string_view bucket_name,
EXPECT(!bucket_name.empty(), "Bucket name must not be empty.");
EXPECT(!object_name.empty(), "Object name must not be empty.");

LOG_DEBUG("Deleting object %s from bucket %s",
object_name.data(),
bucket_name.data());
LOG_DEBUG("Deleting object ", object_name, " from bucket ", bucket_name);
minio::s3::RemoveObjectArgs args;
args.bucket = bucket_name;
args.object = object_name;

auto response = client_->RemoveObject(args);
if (!response) {
LOG_ERROR("Failed to delete object %s from bucket %s: %s",
object_name.data(),
bucket_name.data(),
response.Error().String().c_str());
LOG_ERROR("Failed to delete object ",
object_name,
" from bucket ",
bucket_name,
": ",
response.Error().String());
return false;
}

Expand Down Expand Up @@ -191,9 +192,8 @@ zarr::S3Connection::complete_multipart_object(
EXPECT(!upload_id.empty(), "Upload id must not be empty.");
EXPECT(!parts.empty(), "Parts list must not be empty.");

LOG_DEBUG("Completing multipart object %s in bucket %s",
object_name.data(),
bucket_name.data());
LOG_DEBUG(
"Completing multipart object ", object_name, " in bucket ", bucket_name);
minio::s3::CompleteMultipartUploadArgs args;
args.bucket = bucket_name;
args.object = object_name;
Expand All @@ -202,10 +202,12 @@ zarr::S3Connection::complete_multipart_object(

auto response = client_->CompleteMultipartUpload(args);
if (!response) {
LOG_ERROR("Failed to complete multipart object %s in bucket %s: %s",
object_name.data(),
bucket_name.data(),
response.Error().String().c_str());
LOG_ERROR("Failed to complete multipart object ",
object_name,
" in bucket ",
bucket_name,
": ",
response.Error().String());
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/streaming/s3.connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <list>
#include <memory>
#include <mutex>
#include <string_view>
#include <string>
#include <span>
#include <vector>

Expand Down
205 changes: 205 additions & 0 deletions src/streaming/s3.sink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#include "macros.hh"
#include "s3.sink.hh"

#include <miniocpp/client.h>

#ifdef min
#undef min
#endif

zarr::S3Sink::S3Sink(std::string_view bucket_name,
std::string_view object_key,
std::shared_ptr<S3ConnectionPool> connection_pool)
: bucket_name_{ bucket_name }
, object_key_{ object_key }
, connection_pool_{ connection_pool }
{
EXPECT(!bucket_name_.empty(), "Bucket name must not be empty");
EXPECT(!object_key_.empty(), "Object key must not be empty");
EXPECT(connection_pool_, "Null pointer: connection_pool");
}

bool
zarr::S3Sink::flush_()
{
if (is_multipart_upload_()) {
const auto& parts = multipart_upload_->parts;
if (nbytes_buffered_ > 0 && !flush_part_()) {
LOG_ERROR("Failed to upload part ",
parts.size() + 1,
" of object ",
object_key_);
return false;
}
if (!finalize_multipart_upload_()) {
LOG_ERROR("Failed to finalize multipart upload of object ",
object_key_);
return false;
}
} else if (nbytes_buffered_ > 0) {
if (!put_object_()) {
LOG_ERROR("Failed to upload object: ", object_key_);
return false;
}
}

// cleanup
nbytes_buffered_ = 0;

return true;
}

bool
zarr::S3Sink::write(size_t offset, std::span<std::byte> data)
{
if (data.data() == nullptr || data.empty()) {
return true;
}

if (offset < nbytes_flushed_) {
LOG_ERROR("Cannot write data at offset ",
offset,
", already flushed to ",
nbytes_flushed_);
return false;
}
nbytes_buffered_ = offset - nbytes_flushed_;

size_t bytes_of_data = data.size();
std::byte* data_ptr = data.data();
while (bytes_of_data > 0) {
const auto bytes_to_write =
std::min(bytes_of_data, part_buffer_.size() - nbytes_buffered_);

if (bytes_to_write) {
std::copy_n(data_ptr,
bytes_to_write,
part_buffer_.begin() + nbytes_buffered_);
nbytes_buffered_ += bytes_to_write;
data_ptr += bytes_to_write;
bytes_of_data -= bytes_to_write;
}

if (nbytes_buffered_ == part_buffer_.size() && !flush_part_()) {
return false;
}
}

return true;
}

bool
zarr::S3Sink::put_object_()
{
if (nbytes_buffered_ == 0) {
return false;
}

auto connection = connection_pool_->get_connection();
std::span data(reinterpret_cast<uint8_t*>(part_buffer_.data()),
nbytes_buffered_);

bool retval = false;
try {
std::string etag =
connection->put_object(bucket_name_, object_key_, data);
EXPECT(!etag.empty(), "Failed to upload object: ", object_key_);

retval = true;

nbytes_flushed_ = nbytes_buffered_;
nbytes_buffered_ = 0;
} catch (const std::exception& exc) {
LOG_ERROR("Error: ", exc.what());
}

// cleanup
connection_pool_->return_connection(std::move(connection));

return retval;
}

bool
zarr::S3Sink::is_multipart_upload_() const
{
return multipart_upload_.has_value();
}

void
zarr::S3Sink::create_multipart_upload_()
{
if (!is_multipart_upload_()) {
multipart_upload_ = {};
}

if (!multipart_upload_->upload_id.empty()) {
return;
}

multipart_upload_->upload_id =
connection_pool_->get_connection()->create_multipart_object(bucket_name_,
object_key_);
}

bool
zarr::S3Sink::flush_part_()
{
if (nbytes_buffered_ == 0) {
return false;
}

auto connection = connection_pool_->get_connection();

create_multipart_upload_();

bool retval = false;
try {
auto& parts = multipart_upload_->parts;

minio::s3::Part part;
part.number = static_cast<unsigned int>(parts.size()) + 1;

std::span data(reinterpret_cast<uint8_t*>(part_buffer_.data()),
nbytes_buffered_);
part.etag =
connection->upload_multipart_object_part(bucket_name_,
object_key_,
multipart_upload_->upload_id,
data,
part.number);
EXPECT(!part.etag.empty(),
"Failed to upload part ",
part.number,
" of object ",
object_key_);

parts.push_back(part);

retval = true;
} catch (const std::exception& exc) {
LOG_ERROR("Error: ", exc.what());
}

// cleanup
connection_pool_->return_connection(std::move(connection));
nbytes_flushed_ += nbytes_buffered_;
nbytes_buffered_ = 0;

return retval;
}

bool
zarr::S3Sink::finalize_multipart_upload_()
{
auto connection = connection_pool_->get_connection();

const auto& upload_id = multipart_upload_->upload_id;
const auto& parts = multipart_upload_->parts;

bool retval = connection->complete_multipart_object(
bucket_name_, object_key_, upload_id, parts);

connection_pool_->return_connection(std::move(connection));

return retval;
}
Loading

0 comments on commit c9746e1

Please # to comment.