diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 5770718b..4c557174 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -128,4 +128,80 @@ bool Payload::empty() const return std::holds_alternative(bytes_); } +///============================================================================= +BufferPool::BufferPool() +: buffers_(), mutex_() +{ + const char * env_value; + const char * error_str = rcutils_get_env("RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES", &env_value); + if (error_str != nullptr) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Unable to read maximum buffer pool size, falling back to default."); + max_size_ = DEFAULT_MAX_SIZE; + } else if (strcmp(env_value, "") == 0) { + max_size_ = DEFAULT_MAX_SIZE; + } else { + max_size_ = std::atoll(env_value); + } + size_ = 0; +} + +///============================================================================= +BufferPool::~BufferPool() +{ + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + for (Buffer & buffer : buffers_) { + allocator.deallocate(buffer.data, allocator.state); + } +} + +///============================================================================= +BufferPool::Buffer BufferPool::allocate(size_t size) +{ + std::lock_guard guard(mutex_); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + if (buffers_.empty()) { + if (size_ + size > max_size_) { + return {}; + } else { + size_ += size; + } + uint8_t * data = static_cast(allocator.allocate(size, allocator.state)); + if (data == nullptr) { + return {}; + } else { + return Buffer {data, size}; + } + } else { + Buffer buffer = buffers_.back(); + buffers_.pop_back(); + if (buffer.size < size) { + size_t size_diff = size - buffer.size; + if (size_ + size_diff > max_size_) { + return {}; + } + uint8_t * data = static_cast(allocator.reallocate( + buffer.data, size, allocator.state)); + if (data == nullptr) { + return {}; + } + size_ += size_diff; + buffer.data = data; + buffer.size = size; + } + return buffer; + } +} + +///============================================================================= +void BufferPool::deallocate(BufferPool::Buffer buffer) +{ + std::lock_guard guard(mutex_); + buffers_.push_back(buffer); +} + } // namespace rmw_zenoh_cpp