Skip to content
Open
48 changes: 31 additions & 17 deletions libraries/custom_appbase/include/eosio/chain/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

#include <appbase/application_base.hpp>
#include <eosio/chain/exec_pri_queue.hpp>
#include <cassert>
#include <chrono>
#include <optional>
#include <mutex>

/*
* Customize appbase to support two-queue executor.
Expand Down Expand Up @@ -47,10 +46,15 @@ class priority_queue_executor {

template <typename Func>
void post( handler_id id, int priority, exec_queue q, Func&& func ) {
if (q == exec_queue::read_exclusive) {
if (q == exec_queue::read_exclusive || q == exec_queue::trx_read_write) {
// no reason to post to io_context which then places this in the read_exclusive_handlers queue.
// read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue.
pri_queue_.add(id, priority, q, --order_, std::forward<Func>(func));
// similarly, trx_read_write trxs are pulled off the queue directly to be executed
assert(id == handler_id::unique);
if (!pri_queue_.add(priority, q, --order_, std::forward<Func>(func))) {
// post required to trigger io_ctx_ run_one
boost::asio::post(io_ctx_, pri_queue_.wrap(id, priority, q, order_, std::forward<Func>(func)));
}
} else {
// post to io_context as the main thread may be blocked on io_context.run_one() in application::exec()
boost::asio::post(io_ctx_, pri_queue_.wrap(id, priority, q, --order_, std::forward<Func>(func)));
Expand All @@ -59,27 +63,20 @@ class priority_queue_executor {

template <typename Func>
void post( int priority, exec_queue q, Func&& func ) {
if (q == exec_queue::read_exclusive) {
// no reason to post to io_context which then places this in the read_exclusive_handlers queue.
// read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue.
pri_queue_.add(priority, q, --order_, std::forward<Func>(func));
} else {
// post to io_context as the main thread may be blocked on io_context.run_one() in application::exec()
boost::asio::post(io_ctx_, pri_queue_.wrap(priority, q, --order_, std::forward<Func>(func)));
}
post( handler_id::unique, priority, q, std::forward<Func>(func) );
}

// Legacy and deprecated. To be removed after cleaning up its uses in base appbase
template <typename Func>
auto post( int priority, Func&& func ) {
// safer to use read_write queue for unknown type of operation since operations
// from read_write queue are not executed in parallel with read-only operations
return boost::asio::post(io_ctx_, pri_queue_.wrap(priority, exec_queue::read_write, --order_, std::forward<Func>(func)));
return boost::asio::post(io_ctx_, pri_queue_.wrap(handler_id::unique, priority, exec_queue::read_write, --order_, std::forward<Func>(func)));
}

boost::asio::io_context& get_io_context() { return io_ctx_; }

// called from main thread, highest read_only and read_write
// called from main thread, highest read_only or read_write or trx_read_write
bool execute_highest() {
// execute for at least minimum runtime
const auto end = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(minimum_runtime_ms);
Expand All @@ -88,7 +85,13 @@ class priority_queue_executor {
while (true) {
if ( exec_window_ == exec_window::write ) {
// During write window only main thread is accessing anything in priority_queue_executor, no locking required
more = pri_queue_.execute_highest(exec_queue::read_write, exec_queue::read_only);
size_t before = pri_queue_.execute_highest(exec_queue::read_write, exec_queue::read_only);
if (before == 0 && trx_read_write_queue_enabled_.load(std::memory_order_acquire)) {
// locked because any thread can push to the trx_read_write queue
more = pri_queue_.execute_highest_locked(exec_queue::trx_read_write);
} else {
more = before > 0;
}
} else {
// When in read window, multiple threads including main app thread are accessing priority_queue_executor, locking required
more = pri_queue_.execute_highest_locked(exec_queue::read_only);
Expand Down Expand Up @@ -116,7 +119,7 @@ class priority_queue_executor {
template <typename Function>
boost::asio::executor_binder<Function, appbase::exec_pri_queue::executor>
wrap(int priority, exec_queue q, Function&& func ) {
return pri_queue_.wrap(priority, q, --order_, std::forward<Function>(func));
return pri_queue_.wrap(handler_id::unique, priority, q, --order_, std::forward<Function>(func));
}

void stop() {
Expand Down Expand Up @@ -145,19 +148,30 @@ class priority_queue_executor {
return exec_window_ == exec_window::write;
}

void enable_trx_read_write_queue(bool enable) {
trx_read_write_queue_enabled_.store(enable, std::memory_order_release);
}

size_t read_only_queue_size() { return pri_queue_.size(exec_queue::read_only); }
size_t read_write_queue_size() { return pri_queue_.size(exec_queue::read_write); }
size_t trx_read_write_queue_size() { return pri_queue_.size(exec_queue::trx_read_write); }
size_t read_exclusive_queue_size() { return pri_queue_.size(exec_queue::read_exclusive); }
bool read_only_queue_empty() { return pri_queue_.empty(exec_queue::read_only); }
bool read_write_queue_empty() { return pri_queue_.empty(exec_queue::read_write); }
bool trx_read_write_queue_empty() { return pri_queue_.empty(exec_queue::trx_read_write); }
bool read_exclusive_queue_empty() { return pri_queue_.empty(exec_queue::read_exclusive); }

[[nodiscard]] exec_pri_queue::read_view readable_queue() const { return pri_queue_.readable(); }

// members are ordered taking into account that the last one is destructed first
private:
std::thread::id main_thread_id_{ std::this_thread::get_id() };
boost::asio::io_context io_ctx_;
appbase::exec_pri_queue pri_queue_;
std::atomic<std::size_t> order_{ std::numeric_limits<size_t>::max() }; // to maintain FIFO ordering in all queues within priority
// to maintain FIFO ordering in all queues within priority
std::atomic<std::size_t> order_{ std::numeric_limits<size_t>::max() };
// Prevent spinning on popping a trx from the queue only to put it back in because there is no pending block
std::atomic<bool> trx_read_write_queue_enabled_{true};
exec_window exec_window_{ exec_window::write };
};

Expand Down
Loading
Loading