Skip to content

Implement kqueue reactor #13

@sgerbino

Description

@sgerbino

Summary

The corosio library currently only supports epoll (Linux) and IOCP (Windows) backends. Adding a kqueue reactor would provide native support for BSD-based systems including macOS, FreeBSD, OpenBSD, NetBSD, and DragonFlyBSD.

Motivation

Platform Coverage

The kqueue API is the native high-performance event notification mechanism on BSD-derived systems:

Platform kqueue Available Notes
macOS Yes Default since macOS 10.3
FreeBSD Yes Origin of kqueue (FreeBSD 4.1)
OpenBSD Yes Supported
NetBSD Yes Supported (minor API differences)
DragonFlyBSD Yes Supported
iOS Yes Same as macOS

Why kqueue Over select on BSD Systems

While the select backend provides a portable fallback, kqueue offers significant advantages:

  1. O(1) event retrieval: Like epoll, kqueue scales to thousands of descriptors
  2. Unified interface: Handles sockets, files, signals, processes, and timers
  3. Edge and level triggered: Supports both modes via EV_CLEAR
  4. Atomic registration: Can add/modify/delete multiple events in one syscall
  5. No descriptor limits: Unlike select's FD_SETSIZE constraint

Use Cases

  1. macOS development: Native performance for Apple platforms
  2. FreeBSD servers: Production deployments on FreeBSD
  3. Cross-platform libraries: Complete coverage of major server platforms

Current State

Existing backends: src/corosio/src/detail/epoll/ and src/corosio/src/detail/iocp/

The epoll backend provides a good reference as kqueue shares similar concepts:

  • Both are O(1) event notification mechanisms
  • Both support edge-triggered operation
  • Both use a file descriptor to represent the event queue

Boost.Asio Reference Implementation

Boost.Asio's kqueue_reactor provides a well-tested reference. Key implementation details:

Location: boost/asio/detail/kqueue_reactor.hpp and kqueue_reactor.ipp

Architecture

┌─────────────────────────────────────────────────────────────┐
│                      kqueue_reactor                         │
├─────────────────────────────────────────────────────────────┤
│  int kqueue_fd_                    (kqueue descriptor)      │
│  select_interrupter interrupter_   (wakeup mechanism)       │
│  object_pool<descriptor_state> registered_descriptors_      │
│  timer_queue_set timer_queues_                              │
│  mutex mutex_                                               │
└─────────────────────────────────────────────────────────────┘

Per-Descriptor State

Each registered descriptor has associated state:

struct descriptor_state
{
    descriptor_state* next_;
    descriptor_state* prev_;
    mutex mutex_;
    int descriptor_;
    int num_kevents_;  // 1 for read-only, 2 for read+write
    op_queue<reactor_op> op_queue_[max_ops];
    bool shutdown_;
};

Operation Types

enum op_types {
    read_op = 0,
    write_op = 1,
    connect_op = write_op,  // Same as write
    except_op = 2,
    max_ops = 3
};

Key API Patterns

  1. kqueue creation:

    kqueue_fd_ = kqueue();
  2. Event registration (kevent structure):

    struct kevent ev;
    EV_SET(&ev, fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, udata);
    kevent(kqueue_fd_, &ev, 1, nullptr, 0, nullptr);
  3. Event loop:

    struct kevent events[128];
    struct timespec ts = { timeout_sec, timeout_nsec };
    int n = kevent(kqueue_fd_, nullptr, 0, events, 128, &ts);
    for (int i = 0; i < n; ++i) {
        // Process events[i]
    }

Platform-Specific Considerations

  1. macOS: Older versions may not define EV_OOBAND, requiring:

    #ifndef EV_OOBAND
    #define EV_OOBAND EV_FLAG1
    #endif
  2. NetBSD: Requires cast for udata parameter in EV_SET:

    EV_SET(&ev, fd, filter, flags, fflags, data,
           reinterpret_cast<intptr_t>(udata));

Fork Handling

Unlike epoll, kqueue descriptors are not inherited across fork(). Boost.Asio provides notify_fork() to recreate internal descriptors after forking.

kqueue vs epoll Comparison

