Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ set(TARGET_LIBRARY beman_${TARGET_NAME})
set(TARGET_ALIAS beman::${TARGET_NAME})
set(TARGETS_EXPORT_NAME ${CMAKE_PROJECT_NAME})

option(BEMAN_NET_WITH_URING "Enable liburing io context" OFF)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Linux there are at least 4 sequencers: select, poll, epoll_*, and iouring_*. On other systems there are additional sequence, e.g. kqueue* and IOCP. The interface context_base and io_context are set up to make it an object creation choice which sequencer is used. My vision on setting this up is to have cmake-level checks to determine what implementations are being build. There should be a default sequencer and that may either be prioritized based on what sequencers can be build, an cmake-option, or a combination thereof.

I can provide examples of how I think that could look like.

Copy link
Author

@cbodley cbodley Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @dietmarkuehl, thanks for the feedback here - and for your standardization efforts as well

my initial goal was just to treat existing functionality as the default to avoid breaking things that are already stable. but i'm happy to explore a better design for this part


coming from an asio background, i'm used to the epoll backend as default but with preprocessor macros to customize. regarding uring, from https://www.boost.org/doc/libs/latest/doc/html/boost_asio/using.html:

The backend is disabled by default, and must be enabled by defining both BOOST_ASIO_HAS_IO_URING and BOOST_ASIO_DISABLE_EPOLL.

while BOOST_ASIO_DISABLE_EPOLL alone "disables epoll support on Linux, forcing the use of a select-based implementation."

as a header-only library, asio relies on the application to link against liburing so the application has to opt in by defining BOOST_ASIO_HAS_IO_URING


my intuition for linux is that the preference should be uring -> epoll -> poll. while asio is concerned about supporting older kernels without the required uring functionality, that may not be necessary for a library targeting a future standard

because this isn't a header-only library, selection of uring does need to happen during the configure step. so i'm thinking this cmake variable should default to ON where support is expected:

cmake_dependent_option(BEMAN_NET_WITH_URING "Enable liburing io context" ON "CMAKE_SYSTEM_NAME MATCHES Linux" OFF)

when enabled, this option would inject a compile definition like BEMAN_NET_HAS_URING:

target_compile_definitions(${TARGET_LIBRARY} PUBLIC BEMAN_NET_HAS_URING)

allowing io_context to select the backend with:

#if defined(BEMAN_NET_HAS_URING) && !defined(BEMAN_NET_DISABLE_URING)
    ::std::unique_ptr<::beman::net::detail::context_base> d_owned{new ::beman::net::detail::uring_context()};
#elif defined(BEMAN_NET_HAS_EPOLL) && !defined(BEMAN_NET_DISABLE_EPOLL)
    ::std::unique_ptr<::beman::net::detail::context_base> d_owned{new ::beman::net::detail::epoll_context()};
#else
    ::std::unique_ptr<::beman::net::detail::context_base> d_owned{new ::beman::net::detail::poll_context()};
#endif

so to summarize: the library defines the order of preference for each platform, and the application can customize using the DISABLE defines


include(FetchContent)
FetchContent_Declare(
execution
Expand Down
6 changes: 6 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ set(xEXAMPLES taps)
foreach(EXAMPLE ${EXAMPLES})
set(EXAMPLE_TARGET ${TARGET_PREFIX}.examples.${EXAMPLE})
add_executable(${EXAMPLE_TARGET})
if(BEMAN_NET_WITH_URING)
target_compile_definitions(
${EXAMPLE_TARGET}
PRIVATE BEMAN_NET_USE_URING
)
endif()
target_sources(${EXAMPLE_TARGET} PRIVATE ${EXAMPLE}.cpp)
target_link_libraries(${EXAMPLE_TARGET} PRIVATE ${TARGET_LIBRARY})
target_link_libraries(${EXAMPLE_TARGET} PRIVATE beman::task)
Expand Down
6 changes: 5 additions & 1 deletion examples/demo_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,12 @@ struct task {
state->callback.reset();
state->handle->stop_state = task::stop_state::stopping;
state->handle->stop_source.request_stop();
if (state->handle->stop_state == task::stop_state::stopped)
if (state->handle->stop_state == task::stop_state::stopped) {
this->object->handle->state->complete_stopped();
} else {
// transition back to running so sender_awaiter::stop() can safely complete later
state->handle->stop_state = task::stop_state::running;
}
}
};
using stop_token = decltype(ex::get_stop_token(ex::get_env(::std::declval<Receiver>())));
Expand Down
11 changes: 9 additions & 2 deletions include/beman/net/detail/io_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
#include <beman/net/detail/container.hpp>
#include <beman/net/detail/context_base.hpp>
#include <beman/net/detail/io_context_scheduler.hpp>
#ifdef BEMAN_NET_USE_URING
#include <beman/net/detail/uring_context.hpp>
#else
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As commented above, I don’t think these should be exclusive.

