Skip to content

Commit

Permalink
trivial destructor req
Browse files Browse the repository at this point in the history
  • Loading branch information
ConorWilliams committed Sep 10, 2021
1 parent d810109 commit 50d93cb
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 338 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.14 FATAL_ERROR)
# Note: update this to your new project's name and version
project(
RiftenDeque
VERSION 1.2.0
VERSION 2.0.0
LANGUAGES CXX
)

Expand All @@ -29,7 +29,7 @@ CPMAddPackage("gh:TheLartians/PackageProject.cmake@1.6.0")

add_library(RiftenDeque INTERFACE "include/riften/deque.hpp")

target_compile_features(RiftenDeque INTERFACE cxx_std_17)
target_compile_features(RiftenDeque INTERFACE cxx_std_20)

# Enforce standards conformance on MSVC
target_compile_options(RiftenDeque INTERFACE "$<$<COMPILE_LANG_AND_ID:CXX,MSVC>:/permissive>")
Expand Down
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@ This implementation is based on:
- https://github.com/taskflow/work-stealing-queue
- https://github.com/ssbl/concurrent-deque

`riften::Deque` places no constraint on the types which can be placed in the deque and has no memory overhead associated with buffer recycling. Furthermore, when possible
`riften::Deque` places very few constraints on the types which can be placed in the deque (they must be trivially destructible and have nothrow move constructor/assignment operators) and has no memory overhead associated with buffer recycling.

## Usage

