asyncpp
is a C++20 coroutine library that enables you to write asynchronous and parallel code with a clean syntax.
- MSVC (tested for 19.3+)
- Clang (tested for 17+)
- GCC (tested for 13+)
You can probably use anything that properly supports C++20, but the CI runs on rather recent versions of the three main compilers.
- Coroutines:
- Synchronization:
- Utilities:
- Schedulers:
- Other:
If you're new to coroutines, the rest of the documentation may be difficult to understand. This crash course on C++20 coroutines will hopefully give you enough basis to get started using this library.
In C++, coroutine support is split into three distinct layers:
- Compiler layer: what
asyncpp
is built on - Library layer:
asyncpp
itself - Application layer: your application built using
asyncpp
C++20 introduces three new keywords into the language:
co_await <awaitable>;
co_yield <value>;
co_return <value>;
These keywords are different than the ones you are used to, as their behaviour can be "scripted" by implementing a few interfaces. Imagine as if the compiler exposed an API for throw
and you had to implement stack unwinding yourself. As you can guess, using the compiler layer directly would be really cumbersome, that's why we have coroutine libraries.
asyncpp
(and other coroutine libraries) implement the interfaces of the compiler layer and define higher level primitives such as task<T>
, generator<T>
, or mutex
. These primitives are intuitive, and therefore enable practical applications of coroutines.
A small excerpt of what the library layer code might look like:
template <class T>
class task {
awaitable operator co_await() { /* ... */ }
};
You can use the higher level primitives of the coroutine libraries to implement intuitive asynchronous and parallel code:
// With coroutines:
asyncpp::task<int> add(int a, int b) {
co_return a + b;
}
// With OS threading:
std::future<int> add(int a, int b) {
return std::async([a, b]{ return a + b; });
}
Consider the following code using OS threading:
void work(std::mutex& mtx) {
std::cout << "init section" << std::endl;
mtx.lock();
std::cout << "critical section" << std::endl;
mtx.unlock();
}
void my_main(std::mutex& mtx) {
work(mtx);
std::cout << "post section" << std::endl;
}
When you run my_main
, the following will happen in order:
- Entering
my_main
- Entering
work
- Executing init section
- Blocking current thread (assuming mutex not free)
- ... another thread unlocks the mutex
- Resuming current thread
- Executing critical section
- Exiting
work
- Executing post section
- Entering
- Exiting
my_main
As such, the text printed will in all cases be:
init section
critical section
post section
Let's look at a very similar piece of code implemented using coroutines:
example::task<void> work(example::mutex& mtx) {
std::cout << "init section" << std::endl;
co_await mtx;
std::cout << "critical section" << std::endl;
mtx.unlock();
}
example::task<void>void my_main(example::mutex& mtx) {
example::task<void> tsk = work(mtx);
std::cout << "post section" << std::endl;
co_await tsk;
}
In this case, executing my_main
results in the following:
Thread #1:
- Entering
my_main
- Entering
work
@ function preamble- Executing init section
- Suspend coroutine
work
(assuming mutex not free)
- Return control to
my_main
- Executing post section
- Suspend coroutine
my_main
(assumingwork
has not finished yet on another thread)
- Entering
- Return control to caller of
my_main
Thread #2 (some time later...):
- Unlock mutex
- Entering
work
@ afterco_await mtx
- Executing critical section
- Returning control to caller and specifying next coroutine to run
- Entering
my_main
@ afterco_await tsk
- Return control to caller in thread #2
The application now may print (as described above):
init section
post section
critical section
But could also print, given different thread interleavings:
init section
critical section
post section
This looks rather complicated, but the key idea is that when you call work(mtx)
, it does not run immediately in the current thread, like a function, but runs asynchronously on any thread. Consequently, you also can't just access its return value, you must synchronize using co_await
.
When it comes to co_await
, it is not a thread synchronization primitive like std::condition_variable
. co_await
does not actually block the current thread, it merely suspends the coroutine at a so called suspension point. Later, the thread that completes the coroutine that is being co_await
ed (in this case, work
), is responsible for continuing the suspended coroutine (in this case, my_main
). You can view this as cooperative multi-tasking as opposed to the operating system's preemptive multi-tasking. Suspension points are introduced by the co_await
and co_yield
statements, though coroutines also have an initial and final suspension point. The latter two are more important for library developers.
When approaching coroutines from a procedural programming standpoint, it's logical to think about coroutines as special function that can be suspended mid-execution. I'd rather suggest the other way around: consider functions as special coroutines that have zero suspension points. This is especially useful for C++, where we can implement different types of coroutines via coroutine libraries, because we can regard plain old functions, as well as tasks, generators, streams, etc., as a "subclass" of a general coroutine.
Tasks are coroutines that can co_await
other tasks and co_return
a single value:
// A compute-heavy asynchronous computation.
task<float> det(float a, float b, float c) {
co_return std::sqrt(b*b - 4*a*c);
}
// An asynchronous computation that uses the previous one.
task<float> solve(float a, float b, float c) {
task<float> computation = det(a, b, c); // Launch the async computation.
const float d = co_await computation; // Wait for the computation to complete.
co_return (-b + d) / (2 * a); // Once complete, process the results.
}
In asyncpp
, tasks are lazy, meaning that they don't start executing until you co_await
them. You can also force a task to start executing using launch
:
task<void> t1 = work(); // Lazy, does not start.
task<void> t2 = launch(work()); // Eager, does start.
Tasks come in two flavours:
task
:- Movable
- Not copyable
- Can only be
co_await
ed once
shared_task
:- Movable
- Copyable: does not launch multiple coroutines, just gives you multiple handles to the result
- Can be
co_await
ed any number of times- Repeatedly in the same thread
- Simultaneously from multiple threads: each thread must have its own copy!
Generator can generate multiple values using co_yield
, this is what sets them apart from tasks that can generate only one result. The generator below generates an infinite number of results:
generator<int> iota() {
int value = 0;
while (true) {
co_yield value++;
}
}
To access the sequence of generated values, you can use iterators:
// Prints numbers 1 to 100
generator<int> g = iota();
auto it = g.begin();
for (int index = 0; index < 100; ++index, ++it) {
std::cout << *it << std::endl;
}
You can also take advantage of C++20 ranges:
// Prints numbers 1 to 100
for (const auto value : iota() | std::views::take(100)) {
std::cout << value << std::endl;
}
Unlike tasks, generators are always synchronous, so they run on the current thread when you advance the iterator, and you don't have to await them.
Streams are a mix of tasks and generators: like generators, they can yield multiple values, but like tasks, you have to await each value. You can think of a stream as a generator<task<T>>
, but with a simpler interface and a more efficient implementation. Like tasks, streams are also asynchronous, and can be set to run on another thread.
The iota
function looks the same when implemented as a stream
:
stream<int> iota() {
int value = 0;
while (true) {
co_yield value++;
}
}
The difference is in how you access the yielded elements:
auto s = iota();
while (const auto it = co_await s) {
std::cout << *it << std::endl;
}
You can co_await
the stream object multiple times, and each time it will give you an "iterator" to the latest yielded value. Before retrieving the value from the iterator, you have to verify if it's valid. An invalid iterator signals the end of the stream.
Events allow you to signal the completion of an operation in one task to another task:
task<void> producer(std::shared_ptr<event<int>> evt) {
evt->set_value(100);
}
task<void> consumer(std::shared_ptr<event<int>> evt) {
std::cout << (co_await *evt) << std::endl;
}
const auto event = std::make_shared<event<int>>();
launch(producer(event));
launch(consumer(event));
The relationship between event
and broadcast_event
is similar to that of task
and shared_task
: you can await an event
only once, and only from one thread at the same time, but you can await a broadcast_event
multiple times without thread-safety concerns. It's important to note that both event
and broadcast_event
are neither movable nor copyable.
While events can be useful on their own, they can also be used to implement higher level primitives, such as the usual std::promise / std::future
pair. In fact, task
and shared_task
are implemented using event
and broadcast_event
, respectively.
Mutexes and shared mutexes in asyncpp are virtually equivalent to their standard library counterparts, but they are tailored for coroutine contexts.
The interfaces have been slightly modified to make them suitable for coroutines:
task<void> lock_mutex(mutex& mtx) {
co_await mtx; // Locks the mutex.
mtx.unlock();
}
task<void> lock_shared_mutex(shared_mutex& mtx) {
// Lock exclusively:
co_await mtx.exclusive();
mtx.unlock();
// Lock shared:
co_await mtx.shared();
mtx.unlock_shared();
}
asyncpp
also comes with its own unique_lock
and shared_lock
that help you manage mutexes in an RAII fashion:
task<void> lock_mutex(mutex& mtx) {
unique_lock lk(co_await mtx); // Locks the mutex.
}
task<void> lock_shared_mutex(shared_mutex& mtx) {
{
unique_lock lk(co_await mtx.exclusive());
}
{
shared_lock lk(co_await mtx.shared());
}
}
Semaphores are also very similar to their standard library counterparts. There is a counting_semaphore
variant, where you can specify the maximum value of the semaphore's counter, and there is a binary_semaphore
variant that specifies the maximum value to one.
Usage of semaphores is similar to the standard library equivalent, but you need to use ˙co_await` to acquire the semaphore:
counting_semaphore sema(0, 16); // Counter may go up to 16, current value at 0.
// A coroutine
launch([](counting_semaphore& sema){
co_await sema;
std::cout << "This runs second." << std::endl;
}(sema));
// Another coroutine
launch([](counting_semaphore& sema){
std::cout << "This runs first." << std::endl;
sema.release();
}(sema));
Unlike the standard library semaphore, semaphores in asyncpp
don't have an implementation defined upper limit for the counter so you can go up to std::numeric_limits<ptrdiff_t>::max()
. In asyncpp
, semaphores will also complain (via std::terminate
) if you exceed the maximum value of the counter by releasing too many times.
To retrieve the result of a coroutine, we must co_await
it, however, only a coroutine can co_await
another one. Then how is it possible to wait for a coroutine's completion from a plain old function? For this purpose, asnyncpp
provides join
:
#include <asyncpp/join.hpp>
using namespace asyncpp;
task<int> coroutine() {
co_return 0;
}
int main() {
return join(coroutine());
}
Join uses OS thread-synchronization primitives to block the current thread until the coroutine is finished. Join can be used for anything that can be co_await
ed: tasks, streams, events, and even mutexes.
Just like with mutexes, you shouldn't use threading primitives like std::this_thread::sleep_for
inside coroutines. As a replacement, asyncpp
provides a coroutine-friendly sleep_for
and sleep_until
method:
task<void> sleepy() {
co_await sleep_for(20ms);
}
Sleeping is implemented by a background thread that manages a priority queue of coroutines that have been put to sleep. The background thread then uses the operating system's sleep functions to wait until the next coroutine has to be awoken. It then awakes that coroutine, and goes back to sleep until the next one. There is not busy loop that wastes CPU power.
Think about the following code:
task<measurement> measure() {
co_return heavy_computation();
}
task<report> analyze() {
auto t = measurement(); // This line doesn't do any heavy computation because tasks are lazy.
const auto m = co_await t; // This line does the heavy computation.
co_return make_report(m);
}
This code may be asynchronous, but it does not use multiple threads. To achieve that, we are going to need schedulers, for example a thread_pool
:
task<measurement> measure() {
co_return heavy_computation();
}
task<report> analyze(thread_pool& sched) {
auto t = launch(measurement(), sched); // Execution of the heavy computation will start on the thread pool ASAP.
const auto m = co_await t; // We suspend until the thread pool finished the heavy computation.
co_return make_report(m); // We countinue as soon as the results are available.
}
launch
can optionally take a second argument, which is the scheduler that the coroutine will run on. In this case, it is the thread pool, and the coroutine will be assigned to any if its threads.
Let's assume that we did not specify a scheduler for analyze
, even though measure
is running on the thread pool. In this case, when measure
is finished, as the thread pool's thread returns control to the caller of measure
, that is, analyze
, it will cause analyze
's second part to also run on the thread pool.
However, if you specify a scheduler for both analyze
and measure
, they will at all times respect their target schedulers. As such, analyze
s second part is guaranteed to run on analyze
's scheduler, even if measure
runs on the thread pool.
The thread pool is currently the only scheduler in asyncpp
. It's a traditional thread pool with multiple threads that wait and execute coroutines when they become ready for execution. The thread pool uses work stealing to dynamically distribute the workload. When saturated with tasks, the thread pool incurs no synchronization overhead and is extremely fast.
asyncpp
's coroutines are allocator aware in the same fashion as described in the proposal for generators.
This means that you can specify the allocator used for dynamically allocating the coroutine's promise using a template parameter:
namespace asyncpp::pmr {
template <class T>
using task<T> = asyncpp::task<T, std::pmr::polymorphic_allocator<>>;
}
Since there is no other way of passing data to the coroutine's creation than the parameter list, you have to include the allocator there. To differentiate the allocator argument from other, generic arguments, you have to pass it first, prepended by std::allocator_arg
, like so:
pmr::task<int> coroutine(std::allocator_arg_t, std::pmr::polymorphic_allocator<>, int a, int b) {
co_return a + b;
}
Just like in the generator proposal, type-erased allocators are supported via setting the allocator template parameter to void
. The code below will function the exact same way as the previous snippet:
task<int, /* Alloc = */ void> coroutine(std::allocator_arg_t, std::pmr::polymorphic_allocator<>, int a, int b) {
co_return a + b;
}
The need for specifying allocators comes from the fact that coroutines have to do dynamic allocation on creation, as their body's state cannot be placed on the stack, it must be placed on the heap to survive suspension and possible moves to another thread. Allocators help you control how exactly the coroutine's state will be allocated. (Note: compilers may do a Heap Allocation eLimination Optimization (HALO) to avoid allocations altogether, but asyncpp
's coroutines use a design for parallelism that is difficult to optimize by the compilers.)
If asyncpp
does not provide all that you need, but neither does another coroutine library that you considered using, it might seem like a good idea to combine the two.
Combining coroutine libraries is surprisingly easy to do: since they all have to conform to a strict interface provided by the compiler, they are largely interoperable and you can just mix and match primitives. While mixing is not that likely to render your code completely broken, it can introduce some unexpected side effects.
asyncpp::task<void> coroutine(coro::mutex& mutex) {
coro::scoped_lock lk = co_await mutex;
// Do work inside critical section...
}
This code will execute correctly, but with a caveat: if you have bound coroutine
to an asyncpp::scheduler
, libcoro
will not respect that, and coroutine
will not be continued on the bound scheduler, but rather on whichever thread unlocked the mutex.
coro::task<void> coroutine(asyncpp::event<void>& evt) {
co_await evt;
}
This code will not actually compile, because asyncpp
's primitives can only be awaited by coroutines that implement asyncpp
's resumable_promise
interface, which libcoro
's task
obviously does not. While this limitation, or feature, whichever you think it is, could be relaxed, there are no current plans for it.
If this code actually compiled, it would likely work exactly as expected.
As asyncpp
has its own little ecosystem that consist of a few interfaces, extending asyncpp is as easy as implementing those interfaces, and they will cooperate seemlessly with the rest of asyncpp
.
To implement a new coroutine like task
, generator
, or stream
, your coroutine's promise has to satisfy the following interfaces:
#include <asyncpp/promise.hpp>
class my_coroutine {
struct my_promise
: asyncpp::resumable_promise,
asyncpp::schedulable_promise,
asyncpp::allocator_aware_promise<Alloc>,
{
void resume() override; // resumable_promise
void resume_now() override; // schedulable_promise
};
// ...
}
While all these interfaces are optional to implement, you may miss out on specific functionality if you don't implement them:
resumable_promise
gives you control over how your coroutine is resumed from a suspended state. Typically, you can either resume the coroutine immediately, or enqueue it on a scheduler. This interface is required for your coroutine to be able toco_await
asyncpp
primitives.schedulable_promise
enables schedulers to resume a coroutine immediately on the current thread. You have to implement this interface in order to bind your coroutine to an asyncpp scheduler.allocator_aware_promise
makes your promise support allocators. It's more of a mixin than an interface, you don't have to implement anything. It's also completely optional.
#include <asyncpp/scheduler.hpp>
class my_scheduler : public asyncpp::scheduler {
public:
void schedule(asyncpp::schedulable_promise& promise) override {
promise.resume_now();
}
};
This scheduler will just resume the scheduled promise on the current thread immediately.
#include <asyncpp/promise.hpp>
struct my_primitive {
asyncpp::resumable_promise* m_enclosing = nullptr;
bool await_ready(); // Implement as you wish.
// Make sure you give special treatment to resumable_promises.
template <std::convertible_to<const asyncpp::resumable_promise&> Promise>
bool await_suspend(std::coroutine_handle<Promise> enclosing) {
// Store the promise that's awaiting my_primitive.
// Later, you should resume it using resumable_promise::resume().
m_enclosing = &enclosing.promise();
}
void await_resume(); // Implement as you wish.
}
When you await a synchronization primitive, like my_primitive
, you have the opportunity to store the awaiting coroutine's handle in await_suspend
. If the awaiting coroutine is an asyncpp
coroutine, it will have the resumable_promise
interface implemented, and you should give it special treatment: instead of simply resuming the coroutine handle, you should use the resumable_promise
's resume
method, which will take care of using the proper scheduler.
Check out the section of the documentation about extending asyncpp
. This should make adding a new feature fairly straightforward.
Multi-threaded code is notoriously error-prone, therefore proper tests are expected for every new feature. asyncpp
comes with a mini framework to exhaustively test multi-threaded code by running all possible thread interleavings. Interleaved tests should be present when applicable. Look at existing tests for reference.
Strive to make code as short and simple as possible, and break it down to pieces as small as possible. This helps with testing and makes it easier to reason about the code, hopefully making it less riddled with bugs despite the complicated nature of multi-threading.
You can always open an issue if you find a bug. Please provide as much detail as you can, including source code to reproduce, if possible. Multi-threaded bugs are difficult to reproduce so all information available will likely be necessary.
I've used libcoro as well as cppcoro for inspiration.
asyncpp
is distributed under the MIT license, therefore can be used in commercial and non-commercial projects alike with very few restrictions.