一个用于构建静态定义、高性能并发系统的C++20协程框架
A C++20 coroutine framework for building statically-defined, high-performance concurrent systems
Coflux is a modern concurrency framework built on C++20 Coroutines.
Coflux is designed with the Structured Concurrency task/fork model and the "Task-as-Context" heterogeneous computing concept, aiming to statically describe a safe and predictable concurrent system at compile time.
Structured Concurrency and Task-as-Context jointly articulate the core philosophy: the "Static Channel." This ensures that once all asynchronous work is initiated, it executes smoothly along a predetermined path, much like water flow in a channel.
- Structured Concurrency: A generalized RAII-style lifecycle management. Each
taskderives childforks via theenvironment protocol, syntactically eliminating "orphan tasks." - Task-as-Context: There is no external
context. Eachtaskitself is a complete, isolated execution environment that supports the operation of allforks. - Heterogeneous Execution: The
scheduleris designed as a template-basedexecutorcluster, enabling tasks within the same concurrent scope to run in different execution contexts, achieving high decoupling from lifecycle management. - PMR Memory Model: Integration with
std::pmrvia theenvironment protocolallows users to inject custom, high-performance memory allocation strategies for different concurrent scopes at runtime. - Modern C++ Design: Utilizing modern C++ language features and advanced design, we strive for the elegance of "conciseness and profound meaning" (Micro-Statement, Grand-Meaning) in the source code.
Coflux's design is driven by several core concepts.
- Structured Concurrency based on the
task/forkmodel, eliminating orphan tasks and memory leaks. - PMR support and Heterogeneous Execution support based on Task-as-Context.
- Static Channel philosophy: pursuing zero-cost abstraction to describe an asynchronous concurrent system to the maximum extent at compile time.
To delve deeper into Structured Concurrency, Task-as-Context, and the introduction of the Static Channel, please refer to the Design and Architecture Document (ARCHITECTURE.md).
Coflux is architected around the philosophy of "Static Channels," aiming to deliver minimal abstraction overhead and highly efficient multi-core scalability. The following key performance metrics validate Coflux's standing in the C++20 coroutine runtime space:
| Key Metric | Result | Significance |
|---|---|---|
| Single-Task Core Overhead |
|
The core framework mechanism is extremely lightweight, resulting in very low coroutine life-cycle cost. |
| Net M:N Scheduling Overhead | The Work-Stealing scheduler scales tasks efficiently across multiple cores at a minimal cost. | |
| High Concurrency Throughput | Sustains a robust throughput of 1.95 Million operations per second under high contention, demonstrating the stability and robustness of the synchronization mechanism. | |
| Sequential Dependency Handling |
|
Very low latency for the complete coroutine suspend-schedule-resume cycle, making it ideal for I/O-intensive Pipelines. |
For detailed methodology, hardware specifications, and complete data analysis, please refer to BENCHMARK.md.
- Extreme Micro-Overhead: Controls the entire asynchronous task life-cycle cost (including coroutine frame, PMR memory, and structured destruction) at the sub-300 ns level.
- Efficient and Predictable Concurrency: The Work-Stealing scheduler enables safe multi-core scaling at an extremely low cost, while perfect integration with C++ PMR avoids performance bottlenecks common in traditional concurrency models.
- Structural Concurrency Guarantee: Combined with high-performance data, Coflux provides a safe, predictable, and high-throughput C++20 coroutine runtime environment.
The example below demonstrates defining a root task (server_task) that runs on the main thread (noop_executor), which then derives three child task chains running on a thread pool (thread_pool_executor).
Since noop does nothing, the task coroutine frame will resume on the thread_pool_executor until co_return. However, the main thread's RAII will block until all tasks are complete.
#include <iostream>
#include <string>
#include <coflux/scheduler.hpp>
#include <coflux/combiner.hpp>
#include <coflux/task.hpp>
using pool = coflux::thread_pool_executor<>;
using timer = coflux::timer_executor;
using sche = coflux::scheduler<pool, timer>;
// Simulate asynchronous reading of a network request
coflux::fork<std::string, pool> async_read_request(auto&&, int client_id) {
std::cout << "[Client " << client_id << "] Waiting for request..." << std::endl;
co_await std::chrono::milliseconds(200 + client_id * 100);
co_return "Hello from client " + std::to_string(client_id);
}
// Simulate asynchronous writing back of a network response
coflux::fork<void, pool> async_write_response(auto&&, const std::string& response) {
std::cout << " -> Echoing back: '" << response << "'" << std::endl;
co_await std::chrono::milliseconds((rand() % 5) * 100);
co_return;
}
// Handle a single connection using structured concurrency
coflux::fork<void, pool> handle_connection(auto&&, int client_id) {
try {
auto&& env = co_await coflux::context();
auto request = co_await async_read_request(env, client_id);
auto processed_response = request + " [processed by server]";
co_await async_write_response(env, processed_response);
std::cout << "[Client " << client_id << "] Connection handled successfully." << std::endl;
}
catch (const std::exception& e) {
std::cerr << "[Client " << client_id << "] Error: " << e.what() << std::endl;
}
// When handle_connection finishes, all forks it created (read/write) are automatically cleaned up
}
int main() {
std::cout << "--- Demo: Structured Concurrency with when_all ---\n";
{
auto env = coflux::make_environment<sche>(pool{ 4 }, timer{});
auto server_task = [](auto env) -> coflux::task<void, pool, sche> {
std::cout << "Server task starting 3 concurrent connections...\n";
co_await coflux::when_all(
handle_connection(co_await coflux::context(), 1),
handle_connection(co_await coflux::context(), 2),
handle_connection(co_await coflux::context(), 3)
);
std::cout << "All connections handled.\n";
}(env);
// When server_task destructs, RAII automatically blocks the main thread
// until server_task and all its child forks (handle_connection) are complete.
}
std::cout << "\n--- Demo Finished ---\n";
return 0;
}graph TD
server_task-->handle_connection1
server_task-->handle_connection2
server_task-->handle_connection3
handle_connection1-->async_read_request1
async_read_request1-->async_write_response1
handle_connection2-->async_read_request2
async_read_request2-->async_write_response2
handle_connection3-->async_read_request3
async_read_request3-->async_write_response3
The example below demonstrates asynchronous code using both coroutine style and callback style.
Code in the form auto res = co_await std::move(t).on_xxx().on_xxx() describes a "just-in-time configuration" fast path.
Coflux guarantees that co_await is implemented non-blockingly after all on_xxx calls, which aligns with the value semantics model: the value received by on_value is obtained from an internal lvalue copy, while res will be assigned the rvalue result of t.
To simulate exceptions in synchronous scenarios, Coflux guarantees that a specific exception can only be caught across threads or consumed once by on_error. After this, repeated calls to get_result will throw std::runtime_error("Can't get result because there is an exception.").
#include <iostream>
#include <string>
#include <coflux/scheduler.hpp>
#include <coflux/task.hpp>
using pool = coflux::thread_pool_executor<>;
using timer = coflux::timer_executor;
using sche = coflux::scheduler<pool, timer>;
// Helper function: Simulate async IO
coflux::fork<std::string, pool> async_fetch_data(auto&&, std::string data, std::chrono::milliseconds delay) {
co_await delay;
co_return "Fetched$" + data;
}
// Helper function: Simulate async IO (will fail)
coflux::fork<std::string, pool> async_fetch_data_error(auto&&) {
co_await std::chrono::milliseconds(50);
throw std::runtime_error("Data fetch failed!");
co_return "";
}
int main() {
std::cout << "--- Demo: Mixed Style (co_await + Chaining) ---\n";
{
auto env = coflux::make_environment<sche>(pool{ 2 }, timer{});
auto launch = [&](auto env) -> coflux::task<void, pool, sche> {
auto ctx = co_await coflux::context();
std::atomic<bool> success_called = false;
std::atomic<bool> error_called = false;
// Demonstrate success path
std::cout << "Awaiting success task with .on_value()...\n";
std::string result = co_await async_fetch_data(ctx, "SuccessData", std::chrono::milliseconds(50))
.on_value([&](const std::string& s) {
std::cout << " [on_value callback] Fired for: " << s << "\n";
success_called = true;
})
.on_error([&](auto) { // Will not execute
});
std::cout << " [co_await result] Got: " << result << "\n";
// Demonstrate failure path
std::cout << "Awaiting error task with .on_error()...\n";
try {
// co_await an rvalue task
co_await async_fetch_data_error(ctx)
.on_value([&](auto) { // Will not execute
})
.on_error([&](std::exception_ptr e) {
std::cout << " [on_error callback] Fired! Exception consumed.\n";
error_called = true;
});
}
catch (const std::runtime_error& e) {
// After the exception is handled by on_error, get_result() will throw No_result_error
std::cout << " [co_await catch] Correctly caught: " << e.what() << "\n";
}
assert(success_called.load());
assert(error_called.load());
};
auto demo_task = launch(env);
// RAII destruction will wait for demo_task to complete
}
std::cout << "\n--- Demo Finished ---\n";
return 0;
}The example below demonstrates packaging a synchronous task into a fork and specifying its executor.
fork_view can be passed within the scope of the same task to observe the result of a fork, meaning complex dependency graphs can be formed.
#include <iostream>
#include <string>
#include <coflux/scheduler.hpp>
#include <coflux/task.hpp>
#include <coflux/combiner.hpp>
using pool = coflux::thread_pool_executor<>;
using timer = coflux::timer_executor;
using sche = coflux::scheduler<pool, timer>;
int main() {
std::cout << "--- Demo: `make_fork` and `fork_view` Dependency Graph ---\n";
{
auto env = coflux::make_environment<sche>(pool{ 3 }, timer{});
// 1. Define synchronous/asynchronous callables
// Wrap a "std::" function (or similar synchronous lambda)
auto sync_fetch_user_id = [](const std::string& username) -> int {
std::cout << " [Task A] (Sync) Fetching ID for '" << username << "'\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return std::stoi(username.substr(username.find_first_of('$') + 1));
};
// B and C depend on the result of A
auto fetch_user_name = [](auto&&, coflux::fork_view<int> id_view) -> coflux::fork<std::string, pool> {
int id = co_await id_view;
std::cout << " [Task B] (Async) Getting name for ID " << id << "\n";
co_return "Daking";
};
auto fetch_user_perms = [](auto&&, coflux::fork_view<int> id_view) -> coflux::fork<std::string, pool> {
int id = co_await id_view;
std::cout << " [Task C] (Async) Getting perms for ID " << id << "\n";
co_return "Admin";
};
auto launch = [&](auto env) -> coflux::task<void, pool, sche> {
auto ctx = co_await coflux::context();
// 2. Use make_fork to "fork-ify" the synchronous function
auto get_id_fork_factory = coflux::make_fork<pool>(sync_fetch_user_id, ctx);
// 3. Execution Graph
auto id_task = get_id_fork_factory("daking$123");
auto id_view = id_task.get_view(); // Share the result
// 4. B and C launch concurrently
auto name_task = fetch_user_name(ctx, id_view);
auto perms_task = fetch_user_perms(ctx, id_view);
// 5. Wait for the final result
auto [name, perms] = co_await coflux::when_all(name_task, perms_task);
std::cout << " [Result] User: " << name << ", Permissions: " << perms << "\n";
};
auto demo_task = launch(env);
// RAII destruction waits for completion
}
std::cout << "\n--- Demo Finished ---\n";
return 0;
}graph TD
demo_task-->id_task
demo_task-->name_task
demo_task-->perms_task
name_task-->id_task
perms_task-->id_task
The example below demonstrates how an asynchronous data flow can non-blockingly participate in a synchronous call chain via co_await(vec | when(n)) (integrated with ranges).
co_await acts as a synchronization point, connecting the asynchronous environment with the synchronous environment by resolving asynchronous tasks into synchronous results.
#include <iostream>
#include <string>
#include <coflux/scheduler.hpp>
#include <coflux/combiner.hpp>
#include <coflux/task.hpp>
using pool = coflux::thread_pool_executor<>;
using timer = coflux::timer_executor;
using sche = coflux::scheduler<pool, timer>;
// Helper function: Simulate async IO
coflux::fork<std::string, pool> async_fetch_data(auto&&, std::string data, std::chrono::milliseconds delay) {
co_await delay;
co_return "Fetched$" + data;
}
int main() {
std::cout << "--- Demo: Async Pipeline with `when(n)` ---\n";
{
auto env = coflux::make_environment<sche>(pool{ 5 }, timer{});
auto launch = [&](auto env) -> coflux::task<void, pool, sche> {
auto ctx = co_await coflux::context();
std::vector<coflux::fork<std::string, pool>> downloads;
// Start 5 downloads with different speeds
downloads.push_back(async_fetch_data(ctx, "File 1 (200ms)", std::chrono::milliseconds(200)));
downloads.push_back(async_fetch_data(ctx, "File 2 (50ms)", std::chrono::milliseconds(50)));
downloads.push_back(async_fetch_data(ctx, "File 3 (300ms)", std::chrono::milliseconds(300)));
downloads.push_back(async_fetch_data(ctx, "File 4 (10ms)", std::chrono::milliseconds(10)));
downloads.push_back(async_fetch_data(ctx, "File 5 (70ms)", std::chrono::milliseconds(70)));
std::cout << "Starting 5 downloads, waiting for the first 3 to complete...\n";
// `co_await(vec | when(n))`
// Wait for the 3 *fastest* tasks out of 5 to complete, and process to remove the "Fetched$" prefix
std::cout << "\n [Result] The first 3 completed files were:\n";
for (const auto& s : co_await(downloads | coflux::when(3)) |
std::views::transform([](auto&& s) { return s.substr(s.find_first_of('$') + 1); }))
{
std::cout << " -> " << s << "\n";
}
};
auto demo_task = launch(env);
// RAII destruction waits for all tasks (including those not co_await-ed) to complete
}
std::cout << "--- Demo Finished ---\n";
return 0;
}The executor thread group, worker_group<N>, defines worker_group<N>::worker<M>, can be used to locate the worker_group<N> and designate a task to the corresponding thread.
Users can fully rely on this executor to customize a user-space event loop.
The following demonstrates the working mode of the executor thread group, where the initial task executor is specified as noop purely to showcase the dispatch functionality.
dispatch can forcibly switch the executor, but this is only temporary. The next co_await will resume on the default executor, unless after is used.
after(..., exec) or ... | after(exec) can intercept the default executor's behavior of injecting execution segments into the task/fork or awaitable_closure. In this case, the caller is resumed using the user-specified executor.
However, this is conditional on the task/fork or awaitable_closure not completing instantly (i.e., if it completes in an extremely short time, after will have no effect).
Therefore, the role of after is to maintain the relay of the execution context, preventing the insertion of execution segments from the default executor.
#include <iostream>
#include <string>
#include <coflux/scheduler.hpp>
#include <coflux/combiner.hpp>
#include <coflux/task.hpp>
// Define noop as the starting executor
using noop = coflux::noop_executor;
// Define a worker group with 2 threads
using group = coflux::worker_group<2>;
// Define the scheduler using noop and the 2-thread worker group
using sche = coflux::scheduler<noop, group>;
int main() {
std::cout << "--- Demo: thread executor group (Worker Group) ---\n";
{
// Create the environment with the defined scheduler
auto env = coflux::make_environment<sche>();
// Define the demo task, starting with noop executor
auto demo_task = [](auto env) -> coflux::task<void, noop, sche> {
auto& sch = co_await coflux::get_scheduler();
std::cout << "Initial thread: " << std::this_thread::get_id() << "\n";
// Dispatch to the worker 0 of the worker group
co_await coflux::this_task::dispatch(sch.get<group::worker<0>>());
// Execution context switched to worker thread 0 for subsequent tasks
std::cout << "After dispatch to worker 0, thread: " << std::this_thread::get_id() << "\n";
auto&& ctx = co_await coflux::context();
// Fork that executes on worker 1
auto fork_on_worker1 = [](auto&&, int id) -> coflux::fork<void, group::worker<1>> {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::cout << " [Worker 1] Processing ID: " << id << " on thread " << std::this_thread::get_id() << "\n";
co_return;
};
// Fork that executes on worker 0
auto fork_on_worker0 = [](auto&&, int id) -> coflux::fork<void, group::worker<0>> {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::cout << " [Worker 0] Processing ID: " << id << " on thread " << std::this_thread::get_id() << "\n";
co_return;
};
for (int i = 0; i < 5; i++) {
if (i & 1) {
// Fork runs on worker 1. The 'after' combinator specifies that the main task
// should resume on worker 1 after the fork completes.
co_await(fork_on_worker1(ctx, i) | coflux::after(sch.get<group::worker<1>>()));
std::cout << " [Main Task] on thread " << std::this_thread::get_id() << "\n";
}
else {
// Fork runs on worker 0. The 'after' combinator specifies that the main task
// should resume on worker 0 after the fork completes.
co_await(fork_on_worker0(ctx, i) | coflux::after(sch.get<group::worker<0>>()));
std::cout << " [Main Task] on thread " << std::this_thread::get_id() << "\n";
}
}
}(env);
}
std::cout << "--- Demo Finished ---\n";
return 0;
}There are two types of channels: channel<T[N]> and channel<T[]> (for unbuffered channels, channel<T> is often used in practice, but the context implies an array syntax for buffer size
-
channel<T[N]>(Buffered Channel): This channel has a buffer size of$N$ . Coroutines will not be suspended regardless of whether the read or write operation succeeds or fails. It is used for efficient information transfer. -
channel<T[]>(Unbuffered Channel / Handshake Channel): This channel is unbuffered (or has a size of$0$ ), which will cause the coroutine to suspend until the matching operation (write for a read, or read for a write) occurs. It is used for handshaking/synchronization.
The operations co_await (chan << t) (write) or co_await (chan >> t) (read) return a boolean value, indicating whether the operation was successful.
The example below demonstrates two independently running producers and two consumers running on a thread pool communicating using a channel.
#include <iostream>
#include <string>
#include <coflux/scheduler.hpp>
#include <coflux/combiner.hpp>
#include <coflux/task.hpp>
#include <coflux/generator.hpp>
using noop = coflux::noop_executor;
using pool = coflux::thread_pool_executor<>;
using group = coflux::worker_group<2>;
using sche = coflux::scheduler<noop, group, pool>;
int main() {
std::cout << "--- Demo: channel<int[N]> ---\n";
{
auto env = coflux::make_environment<sche>();
auto demo_task = [](auto env) -> coflux::task<void, pool, sche> {
auto&& ctx = co_await coflux::context();
coflux::channel<std::string[64]> chan; // Buffered Channel of size 64
// Producer 1: Runs on worker<1> of the worker_group
auto processer1 = [](auto&&, coflux::channel<std::string[64]>& chan) -> coflux::fork<void, group::worker<1>> {
for (int i = 0; i < 5; i++)
co_await(chan << "Message " + std::to_string(i) + " from Worker 1");
co_return;
};
// Producer 2: Runs on worker<0> of the worker_group
auto processer2 = [](auto&&, coflux::channel<std::string[64]>& chan) -> coflux::fork<void, group::worker<0>> {
for (int i = 0; i < 5; i++) {
co_await(chan << "Message " + std::to_string(i) + " from Worker 2");
}
co_return;
};
std::vector<coflux::fork<void, pool>> consumers;
// Create 2 Consumers, running on the generic thread_pool
for (int i = 0; i < 2; i++) {
consumers.push_back([&](auto&&, int consumer_id) -> coflux::fork<void, pool> {
for (int i = 0; i < 5; i++) {
std::string msg;
while (!co_await(chan >> msg)) {
// A busy-wait is avoided by an implicit yield internally
}
std::cout << " [Consumer " << consumer_id << "] Received: " << msg << "\n";
}
co_return;
}(ctx, i + 1));
}
// Await both producers to finish
co_await coflux::when_all(processer1(ctx, chan), processer2(ctx, chan));
// Await all consumers to finish
co_await coflux::when(consumers);
}(env);
}
std::cout << "--- Demo Finished ---\n";
return 0;
}The example below demonstrates the two generation strategies supported by coflux::generator: looping and recursion.
coflux::generator mimics input_range and can be integrated into std::ranges.
Coflux supports recursive calls between different generators as long as the return types are the same.
#include <iostream>
#include <string>
#include <ranges>
#include <coflux/generator.hpp>
// === Generator Example ===
coflux::generator<int> fibonacci(int n) {
int a = 0, b = 1;
for (int i = 0; i < n; ++i) {
co_yield a;
int next = a + b;
a = b;
b = next;
}
}
coflux::generator<int> recursive_countdown(int n, auto&& fibonacci) {
if (n > 0) {
co_yield fibonacci(n);
co_yield recursive_countdown(n - 1, fibonacci);
}
};
int main() {
std::cout << "--- Demo: Generators (Loop & Recursion) ---\n";
{
// Looping
std::cout << "Looping (Fibonacci):\n ";
auto view = fibonacci(15)
| std::views::filter([](int n) { return n % 2 == 0; })
| std::views::take(5)
| std::views::transform([](int n) { return n * n; });
for (int val : view) { std::cout << val << " "; }
// Recursion
std::cout << "\nRecursion (Countdown):\n ";
for (int val : recursive_countdown(5, fibonacci)) {
std::cout << val << " ";
}
std::cout << "\n";
}
std::cout << "\n--- Demo Finished ---\n";
return 0;
}- A C++20-compatible compiler (MSVC v19.29+, GCC 11+, Clang 13+).
Coflux is a header-only library. You only need to add the include directory to your project's include path.
It is recommended to use CMake's FetchContent to integrate Coflux into your project:
# In your CMakeLists.txt
include(FetchContent)
FetchContent_Declare(
coflux
GIT_REPOSITORY [https://github.com/dakingffo/coflux.git](https://github.com/dakingffo/coflux.git)
)
FetchContent_MakeAvailable(coflux)
# ... In your target
target_link_libraries(your_target PRIVATE coflux)For the further development of this framework:
- Expansion into classic asynchronous working environments such as net/rpc.
- Further performance optimizations (more lock-free container, memory pools with coroutine affinity, etc.).
- More user-friendly API design.
- Further refinement of benchmarks and unit tests.
- Fixing hidden bugs and race conditions.
Contributions in any form are welcome! Whether submitting bug reports, feature suggestions, or Pull Requests. We will finalize the CONTRIBUTING document in the near future! More information will be available then.
Coflux is licensed under the MIT License.