Feature kqueue epoll
Creation kqueue() epoll_create1()
Modification kevent() epoll_ctl()
Wait kevent() epoll_wait()
Event structure struct kevent struct epoll_event
Filters EVFILT_READ, EVFILT_WRITE EPOLLIN, EPOLLOUT
Edge-triggered EV_CLEAR flag EPOLLET flag
User data void* udata epoll_data_t union
Batch operations Yes (single kevent call) No (one epoll_ctl per fd)
Timer support Built-in (EVFILT_TIMER) Separate timerfd
Signal support Built-in (EVFILT_SIGNAL) Separate signalfd

Implementation Approach

Directory Structure

src/corosio/src/detail/kqueue/
├── scheduler.hpp
├── scheduler.cpp
├── sockets.hpp
├── sockets.cpp
└── op.hpp

Note: No resolver_service.hpp or signal implementation needed — the kqueue backend reuses the existing POSIX implementations (see "Reusing POSIX Services" section below).

Phase 1: Core Scheduler

Create kqueue_scheduler adapting the epoll scheduler patterns:

  1. kqueue management:

    class kqueue_scheduler
    {
    public:
        kqueue_scheduler();
        ~kqueue_scheduler();
    
    private:
        int kqueue_fd_;
    
        // Interrupter for waking the event loop
        int interrupt_pipe_[2];  // or use EVFILT_USER on supported platforms
    };
  2. Descriptor state tracking:

    struct descriptor_state
    {
        int descriptor_;
        scheduler_op* read_op_;
        scheduler_op* write_op_;
        bool registered_;
    };
    
    std::unordered_map<int, descriptor_state> descriptors_;
  3. Event registration:

    void register_descriptor(int fd, descriptor_state* state)
    {
        struct kevent ev;
        EV_SET(&ev, fd, EVFILT_READ, EV_ADD | EV_CLEAR | EV_DISABLE,
               0, 0, state);
        kevent(kqueue_fd_, &ev, 1, nullptr, 0, nullptr);
    }
    
    void start_read_op(int fd, scheduler_op* op)
    {
        struct kevent ev;
        EV_SET(&ev, fd, EVFILT_READ, EV_ENABLE, 0, 0, op);
        kevent(kqueue_fd_, &ev, 1, nullptr, 0, nullptr);
    }
  4. Event loop:

    void run_reactor(std::unique_lock<std::mutex>& lock)
    {
        struct kevent events[128];
    
        lock.unlock();
    
        struct timespec ts = calculate_timeout();
        int n = kevent(kqueue_fd_, nullptr, 0, events, 128, &ts);
    
        lock.lock();
    
        for (int i = 0; i < n; ++i) {
            auto* state = static_cast<descriptor_state*>(events[i].udata);
    
            if (events[i].filter == EVFILT_READ) {
                // Handle read readiness
            } else if (events[i].filter == EVFILT_WRITE) {
                // Handle write readiness
            }
        }
    }

Phase 2: Socket Operations

Adapt epoll socket operations for kqueue:

  1. Registration: Use EV_ADD with appropriate filter
  2. Modification: Use EV_ENABLE/EV_DISABLE or re-add
  3. Removal: Use EV_DELETE
  4. Edge-triggered: Use EV_CLEAR flag for one-shot semantics

Phase 3: Integration

Backend selection (config_backend.hpp):

#if defined(__linux__)
#  include "epoll/scheduler.hpp"
   namespace corosio::detail { using scheduler = epoll_scheduler; }
#elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || \
      defined(__NetBSD__) || defined(__DragonFly__)
#  include "kqueue/scheduler.hpp"
   namespace corosio::detail { using scheduler = kqueue_scheduler; }
#elif defined(_WIN32)
#  include "iocp/scheduler.hpp"
   namespace corosio::detail { using scheduler = iocp_scheduler; }
#else
   // Fallback for other POSIX platforms
#  include "select/scheduler.hpp"
   namespace corosio::detail { using scheduler = select_scheduler; }
#endif

Reusing POSIX Services

The kqueue backend should reuse the existing POSIX signal and resolver implementations rather than creating backend-specific versions.

Resolver (posix/resolver_service.hpp/cpp)

The POSIX resolver uses getaddrinfo() on worker threads — the standard approach for all POSIX systems since there's no async DNS API. The header explicitly states:

// This implementation works for all POSIX backends (epoll, kqueue, io_uring, poll)

During scheduler initialization:

// In kqueue_scheduler constructor
resolver_svc_ = &get_resolver_service(ctx, *this);

Signals (posix/signals.hpp/cpp)

