diff --git a/CMakeLists.txt b/CMakeLists.txt index 972731d..436670a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.14 FATAL_ERROR) # Note: update this to your new project's name and version project( RiftenDeque - VERSION 1.2.0 + VERSION 2.0.0 LANGUAGES CXX ) @@ -29,7 +29,7 @@ CPMAddPackage("gh:TheLartians/PackageProject.cmake@1.6.0") add_library(RiftenDeque INTERFACE "include/riften/deque.hpp") -target_compile_features(RiftenDeque INTERFACE cxx_std_17) +target_compile_features(RiftenDeque INTERFACE cxx_std_20) # Enforce standards conformance on MSVC target_compile_options(RiftenDeque INTERFACE "$<$:/permissive>") diff --git a/README.md b/README.md index d8ab51a..2308ec5 100644 --- a/README.md +++ b/README.md @@ -7,23 +7,22 @@ This implementation is based on: - https://github.com/taskflow/work-stealing-queue - https://github.com/ssbl/concurrent-deque -`riften::Deque` places no constraint on the types which can be placed in the deque and has no memory overhead associated with buffer recycling. Furthermore, when possible +`riften::Deque` places very few constraints on the types which can be placed in the deque (they must be trivially destructible and have nothrow move constructor/assignment operators) and has no memory overhead associated with buffer recycling. ## Usage ```C++ - // #include // #include // #include "riften/deque.hpp" - // Work-stealing deque of strings - riften::Deque deque; + // Work-stealing deque of ints + riften::Deque deque; // One thread can push and pop items from one end (like a stack) std::thread owner([&]() { for (int i = 0; i < 10000; i = i + 1) { - deque.emplace(std::to_string(i)); + deque.emplace(i); } while (!deque.empty()) { std::optional item = deque.pop(); diff --git a/include/riften/deque.hpp b/include/riften/deque.hpp index d0e6fb0..dcd062f 100644 --- a/include/riften/deque.hpp +++ b/include/riften/deque.hpp @@ -2,16 +2,18 @@ #include #include +#include #include #include +#include #include #include #include #include -// This (standalone) file implements the deque described in the papers, "Correct and Efficient -// Work-Stealing for Weak Memory Models," and "Dynamic Circular Work-Stealing Deque". Both are avaliable -// in 'reference/'. +// This (stand-alone) file implements the deque described in the papers, "Correct and Efficient +// Work-Stealing for Weak Memory Models," and "Dynamic Circular Work-Stealing Deque". Both are +// available in 'reference/'. namespace riften { @@ -27,11 +29,15 @@ template struct RingBuff { std::int64_t capacity() const noexcept { return _cap; } - // Relaxed store at modulo index - void store(std::int64_t i, T x) noexcept { _buff[i & _mask].store(x, std::memory_order_relaxed); } + // Store at modulo index + void store(std::int64_t i, T&& x) noexcept requires std::is_nothrow_move_assignable_v { + _buff[i & _mask] = std::move(x); + } - // Relaxed load at modulo index - T load(std::int64_t i) const noexcept { return _buff[i & _mask].load(std::memory_order_relaxed); } + // Load at modulo index + T load(std::int64_t i) const noexcept requires std::is_nothrow_move_constructible_v { + return _buff[i & _mask]; + } // Allocates and returns a new ring buffer, copies elements in range [b, t) into the new buffer. RingBuff* resize(std::int64_t b, std::int64_t t) const { @@ -44,34 +50,28 @@ template struct RingBuff { private: std::int64_t _cap; // Capacity of the buffer - std::int64_t _mask; // Bitmask to perform modulo capacity operations + std::int64_t _mask; // Bit mask to perform modulo capacity operations -#if !__cpp_lib_smart_ptr_for_overwrite - std::unique_ptr[]> _buff = std::make_unique[]>(_cap); -#else - std::unique_ptr[]> _buff = std::make_unique_for_overwrite[]>(_cap); -#endif + std::unique_ptr _buff = std::make_unique_for_overwrite(_cap); }; -template struct is_always_lock_free { - static constexpr bool value = std::atomic::is_always_lock_free; -}; +} // namespace detail -template static constexpr bool lock_free_v - = std::conjunction_v, - std::is_copy_constructible, - std::is_move_constructible, - std::is_copy_assignable, - std::is_move_assignable, - is_always_lock_free>; +#ifdef __cpp_lib_hardware_interference_size +using std::hardware_destructive_interference_size; +#else +// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ... +inline constexpr std::size_t hardware_destructive_interference_size = 2 * sizeof(std::max_align_t); +#endif -} // namespace detail +template +concept trivially_destructible = std::is_trivially_destructible_v; -// Lock-free single-producer multiple-consumer deque. There are no constraints on the type `T` that can -// be stored. Only the deque owner can perform pop and push operations where the deque behaves like a -// stack. Others can (only) steal data from the deque, they see a FIFO queue. All threads must have -// finished using the deque before it is destructed. -template class Deque { +// Lock-free single-producer multiple-consumer deque. Only the deque owner can perform pop and push +// operations where the deque behaves like a stack. Others can (only) steal data from the deque, they see +// a FIFO queue. All threads must have finished using the deque before it is destructed. T must be +// trivially destructible and have nothrow move constructor/assignment operators. +template class Deque { public: // Constructs the deque with a given capacity the capacity of the deque (must be power of 2) explicit Deque(std::int64_t cap = 1024); @@ -89,34 +89,30 @@ template class Deque { // Test if empty at instance of call bool empty() const noexcept; - // Emplace an item to the deque. Only the owner thread can insert an item to the deque. The operation - // can trigger the deque to resize its cap if more space is required. Provides the strong exception - // garantee. + // Emplace an item to the deque. Only the owner thread can insert an item to the deque. The + // operation can trigger the deque to resize its cap if more space is required. Provides the + // strong exception guarantee. template void emplace(Args&&... args); - // Pops out an item from the deque. Only the owner thread can pop out an item from the deque. The - // return can be a std::nullopt if this operation failed (empty deque). + // Pops out an item from the deque. Only the owner thread can pop out an item from the deque. + // The return can be a std::nullopt if this operation fails (empty deque). std::optional pop() noexcept; - // Steals an item from the deque Any threads can try to steal an item from the deque. The return can - // be a std::nullopt if this operation failed (not necessary empty). + // Steals an item from the deque Any threads can try to steal an item from the deque. The return + // can be a std::nullopt if this operation failed (not necessarily empty). std::optional steal() noexcept; // Destruct the deque, all threads must have finished using the deque. ~Deque() noexcept; - // If true elements of type `T` are stored directly in the ring buffer. - static constexpr bool no_alloc = std::is_trivially_destructible_v && detail::lock_free_v; - private: - using buffer_t = detail::RingBuff>; + alignas(hardware_destructive_interference_size) std::atomic _top; + alignas(hardware_destructive_interference_size) std::atomic _bottom; + alignas(hardware_destructive_interference_size) std::atomic*> _buffer; - std::atomic _top; // Top of deque - std::atomic _bottom; // Bottom of deque. - std::atomic _buffer; // Current buffer. - std::vector> _garbage; // Store old buffers here. + std::vector>> _garbage; // Store old buffers here. - // Convinience aliases. + // Convenience aliases. static constexpr std::memory_order relaxed = std::memory_order_relaxed; static constexpr std::memory_order consume = std::memory_order_consume; static constexpr std::memory_order acquire = std::memory_order_acquire; @@ -124,27 +120,30 @@ template class Deque { static constexpr std::memory_order seq_cst = std::memory_order_seq_cst; }; -template Deque::Deque(std::int64_t cap) - : _top(0), _bottom(0), _buffer(new buffer_t{cap}) { +template Deque::Deque(std::int64_t cap) + : _top(0), _bottom(0), _buffer(new detail::RingBuff{cap}) { _garbage.reserve(32); } -template std::size_t Deque::size() const noexcept { +template std::size_t Deque::size() const noexcept { int64_t b = _bottom.load(relaxed); int64_t t = _top.load(relaxed); return static_cast(b >= t ? b - t : 0); } -template int64_t Deque::capacity() const noexcept { +template int64_t Deque::capacity() const noexcept { return _buffer.load(relaxed)->capacity(); } -template bool Deque::empty() const noexcept { return !size(); } +template bool Deque::empty() const noexcept { return !size(); } + +template template void Deque::emplace(Args&&... args) { + // Construct before acquiring slot in-case constructor throws + T object(std::forward(args)...); -template template void Deque::emplace(Args&&... args) { std::int64_t b = _bottom.load(relaxed); std::int64_t t = _top.load(acquire); - buffer_t* buf = _buffer.load(relaxed); + detail::RingBuff* buf = _buffer.load(relaxed); if (buf->capacity() < (b - t) + 1) { // Queue is full, build a new one @@ -152,46 +151,38 @@ template template void Deque::emplace(Args&&. _buffer.store(buf, relaxed); } - // Construct new object - if constexpr (no_alloc) { - buf->store(b, {std::forward(args)...}); - } else { - buf->store(b, new T{std::forward(args)...}); - } + // Construct new object, this does not have to be atomic as no one can steal this item until after we + // store the new value of bottom, ordering is maintained by surrounding atomics. + buf->store(b, std::move(object)); std::atomic_thread_fence(release); _bottom.store(b + 1, relaxed); } -template std::optional Deque::pop() noexcept { +template std::optional Deque::pop() noexcept { std::int64_t b = _bottom.load(relaxed) - 1; - buffer_t* buf = _buffer.load(relaxed); - _bottom.store(b, relaxed); + detail::RingBuff* buf = _buffer.load(relaxed); + + _bottom.store(b, relaxed); // Stealers can no longer steal + std::atomic_thread_fence(seq_cst); std::int64_t t = _top.load(relaxed); if (t <= b) { // Non-empty deque if (t == b) { - // The last item could get stolen + // The last item could get stolen, by a stealer that loaded bottom before our write above if (!_top.compare_exchange_strong(t, t + 1, seq_cst, relaxed)) { - // Failed race. + // Failed race, thief got the last item. _bottom.store(b + 1, relaxed); return std::nullopt; } _bottom.store(b + 1, relaxed); } - // Can delay load until after aquiring slot as only this thread can push() - auto x = buf->load(b); - - if constexpr (no_alloc) { - return x; - } else { - std::optional tmp{std::move(*x)}; - delete x; - return tmp; - } + // Can delay load until after acquiring slot as only this thread can push(), this load is not + // required to be atomic as we are the exclusive writer. + return buf->load(b); } else { _bottom.store(b + 1, relaxed); @@ -199,27 +190,25 @@ template std::optional Deque::pop() noexcept { } } -template std::optional Deque::steal() noexcept { +template std::optional Deque::steal() noexcept { std::int64_t t = _top.load(acquire); std::atomic_thread_fence(seq_cst); std::int64_t b = _bottom.load(acquire); if (t < b) { - // Must load *before* aquiring the slot as slot may be overwritten immidiatly after aquiring. - auto x = _buffer.load(consume)->load(t); + // Must load *before* acquiring the slot as slot may be overwritten immediately after acquiring. + // This load is NOT required to be atomic even-though it may race with an overrite as we only + // return the value if we win the race below garanteeing we had no race during our read. If we + // loose the race then 'x' could be corrupt due to read-during-write race but as T is trivially + // destructible this does not matter. + T x = _buffer.load(consume)->load(t); if (!_top.compare_exchange_strong(t, t + 1, seq_cst, relaxed)) { // Failed race. return std::nullopt; } - if constexpr (no_alloc) { - return x; - } else { - std::optional tmp{std::move(*x)}; - delete x; - return tmp; - } + return x; } else { // Empty deque. @@ -227,17 +216,6 @@ template std::optional Deque::steal() noexcept { } } -template Deque::~Deque() noexcept { - if constexpr (!no_alloc) { - // Clean up all remaining items in the deque. - while (!empty()) { - pop(); - } - - assert(empty() && "Busy during destruction"); // Check for interupts. - } - - delete _buffer.load(); -} +template Deque::~Deque() noexcept { delete _buffer.load(); } -} // namespace riften +} // namespace riften \ No newline at end of file diff --git a/test/deque_test.cpp b/test/deque_test.cpp index 027d83f..e6908a9 100644 --- a/test/deque_test.cpp +++ b/test/deque_test.cpp @@ -90,7 +90,6 @@ TEST_CASE("emplace against steals, [deque]") { // Dummy work struct. struct work { int label; - std::string path; }; TEST_CASE("pop and steal, [deque]") { @@ -104,7 +103,7 @@ TEST_CASE("pop and steal, [deque]") { std::vector threads; std::atomic remaining(max); - for (auto i = 0; i < max; ++i) worker.emplace(work{1, "/some/random/path"}); + for (auto i = 0; i < max; ++i) worker.emplace(work{1}); for (auto i = 0; i < nthreads; ++i) { threads.emplace_back([&stealer, &remaining]() { diff --git a/test/example.cpp b/test/example.cpp index fabbf13..8a160da 100644 --- a/test/example.cpp +++ b/test/example.cpp @@ -1,32 +1,30 @@ -#include #include #include "doctest/doctest.h" #include "riften/deque.hpp" TEST_CASE("Examples") { - // #include // #include // #include "riften/deque.hpp" - // Work-stealing deque of strings - riften::Deque deque; + // Work-stealing deque of ints + riften::Deque deque; // One thread can push and pop items from one end (like a stack) std::thread owner([&]() { for (int i = 0; i < 10000; i = i + 1) { - deque.emplace(std::to_string(i)); + deque.emplace(i); } while (!deque.empty()) { - std::optional item = deque.pop(); + [[maybe_unused]] std::optional item = deque.pop(); } }); // While multiple (any) threads can steal items from the other end std::thread thief([&]() { while (!deque.empty()) { - std::optional item = deque.steal(); + [[maybe_unused]] std::optional item = deque.steal(); } }); diff --git a/test/wsq_alloc.cpp b/test/wsq_alloc.cpp deleted file mode 100644 index 82093b1..0000000 --- a/test/wsq_alloc.cpp +++ /dev/null @@ -1,226 +0,0 @@ -// The contents of this file are from: https://github.com/taskflow/work-stealing-queue - -#include -#include -#include -#include -#include -#include - -#include "doctest/doctest.h" -#include "riften/deque.hpp" - -// ============================================================================ -// riften::Deque tests -// ============================================================================ - -struct LikeAnInt { - int i; - - operator int() { return i; } - operator int() const { return i; } - - ~LikeAnInt() {} -}; - -// Procedure: wsq_test_owner -void wsq_test_owner_alloc() { - int64_t cap = 2; - - riften::Deque queue(cap); - std::deque gold; - - REQUIRE(queue.capacity() == 2); - REQUIRE(queue.empty()); - - for (int i = 2; i <= (1 << 16); i <<= 1) { - REQUIRE(queue.empty()); - - for (int j = 0; j < i; ++j) { - queue.emplace(j); - } - - for (int j = 0; j < i; ++j) { - auto item = queue.pop(); - REQUIRE((item && *item == i - j - 1)); - } - REQUIRE(!queue.pop()); - - REQUIRE(queue.empty()); - for (int j = 0; j < i; ++j) { - queue.emplace(j); - } - - for (int j = 0; j < i; ++j) { - auto item = queue.steal(); - REQUIRE((item && *item == j)); - } - REQUIRE(!queue.pop()); - - REQUIRE(queue.empty()); - - for (int j = 0; j < i; ++j) { - // enqueue - if (auto dice = ::rand() % 3; dice == 0) { - queue.emplace(j); - gold.push_back(j); - } - // pop back - else if (dice == 1) { - auto item = queue.pop(); - if (gold.empty()) { - REQUIRE(!item); - } else { - REQUIRE(*item == gold.back()); - gold.pop_back(); - } - } - // pop front - else { - auto item = queue.steal(); - if (gold.empty()) { - REQUIRE(!item); - } else { - REQUIRE(*item == gold.front()); - gold.pop_front(); - } - } - - REQUIRE(queue.size() == (int)gold.size()); - } - - while (!queue.empty()) { - auto item = queue.pop(); - REQUIRE((item && *item == gold.back())); - gold.pop_back(); - } - - REQUIRE(gold.empty()); - - REQUIRE(queue.capacity() == i); - } -} - -// Procedure: wsq_test_n_thieves -void wsq_test_n_thieves_alloc(int N) { - int64_t cap = 2; - - riften::Deque queue(cap); - - REQUIRE(queue.capacity() == 2); - REQUIRE(queue.empty()); - - for (int i = 2; i <= (1 << 16); i <<= 1) { - REQUIRE(queue.empty()); - - int p = 0; - - std::vector> cdeqs(N); - std::vector consumers; - std::deque pdeq; - - auto num_stolen = [&]() { - int total = 0; - for (const auto& cdeq : cdeqs) { - total += static_cast(cdeq.size()); - } - return total; - }; - - for (int n = 0; n < N; n++) { - consumers.emplace_back([&, n]() { - while (num_stolen() + (int)pdeq.size() != i) { - if (auto dice = ::rand() % 4; dice == 0) { - if (auto item = queue.steal(); item) { - cdeqs[n].push_back(*item); - } - } - } - }); - } - - std::thread producer([&]() { - while (p < i) { - if (auto dice = ::rand() % 4; dice == 0) { - queue.emplace(p++); - } else if (dice == 1) { - if (auto item = queue.pop(); item) { - pdeq.push_back(*item); - } - } - } - }); - - producer.join(); - - for (auto& c : consumers) { - c.join(); - } - - REQUIRE(queue.empty()); - REQUIRE(queue.capacity() <= i); - - std::set set; - - for (const auto& cdeq : cdeqs) { - for (auto k : cdeq) { - set.insert(k); - } - } - - for (auto k : pdeq) { - set.insert(k); - } - - for (int j = 0; j < i; ++j) { - REQUIRE(set.find(j) != set.end()); - } - - REQUIRE((int)set.size() == i); - } -} - -// ---------------------------------------------------------------------------- -// Testcase: WSQTest.Owner -// ---------------------------------------------------------------------------- -TEST_CASE("WSQ.Owner.alloc" * doctest::timeout(300)) { wsq_test_owner_alloc(); } - -// ---------------------------------------------------------------------------- -// Testcase: WSQTest.1Thief -// ---------------------------------------------------------------------------- -TEST_CASE("WSQ.1Thief.alloc" * doctest::timeout(300)) { wsq_test_n_thieves_alloc(1); } - -// ---------------------------------------------------------------------------- -// Testcase: WSQTest.2Thieves -// ---------------------------------------------------------------------------- -TEST_CASE("WSQ.2Thieves.alloc" * doctest::timeout(300)) { wsq_test_n_thieves_alloc(2); } - -// ---------------------------------------------------------------------------- -// Testcase: WSQTest.3Thieves -// ---------------------------------------------------------------------------- -TEST_CASE("WSQ.3Thieves.alloc" * doctest::timeout(300)) { wsq_test_n_thieves_alloc(3); } - -// ---------------------------------------------------------------------------- -// Testcase: WSQTest.4Thieves -// ---------------------------------------------------------------------------- -TEST_CASE("WSQ.4Thieves.alloc" * doctest::timeout(300)) { wsq_test_n_thieves_alloc(4); } - -// ---------------------------------------------------------------------------- -// Testcase: WSQTest.5Thieves -// ---------------------------------------------------------------------------- -TEST_CASE("WSQ.5Thieves.alloc" * doctest::timeout(300)) { wsq_test_n_thieves_alloc(5); } - -// ---------------------------------------------------------------------------- -// Testcase: WSQTest.6Thieves -// ---------------------------------------------------------------------------- -TEST_CASE("WSQ.6Thieves.alloc" * doctest::timeout(300)) { wsq_test_n_thieves_alloc(6); } - -// ---------------------------------------------------------------------------- -// Testcase: WSQTest.7Thieves -// ---------------------------------------------------------------------------- -TEST_CASE("WSQ.7Thieves.alloc" * doctest::timeout(300)) { wsq_test_n_thieves_alloc(7); } - -// ---------------------------------------------------------------------------- -// Testcase: WSQTest.8Thieves -// ---------------------------------------------------------------------------- -TEST_CASE("WSQ.8Thieves.alloc" * doctest::timeout(300)) { wsq_test_n_thieves_alloc(8); } \ No newline at end of file