Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: EventLoop locking cleanups #160

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 42 additions & 49 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

#include <assert.h>
#include <functional>
#include <optional>
#include <kj/function.h>
#include <map>
#include <memory>
#include <optional>
#include <sstream>
#include <string>

Expand Down Expand Up @@ -129,6 +130,25 @@ std::string LongThreadName(const char* exe_name);

//! Event loop implementation.
//!
//! Capn'proto threading model is very simple: all I/O operations are
//! asynchronous and must be performed on a single thread. This includes:
//!
//! - Code starting an asynchronous operation (calling a function that returns a
//! promise object)
//! - Code notifying that an asynchronous operation is complete (code using a
//! fulfiller object)
//! - Code handling a completed operation (code chaining or waiting for a promise)
//!
//! All this code needs to run on one thread, and the EventLoop::loop() method
//! is the entry point for this thread. ProxyClient and ProxyServer object that
//! use other threads and need to perform I/O operations post to this thread
//! using EventLoop::post() and EventLoop::sync() methods.
//!
//! Specifically, because ProxyClient methods can be called from arbitrary
//! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
//! methods use the EventLoop thread to send requests, and ProxyServer methods
//! use the thread to return results.
//!
//! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
class EventLoop
{
Expand All @@ -144,15 +164,15 @@ class EventLoop

//! Run function on event loop thread. Does not return until function completes.
//! Must be called while the loop() function is active.
void post(const std::function<void()>& fn);
void post(kj::Function<void()> fn);

//! Wrapper around EventLoop::post that takes advantage of the
//! fact that callable will not go out of scope to avoid requirement that it
//! be copyable.
template <typename Callable>
void sync(Callable&& callable)
{
post(std::ref(callable));
post(std::forward<Callable>(callable));
}

//! Start asynchronous worker thread if necessary. This is only done if
Expand All @@ -166,13 +186,10 @@ class EventLoop
//! is important that ProxyServer::m_impl destructors do not run on the
//! eventloop thread because they may need it to do I/O if they perform
//! other IPC calls.
void startAsyncThread(std::unique_lock<std::mutex>& lock);
void startAsyncThread() MP_REQUIRES(m_mutex);

//! Add/remove remote client reference counts.
void addClient(std::unique_lock<std::mutex>& lock);
bool removeClient(std::unique_lock<std::mutex>& lock);
//! Check if loop should exit.
bool done(std::unique_lock<std::mutex>& lock);
bool done() MP_REQUIRES(m_mutex);

Logger log()
{
Expand All @@ -195,10 +212,10 @@ class EventLoop
std::thread m_async_thread;

//! Callback function to run on event loop thread during post() or sync() call.
const std::function<void()>* m_post_fn = nullptr;
kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;

//! Callback functions to run on async thread.
CleanupList m_async_fns;
CleanupList m_async_fns MP_GUARDED_BY(m_mutex);

//! Pipe read handle used to wake up the event loop thread.
int m_wait_fd = -1;
Expand All @@ -208,11 +225,11 @@ class EventLoop

//! Number of clients holding references to ProxyServerBase objects that
//! reference this event loop.
int m_num_clients = 0;
int m_num_clients MP_GUARDED_BY(m_mutex) = 0;

//! Mutex and condition variable used to post tasks to event loop and async
//! thread.
std::mutex m_mutex;
Mutex m_mutex;
std::condition_variable m_cv;

//! Capnp IO context.
Expand Down Expand Up @@ -263,11 +280,9 @@ struct Waiter
// in the case where a capnp response is sent and a brand new
// request is immediately received.
while (m_fn) {
auto fn = std::move(m_fn);
m_fn = nullptr;
lock.unlock();
fn();
lock.lock();
auto fn = std::move(*m_fn);
m_fn.reset();
Unlock(lock, fn);
}
const bool done = pred();
return done;
Expand All @@ -276,7 +291,7 @@ struct Waiter

std::mutex m_mutex;
std::condition_variable m_cv;
std::function<void()> m_fn;
std::optional<kj::Function<void()>> m_fn;
};

//! Object holding network & rpc state associated with either an incoming server
Expand All @@ -290,21 +305,13 @@ class Connection
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
: m_loop(loop), m_stream(kj::mv(stream_)),
m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
m_rpc_system(::capnp::makeRpcClient(m_network))
{
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
m_rpc_system(::capnp::makeRpcClient(m_network)) {}
Connection(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream_,
const std::function<::capnp::Capability::Client(Connection&)>& make_client)
: m_loop(loop), m_stream(kj::mv(stream_)),
m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this)))
{
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}