The POSIX signal implementation uses sigaction() with C signal handlers and is scheduler-agnostic via the abstract scheduler interface. During scheduler initialization:

// In kqueue_scheduler constructor
signal_svc_ = &get_signal_service(ctx, *this);

Future Optimization: EVFILT_SIGNAL

kqueue provides native signal handling via EVFILT_SIGNAL, which could offer advantages over the current C signal handler approach:

Aspect Current POSIX (sigaction) kqueue (EVFILT_SIGNAL)
Delivery Async signal handler context Normal event loop iteration
Thread safety Acquires mutex in signal handler (not strictly async-signal-safe) No signal handler, fully thread-safe
Integration Separate wakeup path Unified with socket/timer events
Complexity Works everywhere kqueue-specific code path

The current POSIX implementation explicitly documents its async-signal-safety limitation:

"deliver_signal() is called from signal handler context and acquires mutexes. This is NOT strictly async-signal-safe per POSIX."

A kqueue-native implementation using EVFILT_SIGNAL would eliminate this limitation by processing signals as regular kevents in the event loop, avoiding the signal handler context entirely.

Recommendation: Start with the shared POSIX implementation for simplicity. Consider a kqueue-native EVFILT_SIGNAL implementation as a future optimization if signal handling performance or correctness becomes a concern.

Scheduler Interface Requirements

The kqueue scheduler must implement the abstract scheduler interface (include/boost/corosio/detail/scheduler.hpp):

struct scheduler
{
    virtual void post(capy::coro) const = 0;
    virtual void post(scheduler_op*) const = 0;
    virtual void on_work_started() noexcept = 0;
    virtual void on_work_finished() noexcept = 0;
    virtual void work_started() const noexcept = 0;
    virtual void work_finished() const noexcept = 0;
    virtual bool running_in_this_thread() const noexcept = 0;
    virtual void stop() = 0;
    virtual bool stopped() const noexcept = 0;
    virtual void restart() = 0;
    virtual std::size_t run() = 0;
    virtual std::size_t run_one() = 0;
    virtual std::size_t wait_one(long usec) = 0;
    virtual std::size_t poll() = 0;
    virtual std::size_t poll_one() = 0;
};

Implementation Tasks

Core Infrastructure

  • Create kqueue/scheduler.hpp - Scheduler class declaration
  • Create kqueue/scheduler.cpp - Event loop with kevent
  • Implement kqueue creation and cleanup
  • Implement interrupter (pipe or EVFILT_USER)
  • Implement single reactor model (matching epoll design)

Socket Support

  • Create kqueue/op.hpp - Operation types (can likely share with epoll)
  • Create kqueue/sockets.hpp/cpp - Socket async operations
  • Implement async_accept using EVFILT_READ
  • Implement async_connect using EVFILT_WRITE
  • Implement async_read/async_write

Platform Handling

  • Handle NetBSD udata cast requirement
  • Handle older macOS EV_OOBAND compatibility
  • Test on multiple BSD variants if available

Integration

  • Update config_backend.hpp with kqueue backend selection
  • Initialize POSIX services in scheduler constructor (get_signal_service, get_resolver_service)

Testing

  • Verify all existing tests pass with kqueue backend
  • Test on macOS
  • Test on FreeBSD if available

Files to Create/Modify

File Action Description
src/corosio/src/detail/kqueue/ Create New backend directory
src/corosio/src/detail/kqueue/scheduler.hpp Create Scheduler class (implements scheduler interface)
src/corosio/src/detail/kqueue/scheduler.cpp Create Event loop, POSIX service initialization
src/corosio/src/detail/kqueue/sockets.hpp Create Socket operation declarations
src/corosio/src/detail/kqueue/sockets.cpp Create Socket operation implementations
src/corosio/src/detail/kqueue/op.hpp Create Operation types
src/corosio/src/detail/config_backend.hpp Modify Add kqueue backend selection

Note: No resolver or signal files needed — reuses posix/resolver_service and posix/signals.

Known Considerations

  1. Fork handling: kqueue descriptors don't survive fork - may need notify_fork() support
  2. EVFILT_USER: Modern BSD systems support user-defined events for efficient wakeup (alternative to pipe)
  3. Batch registration: kqueue can register multiple events atomically - potential optimization
  4. EVFILT_SIGNAL: Future optimization opportunity for signal handling (see "Reusing POSIX Services" section)

References

Note: Re-enable macOS builds in CI after implementation!

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions