diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 400d6a265d..bf00ec1f4a 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -65,6 +65,9 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executor.cpp src/rclcpp/executor_options.cpp src/rclcpp/executors.cpp + src/rclcpp/executors/events_cbg_executor/events_cbg_executor.cpp + src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.cpp + src/rclcpp/executors/events_cbg_executor/global_event_id_provider.cpp src/rclcpp/executors/executor_entities_collection.cpp src/rclcpp/executors/executor_entities_collector.cpp src/rclcpp/executors/executor_notify_waitable.cpp diff --git a/rclcpp/include/rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp b/rclcpp/include/rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp new file mode 100644 index 0000000000..42510c565e --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp @@ -0,0 +1,320 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "rclcpp/executor.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ +namespace executors +{ + +namespace cbg_executor +{ +class TimerManager; +struct RegisteredEntityCache; +class CBGScheduler; +struct GlobalWeakExecutableCache; +} + +class EventsCBGExecutor : public rclcpp::Executor +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsCBGExecutor) + + /** + * For the yield_before_execute option, when true std::this_thread::yield() + * will be called after acquiring work (as an AnyExecutable) and + * releasing the spinning lock, but before executing the work. + * This is useful for reproducing some bugs related to taking work more than + * once. + * + * \param options common options for all executors + * \param number_of_threads number of threads to have in the thread pool, + * the default 0 will use the number of cpu cores found (minimum of 2) + * \param yield_before_execute if true std::this_thread::yield() is called + * \param timeout maximum time to wait + */ + RCLCPP_PUBLIC + explicit EventsCBGExecutor( + const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions(), + size_t number_of_threads = 0, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + RCLCPP_PUBLIC + virtual ~EventsCBGExecutor(); + + RCLCPP_PUBLIC + void + add_callback_group( + const rclcpp::CallbackGroup::SharedPtr & group_ptr, + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node_ptr, + bool notify = true) override; + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups() override; + + RCLCPP_PUBLIC + void + remove_callback_group( + const rclcpp::CallbackGroup::SharedPtr & group_ptr, + bool notify = true) override; + + RCLCPP_PUBLIC + void + add_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \see rclcpp::Executor::add_node + */ + RCLCPP_PUBLIC + void + add_node(const std::shared_ptr & node_ptr, bool notify = true) override; + + RCLCPP_PUBLIC + void + remove_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \see rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + void + remove_node(const std::shared_ptr & node_ptr, bool notify = true) override; + + // add a callback group to the executor, not bound to any node + void add_callback_group_only(const rclcpp::CallbackGroup::SharedPtr & group_ptr); + + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + */ + RCLCPP_PUBLIC + void + spin() override; + + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler can be called from multiple threads at the same time. + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + void + spin(const std::function & exception_handler); + + void + spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)) override; + + RCLCPP_PUBLIC + void + spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override; + + /** + * @return true if work was available and executed + */ + bool collect_and_execute_ready_events( + std::chrono::nanoseconds max_duration, + bool recollect_if_no_work_available); + + void + spin_all(std::chrono::nanoseconds max_duration) override; + + /// Cancel any running spin* function, causing it to return. + /** + * This function can be called asynchonously from any thread. + * \throws std::runtime_error if there is an issue triggering the guard condition + */ + RCLCPP_PUBLIC + void + cancel() override; + + RCLCPP_PUBLIC + size_t + get_number_of_threads() const; + + bool + is_spinning() + { + return spinning; + } + + template + FutureReturnCode + spin_until_future_complete( + const FutureT & future, + std::chrono::duration timeout = std::chrono::duration(-1)) + { + // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete + // inside a callback executed by an executor. + + // Check the future before entering the while loop. + // If the future is already complete, don't try to spin. + std::future_status status = future.wait_for(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + + auto end_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; + } + std::chrono::nanoseconds timeout_left = timeout_ns; + + if (spinning.exchange(true)) { + throw std::runtime_error("spin_until_future_complete() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::ok(this->context_) && spinning.load()) { + // Do one item of work. + spin_once_internal(timeout_left); + + // Check if the future is set, return SUCCESS if it is. + status = future.wait_for(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + // If the original timeout is < 0, then this is blocking, never TIMEOUT. + if (timeout_ns < std::chrono::nanoseconds::zero()) { + continue; + } + // Otherwise check if we still have time to wait, return TIMEOUT if not. + auto now = std::chrono::steady_clock::now(); + if (now >= end_time) { + return FutureReturnCode::TIMEOUT; + } + // Subtract the elapsed time from the original timeout. + timeout_left = std::chrono::duration_cast(end_time - now); + } + + // The future did not complete before ok() returned false, return INTERRUPTED. + return FutureReturnCode::INTERRUPTED; + } + + /// We need these fuction to be public, as we use them in the callback_group_scheduler + using rclcpp::Executor::execute_subscription; + using rclcpp::Executor::execute_timer; + using rclcpp::Executor::execute_service; + using rclcpp::Executor::execute_client; + +protected: + RCLCPP_PUBLIC + void + run(size_t this_thread_number, bool block_initially); + + void + run( + size_t this_thread_number, + const std::function & exception_handler); + + /** + * Te be called in termination case. E.g. destructor of shutdown callback. + * Stops the scheduler and cleans up the internal data structures. + */ + void shutdown(); + + std::unique_ptr scheduler; + + std::thread rcl_polling_thread; + + struct CallbackGroupData + { + CallbackGroup::WeakPtr callback_group; + + std::unique_ptr registered_entities; + }; + + void set_callbacks(CallbackGroupData & cgd); + + /** + * This function will execute all available executables, + * that were ready, before this function was called. + */ + bool execute_previous_ready_executables_until( + const std::chrono::time_point & stop_time); + + void unregister_event_callbacks(const rclcpp::CallbackGroup::SharedPtr & cbg) const; + +private: + void remove_all_nodes_and_callback_groups(); + + void sync_callback_groups(); + + void spin_once_internal(std::chrono::nanoseconds timeout); + + RCLCPP_DISABLE_COPY(EventsCBGExecutor) + + std::mutex added_callback_groups_mutex_; + std::vector added_callback_groups; + + std::mutex added_nodes_mutex_; + std::vector added_nodes; + + std::mutex callback_groups_mutex; + + std::vector callback_groups; + + size_t number_of_threads_; + + std::chrono::nanoseconds next_exec_timeout_; + + std::atomic_bool needs_callback_group_resync = false; + + /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. + std::atomic_bool spinning; + + /// Guard condition for signaling the rmw layer to wake up for special events. + std::shared_ptr interrupt_guard_condition_; + + /// Guard condition for signaling the rmw layer to wake up for system shutdown. + std::shared_ptr shutdown_guard_condition_; + + /// shutdown callback handle registered to Context + rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_; + + /// The context associated with this executor. + std::shared_ptr context_; + + std::unique_ptr timer_manager; + + /// Stores the executables for the internal guard conditions + /// e.g. interrupt_guard_condition_ and shutdown_guard_condition_ + std::unique_ptr global_executable_cache; + + /// Stores the executables for guard conditions of the nodes + std::unique_ptr nodes_executable_cache; +}; + +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/events_cbg_executor.cpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/events_cbg_executor.cpp new file mode 100644 index 0000000000..15bff2af86 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/events_cbg_executor.cpp @@ -0,0 +1,729 @@ +// Copyright 2024 Cellumation GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "rcpputils/scope_exit.hpp" +#include "rclcpp/exceptions/exceptions.hpp" +#include "rclcpp/node.hpp" + +#include "first_in_first_out_scheduler.hpp" +#include "timer_manager.hpp" +#include "registered_entity_cache.hpp" +#include "rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp" + + +namespace rclcpp::executors +{ +namespace cbg_executor +{ +struct GlobalWeakExecutableCache +{ + std::vector guard_conditions; + + GlobalWeakExecutableCache() = default; + GlobalWeakExecutableCache(const GlobalWeakExecutableCache &) = default; + GlobalWeakExecutableCache(GlobalWeakExecutableCache &&) = default; + ~GlobalWeakExecutableCache() + { + for (const auto & gc_ref : guard_conditions) { + gc_ref.guard_condition->set_on_trigger_callback(nullptr); + } + } + + GlobalWeakExecutableCache & operator=(const GlobalWeakExecutableCache &) = default; + GlobalWeakExecutableCache & operator=(GlobalWeakExecutableCache &&) = default; + + void add_guard_condition_event( + const rclcpp::GuardCondition::SharedPtr & ptr, + std::function fun) + { + guard_conditions.emplace_back(ptr, std::move(fun)); + + + for (auto & entry : guard_conditions) { + entry.guard_condition->set_on_trigger_callback( + [ptr = &entry](size_t nr_events) { + for (size_t i = 0; i < nr_events; i++) { + if (ptr->handle_guard_condition_fun) { + ptr->handle_guard_condition_fun(); + } + } + }); + } + } + + void clear() + { + guard_conditions.clear(); + } +}; +} // namespace cbg_executor + +EventsCBGExecutor::EventsCBGExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + std::chrono::nanoseconds next_exec_timeout) +: scheduler(std::make_unique()), + next_exec_timeout_(next_exec_timeout), + spinning(false), + interrupt_guard_condition_(std::make_shared(options.context) ), + shutdown_guard_condition_(std::make_shared(options.context) ), + context_(options.context), + timer_manager(std::make_unique(context_)), + global_executable_cache(std::make_unique() ), + nodes_executable_cache(std::make_unique() ) +{ + global_executable_cache->add_guard_condition_event ( + interrupt_guard_condition_, + std::function() ); + + global_executable_cache->add_guard_condition_event( + shutdown_guard_condition_, [this]() { + shutdown(); + }); + + number_of_threads_ = number_of_threads > 0 ? + number_of_threads : + std::max(std::thread::hardware_concurrency(), 2U); + + shutdown_callback_handle_ = context_->add_on_shutdown_callback( + [weak_gc = std::weak_ptr {shutdown_guard_condition_}]() { + auto strong_gc = weak_gc.lock(); + if (strong_gc) { + strong_gc->trigger(); + } + }); +} + +EventsCBGExecutor::~EventsCBGExecutor() +{ + shutdown(); +} + +void EventsCBGExecutor::shutdown() +{ + if(!timer_manager) { + // already shut down + return; + } + + // we need to shut down the timer manager first, as it might access the Schedulers + timer_manager->stop(); + + bool was_spining = spinning; + + // signal all processing threads to shut down + spinning = false; + + if(was_spining) { + scheduler->release_all_worker_threads(); + } + + remove_all_nodes_and_callback_groups(); + + { + std::scoped_lock l(callback_groups_mutex); + callback_groups.clear(); + } + + // Remove shutdown callback handle registered to Context + if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_) ) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "failed to remove registered on_shutdown callback"); + rcl_reset_error(); + } + + // now we may release the memory of the timer_manager, + // as we know no thread is working on it any more + timer_manager.reset(); +} + +void EventsCBGExecutor::remove_all_nodes_and_callback_groups() +{ + std::vector added_nodes_cpy; + { + std::lock_guard lock{added_nodes_mutex_}; + added_nodes_cpy = added_nodes; + } + + for (const node_interfaces::NodeBaseInterface::WeakPtr & node_weak_ptr : added_nodes_cpy) { + const node_interfaces::NodeBaseInterface::SharedPtr & node_ptr = node_weak_ptr.lock(); + if (node_ptr) { + remove_node(node_ptr, false); + } + } + + std::vector added_cbgs_cpy; + { + std::lock_guard lock{added_callback_groups_mutex_}; + added_cbgs_cpy = added_callback_groups; + } + + for (const auto & weak_ptr : added_cbgs_cpy) { + auto shr_ptr = weak_ptr.lock(); + if (shr_ptr) { + remove_callback_group(shr_ptr, false); + } + } +} + +bool EventsCBGExecutor::execute_previous_ready_executables_until( + const std::chrono::time_point & stop_time) +{ + bool found_work = false; + + const uint64_t last_ready_id = cbg_executor::GlobalEventIdProvider::get_last_id(); + + while(true) { + auto ready_entity = scheduler->get_next_ready_entity(last_ready_id); + if(!ready_entity.entitiy) { + break; + } + + found_work = true; + + ready_entity.entitiy->execute_function(); + + scheduler->mark_entity_as_executed(*ready_entity.entitiy); + + if(std::chrono::steady_clock::now() >= stop_time) { + break; + } + } + + return found_work; +} + + +size_t +EventsCBGExecutor::get_number_of_threads() const +{ + return number_of_threads_; +} + +void EventsCBGExecutor::sync_callback_groups() +{ + if (!needs_callback_group_resync.exchange(false) ) { + return; + } + + std::scoped_lock l(callback_groups_mutex); + + + std::vector> cur_group_data; + cur_group_data.reserve(callback_groups.size() ); + + for (CallbackGroupData & d : callback_groups) { + auto p = d.callback_group.lock(); + if (p) { + cur_group_data.emplace_back(&d, std::move(p) ); + } + } + + std::vector next_group_data; + + std::set added_cbgs; + + auto insert_data = + [&cur_group_data, &next_group_data, &added_cbgs, + this](rclcpp::CallbackGroup::SharedPtr && cbg) { + // nodes may share callback groups, therefore we need to make sure we only add them once + if (added_cbgs.find(cbg.get() ) != added_cbgs.end() ) { + return; + } + + added_cbgs.insert(cbg.get() ); + + for (const auto & pair : cur_group_data) { + if (pair.second == cbg) { + next_group_data.push_back(std::move(*pair.first) ); + // call regenerate, in case something changed in the group + next_group_data.back().registered_entities->regenerate_events(); + return; + } + } + + + CallbackGroupData new_entry; + new_entry.registered_entities = + std::make_unique(*scheduler, + *timer_manager, cbg); + new_entry.callback_group = cbg; + new_entry.registered_entities->regenerate_events(); + next_group_data.push_back(std::move(new_entry) ); + }; + + { + std::vector added_cbgs_cpy; + { + std::lock_guard lock{added_callback_groups_mutex_}; + added_cbgs_cpy = added_callback_groups; + } + + std::vector added_nodes_cpy; + { + std::lock_guard lock{added_nodes_mutex_}; + added_nodes_cpy = added_nodes; + } + + // *3 is a rough estimate of how many callback_group a node may have + next_group_data.reserve(added_cbgs_cpy.size() + (added_nodes_cpy.size() * 3)); + + nodes_executable_cache->clear(); + + for (const node_interfaces::NodeBaseInterface::WeakPtr & node_weak_ptr : added_nodes_cpy) { + auto node_ptr = node_weak_ptr.lock(); + if (node_ptr) { + node_ptr->for_each_callback_group( + [&insert_data](rclcpp::CallbackGroup::SharedPtr cbg) { + if (cbg->automatically_add_to_executor_with_node() ) { + insert_data(std::move(cbg) ); + } + }); + + // register node guard condition, and trigger + // resync on node change event + nodes_executable_cache->add_guard_condition_event( + node_ptr->get_shared_notify_guard_condition(), + [this]() { + needs_callback_group_resync.store(true); + }); + } + } + + for (const rclcpp::CallbackGroup::WeakPtr & cbg : added_cbgs_cpy) { + auto p = cbg.lock(); + if (p) { + insert_data(std::move(p) ); + } + } + } + + // FIXME inform scheduler about remove cbgs + + callback_groups.swap(next_group_data); +} + +void +EventsCBGExecutor::run(size_t this_thread_number, bool block_initially) +{ + (void) this_thread_number; + + while (rclcpp::ok(this->context_) && spinning.load() ) { + if(block_initially) { + block_initially = false; + scheduler->block_worker_thread(); + } + + sync_callback_groups(); + + auto ready_entity = scheduler->get_next_ready_entity(); + if(!ready_entity.entitiy) { + scheduler->block_worker_thread(); + continue; + } + + if(ready_entity.moreEntitiesReady) { + scheduler->unblock_one_worker_thread(); + } + + ready_entity.entitiy->execute_function(); + + scheduler->mark_entity_as_executed(*ready_entity.entitiy); + } +} + +void +EventsCBGExecutor::run( + size_t this_thread_number, + const std::function & exception_handler) +{ + (void) this_thread_number; + + while (rclcpp::ok(this->context_) && spinning.load() ) { + sync_callback_groups(); + + auto ready_entity = scheduler->get_next_ready_entity(); + if(!ready_entity.entitiy) { + scheduler->block_worker_thread(); + continue; + } + + try { + ready_entity.entitiy->execute_function(); + } catch (const std::exception & e) { + exception_handler(e); + } + + scheduler->mark_entity_as_executed(*ready_entity.entitiy); + } +} + + +void EventsCBGExecutor::spin_once_internal(std::chrono::nanoseconds timeout) +{ + if (!rclcpp::ok(this->context_) || !spinning.load() ) { + return; + } + + auto ready_entity = scheduler->get_next_ready_entity(); + if(!ready_entity.entitiy) { + if (timeout < std::chrono::nanoseconds::zero()) { + // can't use std::chrono::nanoseconds::max, as wait_for + // internally computes end time by using ::now() + timeout + // as a workaround, we use some absurd high timeout + timeout = std::chrono::hours(10000); + } + + scheduler->block_worker_thread_for(timeout); + + ready_entity = scheduler->get_next_ready_entity(); + + if (!ready_entity.entitiy) { + return; + } + } + + ready_entity.entitiy->execute_function(); + + scheduler->mark_entity_as_executed(*ready_entity.entitiy); +} + +void +EventsCBGExecutor::spin_once(std::chrono::nanoseconds timeout) +{ + if (spinning.exchange(true) ) { + throw std::runtime_error("spin_once() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + sync_callback_groups(); + + spin_once_internal(timeout); +} + + +void +EventsCBGExecutor::spin_some(std::chrono::nanoseconds max_duration) +{ + collect_and_execute_ready_events(max_duration, false); +} + +void EventsCBGExecutor::spin_all(std::chrono::nanoseconds max_duration) +{ + if (max_duration < std::chrono::nanoseconds::zero() ) { + throw std::invalid_argument("max_duration must be greater than or equal to 0"); + } + + collect_and_execute_ready_events(max_duration, true); +} + +bool EventsCBGExecutor::collect_and_execute_ready_events( + std::chrono::nanoseconds max_duration, + bool recollect_if_no_work_available) +{ + if (spinning.exchange(true) ) { + throw std::runtime_error("collect_and_execute_ready_events() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + sync_callback_groups(); + + const auto start = std::chrono::steady_clock::now(); + const auto end_time = start + max_duration; + auto cur_time = start; + + bool had_work = false; + + while (rclcpp::ok(this->context_) && spinning && cur_time <= end_time) { + if (!execute_previous_ready_executables_until(end_time) ) { + return had_work; + } + + had_work = true; + + if (!recollect_if_no_work_available) { + // we are done + return had_work; + } + + cur_time = std::chrono::steady_clock::now(); + } + + return had_work; +} +void +EventsCBGExecutor::spin() +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + std::vector threads; + size_t thread_id = 0; + for ( ; thread_id < number_of_threads_ - 1; ++thread_id) { + threads.emplace_back([this, thread_id]() + { + run(thread_id, true); + }); + } + + run(thread_id, false); + for (auto & thread : threads) { + thread.join(); + } +} + +void EventsCBGExecutor::spin( + const std::function & exception_handler) +{ + if (spinning.exchange(true) ) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + std::vector threads; + size_t thread_id = 0; + for ( ; thread_id < number_of_threads_ - 1; ++thread_id) { + threads.emplace_back([this, thread_id, exception_handler]() + { + run(thread_id, exception_handler); + } + ); + } + + run(thread_id, exception_handler); + for (auto & thread : threads) { + thread.join(); + } +} + +void +EventsCBGExecutor::add_callback_group( + const rclcpp::CallbackGroup::SharedPtr & group_ptr, + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & /*node_ptr*/, + bool notify) +{ + { + std::lock_guard lock{added_callback_groups_mutex_}; + added_callback_groups.push_back(group_ptr); + } + needs_callback_group_resync = true; + scheduler->unblock_one_worker_thread(); + + if (notify) { + // Interrupt waiting to handle new node + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what() ); + } + } +} + +void +EventsCBGExecutor::cancel() +{ + bool was_spinning = spinning; + + spinning.store(false); + + if(was_spinning) { + scheduler->release_all_worker_threads(); + } + + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string("Failed to trigger guard condition in cancel: ") + ex.what() ); + } +} + +std::vector +EventsCBGExecutor::get_all_callback_groups() +{ + std::lock_guard lock{added_callback_groups_mutex_}; + return added_callback_groups; +} + +void EventsCBGExecutor::unregister_event_callbacks(const rclcpp::CallbackGroup::SharedPtr & cbg) +const +{ + const auto remove_sub = [](const rclcpp::SubscriptionBase::SharedPtr & s) { + s->clear_on_new_message_callback(); + }; + const auto remove_timer = [this](const rclcpp::TimerBase::SharedPtr & s) { + timer_manager->remove_timer(s); + }; + + const auto remove_client = [](const rclcpp::ClientBase::SharedPtr & s) { + s->clear_on_new_response_callback(); + }; + + const auto remove_service = [](const rclcpp::ServiceBase::SharedPtr & s) { + s->clear_on_new_request_callback(); + }; + + auto gc_ptr = cbg->get_notify_guard_condition(); + if (gc_ptr) { + gc_ptr->set_on_trigger_callback(std::function()); + } + + const auto remove_waitable = [](const rclcpp::Waitable::SharedPtr & s) { + s->clear_on_ready_callback(); + }; + + + cbg->collect_all_ptrs(remove_sub, remove_service, remove_client, remove_timer, remove_waitable); +} + + +void +EventsCBGExecutor::remove_callback_group( + const rclcpp::CallbackGroup::SharedPtr & group_ptr, + bool notify) +{ + bool found = false; + { + std::lock_guard lock{added_callback_groups_mutex_}; + added_callback_groups.erase( + std::remove_if( + added_callback_groups.begin(), added_callback_groups.end(), + [&group_ptr, &found](const auto & weak_ptr) { + auto shr_ptr = weak_ptr.lock(); + if (!shr_ptr) { + return true; + } + + if (group_ptr == shr_ptr) { + found = true; + return true; + } + return false; + }), added_callback_groups.end() ); + added_callback_groups.push_back(group_ptr); + } + + // we need to unregister all callbacks + unregister_event_callbacks(group_ptr); + + if (found) { + needs_callback_group_resync = true; + scheduler->unblock_one_worker_thread(); + } + + if (notify) { + // Interrupt waiting to handle new node + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what() ); + } + } +} + +void +EventsCBGExecutor::add_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node_ptr, + bool /*notify*/) +{ + // If the node already has an executor + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true) ) { + throw std::runtime_error( + std::string("Node '") + node_ptr->get_fully_qualified_name() + + "' has already been added to an executor."); + } + + { + std::lock_guard lock{added_nodes_mutex_}; + added_nodes.push_back(node_ptr); + } + + needs_callback_group_resync = true; + scheduler->unblock_one_worker_thread(); + + + if (!spinning) { + sync_callback_groups(); + } +} + +void +EventsCBGExecutor::add_node(const std::shared_ptr & node_ptr, bool notify) +{ + add_node(node_ptr->get_node_base_interface(), notify); +} + +void +EventsCBGExecutor::remove_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node_ptr, + bool notify) +{ + { + std::lock_guard lock{added_nodes_mutex_}; + added_nodes.erase( + std::remove_if( + added_nodes.begin(), added_nodes.end(), [&node_ptr](const auto & weak_ptr) { + const auto shr_ptr = weak_ptr.lock(); + return shr_ptr && shr_ptr == node_ptr; + }), added_nodes.end()); + } + + node_ptr->for_each_callback_group( + [this](const rclcpp::CallbackGroup::SharedPtr & cbg) + { + unregister_event_callbacks(cbg); + } + ); + + needs_callback_group_resync = true; + + if (notify) { + scheduler->unblock_one_worker_thread(); + // Interrupt waiting to handle new node + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what() ); + } + } + + node_ptr->get_associated_with_executor_atomic().store(false); +} + +void +EventsCBGExecutor::remove_node(const std::shared_ptr & node_ptr, bool notify) +{ + remove_node(node_ptr->get_node_base_interface(), notify); +} + +// add a callback group to the executor, not bound to any node +void EventsCBGExecutor::add_callback_group_only(const rclcpp::CallbackGroup::SharedPtr & group_ptr) +{ + add_callback_group(group_ptr, nullptr, true); +} +} // namespace rclcpp::executors diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.cpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.cpp new file mode 100644 index 0000000000..ffafd2cb6e --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.cpp @@ -0,0 +1,200 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "first_in_first_out_scheduler.hpp" +#include + +namespace rclcpp +{ +namespace executors +{ +namespace cbg_executor +{ +std::function FirstInFirstOutCallbackGroupHandle::get_ready_callback_for_entity( + const rclcpp::SubscriptionBase::WeakPtr & entity) +{ + return [weak_ptr = entity, this](size_t nr_msg) { + add_ready_entity([&] () { + for (size_t i = 0; i < nr_msg; i++) { + ready_entities.emplace_back(weak_ptr); + } + }); + }; +} + +std::function executed_callback)> FirstInFirstOutCallbackGroupHandle:: +get_ready_callback_for_entity(const rclcpp::TimerBase::WeakPtr & entity) +{ + return [weak_ptr = entity, this](std::function executed_callback) { + add_ready_entity([&] () { + ready_entities.emplace_back(ReadyEntity::ReadyTimerWithExecutedCallback{weak_ptr, + executed_callback}); + }); + }; +} + +std::function FirstInFirstOutCallbackGroupHandle::get_ready_callback_for_entity( + const rclcpp::ClientBase::WeakPtr & entity) +{ + return [weak_ptr = entity, this](size_t nr_msg) { + add_ready_entity([&] () { + for (size_t i = 0; i < nr_msg; i++) { + ready_entities.emplace_back(weak_ptr); + } + }); + }; +} + +std::function FirstInFirstOutCallbackGroupHandle::get_ready_callback_for_entity( + const rclcpp::ServiceBase::WeakPtr & entity) +{ + return [weak_ptr = entity, this](size_t nr_msg) { + add_ready_entity([&] () { + for (size_t i = 0; i < nr_msg; i++) { + ready_entities.emplace_back(weak_ptr); + } + }); + }; +} + +std::function FirstInFirstOutCallbackGroupHandle::get_ready_callback_for_entity( + const rclcpp::Waitable::WeakPtr & entity) +{ + return [weak_ptr = entity, this](size_t nr_msg, int event_type) { + add_ready_entity([&] () { + for (size_t i = 0; i < nr_msg; i++) { + ready_entities.emplace_back(CBGScheduler::WaitableWithEventType({weak_ptr, + event_type})); + } + }); + }; +} +std::function FirstInFirstOutCallbackGroupHandle::get_ready_callback_for_entity( + const CBGScheduler::CallbackEventType & entity) +{ + return [weak_ptr = entity, this](size_t nr_msg) { + add_ready_entity([&] () { + for (size_t i = 0; i < nr_msg; i++) { + ready_entities.emplace_back(weak_ptr); + } + }); + }; +} + +std::optional FirstInFirstOutCallbackGroupHandle:: +get_next_ready_entity() +{ + std::lock_guard l(ready_mutex); + + while(!ready_entities.empty()) { + auto & first = ready_entities.front(); + + std::function exec_fun = first.get_execute_function(); + ready_entities.pop_front(); + if(!exec_fun) { + // was deleted, or in case of timer was canceled + continue; + } + + mark_as_executing(); + + return CBGScheduler::ExecutableEntity{exec_fun, this}; + } + + mark_as_skiped(); + + return std::nullopt; +} + +std::optional FirstInFirstOutCallbackGroupHandle:: +get_next_ready_entity(GlobalEventIdProvider::MonotonicId max_id) +{ + std::lock_guard l(ready_mutex); + + while(!ready_entities.empty()) { + auto & first = ready_entities.front(); + if(first.id > max_id) { + return std::nullopt; + } + + std::function exec_fun = first.get_execute_function(); + ready_entities.pop_front(); + if(!exec_fun) { + // was deleted, or in case of timer was canceled + continue; + } + + mark_as_executing(); + + return CBGScheduler::ExecutableEntity{exec_fun, this}; + } + + mark_as_skiped(); + + return std::nullopt; +} + +std::unique_ptr FirstInFirstOutScheduler:: +get_handle_for_callback_group(const rclcpp::CallbackGroup::SharedPtr &/*callback_group*/) +{ + return std::make_unique(*this); +} + +CBGScheduler::ExecutableEntityWithInfo FirstInFirstOutScheduler::get_next_ready_entity() +{ + std::lock_guard l(ready_callback_groups_mutex); + + while(!ready_callback_groups.empty()) { + FirstInFirstOutCallbackGroupHandle *ready_cbg = + static_cast(ready_callback_groups.front()); + ready_callback_groups.pop_front(); + + std::optional ret = + ready_cbg->get_next_ready_entity(); + if(ret) { + return CBGScheduler::ExecutableEntityWithInfo{.entitiy = std::move(ret), + .moreEntitiesReady = !ready_callback_groups.empty()}; + } + } + + return CBGScheduler::ExecutableEntityWithInfo{.entitiy = std::nullopt, + .moreEntitiesReady = false}; +} + +CBGScheduler::ExecutableEntityWithInfo FirstInFirstOutScheduler::get_next_ready_entity( + GlobalEventIdProvider::MonotonicId max_id) +{ + std::lock_guard l(ready_callback_groups_mutex); + + // as, we remove an reappend ready callback_groups during execution, + // the first ready cbg may not contain the lowest id. Therefore we + // need to search the whole deque + for(auto it = ready_callback_groups.begin(); it != ready_callback_groups.end(); it++) { + FirstInFirstOutCallbackGroupHandle *ready_cbg( + static_cast(*it)); + std::optional ret = + ready_cbg->get_next_ready_entity(max_id); + if(ret) { + ready_callback_groups.erase(it); + return CBGScheduler::ExecutableEntityWithInfo{.entitiy = std::move(ret), + .moreEntitiesReady = !ready_callback_groups.empty()}; + } + } + + return CBGScheduler::ExecutableEntityWithInfo{.entitiy = std::nullopt, + .moreEntitiesReady = false}; +} +} // namespace cbg_executor +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.hpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.hpp new file mode 100644 index 0000000000..412f637175 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.hpp @@ -0,0 +1,79 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include + +#include "ready_entity.hpp" +#include "scheduler.hpp" +#include "global_event_id_provider.hpp" + +namespace rclcpp +{ +namespace executors +{ +namespace cbg_executor +{ +struct FirstInFirstOutCallbackGroupHandle final : public CBGScheduler::CallbackGroupHandle +{ +public: + explicit FirstInFirstOutCallbackGroupHandle(CBGScheduler & scheduler) + : CallbackGroupHandle(scheduler) {} + + std::function get_ready_callback_for_entity( + const rclcpp::SubscriptionBase::WeakPtr & entity) final; + std::function executed_callback)> get_ready_callback_for_entity( + const rclcpp::TimerBase::WeakPtr & entity) final; + std::function get_ready_callback_for_entity( + const rclcpp::ClientBase::WeakPtr & entity) final; + std::function get_ready_callback_for_entity( + const rclcpp::ServiceBase::WeakPtr & entity) final; + std::function get_ready_callback_for_entity(const rclcpp::Waitable::WeakPtr & entity) final; + std::function get_ready_callback_for_entity( + const CBGScheduler::CallbackEventType & entity) final; + + std::optional get_next_ready_entity(); + std::optional get_next_ready_entity( + GlobalEventIdProvider::MonotonicId max_id); + + bool has_ready_entities() const final + { + return !ready_entities.empty(); + } + +private: + std::deque ready_entities; +}; + +class FirstInFirstOutScheduler : public CBGScheduler +{ +public: + ExecutableEntityWithInfo get_next_ready_entity() final; + ExecutableEntityWithInfo get_next_ready_entity( + GlobalEventIdProvider::MonotonicId max_id) final; + +private: + std::unique_ptr get_handle_for_callback_group( + const rclcpp::CallbackGroup::SharedPtr & callback_group) final; + + std::vector> callback_group_handles; +}; +} // namespace cbg_executor +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/global_event_id_provider.cpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/global_event_id_provider.cpp new file mode 100644 index 0000000000..22346e819b --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/global_event_id_provider.cpp @@ -0,0 +1,26 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "global_event_id_provider.hpp" + +namespace rclcpp +{ +namespace executors +{ +namespace cbg_executor +{ +std::atomic GlobalEventIdProvider::last_event_id = 1; +} // namespace cbg_executor +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/global_event_id_provider.hpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/global_event_id_provider.hpp new file mode 100644 index 0000000000..5377cd04be --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/global_event_id_provider.hpp @@ -0,0 +1,47 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace rclcpp +{ +namespace executors +{ +namespace cbg_executor +{ +class GlobalEventIdProvider +{ + static std::atomic last_event_id; + +public: + using MonotonicId = uint64_t; + + // Returns the last id, returnd by getNextId + static uint64_t get_last_id() + { + return last_event_id; + } + + // increases the Id by one and returns the Id + static uint64_t get_next_id() + { + return ++last_event_id; + } +}; +} // namespace cbg_executor +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/ready_entity.hpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/ready_entity.hpp new file mode 100644 index 0000000000..698b829967 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/ready_entity.hpp @@ -0,0 +1,136 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "scheduler.hpp" +#include "global_event_id_provider.hpp" +#include "rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp" + +namespace rclcpp +{ +namespace executors +{ +namespace cbg_executor +{ +struct ReadyEntity +{ + struct ReadyTimerWithExecutedCallback + { + rclcpp::TimerBase::WeakPtr timer_ptr; + // must be called by the after executing the timer callback + std::function timer_was_executed; + + bool expired() const + { + return timer_ptr.expired(); + } + }; + + std::variant entity; + + explicit ReadyEntity(const rclcpp::SubscriptionBase::WeakPtr ptr) + : entity(ptr), id(GlobalEventIdProvider::get_next_id()) {} + explicit ReadyEntity(const ReadyTimerWithExecutedCallback & timer) + : entity(timer), id(GlobalEventIdProvider::get_next_id()) {} + explicit ReadyEntity(const rclcpp::ServiceBase::WeakPtr ptr) + : entity(ptr), id(GlobalEventIdProvider::get_next_id()) {} + explicit ReadyEntity(const rclcpp::ClientBase::WeakPtr ptr) + : entity(ptr), id(GlobalEventIdProvider::get_next_id()) {} + explicit ReadyEntity(const CBGScheduler::WaitableWithEventType & ev) + : entity(ev), id(GlobalEventIdProvider::get_next_id()) {} + explicit ReadyEntity(const CBGScheduler::CallbackEventType & ev) + : entity(ev), id(GlobalEventIdProvider::get_next_id()) {} + + std::function get_execute_function() const + { + return std::visit([](auto && entity) -> std::function { + using T = std::decay_t; + if constexpr (std::is_same_v) { + rclcpp::SubscriptionBase::SharedPtr shr_ptr = entity.lock(); + if (!shr_ptr) { + return std::function(); + } + return [shr_ptr = std::move(shr_ptr)]() { + rclcpp::executors::EventsCBGExecutor::execute_subscription(shr_ptr); + }; + } else if constexpr (std::is_same_v) { + auto shr_ptr = entity.timer_ptr.lock(); + if (!shr_ptr) { + return std::function(); + } + + return [shr_ptr = std::move(shr_ptr), + timer_executed_cb = entity.timer_was_executed]() { + auto data = shr_ptr->call(); + if (!data) { + // timer was cancelled, skip it. + return; + } + + rclcpp::executors::EventsCBGExecutor::execute_timer(shr_ptr, data); + + // readd the timer to the timers manager + timer_executed_cb(); + }; + } else if constexpr (std::is_same_v) { + auto shr_ptr = entity.lock(); + if (!shr_ptr) { + return std::function(); + } + return [shr_ptr = std::move(shr_ptr)]() { + rclcpp::executors::EventsCBGExecutor::execute_service(shr_ptr); + }; + } else if constexpr (std::is_same_v) { + auto shr_ptr = entity.lock(); + if (!shr_ptr) { + return std::function(); + } + return [shr_ptr = std::move(shr_ptr)]() { + rclcpp::executors::EventsCBGExecutor::execute_client(shr_ptr); + }; + } else if constexpr (std::is_same_v) { + auto shr_ptr_in = entity.waitable.lock(); + if (!shr_ptr_in) { + return std::function(); + } + + return [shr_ptr = std::move(shr_ptr_in), + event_type = entity.internal_event_type]() { + auto data = shr_ptr->take_data_by_entity_id(event_type); + shr_ptr->execute(data); + }; + } else if constexpr (std::is_same_v) { + return entity.callback; + } + }, entity); + } + + GlobalEventIdProvider::MonotonicId id; + + /** + * Returns true if the event has expired / does not need to be executed any more + */ + bool expired() const + { + return std::visit([](const auto & entity) {return entity.expired();}, entity); + } +}; +} // namespace cbg_executor +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/registered_entity_cache.hpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/registered_entity_cache.hpp new file mode 100644 index 0000000000..49b0cdf79a --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/registered_entity_cache.hpp @@ -0,0 +1,293 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "scheduler.hpp" +#include "timer_manager.hpp" +#include + +namespace rclcpp +{ +namespace executors +{ +namespace cbg_executor +{ +template +struct WeakEntityPtrWithRemoveFunction +{ + WeakEntityPtrWithRemoveFunction( + const std::shared_ptr & shr_ptr, + const std::function & ptr)> & destruction_callback) + :executable(shr_ptr), + ptr(shr_ptr.get()), + destruction_callback(destruction_callback) + { + } + + ~WeakEntityPtrWithRemoveFunction() + { + std::shared_ptr shr_ptr = executable.lock(); + if(shr_ptr) { + destruction_callback(shr_ptr); + } + } + + std::weak_ptr executable; + + // May be used as key, to identify the removed element + // after the weak pointer went out of scope + EntityType_T *ptr; + + std::function & ptr)> destruction_callback; +}; + +struct GuardConditionWithFunction +{ + GuardConditionWithFunction(rclcpp::GuardCondition::SharedPtr gc, std::function fun) + : guard_condition(std::move(gc)), handle_guard_condition_fun(std::move(fun)) + { + } + + rclcpp::GuardCondition::SharedPtr guard_condition; + + // A function that should be executed if the guard_condition is ready + std::function handle_guard_condition_fun; +}; + +template +class EntityCache +{ + using CacheType = WeakEntityPtrWithRemoveFunction; + + std::unordered_map> entities; + +public: + void update( + const std::vector> & entityList, + const std::function &)> & on_add, + const std::function &)> & on_remove) + { + std::unordered_map> nextEntities; + for(const std::shared_ptr & shr_ptr : entityList) { + auto it = entities.find(shr_ptr.get()); + if(it != entities.end()) { + nextEntities.insert(std::move(entities.extract(it))); + } else { + // new entry + nextEntities.emplace(std::make_pair(shr_ptr.get(), + std::make_unique(shr_ptr, on_remove))); + + on_add(shr_ptr); + } + } + + entities.swap(nextEntities); + + // NOTE, at this point the non moved unique_ptrs get destructed, and the + // remove callbacks are called + } + + void clear() + { + entities.clear(); + } +}; + +struct RegisteredEntityCache +{ + RegisteredEntityCache( + CBGScheduler & scheduler, TimerManager & timer_manager, + const rclcpp::CallbackGroup::SharedPtr & callback_group) + : callback_group_weak_ptr(callback_group), + scheduler_cbg_handle(*scheduler.add_callback_group(callback_group)), + timer_manager(timer_manager) + { + // register guard condition for the callback group with the handler + // this makes sure, that we pick up new entities in case the guard + // condition is triggered + add_guard_condition_event( + callback_group->get_notify_guard_condition(), [this]() { + handle_callback_group_guard_condition(); + } + ); + } + + EntityCache timers_cache; + EntityCache subscribers_cache; + EntityCache clients_cache; + EntityCache services_cache; + EntityCache waitables_cache; + + std::vector guard_conditions; + + rclcpp::CallbackGroup::WeakPtr callback_group_weak_ptr; + CBGScheduler::CallbackGroupHandle & scheduler_cbg_handle; + + TimerManager & timer_manager; + + // This function will be called whenever the guard condition + // if the callback group was triggered. In this case, we + // query the callback group for added or removed entites + void handle_callback_group_guard_condition() + { + if (!rclcpp::contexts::get_global_default_context()->shutdown_reason().empty()) { + return; + } + + if (!rclcpp::ok(rclcpp::contexts::get_global_default_context())) { + return; + } + + regenerate_events(); + } + + + ~RegisteredEntityCache() + { + for (const auto & gc_ref : guard_conditions) { + gc_ref.guard_condition->set_on_trigger_callback(nullptr); + } + } + + void clear_caches() + { + timers_cache.clear(); + subscribers_cache.clear(); + clients_cache.clear(); + services_cache.clear(); + waitables_cache.clear(); + } + + + bool regenerate_events() + { + rclcpp::CallbackGroup::SharedPtr callback_group = callback_group_weak_ptr.lock(); + + if(!callback_group) { + clear_caches(); + return false; + } + + std::vector timers; + std::vector subscribers; + std::vector clients; + std::vector services; + std::vector waitables; + + // we reserve to much memory here, but this should be fine + const size_t max_size = callback_group->size(); + timers.reserve(max_size); + subscribers.reserve(max_size); + clients.reserve(max_size); + services.reserve(max_size); + waitables.reserve(max_size); + + const auto add_sub = [&subscribers](const rclcpp::SubscriptionBase::SharedPtr & s) { + subscribers.push_back(s); + }; + const auto add_timer = [&timers](const rclcpp::TimerBase::SharedPtr & s) { + timers.push_back(s); + }; + const auto add_client = [&clients](const rclcpp::ClientBase::SharedPtr & s) { + clients.push_back(s); + }; + const auto add_service = [&services](const rclcpp::ServiceBase::SharedPtr & s) { + services.push_back(s); + }; + const auto add_waitable = [&waitables](const rclcpp::Waitable::SharedPtr & s) { + waitables.push_back(s); + }; + + // populate all vectors + callback_group->collect_all_ptrs(add_sub, add_service, add_client, add_timer, add_waitable); + + timers_cache.update(timers, + [this](const rclcpp::TimerBase::SharedPtr & s) { + timer_manager.add_timer(s, scheduler_cbg_handle.get_ready_callback_for_entity(s)); + }, + [this](const rclcpp::TimerBase::SharedPtr & s) { + timer_manager.remove_timer(s); + }); + + subscribers_cache.update(subscribers, + [this](const rclcpp::SubscriptionBase::SharedPtr & s) { + s->set_on_new_message_callback( + scheduler_cbg_handle.get_ready_callback_for_entity(s)); + }, + [] (const rclcpp::SubscriptionBase::SharedPtr & shr_ptr) { + shr_ptr->clear_on_new_message_callback(); + }); + + clients_cache.update(clients, + [this](const rclcpp::ClientBase::SharedPtr & s) { + s->set_on_new_response_callback( + scheduler_cbg_handle.get_ready_callback_for_entity(s)); + }, + [] (const rclcpp::ClientBase::SharedPtr & shr_ptr) { + shr_ptr->clear_on_new_response_callback(); + }); + services_cache.update(services, + [this](const rclcpp::ServiceBase::SharedPtr & s) { + s->set_on_new_request_callback( + scheduler_cbg_handle.get_ready_callback_for_entity(s)); + }, + [] (const rclcpp::ServiceBase::SharedPtr & shr_ptr) { + shr_ptr->clear_on_new_request_callback(); + }); + waitables_cache.update(waitables, + [this](const rclcpp::Waitable::SharedPtr & s) { + s->set_on_ready_callback( + scheduler_cbg_handle.get_ready_callback_for_entity(s)); + for (const auto & t : s->get_timers()) { + timer_manager.add_timer(t, scheduler_cbg_handle.get_ready_callback_for_entity(t)); + } + }, + [this] (const rclcpp::Waitable::SharedPtr & s) { + s->clear_on_ready_callback(); + for (const auto & t : s->get_timers()) { + timer_manager.remove_timer(t); + } + }); + + return true; + } + + /** + * Register special case guard conditions. + * These guard conditions can be registered to a callback, that will + * be executed within the executor worker context on trigger. + * + * Note, there is currently no way, to deregister these guard conditions + */ + void add_guard_condition_event( + rclcpp::GuardCondition::SharedPtr ptr, + std::function fun) + { + auto & new_entry = guard_conditions.emplace_back(std::move(ptr), std::move(fun)); + + if (new_entry.handle_guard_condition_fun) { + new_entry.guard_condition->set_on_trigger_callback( + scheduler_cbg_handle.get_ready_callback_for_entity(CBGScheduler::CallbackEventType(fun))); + } + } +}; +} // namespace cbg_executor +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/scheduler.hpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/scheduler.hpp new file mode 100644 index 0000000000..fa139fe253 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/scheduler.hpp @@ -0,0 +1,314 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include +#include + +#include +#include "global_event_id_provider.hpp" + +namespace rclcpp +{ +namespace executors +{ +namespace cbg_executor +{ +class CallbackGroupSchedulerEv; +struct WeakExecutableCache; +struct AnyExecutableCbgEv; +class TimerManager; +struct GloablaWeakExecutableCache; + +class CBGScheduler +{ +public: + struct WaitableWithEventType + { + rclcpp::Waitable::WeakPtr waitable; + int internal_event_type; + + bool expired() const + { + return waitable.expired(); + } + }; + + struct CallbackEventType + { + explicit CallbackEventType(std::function callback) + : callback(std::move(callback)) + { + } + + std::function callback; + + bool expired() const + { + return false; + } + }; + + struct CallbackGroupHandle + { + explicit CallbackGroupHandle(CBGScheduler & scheduler) + : scheduler(scheduler) {} + + CallbackGroupHandle(const CallbackGroupHandle &) = delete; + CallbackGroupHandle(CallbackGroupHandle &&) = delete; + + virtual ~CallbackGroupHandle() = default; + + CallbackGroupHandle & operator=(const CallbackGroupHandle &) = delete; + CallbackGroupHandle & operator=(CallbackGroupHandle &&) = delete; + + virtual std::function get_ready_callback_for_entity( + const rclcpp::SubscriptionBase::WeakPtr & entity) = 0; + virtual std::function executed_callback)> + get_ready_callback_for_entity(const rclcpp::TimerBase::WeakPtr & entity) = 0; + virtual std::function get_ready_callback_for_entity( + const rclcpp::ClientBase::WeakPtr & entity) = 0; + virtual std::function get_ready_callback_for_entity( + const rclcpp::ServiceBase::WeakPtr & entity) = 0; + virtual std::function get_ready_callback_for_entity(const rclcpp::Waitable::WeakPtr & entity) = 0; + virtual std::function get_ready_callback_for_entity( + const CallbackEventType & entity) = 0; + + /** + * Marks the last removed ready entitiy as executed. + */ + void mark_as_executed() + { + { + std::lock_guard l(ready_mutex); + not_ready = false; + + if(!has_ready_entities()) { + idle = true; + return; + } + } + // inform scheduler that we have more work + scheduler.callback_group_ready(this, false); + } + + bool is_ready(); + +protected: + CBGScheduler & scheduler; + + /** + * Will always be called under lock of ready_mutex + */ + virtual bool has_ready_entities() const = 0; + + /** + * Executes the given function to add an entity + * under the ready mutex. Afterwards the function + * checks if the scheduler needs to be informed + * that the callback group got ready and informs + * it if needed. + */ + template + void add_ready_entity(const add_fun & fun) + { + { + std::lock_guard l(ready_mutex); + + fun(); + + if(not_ready || !idle) { + return; + } + + idle = false; + } + + // If we reached this point, we were idle and now have work, + // therefore we need to move this callback group into the list + // of ready callback groups. + scheduler.callback_group_ready(this, true); + } + + void mark_as_skiped() + { + if(!has_ready_entities()) { + idle = true; + } + } + + /** + * Must be called by derived classes if a ready entity is + * returned. This call must happen under a lock holding the + * ready_mutex. + */ + void mark_as_executing() + { + not_ready = true; + } + + std::mutex ready_mutex; + +private: + // will be set if cbg is mutual exclusive and something is executing + bool not_ready = false; + + // true, if nothing is beeing executed, and there are no pending events + bool idle = true; + }; + + struct ExecutableEntity + { + // if called executes the entitiy + std::function execute_function; + // The callback group associated with the entity. Can be nullptr. + CallbackGroupHandle *callback_handle = nullptr; + }; + + CBGScheduler() = default; + CBGScheduler(const CBGScheduler &) = delete; + CBGScheduler(CBGScheduler &&) = delete; + virtual ~CBGScheduler() = default; + + CBGScheduler & operator=(const CBGScheduler &) = delete; + CBGScheduler & operator=(CBGScheduler &&) = delete; + + CallbackGroupHandle * add_callback_group(const rclcpp::CallbackGroup::SharedPtr & callback_group) + { + auto uPtr = get_handle_for_callback_group(callback_group); + CallbackGroupHandle * ret = uPtr.get(); + + std::lock_guard lk(ready_callback_groups_mutex); + + callback_groups.push_back(std::move(uPtr)); + return ret; + } + + void remove_callback_group(const CallbackGroupHandle *callback_handle) + { + std::lock_guard lk(ready_callback_groups_mutex); + ready_callback_groups.erase(std::find(ready_callback_groups.begin(), + ready_callback_groups.end(), callback_handle)); + + callback_groups.remove_if([&callback_handle] (const auto & e) { + return e.get() == callback_handle; + }); + } + + /** Will be called, by CallbackGroupHandle if any entity in the cb group is ready for execution + * and the cb group was idle before + * @param callback_group_was_idle Is false, if no entity of the callback group was executed, + * before this call was made. This means we need to wakeup a + * a new thread. + */ + void callback_group_ready(CallbackGroupHandle *handle, bool callback_group_was_idle) + { + { + std::lock_guard l(ready_callback_groups_mutex); + ready_callback_groups.push_back(handle); + } + + if(callback_group_was_idle) { + unblock_one_worker_thread(); + } + } + + struct ExecutableEntityWithInfo + { + std::optional entitiy; + bool moreEntitiesReady{}; + }; + + /** + * Returns the next ready entity that shall be executed. + * If a entity is removed here, the scheduler may assume that + * it will be executed, and that the function mark_entity_as_executed + * will be called afterwards. + */ + virtual ExecutableEntityWithInfo get_next_ready_entity() = 0; + virtual ExecutableEntityWithInfo get_next_ready_entity( + GlobalEventIdProvider::MonotonicId max_id) = 0; + + /** + * Must be called, after a entity was executed. This function will + * normally be used, to mark the associated callback group as ready + * again. + */ + void mark_entity_as_executed(const ExecutableEntity & e) + { + if(e.callback_handle != nullptr) { + e.callback_handle->mark_as_executed(); + } + } + + /** + * Wakes up a worker thread + */ + void unblock_one_worker_thread() + { + { + std::lock_guard lk(ready_callback_groups_mutex); + release_worker_once = true; + } + work_ready_conditional.notify_one(); + } + + void block_worker_thread() + { + std::unique_lock lk(ready_callback_groups_mutex); + work_ready_conditional.wait(lk, [this]() -> bool { + return !ready_callback_groups.empty() || release_worker_once || release_workers; + }); + release_worker_once = false; + } + + void block_worker_thread_for(std::chrono::nanoseconds timeout) + { + std::unique_lock lk(ready_callback_groups_mutex); + work_ready_conditional.wait_for(lk, timeout, [this]() -> bool { + return !ready_callback_groups.empty() || release_worker_once || release_workers; + }); + release_worker_once = false; + } + + void release_all_worker_threads() + { + { + std::lock_guard lk(ready_callback_groups_mutex); + release_workers = true; + } + work_ready_conditional.notify_all(); + } + +protected: + virtual std::unique_ptr get_handle_for_callback_group( + const rclcpp::CallbackGroup::SharedPtr & callback_group) = 0; + + std::mutex ready_callback_groups_mutex; + std::deque ready_callback_groups; + + bool release_workers = false; + bool release_worker_once = false; + + std::condition_variable work_ready_conditional; + + std::list> callback_groups; +}; +} // namespace cbg_executor +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/timer_manager.hpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/timer_manager.hpp new file mode 100644 index 0000000000..3ec8d20262 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/timer_manager.hpp @@ -0,0 +1,536 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace rclcpp::executors::cbg_executor +{ + +/** + * Specialized version of rclcpp::ClockConditionalVariable + * + * This version accepts the clock on waits instead of on construction. + * This is needed, as clocks may be deleted during normal operation, + * and be don't have a way to create a permanent ros time clock. + */ +class ClockConditionalVariable +{ + std::mutex pred_mutex_; + bool shutdown_ = false; + rclcpp::Context::SharedPtr context_; + rclcpp::OnShutdownCallbackHandle shutdown_cb_handle_; + ClockWaiter::UniquePtr clock_; + +public: + explicit ClockConditionalVariable(rclcpp::Context::SharedPtr context) + :context_(std::move(context)) + { + if (!context_ || !context_->is_valid()) { + throw std::runtime_error("context cannot be slept with because it's invalid"); + } + // Wake this thread if the context is shutdown + shutdown_cb_handle_ = context_->add_on_shutdown_callback( + [this]() { + std::unique_lock lock(pred_mutex_); + shutdown_ = true; + if(clock_) { + clock_->notify_one(); + } + }); + } + + ~ClockConditionalVariable() + { + context_->remove_on_shutdown_callback(shutdown_cb_handle_); + } + + bool + wait_until( + std::unique_lock & lock, const rclcpp::Clock::SharedPtr & clock, + const rclcpp::Time & until, + const std::function & pred) + { + if(lock.mutex() != &pred_mutex_) { + throw std::runtime_error( + "ClockConditionalVariable::wait_until: Internal error, given lock does not use" + " mutex returned by this->mutex()"); + } + + if(shutdown_) { + return false; + } + + clock_ = std::make_unique(clock); + + clock_->wait_until(lock, until, [this, &pred] () -> bool { + return shutdown_ || pred(); + }); + + clock_.reset(); + + return true; + } + + void + notify_one() + { + std::unique_lock lock(pred_mutex_); + + if(clock_) { + clock_->notify_one(); + } + } + + std::mutex & + mutex() + { + return pred_mutex_; + } +}; + +/** + * @brief A class for managing a queue of timers + * + * This class holds a queue of timers of one type (RCL_ROS_TIME, RCL_SYSTEM_TIME or RCL_STEADY_TIME). + * The queue itself manages an internal map of the timers, orders by the next time a timer will be + * ready. Each time a timer is ready, a callback will be called from the internal thread. + */ +class TimerQueue +{ + struct TimerData + { + std::shared_ptr rcl_ref; + std::weak_ptr timer_ref; + rclcpp::Clock::SharedPtr clock; + bool in_running_list = false; + std::function & executed_cb)> timer_ready_callback; + }; + + class GetClockHelper : public rclcpp::TimerBase + { +public: + static rclcpp::Clock::SharedPtr get_clock(const rclcpp::TimerBase & timer) + { + // SUPER ugly hack, but we need the correct clock + return static_cast(&timer)->clock_; + } + }; + +public: + TimerQueue(rcl_clock_type_t timer_type, const rclcpp::Context::SharedPtr & context) + : timer_type(timer_type), clock_waiter(context) + { + // must be initialized here so that all class members + // are initialized + trigger_thread = std::thread([this]() { + timer_thread(); + }); + } + + ~TimerQueue() + { + stop(); + } + + void stop() + { + running = false; + { + std::scoped_lock l(mutex); + wakeup_timer_thread(); + } + if(trigger_thread.joinable()) { + trigger_thread.join(); + } + + std::scoped_lock l(mutex); + + for (auto & tData : all_timers) { + if (auto shrPtr = tData->timer_ref.lock()) { + shrPtr->clear_on_reset_callback(); + } + } + } + + /** + * @brief Removes a new timer from the queue. + * This function is thread safe. + * + * Removes a timer, if it was added to this queue. + * Ignores timers that are not part of this queue + * + * @param timer the timer to remove. + */ + void remove_timer(const rclcpp::TimerBase::SharedPtr & timer) + { + rcl_clock_t * clock_type_of_timer{}; + + std::shared_ptr handle = timer->get_timer_handle(); + + if (rcl_timer_clock( + const_cast(handle.get()), + &clock_type_of_timer) != RCL_RET_OK) + { + assert(false); + } + + if (clock_type_of_timer->type != timer_type) { + // timer is handled by another queue + return; + } + + std::scoped_lock l(mutex); + + // clear the timer under lock, as the underlying rcl function + // is not thread safe + timer->clear_on_reset_callback(); + + auto it = std::find_if( + all_timers.begin(), all_timers.end(), + [rcl_ref = timer->get_timer_handle()](const std::unique_ptr & d) + { + return d->rcl_ref == rcl_ref; + }); + + if (it != all_timers.end()) { + const TimerData * data_ptr = it->get(); + + auto it2 = std::find_if( + running_timers.begin(), running_timers.end(), [data_ptr](const auto & e) { + return e.second == data_ptr; + }); + + if(it2 != running_timers.end()) { + running_timers.erase(it2); + } + all_timers.erase(it); + } + + wakeup_timer_thread(); + } + + /** + * @brief Adds a new timer to the queue. + * This function is thread safe. + * + * This function will ignore any timer, that has not a matching type + * + * @param timer the timer to add. + * @param timer_ready_callback callback that should be called when the timer is ready. + */ + void add_timer( + const rclcpp::TimerBase::SharedPtr & timer, + const std::function executed_cb)> & timer_ready_callback) + { + rcl_clock_t * clock_type_of_timer{}; + + std::shared_ptr handle = timer->get_timer_handle(); + + if (rcl_timer_clock( + const_cast(handle.get()), + &clock_type_of_timer) != RCL_RET_OK) + { + assert(false); + } + + if (clock_type_of_timer->type != timer_type) { + // timer is handled by another queue + return; + } + + std::unique_ptr data = std::make_unique(TimerData{std::move(handle), + timer, + GetClockHelper::get_clock(*timer), false, timer_ready_callback}); + + timer->set_on_reset_callback( + [data_ptr = data.get(), this](size_t) { + std::scoped_lock l(mutex); + if (!remove_if_dropped(data_ptr)) { + add_timer_to_running_map(data_ptr); + } + }); + + { + std::scoped_lock l(mutex); + // this will wake up the timer thread if needed + add_timer_to_running_map(data.get()); + + all_timers.emplace_back(std::move(data) ); + } + } + +private: + /** + * Wakes the timer thread. Must be called under lock + * by mutex + */ + void wakeup_timer_thread() + { + if(used_clock_for_timers) { + { + std::unique_lock l(clock_waiter.mutex()); + wake_up = true; + } + clock_waiter.notify_one(); + } else { + thread_conditional.notify_all(); + } + } + + /** + * Checks if the timer is still referenced if not deletes it from the queue + * + * @param timer_data The timer to check + * @return true if removed / invalid + */ + bool remove_if_dropped(const TimerData * timer_data) + { + if (timer_data->rcl_ref.unique()) { + // clear on reset callback + if(rcl_timer_set_on_reset_callback(timer_data->rcl_ref.get(), nullptr, + nullptr) != RCL_RET_OK) + { + assert(false); + } + + // timer was deleted + auto it = std::find_if( + all_timers.begin(), all_timers.end(), [timer_data](const std::unique_ptr & e) { + return timer_data == e.get(); + } + ); + + if (it != all_timers.end()) { + all_timers.erase(it); + } + return true; + } + return false; + } + + /** + * @brief adds the given timer_data to the map of running timers, if valid. + * + * Advances the rcl timer. + * Computes the next call time of the timer. + * readds the timer to the map of running timers + */ + void add_timer_to_running_map(TimerData * timer_data) + { + bool wasEmpty = running_timers.empty(); + std::chrono::nanoseconds old_next_call_time(-1); + if(!wasEmpty) { + old_next_call_time = running_timers.begin()->first; + } + + // timer can already be in the running list, if + // e.g. reset was called on a running timer + if(timer_data->in_running_list) { + for(auto it = running_timers.begin() ; it != running_timers.end(); it++) { + if(it->second == timer_data) { + running_timers.erase(it); + break; + } + } + timer_data->in_running_list = false; + } + + int64_t next_call_time{}; + rcl_ret_t ret = rcl_timer_get_next_call_time(timer_data->rcl_ref.get(), &next_call_time); + + if (ret != RCL_RET_OK) { + return; + } + + running_timers.emplace(next_call_time, timer_data); + timer_data->in_running_list = true; + + if(wasEmpty || running_timers.begin()->first < old_next_call_time) { + // the next wakeup is now earlier, wake up the timer thread so that it can pick up the timer + wakeup_timer_thread(); + } + } + + std::vector> get_ready_timer_callbacks() + { + std::vector> ready_timer_callbacks; + ready_timer_callbacks.reserve(running_timers.size()); + while (!running_timers.empty()) { + if(remove_if_dropped(running_timers.begin()->second)) { + running_timers.erase(running_timers.begin()); + continue; + } + + int64_t time_until_call{}; + TimerData *timer_data(running_timers.begin()->second); + + const rcl_timer_t * rcl_timer_ref = timer_data->rcl_ref.get(); + auto ret = rcl_timer_get_time_until_next_call(rcl_timer_ref, &time_until_call); + if (ret == RCL_RET_TIMER_CANCELED) { + timer_data->in_running_list = false; + running_timers.erase(running_timers.begin()); + continue; + } + + if (time_until_call <= 0) { + auto timer_done_callback = [timer_data = timer_data, this] () + { + // Note, we have the guarantee, that the shared_ptr to this timer is + // valid in case this callback is executed, as the executor holds a + // reference to the timer during execution and at the time of this callback. + // Therefore timer_data is valid. + { + std::scoped_lock l(mutex); + add_timer_to_running_map(timer_data); + } + }; + + ready_timer_callbacks.push_back([ready_callback = + timer_data->timer_ready_callback, + done_callback = std::move(timer_done_callback)] () { + ready_callback(done_callback); + }); + + // remove timer from, running list, until it was executed + // the scheduler will readd the timer after execution + timer_data->in_running_list = false; + running_timers.erase(running_timers.begin()); + + continue; + } + break; + } + + return ready_timer_callbacks; + } + + void timer_thread() + { + while (running && rclcpp::ok()) { + std::chrono::nanoseconds next_wakeup_time; + std::vector> ready_timer_callbacks; + rclcpp::Clock::SharedPtr used_clock; + { + std::scoped_lock l(mutex); + ready_timer_callbacks = get_ready_timer_callbacks(); + + if(running_timers.empty()) { + used_clock_for_timers.reset(); + } else { + used_clock_for_timers = running_timers.begin()->second->clock; + next_wakeup_time = running_timers.begin()->first; + used_clock = used_clock_for_timers; + } + } + + for(const std::function & timer_ready_fun : ready_timer_callbacks) { + // inform the timer that it is ready. We need to do this out of the scope + // of the mutex, to avoid a deadlock, as the timer_ready function will need + // to acquire the callback group mutex + timer_ready_fun(); + } + + if(used_clock) { + try { + used_clock->wait_until_started(); + + std::unique_lock l(clock_waiter.mutex()); + clock_waiter.wait_until(l, used_clock, + rclcpp::Time(next_wakeup_time.count(), timer_type), [this] () -> bool { + return wake_up || !running || !rclcpp::ok(); + }); + wake_up = false; + } catch (const std::runtime_error &) { + // there is a race on shutdown, were the context may + // become invalid, while we call sleep_until + running = false; + } + } else { + std::unique_lock l(mutex); + thread_conditional.wait(l, [this]() { + return !running_timers.empty() || !running || !rclcpp::ok(); + }); + } + } + thread_terminated = true; + } + + rcl_clock_type_t timer_type; + + rclcpp::Clock::SharedPtr used_clock_for_timers; + + ClockConditionalVariable clock_waiter; + bool wake_up = false; + + std::mutex mutex; + + std::atomic_bool running = true; + std::atomic_bool thread_terminated = false; + + std::vector> all_timers; + + using TimerMap = std::multimap; + TimerMap running_timers; + + std::thread trigger_thread; + + std::condition_variable thread_conditional; +}; + +class TimerManager +{ + std::array timer_queues; + +public: + explicit TimerManager(const rclcpp::Context::SharedPtr & context) + : timer_queues{TimerQueue{RCL_ROS_TIME, context}, TimerQueue{RCL_SYSTEM_TIME, context}, + TimerQueue{RCL_STEADY_TIME, context}} + { + } + + void remove_timer(const rclcpp::TimerBase::SharedPtr & timer) + { + for (TimerQueue & q : timer_queues) { + q.remove_timer(timer); + } + } + + void add_timer( + const rclcpp::TimerBase::SharedPtr & timer, + const std::function executed_cb)> & timer_ready_callback) + { + for (TimerQueue & q : timer_queues) { + q.add_timer(timer, timer_ready_callback); + } + } + + void stop() + { + for (TimerQueue & q : timer_queues) { + q.stop(); + } + } +}; +} // namespace rclcpp::executors::cbg diff --git a/rclcpp/test/benchmark/benchmark_executor.cpp b/rclcpp/test/benchmark/benchmark_executor.cpp index 319a71e175..260f5ba767 100644 --- a/rclcpp/test/benchmark/benchmark_executor.cpp +++ b/rclcpp/test/benchmark/benchmark_executor.cpp @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include #include @@ -21,13 +20,15 @@ #include "performance_test_fixture/performance_test_fixture.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp" #include "rcpputils/scope_exit.hpp" #include "test_msgs/msg/empty.hpp" using namespace std::chrono_literals; using performance_test_fixture::PerformanceTest; -constexpr unsigned int kNumberOfNodes = 10; +constexpr unsigned int kNumberOfNodes = 1; +constexpr unsigned int kNumberOfPubSubs = 10; class PerformanceTestExecutor : public PerformanceTest { @@ -39,14 +40,18 @@ class PerformanceTestExecutor : public PerformanceTest for (unsigned int i = 0u; i < kNumberOfNodes; i++) { nodes.push_back(std::make_shared("my_node_" + std::to_string(i))); - publishers.push_back( - nodes[i]->create_publisher( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10))); - - auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {this->callback_count++;}; - subscriptions.push_back( - nodes[i]->create_subscription( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10), std::move(callback))); + for (unsigned int j = 0u; j < kNumberOfPubSubs; j++) { + publishers.push_back( + nodes[i]->create_publisher( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + + "_" + std::to_string(j), rclcpp::QoS(10))); + + auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {this->callback_count++;}; + subscriptions.push_back( + nodes[i]->create_subscription( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + + "_" + std::to_string(j), rclcpp::QoS(10), std::move(callback))); + } } PerformanceTest::SetUp(st); } @@ -59,65 +64,534 @@ class PerformanceTestExecutor : public PerformanceTest rclcpp::shutdown(); } + template + void executor_spin_some(benchmark::State & st) + { + Executer executor; + for (unsigned int i = 0u; i < kNumberOfNodes; i++) { + executor.add_node(nodes[i]); + } + while (subscriptions.front()->get_publisher_count() == 0) { + std::this_thread::sleep_for(10ms); + } + + // precompute executer internals + publishers[0]->publish(empty_msgs); + executor.spin_some(100ms); + + if (callback_count == 0) { + st.SkipWithError("No message was received"); + } + + callback_count = 0; + reset_heap_counters(); + + for (auto _ : st) { + (void)_; + st.PauseTiming(); + for (auto & pub : publishers) { + pub->publish(empty_msgs); + } + std::this_thread::yield(); + st.ResumeTiming(); + + callback_count = 0; + executor.spin_some(1000ms); + if (callback_count < publishers.size()) { + st.SkipWithError("Not all messages were received"); + } + } + if (callback_count == 0) { + st.SkipWithError("No message was received"); + } + } + + template + void benchmark_wait_for_work(benchmark::State & st) + { + class ExecuterDerived : public Executer + { +public: + void call_wait_for_work(std::chrono::nanoseconds timeout) + { + Executer::wait_for_work(timeout); + } + }; + + ExecuterDerived executor; + for (unsigned int i = 0u; i < kNumberOfNodes; i++) { + executor.add_node(nodes[i]); + } + while (subscriptions.front()->get_publisher_count() == 0) { + std::this_thread::sleep_for(10ms); + } + // we need one ready event + publishers[0]->publish(empty_msgs); + executor.spin_some(100ms); + if (callback_count == 0) { + st.SkipWithError("No message was received"); + } + + reset_heap_counters(); + + for (auto _ : st) { + (void)_; + + st.PauseTiming(); + // we need one ready event + publishers[0]->publish(empty_msgs); + st.ResumeTiming(); + + + executor.call_wait_for_work(100ms); + st.PauseTiming(); + executor.spin_some(100ms); + st.ResumeTiming(); + } + } + + template + void benchmark_wait_for_work_force_rebuild(benchmark::State & st) + { + class ExecuterDerived : public Executer + { +public: + void call_wait_for_work(std::chrono::nanoseconds timeout) + { + Executer::wait_for_work(timeout); + } + }; + + ExecuterDerived executor; + for (unsigned int i = 0u; i < kNumberOfNodes; i++) { + executor.add_node(nodes[i]); + } + + while (subscriptions.front()->get_publisher_count() == 0) { + std::this_thread::sleep_for(10ms); + } + + // we need one ready event + publishers[0]->publish(empty_msgs); + + executor.spin_some(100ms); + if (callback_count == 0) { + st.SkipWithError("No message was received"); + } + + // only need to force a waitset rebuild + rclcpp::PublisherBase::SharedPtr somePub; + + reset_heap_counters(); + + for (auto _ : st) { + (void)_; + + st.PauseTiming(); + somePub = nodes[0]->create_publisher( + "/foo", rclcpp::QoS(10)); + // we need one ready event + publishers[0]->publish(empty_msgs); + st.ResumeTiming(); + + + executor.call_wait_for_work(std::chrono::nanoseconds::zero()); + st.PauseTiming(); + executor.spin_some(100ms); + st.ResumeTiming(); + } + } test_msgs::msg::Empty empty_msgs; std::vector nodes; std::vector::SharedPtr> publishers; std::vector::SharedPtr> subscriptions; - int callback_count; + uint callback_count; }; + BENCHMARK_F(PerformanceTestExecutor, single_thread_executor_spin_some)(benchmark::State & st) { - rclcpp::executors::SingleThreadedExecutor executor; - for (unsigned int i = 0u; i < kNumberOfNodes; i++) { - executor.add_node(nodes[i]); - publishers[i]->publish(empty_msgs); - executor.spin_some(100ms); + executor_spin_some(st); +} + +BENCHMARK_F(PerformanceTestExecutor, multi_thread_executor_spin_some)(benchmark::State & st) +{ + executor_spin_some(st); +} + +BENCHMARK_F(PerformanceTestExecutor, cbg_executor_spin_some)(benchmark::State & st) +{ + executor_spin_some(st); +} + + +BENCHMARK_F(PerformanceTestExecutor, single_thread_executor_wait_for_work)(benchmark::State & st) +{ + benchmark_wait_for_work(st); +} + +BENCHMARK_F(PerformanceTestExecutor, multi_thread_executor_wait_for_work)(benchmark::State & st) +{ + benchmark_wait_for_work(st); +} + +BENCHMARK_F( + PerformanceTestExecutor, + single_thread_executor_wait_for_work_rebuild)(benchmark::State & st) +{ + benchmark_wait_for_work_force_rebuild(st); +} + +BENCHMARK_F( + PerformanceTestExecutor, + multi_thread_executor_wait_for_work_rebuild)(benchmark::State & st) +{ + benchmark_wait_for_work_force_rebuild(st); +} + +class CallbackWaitable : public rclcpp::Waitable +{ +public: + CallbackWaitable() = default; + + void + add_to_wait_set(rcl_wait_set_t & wait_set) override + { + rcl_ret_t ret = rcl_wait_set_add_guard_condition( + &wait_set, + &gc.get_rcl_guard_condition(), &gc_waitset_idx); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); + } + + if (has_trigger) { + gc.trigger(); + } } - callback_count = 0; - reset_heap_counters(); + void trigger() + { + has_trigger = true; + gc.trigger(); + } - for (auto _ : st) { - (void)_; - st.PauseTiming(); - for (unsigned int i = 0u; i < kNumberOfNodes; i++) { - publishers[i]->publish(empty_msgs); + bool + is_ready(const rcl_wait_set_t & wait_set) override + { + return wait_set.guard_conditions[gc_waitset_idx]; + } + + std::shared_ptr + take_data() override + { + return nullptr; + } + + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void) id; + return nullptr; + } + + void + execute(const std::shared_ptr & data) override + { + has_trigger = false; + (void) data; + if (cb_fun) { + cb_fun(); } - st.ResumeTiming(); + } - executor.spin_some(100ms); + void set_execute_callback_function(std::function cb_fun_in) + { + cb_fun = std::move(cb_fun_in); } - if (callback_count == 0) { - st.SkipWithError("No message was received"); + + size_t + get_number_of_ready_guard_conditions() override {return 1;} + + + void + set_on_ready_callback(std::function callback) override + { + auto gc_callback = [callback](size_t count) { + callback(count, 0); + }; + gc.set_on_trigger_callback(gc_callback); } -} -BENCHMARK_F(PerformanceTestExecutor, multi_thread_executor_spin_some)(benchmark::State & st) + void + clear_on_ready_callback() override + { + gc.set_on_trigger_callback(nullptr); + } + + std::vector> + get_timers() const override + { + return {}; + } + +private: + std::atomic has_trigger = false; + std::function cb_fun; + size_t gc_waitset_idx = 0; + rclcpp::GuardCondition gc; +}; + +class CascadedPerformanceTestExecutor : public PerformanceTest { - rclcpp::executors::MultiThreadedExecutor executor; - for (unsigned int i = 0u; i < kNumberOfNodes; i++) { - executor.add_node(nodes[i]); - publishers[i]->publish(empty_msgs); + std::condition_variable cascase_done; + std::vector> waitables; + std::atomic last_cb_triggered; + +public: + void SetUp(benchmark::State & st) + { + rclcpp::init(0, nullptr); + callback_count = 0; + for (unsigned int i = 0u; i < kNumberOfNodes; i++) { + nodes.push_back(std::make_shared("my_node_" + std::to_string(i))); + + for (unsigned int j = 0u; j < kNumberOfPubSubs; j++) { + publishers.push_back( + nodes[i]->create_publisher( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + + "_" + std::to_string(j), rclcpp::QoS(10))); + + auto callback = [this, i](test_msgs::msg::Empty::ConstSharedPtr) { + if (i == kNumberOfNodes - 1) { + this->callback_count++; + cascase_done.notify_all(); + } else { + publishers[i + 1]->publish(empty_msgs); + } + }; + subscriptions.push_back( + nodes[i]->create_subscription( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + + "_" + std::to_string(j), rclcpp::QoS(10), std::move(callback))); + } + + auto waitable_interfaces = nodes.back()->get_node_waitables_interface(); + + auto callback_waitable = std::make_shared(); + waitables.push_back(callback_waitable); + waitable_interfaces->add_waitable(callback_waitable, nullptr); + } + for (unsigned int i = 0u; i < waitables.size(); i++) { + if (i == waitables.size() - 1) { + waitables[i]->set_execute_callback_function( + [this]() { + last_cb_triggered = true; + cascase_done.notify_all(); + }); + } else { + waitables[i]->set_execute_callback_function( + [this, i]() { + waitables[i + 1]->trigger(); + }); + } + } + PerformanceTest::SetUp(st); + } + void TearDown(benchmark::State & st) + { + PerformanceTest::TearDown(st); + subscriptions.clear(); + publishers.clear(); + nodes.clear(); + waitables.clear(); + last_cb_triggered = false; + rclcpp::shutdown(); + } + + template + void executor_spin_some(benchmark::State & st) + { + Executer executor; + for (unsigned int i = 0u; i < kNumberOfNodes; i++) { + executor.add_node(nodes[i]); + } + + // precompute the executor internals executor.spin_some(100ms); + + while (subscriptions.front()->get_publisher_count() == 0) { + std::this_thread::sleep_for(10ms); + } + + callback_count = 0; + reset_heap_counters(); + + std::mutex cond_mutex; + std::unique_lock lk(cond_mutex); + auto thread = std::thread( + [&executor] { + executor.spin(); + }); + + for (auto _ : st) { + (void)_; + last_cb_triggered = false; + + // start the cascasde + waitables[0]->trigger(); + + cascase_done.wait_for(lk, 500ms); + + if (!last_cb_triggered) { + st.SkipWithError("No message was received"); + } + } + + executor.cancel(); + + thread.join(); + + if (!last_cb_triggered) { + st.SkipWithError("No message was received"); + } } - callback_count = 0; - reset_heap_counters(); + test_msgs::msg::Empty empty_msgs; + std::vector nodes; + std::vector::SharedPtr> publishers; + std::vector::SharedPtr> subscriptions; + uint callback_count; +}; - for (auto _ : st) { - (void)_; - st.PauseTiming(); + +BENCHMARK_F( + CascadedPerformanceTestExecutor, + single_thread_executor_spin)(benchmark::State & st) +{ + executor_spin_some(st); +} + +BENCHMARK_F(CascadedPerformanceTestExecutor, multi_thread_executor_spin)(benchmark::State & st) +{ + executor_spin_some(st); +} + +BENCHMARK_F(CascadedPerformanceTestExecutor, cbg_executor_spin)(benchmark::State & st) +{ + executor_spin_some(st); +} + +class PerformanceTestExecutorMultipleCallbackGroups : public PerformanceTest +{ +public: +// static constexpr unsigned int kNumberOfNodes = 20; + + void SetUp(benchmark::State & st) + { + rclcpp::init(0, nullptr); + callback_count = 0; for (unsigned int i = 0u; i < kNumberOfNodes; i++) { - publishers[i]->publish(empty_msgs); + nodes.push_back(std::make_shared("my_node_" + std::to_string(i))); + for (unsigned int j = 0u; j < kNumberOfPubSubs; j++) { + callback_groups.push_back( + nodes[i]->create_callback_group( + rclcpp::CallbackGroupType:: + MutuallyExclusive)); + + rclcpp::PublisherOptions pubOps; + pubOps.callback_group = callback_groups.back(); + + publishers.push_back( + nodes[i]->create_publisher( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + + "_" + std::to_string(j), rclcpp::QoS(10), pubOps)); + + rclcpp::SubscriptionOptions subOps; + subOps.callback_group = callback_groups.back(); + + auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {this->callback_count++;}; + subscriptions.push_back( + nodes[i]->create_subscription( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + + "_" + std::to_string(j), rclcpp::QoS(10), std::move(callback), subOps)); + } } - st.ResumeTiming(); + PerformanceTest::SetUp(st); + } + void TearDown(benchmark::State & st) + { + PerformanceTest::TearDown(st); + subscriptions.clear(); + publishers.clear(); + callback_groups.clear(); + nodes.clear(); + rclcpp::shutdown(); + } + template + void executor_spin_some(benchmark::State & st) + { + Executer executor; + for (unsigned int i = 0u; i < kNumberOfNodes; i++) { + executor.add_node(nodes[i]); + } + + publishers.front()->publish(empty_msgs); executor.spin_some(100ms); + + callback_count = 0; + reset_heap_counters(); + + for (auto _ : st) { + (void)_; + st.PauseTiming(); + for (auto & pub : publishers) { + pub->publish(empty_msgs); + } + // wait for the messages to arrive + std::this_thread::yield(); + st.ResumeTiming(); + + callback_count = 0; + executor.spin_some(500ms); + if (callback_count != publishers.size()) { + st.SkipWithError("Not all message were received"); + } + } + if (callback_count == 0) { + st.SkipWithError("No message was received"); + } } - if (callback_count == 0) { - st.SkipWithError("No message was received"); - } + + test_msgs::msg::Empty empty_msgs; + std::vector nodes; + std::vector::SharedPtr> publishers; + std::vector::SharedPtr> subscriptions; + std::vector callback_groups; + uint callback_count; +}; + + +BENCHMARK_F( + PerformanceTestExecutorMultipleCallbackGroups, + single_thread_executor_spin_some)(benchmark::State & st) +{ + executor_spin_some(st); +} + +BENCHMARK_F( + PerformanceTestExecutorMultipleCallbackGroups, + multi_thread_executor_spin_some)(benchmark::State & st) +{ + executor_spin_some(st); +} + +BENCHMARK_F( + PerformanceTestExecutorMultipleCallbackGroups, + cbg_executor_spin_some)(benchmark::State & st) +{ + executor_spin_some(st); } class PerformanceTestExecutorSimple : public PerformanceTest @@ -137,117 +611,143 @@ class PerformanceTestExecutorSimple : public PerformanceTest rclcpp::shutdown(); } + template + void add_node(benchmark::State & st) + { + Executor executor; + for (auto _ : st) { + (void)_; + executor.add_node(node); + st.PauseTiming(); + executor.remove_node(node); + st.ResumeTiming(); + } + } + + template + void remove_node(benchmark::State & st) + { + Executor executor; + for (auto _ : st) { + (void)_; + st.PauseTiming(); + executor.add_node(node); + st.ResumeTiming(); + executor.remove_node(node); + } + } + + + template + void spin_until_future_complete(benchmark::State & st) + { + Executor executor; + // test success of an immediately finishing future + std::promise promise; + std::future future = promise.get_future(); + promise.set_value(true); + auto shared_future = future.share(); + + auto ret = executor.spin_until_future_complete(shared_future, 100ms); + if (ret != rclcpp::FutureReturnCode::SUCCESS) { + st.SkipWithError(rcutils_get_error_string().str); + } + + reset_heap_counters(); + + for (auto _ : st) { + (void)_; + ret = executor.spin_until_future_complete(shared_future, 100ms); + if (ret != rclcpp::FutureReturnCode::SUCCESS) { + st.SkipWithError(rcutils_get_error_string().str); + break; + } + } + } + + template + void spin_node_until_future_complete(benchmark::State & st) + { + Executor executor; + // test success of an immediately finishing future + std::promise promise; + std::future future = promise.get_future(); + promise.set_value(true); + auto shared_future = future.share(); + + auto ret = rclcpp::executors::spin_node_until_future_complete( + executor, node, shared_future, 1s); + if (ret != rclcpp::FutureReturnCode::SUCCESS) { + st.SkipWithError(rcutils_get_error_string().str); + } + + reset_heap_counters(); + + for (auto _ : st) { + (void)_; + ret = rclcpp::executors::spin_node_until_future_complete( + executor, node, shared_future, 1s); + if (ret != rclcpp::FutureReturnCode::SUCCESS) { + st.SkipWithError(rcutils_get_error_string().str); + break; + } + } + } + rclcpp::Node::SharedPtr node; }; BENCHMARK_F(PerformanceTestExecutorSimple, single_thread_executor_add_node)(benchmark::State & st) { - rclcpp::executors::SingleThreadedExecutor executor; - for (auto _ : st) { - (void)_; - executor.add_node(node); - st.PauseTiming(); - executor.remove_node(node); - st.ResumeTiming(); - } + add_node(st); } BENCHMARK_F( PerformanceTestExecutorSimple, single_thread_executor_remove_node)(benchmark::State & st) { - rclcpp::executors::SingleThreadedExecutor executor; - for (auto _ : st) { - (void)_; - st.PauseTiming(); - executor.add_node(node); - st.ResumeTiming(); - executor.remove_node(node); - } + remove_node(st); } BENCHMARK_F(PerformanceTestExecutorSimple, multi_thread_executor_add_node)(benchmark::State & st) { - rclcpp::executors::MultiThreadedExecutor executor; - for (auto _ : st) { - (void)_; - executor.add_node(node); - st.PauseTiming(); - executor.remove_node(node); - st.ResumeTiming(); - } + add_node(st); } BENCHMARK_F(PerformanceTestExecutorSimple, multi_thread_executor_remove_node)(benchmark::State & st) { - rclcpp::executors::MultiThreadedExecutor executor; - for (auto _ : st) { - (void)_; - st.PauseTiming(); - executor.add_node(node); - st.ResumeTiming(); - executor.remove_node(node); - } + remove_node(st); +} + +BENCHMARK_F(PerformanceTestExecutorSimple, cbg_executor_add_node)(benchmark::State & st) +{ + add_node(st); +} + +BENCHMARK_F(PerformanceTestExecutorSimple, cbg_executor_remove_node)(benchmark::State & st) +{ + remove_node(st); } BENCHMARK_F( PerformanceTestExecutorSimple, single_thread_executor_spin_node_until_future_complete)(benchmark::State & st) { - rclcpp::executors::SingleThreadedExecutor executor; - // test success of an immediately finishing future - std::promise promise; - std::future future = promise.get_future(); - promise.set_value(true); - auto shared_future = future.share(); - - auto ret = rclcpp::executors::spin_node_until_future_complete( - executor, node, shared_future, 1s); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - } - - reset_heap_counters(); - - for (auto _ : st) { - (void)_; - ret = rclcpp::executors::spin_node_until_future_complete( - executor, node, shared_future, 1s); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - break; - } - } + spin_node_until_future_complete(st); } BENCHMARK_F( PerformanceTestExecutorSimple, multi_thread_executor_spin_node_until_future_complete)(benchmark::State & st) { - rclcpp::executors::MultiThreadedExecutor executor; - // test success of an immediately finishing future - std::promise promise; - std::future future = promise.get_future(); - promise.set_value(true); - auto shared_future = future.share(); - - auto ret = rclcpp::executors::spin_node_until_future_complete( - executor, node, shared_future, 1s); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - } - - reset_heap_counters(); + spin_node_until_future_complete(st); +} - for (auto _ : st) { - (void)_; - ret = rclcpp::executors::spin_node_until_future_complete( - executor, node, shared_future, 1s); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - break; - } - } +BENCHMARK_F( + PerformanceTestExecutorSimple, + cbg_executor_spin_node_until_future_complete)(benchmark::State & st) +{ + spin_node_until_future_complete(st); } BENCHMARK_F(PerformanceTestExecutorSimple, spin_until_future_complete)(benchmark::State & st) diff --git a/rclcpp/test/rclcpp/executors/executor_types.hpp b/rclcpp/test/rclcpp/executors/executor_types.hpp index e1ddffcfb1..f5dd175d2b 100644 --- a/rclcpp/test/rclcpp/executors/executor_types.hpp +++ b/rclcpp/test/rclcpp/executors/executor_types.hpp @@ -20,6 +20,7 @@ #include #include +#include "rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp" #include "rclcpp/experimental/executors/events_executor/events_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/executors/multi_threaded_executor.hpp" @@ -28,6 +29,7 @@ using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, + rclcpp::executors::EventsCBGExecutor, rclcpp::experimental::executors::EventsExecutor>; class ExecutorTypeNames @@ -36,15 +38,19 @@ class ExecutorTypeNames template static std::string GetName([[maybe_unused]] int idx) { - if (std::is_same()) { + if constexpr (std::is_same()) { return "SingleThreadedExecutor"; } - if (std::is_same()) { + if constexpr(std::is_same()) { return "MultiThreadedExecutor"; } - if (std::is_same()) { + if constexpr(std::is_same()) { + return "EventsCBGExecutor"; + } + + if constexpr(std::is_same()) { return "EventsExecutor"; } @@ -56,6 +62,7 @@ using StandardExecutors = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, + rclcpp::executors::EventsCBGExecutor, rclcpp::experimental::executors::EventsExecutor>; #endif // RCLCPP__EXECUTORS__EXECUTOR_TYPES_HPP_