```C++
// #include <string>
// #include <thread>

// #include "riften/deque.hpp"

// Work-stealing deque of strings
riften::Deque<std::string> deque;
// Work-stealing deque of ints
riften::Deque<int> deque;

// One thread can push and pop items from one end (like a stack)
std::thread owner([&]() {
for (int i = 0; i < 10000; i = i + 1) {
deque.emplace(std::to_string(i));
deque.emplace(i);
}
while (!deque.empty()) {
std::optional item = deque.pop();
Expand Down
170 changes: 74 additions & 96 deletions include/riften/deque.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <new>
#include <optional>
#include <type_traits>
#include <utility>
#include <vector>

// This (standalone) file implements the deque described in the papers, "Correct and Efficient
// Work-Stealing for Weak Memory Models," and "Dynamic Circular Work-Stealing Deque". Both are avaliable
// in 'reference/'.
// This (stand-alone) file implements the deque described in the papers, "Correct and Efficient
// Work-Stealing for Weak Memory Models," and "Dynamic Circular Work-Stealing Deque". Both are
// available in 'reference/'.

namespace riften {

Expand All @@ -27,11 +29,15 @@ template <typename T> struct RingBuff {

std::int64_t capacity() const noexcept { return _cap; }

// Relaxed store at modulo index
void store(std::int64_t i, T x) noexcept { _buff[i & _mask].store(x, std::memory_order_relaxed); }
// Store at modulo index
void store(std::int64_t i, T&& x) noexcept requires std::is_nothrow_move_assignable_v<T> {
_buff[i & _mask] = std::move(x);
}

// Relaxed load at modulo index
T load(std::int64_t i) const noexcept { return _buff[i & _mask].load(std::memory_order_relaxed); }
// Load at modulo index
T load(std::int64_t i) const noexcept requires std::is_nothrow_move_constructible_v<T> {
return _buff[i & _mask];
}

// Allocates and returns a new ring buffer, copies elements in range [b, t) into the new buffer.
RingBuff<T>* resize(std::int64_t b, std::int64_t t) const {
Expand All @@ -44,34 +50,28 @@ template <typename T> struct RingBuff {

private:
std::int64_t _cap; // Capacity of the buffer
std::int64_t _mask; // Bitmask to perform modulo capacity operations
std::int64_t _mask; // Bit mask to perform modulo capacity operations

#if !__cpp_lib_smart_ptr_for_overwrite
std::unique_ptr<std::atomic<T>[]> _buff = std::make_unique<std::atomic<T>[]>(_cap);
#else
std::unique_ptr<std::atomic<T>[]> _buff = std::make_unique_for_overwrite<std::atomic<T>[]>(_cap);
#endif
std::unique_ptr<T[]> _buff = std::make_unique_for_overwrite<T[]>(_cap);
};

template <typename T> struct is_always_lock_free {
static constexpr bool value = std::atomic<T>::is_always_lock_free;
};
} // namespace detail

template <typename T> static constexpr bool lock_free_v
= std::conjunction_v<std::is_trivially_copyable<T>,
std::is_copy_constructible<T>,
std::is_move_constructible<T>,
std::is_copy_assignable<T>,
std::is_move_assignable<T>,
is_always_lock_free<T>>;
#ifdef __cpp_lib_hardware_interference_size
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
inline constexpr std::size_t hardware_destructive_interference_size = 2 * sizeof(std::max_align_t);
#endif

} // namespace detail
template <typename T>
concept trivially_destructible = std::is_trivially_destructible_v<T>;

// Lock-free single-producer multiple-consumer deque. There are no constraints on the type `T` that can
// be stored. Only the deque owner can perform pop and push operations where the deque behaves like a
// stack. Others can (only) steal data from the deque, they see a FIFO queue. All threads must have
// finished using the deque before it is destructed.
template <typename T> class Deque {
// Lock-free single-producer multiple-consumer deque. Only the deque owner can perform pop and push
// operations where the deque behaves like a stack. Others can (only) steal data from the deque, they see
// a FIFO queue. All threads must have finished using the deque before it is destructed. T must be
// trivially destructible and have nothrow move constructor/assignment operators.
template <trivially_destructible T> class Deque {
public:
// Constructs the deque with a given capacity the capacity of the deque (must be power of 2)
explicit Deque(std::int64_t cap = 1024);
Expand All @@ -89,155 +89,133 @@ template <typename T> class Deque {
// Test if empty at instance of call
bool empty() const noexcept;

// Emplace an item to the deque. Only the owner thread can insert an item to the deque. The operation
// can trigger the deque to resize its cap if more space is required. Provides the strong exception
// garantee.
// Emplace an item to the deque. Only the owner thread can insert an item to the deque. The
// operation can trigger the deque to resize its cap if more space is required. Provides the
// strong exception guarantee.
template <typename... Args> void emplace(Args&&... args);

// Pops out an item from the deque. Only the owner thread can pop out an item from the deque. The
// return can be a std::nullopt if this operation failed (empty deque).
// Pops out an item from the deque. Only the owner thread can pop out an item from the deque.
// The return can be a std::nullopt if this operation fails (empty deque).
std::optional<T> pop() noexcept;

// Steals an item from the deque Any threads can try to steal an item from the deque. The return can
// be a std::nullopt if this operation failed (not necessary empty).
// Steals an item from the deque Any threads can try to steal an item from the deque. The return
// can be a std::nullopt if this operation failed (not necessarily empty).
std::optional<T> steal() noexcept;

// Destruct the deque, all threads must have finished using the deque.
~Deque() noexcept;

// If true elements of type `T` are stored directly in the ring buffer.
static constexpr bool no_alloc = std::is_trivially_destructible_v<T> && detail::lock_free_v<T>;

private:
using buffer_t = detail::RingBuff<std::conditional_t<no_alloc, T, T*>>;
alignas(hardware_destructive_interference_size) std::atomic<std::int64_t> _top;
alignas(hardware_destructive_interference_size) std::atomic<std::int64_t> _bottom;
alignas(hardware_destructive_interference_size) std::atomic<detail::RingBuff<T>*> _buffer;

std::atomic<std::int64_t> _top; // Top of deque
std::atomic<std::int64_t> _bottom; // Bottom of deque.
std::atomic<buffer_t*> _buffer; // Current buffer.
std::vector<std::unique_ptr<buffer_t>> _garbage; // Store old buffers here.
std::vector<std::unique_ptr<detail::RingBuff<T>>> _garbage; // Store old buffers here.

// Convinience aliases.
// Convenience aliases.
static constexpr std::memory_order relaxed = std::memory_order_relaxed;
static constexpr std::memory_order consume = std::memory_order_consume;
static constexpr std::memory_order acquire = std::memory_order_acquire;
static constexpr std::memory_order release = std::memory_order_release;
static constexpr std::memory_order seq_cst = std::memory_order_seq_cst;
};

template <typename T> Deque<T>::Deque(std::int64_t cap)
: _top(0), _bottom(0), _buffer(new buffer_t{cap}) {
template <trivially_destructible T> Deque<T>::Deque(std::int64_t cap)
: _top(0), _bottom(0), _buffer(new detail::RingBuff<T>{cap}) {
_garbage.reserve(32);
}

template <typename T> std::size_t Deque<T>::size() const noexcept {
template <trivially_destructible T> std::size_t Deque<T>::size() const noexcept {
int64_t b = _bottom.load(relaxed);
int64_t t = _top.load(relaxed);
return static_cast<std::size_t>(b >= t ? b - t : 0);
}

template <typename T> int64_t Deque<T>::capacity() const noexcept {
template <trivially_destructible T> int64_t Deque<T>::capacity() const noexcept {
return _buffer.load(relaxed)->capacity();
}

template <typename T> bool Deque<T>::empty() const noexcept { return !size(); }
template <trivially_destructible T> bool Deque<T>::empty() const noexcept { return !size(); }

template <trivially_destructible T> template <typename... Args> void Deque<T>::emplace(Args&&... args) {
// Construct before acquiring slot in-case constructor throws
T object(std::forward<Args>(args)...);

template <typename T> template <typename... Args> void Deque<T>::emplace(Args&&... args) {
std::int64_t b = _bottom.load(relaxed);
std::int64_t t = _top.load(acquire);
buffer_t* buf = _buffer.load(relaxed);
detail::RingBuff<T>* buf = _buffer.load(relaxed);

if (buf->capacity() < (b - t) + 1) {
// Queue is full, build a new one
_garbage.emplace_back(std::exchange(buf, buf->resize(b, t)));
_buffer.store(buf, relaxed);
}

// Construct new object
if constexpr (no_alloc) {
buf->store(b, {std::forward<Args>(args)...});
} else {
buf->store(b, new T{std::forward<Args>(args)...});
}
// Construct new object, this does not have to be atomic as no one can steal this item until after we
// store the new value of bottom, ordering is maintained by surrounding atomics.
buf->store(b, std::move(object));

std::atomic_thread_fence(release);
_bottom.store(b + 1, relaxed);
}

template <typename T> std::optional<T> Deque<T>::pop() noexcept {
template <trivially_destructible T> std::optional<T> Deque<T>::pop() noexcept {
std::int64_t b = _bottom.load(relaxed) - 1;
buffer_t* buf = _buffer.load(relaxed);
_bottom.store(b, relaxed);
detail::RingBuff<T>* buf = _buffer.load(relaxed);

_bottom.store(b, relaxed); // Stealers can no longer steal

std::atomic_thread_fence(seq_cst);
std::int64_t t = _top.load(relaxed);

if (t <= b) {
// Non-empty deque
if (t == b) {
// The last item could get stolen
// The last item could get stolen, by a stealer that loaded bottom before our write above
if (!_top.compare_exchange_strong(t, t + 1, seq_cst, relaxed)) {
// Failed race.
// Failed race, thief got the last item.
_bottom.store(b + 1, relaxed);
return std::nullopt;
}
_bottom.store(b + 1, relaxed);
}

// Can delay load until after aquiring slot as only this thread can push()
auto x = buf->load(b);

if constexpr (no_alloc) {
return x;
} else {
std::optional tmp{std::move(*x)};
delete x;
return tmp;
}
// Can delay load until after acquiring slot as only this thread can push(), this load is not
// required to be atomic as we are the exclusive writer.
return buf->load(b);

} else {
_bottom.store(b + 1, relaxed);
return std::nullopt;
}
}

template <typename T> std::optional<T> Deque<T>::steal() noexcept {
template <trivially_destructible T> std::optional<T> Deque<T>::steal() noexcept {
std::int64_t t = _top.load(acquire);
std::atomic_thread_fence(seq_cst);
std::int64_t b = _bottom.load(acquire);

if (t < b) {
// Must load *before* aquiring the slot as slot may be overwritten immidiatly after aquiring.
auto x = _buffer.load(consume)->load(t);
// Must load *before* acquiring the slot as slot may be overwritten immediately after acquiring.
// This load is NOT required to be atomic even-though it may race with an overrite as we only
// return the value if we win the race below garanteeing we had no race during our read. If we
// loose the race then 'x' could be corrupt due to read-during-write race but as T is trivially
// destructible this does not matter.
T x = _buffer.load(consume)->load(t);

if (!_top.compare_exchange_strong(t, t + 1, seq_cst, relaxed)) {
// Failed race.
return std::nullopt;
}

if constexpr (no_alloc) {
return x;
} else {
std::optional tmp{std::move(*x)};
delete x;
return tmp;
}
return x;

} else {
// Empty deque.
return std::nullopt;
}
}

template <typename T> Deque<T>::~Deque() noexcept {
if constexpr (!no_alloc) {
// Clean up all remaining items in the deque.
while (!empty()) {
pop();
}

assert(empty() && "Busy during destruction"); // Check for interupts.
}

delete _buffer.load();
}
template <trivially_destructible T> Deque<T>::~Deque() noexcept { delete _buffer.load(); }

} // namespace riften
} // namespace riften
3 changes: 1 addition & 2 deletions test/deque_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ TEST_CASE("emplace against steals, [deque]") {
// Dummy work struct.
struct work {
int label;
std::string path;
};

TEST_CASE("pop and steal, [deque]") {
Expand All @@ -104,7 +103,7 @@ TEST_CASE("pop and steal, [deque]") {
std::vector<std::thread> threads;
std::atomic<int> remaining(max);

for (auto i = 0; i < max; ++i) worker.emplace(work{1, "/some/random/path"});
for (auto i = 0; i < max; ++i) worker.emplace(work{1});

for (auto i = 0; i < nthreads; ++i) {
threads.emplace_back([&stealer, &remaining]() {
Expand Down
12 changes: 5 additions & 7 deletions test/example.cpp
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
#include <string>
#include <thread>

#include "doctest/doctest.h"
#include "riften/deque.hpp"

TEST_CASE("Examples") {
// #include <string>
// #include <thread>

// #include "riften/deque.hpp"

// Work-stealing deque of strings
riften::Deque<std::string> deque;
// Work-stealing deque of ints
riften::Deque<int> deque;

// One thread can push and pop items from one end (like a stack)
std::thread owner([&]() {
for (int i = 0; i < 10000; i = i + 1) {
deque.emplace(std::to_string(i));
deque.emplace(i);
}
while (!deque.empty()) {
std::optional item = deque.pop();
[[maybe_unused]] std::optional item = deque.pop();
}
});

// While multiple (any) threads can steal items from the other end
std::thread thief([&]() {
while (!deque.empty()) {
std::optional item = deque.steal();
[[maybe_unused]] std::optional item = deque.steal();
}
});

Expand Down
Loading

0 comments on commit 50d93cb

Please # to comment.