#include <beman/net/detail/poll_context.hpp>
#endif
#include <beman/net/detail/repeat_effect_until.hpp>
#include <beman/execution/execution.hpp>

#include <cstdint>
#include <sys/socket.h>
#include <unistd.h>
#include <poll.h>
#include <cerrno>
#include <csignal>
#include <limits>
Expand All @@ -33,8 +36,12 @@ class io_context;

class beman::net::io_context {
private:
#ifdef BEMAN_NET_USE_URING
::std::unique_ptr<::beman::net::detail::context_base> d_owned{new ::beman::net::detail::uring_context()};
#else
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is fine, though (in principle; I’d think the details may be different): here the the default is chosen.

::std::unique_ptr<::beman::net::detail::context_base> d_owned{new ::beman::net::detail::poll_context()};
::beman::net::detail::context_base& d_context{*this->d_owned};
#endif
::beman::net::detail::context_base& d_context{*this->d_owned};

public:
using scheduler_type = ::beman::net::detail::io_context_scheduler;
Expand Down
321 changes: 321 additions & 0 deletions include/beman/net/detail/uring_context.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
// include/beman/net/detail/uring_context.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_BEMAN_NET_DETAIL_URING_CONTEXT
#define INCLUDED_BEMAN_NET_DETAIL_URING_CONTEXT

#include <beman/net/detail/container.hpp>
#include <beman/net/detail/context_base.hpp>

#include <cassert>
#include <cstdint>
#include <system_error>
#include <tuple>
#include <liburing.h>

namespace beman::net::detail {

// io_context implementation based on liburing
struct uring_context final : context_base {
static constexpr unsigned QUEUE_DEPTH = 128;
::io_uring ring;
container<native_handle_type> sockets;
task* tasks = nullptr;
::std::size_t submitting = 0; // sqes not yet submitted
::std::size_t outstanding = 0; // cqes expected

uring_context() {
int flags = 0;
int r = ::io_uring_queue_init(QUEUE_DEPTH, &ring, flags);
if (r < 0) {
throw ::std::system_error(-r, ::std::system_category(), "io_uring_queue_init failed");
}
}
~uring_context() override { ::io_uring_queue_exit(&ring); }

auto make_socket(int fd) -> socket_id override { return sockets.insert(fd); }

auto make_socket(int d, int t, int p, ::std::error_code& error) -> socket_id override {
int fd(::socket(d, t, p));
if (fd < 0) {
error = ::std::error_code(errno, ::std::system_category());
return socket_id::invalid;
}
return make_socket(fd);
}

auto release(socket_id id, ::std::error_code& error) -> void override {
const native_handle_type handle = sockets[id];
sockets.erase(id);
if (::close(handle) < 0) {
error = ::std::error_code(errno, ::std::system_category());
}
}

auto native_handle(socket_id id) -> native_handle_type override { return sockets[id]; }

auto set_option(socket_id id, int level, int name, const void* data, ::socklen_t size, ::std::error_code& error)
-> void override {
if (::setsockopt(native_handle(id), level, name, data, size) < 0) {
error = ::std::error_code(errno, ::std::system_category());
}
}

auto bind(socket_id id, const endpoint& ep, ::std::error_code& error) -> void override {
if (::bind(native_handle(id), ep.data(), ep.size()) < 0) {
error = ::std::error_code(errno, ::std::system_category());
}
}

auto listen(socket_id id, int no, ::std::error_code& error) -> void override {
if (::listen(native_handle(id), no) < 0) {
error = ::std::error_code(errno, ::std::system_category());
}
}

auto submit() -> void {
int r = ::io_uring_submit(&ring);
if (r < 0) {
throw ::std::system_error(-r, ::std::system_category(), "io_uring_submit failed");
}
assert(submitting >= r);
submitting -= r;
outstanding += r;
}

auto get_sqe(io_base* completion) -> ::io_uring_sqe* {
auto sqe = ::io_uring_get_sqe(&ring);
while (sqe == nullptr) {
// if the submission queue is full, flush and try again
submit();
sqe = ::io_uring_get_sqe(&ring);
}
::io_uring_sqe_set_data(sqe, completion);
++submitting;
return sqe;
}

auto wait() -> ::std::tuple<int, io_base*> {
::io_uring_cqe* cqe = nullptr;
int r = ::io_uring_wait_cqe(&ring, &cqe);
if (r < 0) {
throw ::std::system_error(-r, ::std::system_category(), "io_uring_wait_cqe failed");
}

assert(outstanding > 0);
--outstanding;

const int res = cqe->res;
const auto completion = ::io_uring_cqe_get_data(cqe);
::io_uring_cqe_seen(&ring, cqe);

return {res, static_cast<io_base*>(completion)};
}

auto run_one() -> ::std::size_t override {
if (auto count = process_task(); count) {
return count;
}

if (submitting) {
// if we have anything to submit, batch the submit and wait in a
// single system call. this allows io_uring_wait_cqe() below to be
// served directly from memory
unsigned wait_nr = 1;
int r = ::io_uring_submit_and_wait(&ring, wait_nr);
if (r < 0) {
throw ::std::system_error(-r, ::std::system_category(), "io_uring_submit_and_wait failed");
}
assert(submitting >= r);
submitting -= r;
outstanding += r;
}

if (!outstanding) {
// nothing to submit and nothing to wait on, we're done
return 0;
}

// read the next completion, waiting if necessary
auto [res, completion] = wait();

if (completion) {
// work() functions depend on res, so pass it in via 'extra'
completion->extra.reset(&res);
completion->work(*this, completion);
}

return 1;
}

auto cancel(io_base* cancel_op, io_base* op) -> void override {
auto sqe = get_sqe(nullptr);
int flags = 0;
::io_uring_prep_cancel(sqe, op, flags);

// use io_uring_prep_cancel() for asynchronous cancellation of op.
// cancel_op, aka sender_state::cancel_callback, lives inside of op's
// operation state. op's completion may race with this cancellation,
// causing that sender_state and its cancel_callback to be destroyed.
// so we can't pass cancel_op to io_uring_sqe_set_data() and attach a
// cancel_op->work() function to handle its completion in run_one().
// instead, we just complete it here without waiting for the result
cancel_op->complete();
}

auto schedule(task* t) -> void override {
t->next = tasks;
tasks = t;
}

auto process_task() -> ::std::size_t {
if (tasks) {
auto* t = tasks;
tasks = t->next;
t->complete();
return 1u;
}
return 0u;
}

auto accept(accept_operation* op) -> submit_result override {
op->work = [](context_base& ctx, io_base* io) {
auto res = *static_cast<int*>(io->extra.get());
if (res == -ECANCELED) {
io->cancel();
return submit_result::ready;
} else if (res < 0) {
io->error(::std::error_code(-res, ::std::system_category()));
return submit_result::error;
}
auto op = static_cast<accept_operation*>(io);
// set socket
::std::get<2>(*op) = ctx.make_socket(res);
io->complete();
return submit_result::ready;
};

auto sqe = get_sqe(op);
auto fd = native_handle(op->id);
auto addr = ::std::get<0>(*op).data();
auto addrlen = &::std::get<1>(*op);
int flags = 0;
::io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
return submit_result::submit;
}

auto connect(connect_operation* op) -> submit_result override {
op->work = [](context_base&, io_base* io) {
auto res = *static_cast<int*>(io->extra.get());
if (res == -ECANCELED) {
io->cancel();
return submit_result::ready;
} else if (res < 0) {
io->error(::std::error_code(-res, ::std::system_category()));
return submit_result::error;
}
io->complete();
return submit_result::ready;
};

auto sqe = get_sqe(op);
auto fd = native_handle(op->id);
auto& addr = ::std::get<0>(*op);
::io_uring_prep_connect(sqe, fd, addr.data(), addr.size());
return submit_result::submit;
}

auto receive(receive_operation* op) -> submit_result override {
op->work = [](context_base&, io_base* io) {
auto res = *static_cast<int*>(io->extra.get());
if (res == -ECANCELED) {
io->cancel();
return submit_result::ready;
} else if (res < 0) {
io->error(::std::error_code(-res, ::std::system_category()));
return submit_result::error;
}
auto op = static_cast<receive_operation*>(io);
// set bytes received
::std::get<2>(*op) = res;
io->complete();
return submit_result::ready;
};

auto sqe = get_sqe(op);
auto fd = native_handle(op->id);
auto msg = &::std::get<0>(*op);
auto flags = ::std::get<1>(*op);
::io_uring_prep_recvmsg(sqe, fd, msg, flags);
return submit_result::submit;
}

auto send(send_operation* op) -> submit_result override {
op->work = [](context_base&, io_base* io) {
auto res = *static_cast<int*>(io->extra.get());
if (res == -ECANCELED) {
io->cancel();
return submit_result::ready;
} else if (res < 0) {
io->error(::std::error_code(-res, ::std::system_category()));
return submit_result::error;
}
auto op = static_cast<send_operation*>(io);
// set bytes sent
::std::get<2>(*op) = res;
io->complete();
return submit_result::ready;
};

auto sqe = get_sqe(op);
auto fd = native_handle(op->id);
auto msg = &::std::get<0>(*op);
auto flags = ::std::get<1>(*op);
::io_uring_prep_sendmsg(sqe, fd, msg, flags);
return submit_result::submit;
}

static auto make_timespec(auto dur) -> __kernel_timespec {
auto sec = ::std::chrono::duration_cast<::std::chrono::seconds>(dur);
dur -= sec;
auto nsec = ::std::chrono::duration_cast<::std::chrono::nanoseconds>(dur);
__kernel_timespec ts;
ts.tv_sec = sec.count();
ts.tv_nsec = nsec.count();
return ts;
}

auto resume_at(resume_at_operation* op) -> submit_result override {
auto at = ::std::get<0>(*op);
op->work = [](context_base&, io_base* io) {
auto res = *static_cast<int*>(io->extra.get());
auto op = static_cast<resume_at_operation*>(io);
if (res == -ECANCELED) {
io->cancel();
return submit_result::ready;
} else if (res == -ETIME) {
io->complete();
return submit_result::ready;
}
io->error(::std::error_code(-res, ::std::system_category()));
return submit_result::error;
};

auto sqe = get_sqe(op);
auto ts = make_timespec(at.time_since_epoch());
unsigned count = 0;
unsigned flags = IORING_TIMEOUT_ABS | IORING_TIMEOUT_REALTIME;
::io_uring_prep_timeout(sqe, &ts, count, flags);

// unlike other operations whose submissions can be batched in run_one(),
// the timeout argument to io_uring_prep_timeout() is a pointer to memory
// on the stack. this memory must remain valid until submit, so we either
// have to call submit here or allocate heap memory to store it
submit();
return submit_result::submit;
}
};

} // namespace beman::net::detail

#endif
Loading