diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index 8406e5dbd1..fff3b1e4aa 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -2,9 +2,8 @@ #include #include +#include #include -#include -#include /* * Customize appbase to support two-queue executor. @@ -47,10 +46,15 @@ class priority_queue_executor { template void post( handler_id id, int priority, exec_queue q, Func&& func ) { - if (q == exec_queue::read_exclusive) { + if (q == exec_queue::read_exclusive || q == exec_queue::trx_read_write) { // no reason to post to io_context which then places this in the read_exclusive_handlers queue. // read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue. - pri_queue_.add(id, priority, q, --order_, std::forward(func)); + // similarly, trx_read_write trxs are pulled off the queue directly to be executed + assert(id == handler_id::unique); + if (!pri_queue_.add(priority, q, --order_, std::forward(func))) { + // post required to trigger io_ctx_ run_one + boost::asio::post(io_ctx_, pri_queue_.wrap(id, priority, q, order_, std::forward(func))); + } } else { // post to io_context as the main thread may be blocked on io_context.run_one() in application::exec() boost::asio::post(io_ctx_, pri_queue_.wrap(id, priority, q, --order_, std::forward(func))); @@ -59,14 +63,7 @@ class priority_queue_executor { template void post( int priority, exec_queue q, Func&& func ) { - if (q == exec_queue::read_exclusive) { - // no reason to post to io_context which then places this in the read_exclusive_handlers queue. - // read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue. - pri_queue_.add(priority, q, --order_, std::forward(func)); - } else { - // post to io_context as the main thread may be blocked on io_context.run_one() in application::exec() - boost::asio::post(io_ctx_, pri_queue_.wrap(priority, q, --order_, std::forward(func))); - } + post( handler_id::unique, priority, q, std::forward(func) ); } // Legacy and deprecated. To be removed after cleaning up its uses in base appbase @@ -74,12 +71,12 @@ class priority_queue_executor { auto post( int priority, Func&& func ) { // safer to use read_write queue for unknown type of operation since operations // from read_write queue are not executed in parallel with read-only operations - return boost::asio::post(io_ctx_, pri_queue_.wrap(priority, exec_queue::read_write, --order_, std::forward(func))); + return boost::asio::post(io_ctx_, pri_queue_.wrap(handler_id::unique, priority, exec_queue::read_write, --order_, std::forward(func))); } boost::asio::io_context& get_io_context() { return io_ctx_; } - // called from main thread, highest read_only and read_write + // called from main thread, highest read_only or read_write or trx_read_write bool execute_highest() { // execute for at least minimum runtime const auto end = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(minimum_runtime_ms); @@ -88,7 +85,13 @@ class priority_queue_executor { while (true) { if ( exec_window_ == exec_window::write ) { // During write window only main thread is accessing anything in priority_queue_executor, no locking required - more = pri_queue_.execute_highest(exec_queue::read_write, exec_queue::read_only); + size_t before = pri_queue_.execute_highest(exec_queue::read_write, exec_queue::read_only); + if (before == 0 && trx_read_write_queue_enabled_.load(std::memory_order_acquire)) { + // locked because any thread can push to the trx_read_write queue + more = pri_queue_.execute_highest_locked(exec_queue::trx_read_write); + } else { + more = before > 0; + } } else { // When in read window, multiple threads including main app thread are accessing priority_queue_executor, locking required more = pri_queue_.execute_highest_locked(exec_queue::read_only); @@ -116,7 +119,7 @@ class priority_queue_executor { template boost::asio::executor_binder wrap(int priority, exec_queue q, Function&& func ) { - return pri_queue_.wrap(priority, q, --order_, std::forward(func)); + return pri_queue_.wrap(handler_id::unique, priority, q, --order_, std::forward(func)); } void stop() { @@ -145,19 +148,30 @@ class priority_queue_executor { return exec_window_ == exec_window::write; } + void enable_trx_read_write_queue(bool enable) { + trx_read_write_queue_enabled_.store(enable, std::memory_order_release); + } + size_t read_only_queue_size() { return pri_queue_.size(exec_queue::read_only); } size_t read_write_queue_size() { return pri_queue_.size(exec_queue::read_write); } + size_t trx_read_write_queue_size() { return pri_queue_.size(exec_queue::trx_read_write); } size_t read_exclusive_queue_size() { return pri_queue_.size(exec_queue::read_exclusive); } bool read_only_queue_empty() { return pri_queue_.empty(exec_queue::read_only); } bool read_write_queue_empty() { return pri_queue_.empty(exec_queue::read_write); } + bool trx_read_write_queue_empty() { return pri_queue_.empty(exec_queue::trx_read_write); } bool read_exclusive_queue_empty() { return pri_queue_.empty(exec_queue::read_exclusive); } + [[nodiscard]] exec_pri_queue::read_view readable_queue() const { return pri_queue_.readable(); } + // members are ordered taking into account that the last one is destructed first private: std::thread::id main_thread_id_{ std::this_thread::get_id() }; boost::asio::io_context io_ctx_; appbase::exec_pri_queue pri_queue_; - std::atomic order_{ std::numeric_limits::max() }; // to maintain FIFO ordering in all queues within priority + // to maintain FIFO ordering in all queues within priority + std::atomic order_{ std::numeric_limits::max() }; + // Prevent spinning on popping a trx from the queue only to put it back in because there is no pending block + std::atomic trx_read_write_queue_enabled_{true}; exec_window exec_window_{ exec_window::write }; }; diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 782a4abda6..4579bbd6d4 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -2,9 +2,15 @@ #include #include +#include +#include #include +#include +#include #include -#include +#include +#include +#include namespace appbase { // adapted from: https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/example/cpp11/invocation/prioritised_handlers.cpp @@ -18,20 +24,26 @@ enum class handler_id { process_incoming_block // process blocks already added to fork_db }; -enum class exec_queue { - read_only, // the queue storing tasks which are safe to execute +enum class exec_queue : uint8_t { + read_only = 0, // The queue storing tasks which are safe to execute // in parallel with other read-only & read_exclusive tasks in the read-only // thread pool as well as on the main app thread. // Multi-thread safe as long as nothing is executed from the read_write queue. - read_write, // the queue storing tasks which can be only executed + read_write, // The queue storing tasks which can be only executed // on the app thread while read-only tasks are // not being executed in read-only threads. Single threaded. - read_exclusive // the queue storing tasks which should only be executed + trx_read_write, // The queue storing trx tasks which can only be executed on the app thread while + // read-only tasks are not being executed in read-only threads. Single threaded. + // The trxs have a separate queue so they can be individually prioritized. They only + // execute when there are no read_write tasks queued. The priority only is relative + // to other trx tasks in the trx_read_write queue. + read_exclusive, // The queue storing tasks which should only be executed // in parallel with other read_exclusive or read_only tasks in the // read-only thread pool. Will never be executed on the main thread. // If no read-only thread pool is available that calls one of the execute_* with // read_exclusive then this queue grows unbounded. exec_pri_queue asserts // if asked to queue a read_exclusive task when init'ed with 0 read-only threads. + size // Placeholder for number of elements in enum }; // Locking has to be coordinated by caller, use with care. @@ -40,9 +52,8 @@ class exec_pri_queue : public boost::asio::execution_context public: ~exec_pri_queue() { - clear(read_only_handlers_); - clear(read_write_handlers_); - clear(read_exclusive_handlers_); + for (auto& q : queues_) + clear(q); } // inform how many read_threads will be calling read_only/read_exclusive queues @@ -58,7 +69,7 @@ class exec_pri_queue : public boost::asio::execution_context } void stop() { - std::lock_guard g( mtx_ ); + std::scoped_lock g( mtx_ ); exiting_blocking_ = true; cond_.notify_all(); } @@ -76,19 +87,68 @@ class exec_pri_queue : public boost::asio::execution_context should_exit_ = [](){ assert(false); return true; }; // should not be called when locking is disabled } + // holds lock until destroyed + class read_view { + const std::lock_guard lock_; + const exec_pri_queue& q_; + public: + read_view(const exec_pri_queue& q, std::mutex& mtx) : lock_(mtx), q_(q) {} + read_view(const read_view&) = delete; + read_view& operator=(const read_view&) = delete; + read_view(read_view&&) noexcept = delete; + read_view& operator=(read_view&&) noexcept = delete; + bool empty(exec_queue q) const { return q_.empty(q); } + size_t size(exec_queue q) const { return q_.size(q); } + auto begin(exec_queue q) const { return q_.priority_que(q).ordered_begin(); } + auto end(exec_queue q) const { return q_.priority_que(q).ordered_end(); } + template + static const auto& function_from_iter(const auto& iter) { + queued_handler_base* ptr = *iter; + assert(dynamic_cast*>(ptr) != nullptr); + return static_cast&>(*ptr).function(); + } + }; + + [[nodiscard]] read_view readable() const { return read_view(*this, mtx_); } + +private: + + // return false if should be posted instead, force=true then add will add to queue and return true template - void add(int priority, exec_queue q, size_t order, Function&& function) { + bool add(int priority, exec_queue q, size_t order, Function&& function, bool force) { assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive); prio_queue& que = priority_que(q); - std::unique_ptr handler(new queued_handler(handler_id::unique, priority, order, std::forward(function))); - if (lock_enabled_ || q == exec_queue::read_exclusive) { // called directly from any thread for read_exclusive - std::lock_guard g( mtx_ ); + auto create_handler = [&]{ + return std::make_unique>(handler_id::unique, priority, order, std::forward(function)); + }; + if (q == exec_queue::read_exclusive || lock_enabled_) { + std::unique_ptr handler = create_handler(); + // called directly from any thread for read_exclusive + std::scoped_lock g( mtx_ ); que.push( handler.release() ); if (num_waiting_) cond_.notify_one(); + } else if (q == exec_queue::trx_read_write) { + std::scoped_lock g( mtx_ ); + if (!force && empty(exec_queue::trx_read_write)) { + // Since empty, post so that io context will queue and call execute_highest; otherwise would not be processed + // until something else is posted that triggers io context run_one. + return false; + } + que.push( create_handler().release() ); } else { - que.push( handler.release() ); + // no lock required, called only from main thread + que.push( create_handler().release() ); } + return true; + } + +public: + + // return false if should be posted instead, force=true then add will add to queue and return true + template + bool add(int priority, exec_queue q, size_t order, Function&& function) { + return add(priority, q, order, std::forward(function), false); } // called from appbase::application_base::exec poll_one() or run_one() @@ -96,11 +156,13 @@ class exec_pri_queue : public boost::asio::execution_context void add(handler_id id, int priority, exec_queue q, size_t order, Function&& function) { assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive); if (id == handler_id::unique) { - return add(priority, q, order, std::forward(function)); + constexpr bool force = true; // no need to check return of add, passing force=true + add(priority, q, order, std::forward(function), force); + return; } prio_queue& que = priority_que(q); std::unique_lock g( mtx_, std::defer_lock ); - if (lock_enabled_ || q == exec_queue::read_exclusive) { + if (q == exec_queue::read_exclusive || q == exec_queue::trx_read_write || lock_enabled_) { // called directly from any thread for read_exclusive g.lock(); } @@ -124,9 +186,8 @@ class exec_pri_queue : public boost::asio::execution_context // only call when no lock required void clear() { - read_only_handlers_ = prio_queue(); - read_write_handlers_ = prio_queue(); - read_exclusive_handlers_ = prio_queue(); + for (auto& q : queues_) + q = prio_queue(); } bool execute_highest_locked(exec_queue q) { @@ -141,12 +202,16 @@ class exec_pri_queue : public boost::asio::execution_context } // only call when no lock required - bool execute_highest(exec_queue lhs, exec_queue rhs) { + // returns number of tasks in queues before executing the highest. + // 0 means none were executed. + // 1 means there was one executed and none remain. + // > 1 means there was one executed and return-1 remain. + size_t execute_highest(exec_queue lhs, exec_queue rhs) { prio_queue& lhs_que = priority_que(lhs); prio_queue& rhs_que = priority_que(rhs); size_t size = lhs_que.size() + rhs_que.size(); if (size == 0) - return false; + return size; exec_queue q = rhs; if (!lhs_que.empty() && (rhs_que.empty() || *rhs_que.top() < *lhs_que.top())) q = lhs; @@ -155,8 +220,7 @@ class exec_pri_queue : public boost::asio::execution_context // pop, then execute since read_write queue is used to switch to read window and the pop needs to happen before that lambda starts auto t = pop(que); t->execute(); - --size; - return size > 0; + return size; } bool execute_highest_blocking_locked(exec_queue lhs, exec_queue rhs) { @@ -192,7 +256,9 @@ class exec_pri_queue : public boost::asio::execution_context // Only call when locking disabled size_t size(exec_queue q) const { return priority_que(q).size(); } - size_t size() const { return read_only_handlers_.size() + read_write_handlers_.size() + read_exclusive_handlers_.size(); } + size_t size() const { + return std::accumulate(queues_.begin(), queues_.end(), size_t(0), [](size_t s, const prio_queue& q) { return s + q.size(); }); + } // Only call when locking disabled bool empty(exec_queue q) const { return priority_que(q).empty(); } @@ -213,22 +279,38 @@ class exec_pri_queue : public boost::asio::execution_context return context_; } + template + decltype(auto) unwrap(T&& t) const + { + if constexpr (requires{ t.get(); } && requires{ std::is_invocable_v; }) { + // modern executor_binder + return unwrap(std::move(t.get())); + } else if constexpr (requires{ t.handler_.get(); } && requires{ std::is_invocable_v; }) { + // legacy detail::binder (0, 1, 2, etc.) + return unwrap(std::move(t.handler_.get())); + } else { + // looks like a simple unwrapped function + static_assert(std::is_invocable_v, "unwrap requires a callable type"); + return std::forward(t); + } + } + template void dispatch(Function f, const Allocator&) const { - context_.add(id_, priority_, que_, order_, std::move(f)); + context_.add(id_, priority_, que_, order_, unwrap(std::move(f))); } template void post(Function f, const Allocator&) const { - context_.add(id_, priority_, que_, order_, std::move(f)); + context_.add(id_, priority_, que_, order_, unwrap(std::move(f))); } template void defer(Function f, const Allocator&) const { - context_.add(id_, priority_, que_, order_, std::move(f)); + context_.add(id_, priority_, que_, order_, unwrap(std::move(f))); } void on_work_started() const noexcept {} @@ -259,13 +341,6 @@ class exec_pri_queue : public boost::asio::execution_context return boost::asio::bind_executor( executor(*this, id, priority, order, q), std::forward(func) ); } - template - boost::asio::executor_binder - wrap(int priority, exec_queue q, size_t order, Function&& func) - { - return boost::asio::bind_executor( executor(*this, handler_id::unique, priority, order, q), std::forward(func) ); - } - private: class queued_handler_base { @@ -310,6 +385,10 @@ class exec_pri_queue : public boost::asio::execution_context function_(); } + const Function& function() const { + return function_; + } + private: Function function_; }; @@ -326,29 +405,11 @@ class exec_pri_queue : public boost::asio::execution_context using prio_queue = boost::heap::binomial_heap>; prio_queue& priority_que(exec_queue q) { - switch (q) { - case exec_queue::read_only: - return read_only_handlers_; - case exec_queue::read_write: - return read_write_handlers_; - case exec_queue::read_exclusive: - return read_exclusive_handlers_; - } - assert(false); - return read_only_handlers_; + return queues_[static_cast(q)]; } const prio_queue& priority_que(exec_queue q) const { - switch (q) { - case exec_queue::read_only: - return read_only_handlers_; - case exec_queue::read_write: - return read_write_handlers_; - case exec_queue::read_exclusive: - return read_exclusive_handlers_; - } - assert(false); - return read_only_handlers_; + return queues_[static_cast(q)]; } static std::unique_ptr pop(prio_queue& que) { @@ -372,9 +433,7 @@ class exec_pri_queue : public boost::asio::execution_context uint32_t max_waiting_{0}; bool exiting_blocking_{false}; std::function should_exit_; // called holding mtx_ - prio_queue read_only_handlers_; - prio_queue read_write_handlers_; - prio_queue read_exclusive_handlers_; + std::array(exec_queue::size)> queues_; }; } // appbase diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index b6c1ae0413..85ab219106 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -350,9 +350,6 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) { BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) { scoped_app_thread app; - // set to run functions from both queues - app->executor().is_write_window(); - // post functions std::map rslts {}; int seq_num = 0; @@ -411,6 +408,69 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) { BOOST_CHECK_LT( rslts[6], rslts[11] ); } +// verify functions from queues (read_only, read_write, trx_read_write) are processed in write window, but not read_exclusive +// trx_read_write are processed after all read_only and read_write +BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_and_trx_read_write_queues ) { + scoped_app_thread app(true); + + // post functions + std::map rslts {}; + int seq_num = 0; + const int trx_priority = priority::high+100; // trx priority set via configuration + app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); + app->executor().post( trx_priority-1, exec_queue::trx_read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); + app->executor().post( trx_priority+5, exec_queue::trx_read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); + app->executor().post( trx_priority+1, exec_queue::trx_read_write, [&]() { rslts[3]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[4]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); + app->executor().post( trx_priority+2, exec_queue::trx_read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[8]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[9]=seq_num; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[10]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[12]=seq_num; ++seq_num; } ); + app->executor().post( trx_priority+3, exec_queue::trx_read_write, [&]() { rslts[13]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[14]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[15]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[16]=seq_num; ++seq_num; } ); + + // stop application. Use lowest at the end to make sure this executes the last + app->executor().post( priority::lowest, exec_queue::trx_read_write, [&]() { + // read_queue should have current function and write_queue's functions are all executed + BOOST_REQUIRE_EQUAL( app->executor().trx_read_write_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); + app->quit(); + } ); + + app.start_exec(); + app.join(); + + // queues are emptied after exec + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().trx_read_write_queue_empty(), true); + + // exactly number of posts processed + BOOST_REQUIRE_EQUAL( rslts.size(), 17u ); + + // trx_read_write execute after everything else + BOOST_CHECK_LT( rslts[15], rslts[1] ); + BOOST_CHECK_LT( rslts[15], rslts[2] ); + BOOST_CHECK_LT( rslts[15], rslts[3] ); + BOOST_CHECK_LT( rslts[15], rslts[7] ); + BOOST_CHECK_LT( rslts[15], rslts[13] ); + + // trx_read_write execute in order of priority + BOOST_CHECK_LT( rslts[2], rslts[13] ); + BOOST_CHECK_LT( rslts[13], rslts[7] ); + BOOST_CHECK_LT( rslts[7], rslts[3] ); + BOOST_CHECK_LT( rslts[3], rslts[1] ); +} + // verify tasks from both queues (read_only, read_exclusive) are processed in read window BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { scoped_app_thread app(true); @@ -579,4 +639,59 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) { BOOST_REQUIRE_EQUAL(run_on_1+run_on_2+run_on_3+run_on_main, num_expected); } +BOOST_AUTO_TEST_CASE( test_read_view_iteration ) { + scoped_app_thread app(true); + + // post functions + const int trx_priority = priority::high+100; // trx priority set via configuration + struct functor { + void operator()() {} + int some_other_function() const { return i; } + int i; + }; + app->executor().post( priority::medium, exec_queue::read_only, functor{ 0 } ); + app->executor().post( trx_priority+1, exec_queue::trx_read_write, functor{ 1 } ); + app->executor().post( trx_priority+5, exec_queue::trx_read_write, functor{ 2 } ); + app->executor().post( trx_priority+1, exec_queue::trx_read_write, functor{ 3 } ); + app->executor().post( priority::medium, exec_queue::read_write, functor{ 4 } ); + app->executor().post( priority::high, exec_queue::read_write, functor{ 5 } ); + app->executor().post( priority::lowest, exec_queue::read_only, functor{ 6 } ); + app->executor().post( trx_priority+2, exec_queue::trx_read_write, functor{ 7 } ); + + auto work = make_work_guard(app->get_io_context()); + while( true ) { + app->get_io_context().poll(); + size_t s = app->executor().read_only_queue_size() + + app->executor().read_exclusive_queue_size() + + app->executor().read_write_queue_size() + + app->executor().trx_read_write_queue_size(); + if (s == 8) + break; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + { + auto read_queue = app->executor().readable_queue(); + auto b = read_queue.begin(exec_queue::trx_read_write); + auto e = read_queue.end(exec_queue::trx_read_write); + BOOST_REQUIRE(b != e); + BOOST_TEST(std::distance(b, e) == 4); + BOOST_REQUIRE_EQUAL( app->executor().trx_read_write_queue_size(), 4u ); + BOOST_REQUIRE_EQUAL( read_queue.size(exec_queue::trx_read_write), 4u ); + int v = read_queue.function_from_iter(b).some_other_function(); + BOOST_CHECK_EQUAL(v, 2); + v = read_queue.function_from_iter(++b).some_other_function(); + BOOST_CHECK_EQUAL(v, 7); + v = read_queue.function_from_iter(++b).some_other_function(); + BOOST_CHECK_EQUAL(v, 1); + v = read_queue.function_from_iter(++b).some_other_function(); + BOOST_CHECK_EQUAL(v, 3); + } + + app.start_exec(); + work.reset(); + app->quit(); + app.join(); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index b2b713acb2..0bba03edc4 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -2125,7 +2125,8 @@ void read_write::push_transaction(const read_write::push_transaction_params& par abi_serializer::from_variant(params, *pretty_input, resolver, abi_serializer_max_time); } EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception, "Invalid packed transaction") - app().get_method()(pretty_input, true, transaction_metadata::trx_type::input, false, + static incoming::methods::transaction_async::method_type& on_incoming = app().get_method(); + on_incoming(pretty_input, true, transaction_metadata::trx_type::input, false, [this, next](const next_function_variant& result) -> void { if (std::holds_alternative(result)) { next(std::get(result)); @@ -2261,7 +2262,8 @@ void api_base::send_transaction_gen(API &api, send_transaction_params_t params, ("e", ptrx->expiration())("m", api.trx_retry->get_max_expiration_time()) ); } - app().get_method()(ptrx, true, params.trx_type, params.return_failure_trace, + static incoming::methods::transaction_async::method_type& on_incoming = app().get_method(); + on_incoming(ptrx, true, params.trx_type, params.return_failure_trace, [&api, ptrx, next, retry, retry_num_blocks](const next_function_variant& result) -> void { if( std::holds_alternative( result ) ) { next( std::get( result ) ); diff --git a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp index 99671001bc..3e5ba0d0ca 100644 --- a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp +++ b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp @@ -134,10 +134,11 @@ class producer_plugin : public appbase::plugin { }; struct get_unapplied_transactions_result { - size_t size = 0; - size_t incoming_size = 0; - std::vector trxs; - string more; ///< fill lower_bound with trx id to fetch next set of transactions + size_t unapplied_size = 0; + size_t queued_size = 0; + std::vector unapplied_trxs; + std::vector queued_trxs; ///< returned in priority order + string more; ///< fill lower_bound with trx id to fetch next set of unapplied trxs }; get_unapplied_transactions_result get_unapplied_transactions( const get_unapplied_transactions_params& params, const fc::time_point& deadline ) const; @@ -177,5 +178,5 @@ FC_REFLECT(eosio::producer_plugin::get_account_ram_corrections_params, (lower_bo FC_REFLECT(eosio::producer_plugin::get_account_ram_corrections_result, (rows)(more)) FC_REFLECT(eosio::producer_plugin::get_unapplied_transactions_params, (lower_bound)(limit)(time_limit_ms)) FC_REFLECT(eosio::producer_plugin::unapplied_trx, (trx_id)(expiration)(trx_type)(first_auth)(first_receiver)(first_action)(total_actions)(billed_cpu_time_us)(size)) -FC_REFLECT(eosio::producer_plugin::get_unapplied_transactions_result, (size)(incoming_size)(trxs)(more)) +FC_REFLECT(eosio::producer_plugin::get_unapplied_transactions_result, (unapplied_size)(queued_size)(unapplied_trxs)(queued_trxs)(more)) FC_REFLECT(eosio::producer_plugin::pause_at_block_params, (block_num)); diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 6cc26d2375..a7f851c0a7 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -694,6 +694,10 @@ class producer_plugin_impl : public std::enable_shared_from_thischain().get_greylist_limit()}; } + producer_plugin::get_unapplied_transactions_result + get_unapplied_transactions(const producer_plugin::get_unapplied_transactions_params& p, + const fc::time_point& deadline); + void schedule_protocol_feature_activations(const producer_plugin::scheduled_protocol_feature_activations& schedule); void plugin_shutdown(); @@ -1003,12 +1007,57 @@ class producer_plugin_impl : public std::enable_shared_from_this next, + bool api_trx, + bool return_failure_traces) + : self(self) + , trx_meta(std::move(trx_meta)) + , is_transient(is_transient) + , next(std::move(next)) + , api_trx(api_trx) + , return_failure_traces(return_failure_traces) + {} + + const transaction_metadata_ptr& get_trx_meta() const { return trx_meta; } + + void operator()() { + auto start = fc::time_point::now(); + auto idle_time = self->_time_tracker.add_idle_time(start); + fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time)); + + auto exception_handler = [this](fc::exception_ptr ex) { + self->log_trx_results(trx_meta->packed_trx(), nullptr, ex, 0, is_transient); + next(std::move(ex)); + }; + try { + if (!self->process_incoming_transaction_async(trx_meta, api_trx, start, return_failure_traces, next)) { + if (self->in_producing_mode()) { + self->schedule_maybe_produce_block(true); + } else { + self->restart_speculative_block(); + } + } + } + CATCH_AND_CALL(exception_handler); + } + private: + producer_plugin_impl* self; + transaction_metadata_ptr trx_meta; + bool is_transient; + next_function next; + bool api_trx; + bool return_failure_traces; + }; + void on_incoming_transaction_async(const packed_transaction_ptr& trx, bool api_trx, transaction_metadata::trx_type trx_type, bool return_failure_traces, next_function next) { - const transaction& t = trx->get_transaction(); EOS_ASSERT( t.delay_sec.value == 0, transaction_exception, "transaction cannot be delayed" ); @@ -1070,29 +1119,10 @@ class producer_plugin_impl : public std::enable_shared_from_thispacked_trx(), nullptr, ex, 0, is_transient); - next(std::move(ex)); - }; - try { - if (!process_incoming_transaction_async(trx_meta, api_trx, start, return_failure_traces, next)) { - if (in_producing_mode()) { - schedule_maybe_produce_block(true); - } else { - restart_speculative_block(); - } - } - } - CATCH_AND_CALL(exception_handler); - }); + trx_executor executor{this, std::move(trx_meta), is_transient, std::move(next), api_trx, return_failure_traces}; + + // key recovery complete, post to the trx queue + app().executor().post(priority::low, exec_queue::trx_read_write, std::move(executor)); }); } @@ -1125,6 +1155,7 @@ class producer_plugin_impl : public std::enable_shared_from_this_unapplied_transactions; + auto& ua = _unapplied_transactions; auto itr = ([&]() { if (!p.lower_bound.empty()) { @@ -2015,15 +2047,12 @@ producer_plugin::get_unapplied_transactions_result producer_plugin::get_unapplie return "unknown type"; }; - get_unapplied_transactions_result result; - result.size = ua.size(); - result.incoming_size = ua.incoming_size(); + producer_plugin::get_unapplied_transactions_result result; + result.unapplied_size = ua.size(); uint32_t remaining = p.limit ? *p.limit : std::numeric_limits::max(); - if (deadline != fc::time_point::maximum() && remaining > 1000) - remaining = 1000; while (itr != ua.end() && remaining > 0) { - auto& r = result.trxs.emplace_back(); + auto& r = result.unapplied_trxs.emplace_back(); r.trx_id = itr->id(); r.expiration = itr->expiration(); const auto& pt = itr->trx_meta->packed_trx(); @@ -2039,18 +2068,56 @@ producer_plugin::get_unapplied_transactions_result producer_plugin::get_unapplie r.size = pt->get_estimated_size(); ++itr; - remaining--; + --remaining; if (fc::time_point::now() >= params_deadline) break; } + auto readable_queue = app().executor().readable_queue(); + result.queued_size = readable_queue.size(exec_queue::trx_read_write); + if (itr != ua.end()) { result.more = itr->id(); + return result; + } + + auto qitr = readable_queue.begin(exec_queue::trx_read_write); + auto qend = readable_queue.end(exec_queue::trx_read_write); + for (; qitr != qend && remaining > 0; ++qitr) { + auto& r = result.queued_trxs.emplace_back(); + const auto& f = readable_queue.function_from_iter(qitr); + const auto& trx_meta = f.get_trx_meta(); + const auto& pt = trx_meta->packed_trx(); + r.trx_id = pt->id(); + r.expiration = pt->expiration(); + r.trx_type = "input"; + r.first_auth = pt->get_transaction().first_authorizer(); + const auto& actions = pt->get_transaction().actions; + if (!actions.empty()) { + r.first_receiver = actions[0].account; + r.first_action = actions[0].name; + } + r.total_actions = pt->get_transaction().total_actions(); + r.billed_cpu_time_us = trx_meta->billed_cpu_time_us; + r.size = pt->get_estimated_size(); + + --remaining; + if (fc::time_point::now() >= params_deadline) + break; + } + + if (qitr != qend) { + result.more = readable_queue.function_from_iter(qitr).get_trx_meta()->id(); } return result; } +producer_plugin::get_unapplied_transactions_result producer_plugin::get_unapplied_transactions(const get_unapplied_transactions_params& p, + const fc::time_point& deadline) const { + return my->get_unapplied_transactions(p, deadline); +} + block_timestamp_type producer_plugin_impl::calculate_pending_block_time() const { const chain::controller& chain = chain_plug->chain(); // on speculative nodes, always use next block time. On producers, honor current clock time @@ -2286,8 +2353,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { const block_num_type head_block_num = head.block_num(); const uint32_t pending_block_num = head_block_num + 1; - fc_dlog(_log, "Starting block #${n} ${bt} producer ${p}, deadline ${d}", - ("n", pending_block_num)("bt", block_time)("p", scheduled_producer.producer_name)("d", _pending_block_deadline)); + fc_dlog(_log, "Starting block #${n} ${bt} producer ${p}, deadline ${d}, unapplied trxs ${u}, queued trxs ${q}, queued tasks ${t}", + ("n", pending_block_num)("bt", block_time)("p", scheduled_producer.producer_name)("d", _pending_block_deadline) + ("u", _unapplied_transactions.size())("q", app().executor().readable_queue().size(exec_queue::trx_read_write)) + ("t", app().executor().read_write_queue_size())); try { @@ -2360,6 +2429,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { LOG_AND_DROP(); if (chain.is_building_block()) { + app().executor().enable_trx_read_write_queue(true); const auto& pending_block_signing_authority = chain.pending_block_signing_authority(); if (in_producing_mode() && pending_block_signing_authority != scheduled_producer.authority) { @@ -2991,6 +3061,7 @@ void producer_plugin_impl::produce_block() { _protocol_features_signaled = false; } + app().executor().enable_trx_read_write_queue(false); // idump( (fc::time_point::now() - chain.pending_block_time()) ); chain.assemble_and_complete_block([&](const digest_type& d) { auto debug_logger = maybe_make_debug_time_logger(); diff --git a/tests/plugin_http_api_test.py b/tests/plugin_http_api_test.py index 7a9d5805bc..42667af032 100755 --- a/tests/plugin_http_api_test.py +++ b/tests/plugin_http_api_test.py @@ -1148,12 +1148,12 @@ def test_ProducerApi(self) : # get_unapplied_transactions with empty parameter command = "get_unapplied_transactions" ret_json = self.nodeos.processUrllibRequest(resource, command, endpoint=endpoint) - self.assertIn("size", ret_json["payload"]) - self.assertIn("incoming_size", ret_json["payload"]) + self.assertIn("unapplied_size", ret_json["payload"]) + self.assertIn("queued_size", ret_json["payload"]) # get_unapplied_transactions with empty content parameter ret_json = self.nodeos.processUrllibRequest(resource, command, self.empty_content_dict, endpoint=endpoint) - self.assertIn("size", ret_json["payload"]) - self.assertIn("incoming_size", ret_json["payload"]) + self.assertIn("unapplied_size", ret_json["payload"]) + self.assertIn("queued_size", ret_json["payload"]) # get_unapplied_transactions with invalid parameter ret_json = self.nodeos.processUrllibRequest(resource, command, self.http_post_invalid_param, endpoint=endpoint) self.assertEqual(ret_json["code"], 400) @@ -1161,7 +1161,8 @@ def test_ProducerApi(self) : # get_unapplied_transactions with valid parameter payload = {"lower_bound":"", "limit":1, "time_limit_ms":500} ret_json = self.nodeos.processUrllibRequest(resource, command, payload, endpoint=endpoint) - self.assertIn("trxs", ret_json["payload"]) + self.assertIn("unapplied_trxs", ret_json["payload"]) + self.assertIn("queued_trxs", ret_json["payload"]) # place pause and resume tests at the end such that they are not impacted # by update_runtime_options