//! Run cleanup functions. Must be called from the event loop thread. First
//! calls synchronous cleanup functions while blocked (to free capnp
Expand Down Expand Up @@ -333,12 +340,12 @@ class Connection
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
// error in cases where f deletes this Connection object.
m_on_disconnect.add(m_network.onDisconnect().then(
[f = std::move(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
[f = std::move(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
}

EventLoop& m_loop;
EventLoopRef m_loop;
kj::Own<kj::AsyncIoStream> m_stream;
LoggingErrorHandler m_error_handler{m_loop};
LoggingErrorHandler m_error_handler{*m_loop};
kj::TaskSet m_on_disconnect{m_error_handler};
::capnp::TwoPartyVatNetwork m_network;
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
Expand Down Expand Up @@ -381,21 +388,15 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
: m_client(std::move(client)), m_context(connection)

{
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}
EventLoopRef loop{*m_context.loop};

// Handler for the connection getting destroyed before this client object.
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(m_client));
}
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
m_context.loop.reset();
m_context.connection = nullptr;
});

Expand Down Expand Up @@ -423,16 +424,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
Sub::destroy(*this);

// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
m_context.connection->m_loop.sync([&]() {
m_context.loop->sync([&]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(m_client));
}
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}

m_context.loop.reset();
if (destroy_connection) {
delete m_context.connection;
m_context.connection = nullptr;
Expand All @@ -454,8 +451,6 @@ ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Co
: m_impl(std::move(impl)), m_context(&connection)
{
assert(m_impl);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}

//! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
Expand Down Expand Up @@ -489,8 +484,6 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
});
}
assert(m_context.cleanup_fns.empty());
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}

//! If the capnp interface defined a special "destroy" method, as described the
Expand Down
28 changes: 14 additions & 14 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ template <typename Client>
void clientDestroy(Client& client)
{
if (client.m_context.connection) {
client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
client.m_context.loop->log() << "IPC client destroy " << typeid(client).name();
} else {
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
}
Expand All @@ -567,7 +567,7 @@ void clientDestroy(Client& client)
template <typename Server>
void serverDestroy(Server& server)
{
server.m_context.connection->m_loop.log() << "IPC server destroy " << typeid(server).name();
server.m_context.loop->log() << "IPC server destroy " << typeid(server).name();
}

//! Entry point called by generated client code that looks like:
Expand All @@ -587,7 +587,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
}
if (!g_thread_context.waiter) {
assert(g_thread_context.thread_name.empty());
g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name);
g_thread_context.thread_name = ThreadName(proxy_client.m_context.loop->m_exe_name);
// If next assert triggers, it means clientInvoke is being called from
// the capnp event loop thread. This can happen when a ProxyServer
// method implementation that runs synchronously on the event loop
Expand All @@ -598,26 +598,26 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
// declaration so the server method runs in a dedicated thread.
assert(!g_thread_context.loop_thread);
g_thread_context.waiter = std::make_unique<Waiter>();
proxy_client.m_context.connection->m_loop.logPlain()
proxy_client.m_context.loop->logPlain()
<< "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter";
}
ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context};
std::exception_ptr exception;
std::string kj_exception;
bool done = false;
proxy_client.m_context.connection->m_loop.sync([&]() {
proxy_client.m_context.loop->sync([&]() {
auto request = (proxy_client.m_client.*get_request)(nullptr);
using Request = CapRequestTraits<decltype(request)>;
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_context.connection->m_loop.logPlain()
proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());

proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then(
proxy_client.m_context.loop->m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_context.connection->m_loop.logPlain()
proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
try {
Expand All @@ -632,7 +632,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
},
[&](const ::kj::Exception& e) {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.connection->m_loop.logPlain()
proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
done = true;
Expand All @@ -643,7 +643,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception;
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
}

//! Invoke callable `fn()` that may return void. If it does return void, replace
Expand Down Expand Up @@ -682,7 +682,7 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds;

int req = ++server_reqs;
server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " "
server.m_context.loop->log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());

try {
Expand All @@ -699,14 +699,14 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) {
server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString());
});
} catch (const std::exception& e) {
server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what();
server.m_context.loop->log() << "IPC server unhandled exception: " << e.what();
throw;
} catch (...) {
server.m_context.connection->m_loop.log() << "IPC server unhandled exception";
server.m_context.loop->log() << "IPC server unhandled exception";
throw;
}
}
Expand Down
24 changes: 23 additions & 1 deletion include/mp/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <mp/util.h>

#include <array>
#include <cassert>
#include <functional>
#include <list>
#include <stddef.h>
Expand Down Expand Up @@ -47,13 +48,34 @@ inline void CleanupRun(CleanupList& fns) {
}
}

//! Event loop smart pointer automatically managing m_num_clients.
//! If lock pointer is passed to constructor will use the provided lock,
//! otherwise will lock EventLoop::m_mutex itself.
class EventLoopRef
{
public:
explicit EventLoopRef(EventLoop& loop, Lock* lock = nullptr);
EventLoopRef(EventLoopRef&& other) noexcept : m_loop(other.m_loop) { other.m_loop = nullptr; }
EventLoopRef(const EventLoopRef&) = delete;
EventLoopRef& operator=(const EventLoopRef&) = delete;
EventLoopRef& operator=(EventLoopRef&&) = delete;
~EventLoopRef() { reset(); }
EventLoop& operator*() const { assert(m_loop); return *m_loop; }
EventLoop* operator->() const { assert(m_loop); return m_loop; }
bool reset(Lock* lock = nullptr);

EventLoop* m_loop{nullptr};
Lock* m_lock{nullptr};
};

//! Context data associated with proxy client and server classes.
struct ProxyContext
{
Connection* connection;
EventLoopRef loop;
CleanupList cleanup_fns;

ProxyContext(Connection* connection) : connection(connection) {}
ProxyContext(Connection* connection);
};

//! Base class for generated ProxyClient classes that implement a C++ interface
Expand Down
Loading