diff --git a/.gitignore b/.gitignore index 4933a2d..70315af 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,7 @@ .idea /build +/build-ex /cmake-build-* /.build diff --git a/CMakeLists.txt b/CMakeLists.txt index a84f22b..7bc2e53 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,7 @@ set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) set(CMAKE_POSITION_INDEPENDENT_CODE ON) +set(CMAKE_EXPERIMENTAL_CXX_MODULE_CMAKE_API ON) project(cpp-jam VERSION 0.0.1 @@ -40,6 +41,12 @@ find_package(Boost.DI CONFIG REQUIRED) find_package(qtils CONFIG REQUIRED) find_package(prometheus-cpp CONFIG REQUIRED) +if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + add_compile_options(-fmodules-ts) +elseif(CMAKE_CXX_COMPILER_ID MATCHES "Clang|AppleClang") + add_compile_options(-fmodules) +endif() + add_library(headers INTERFACE) target_include_directories(headers INTERFACE $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src_> diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 08f0356..87a038b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -24,3 +24,8 @@ add_subdirectory(metrics) # Clocks and time subsystem add_subdirectory(clock) +# Subscription Engine subsystem +add_subdirectory(se) + +# Modules subsystem +add_subdirectory(modules) diff --git a/src/app/CMakeLists.txt b/src/app/CMakeLists.txt index 61fd884..3fc38cd 100644 --- a/src/app/CMakeLists.txt +++ b/src/app/CMakeLists.txt @@ -28,7 +28,9 @@ add_library(build_version add_library(app_configuration SHARED configuration.cpp) target_link_libraries(app_configuration - Boost::boost) + Boost::boost + fmt::fmt +) add_library(app_configurator SHARED configurator.cpp) target_link_libraries(app_configurator diff --git a/src/app/impl/application_impl.cpp b/src/app/impl/application_impl.cpp index 2f29cb1..b5a8181 100644 --- a/src/app/impl/application_impl.cpp +++ b/src/app/impl/application_impl.cpp @@ -20,12 +20,12 @@ namespace jam::app { ApplicationImpl::ApplicationImpl( - std::shared_ptr<log::LoggingSystem> logsys, - std::shared_ptr<Configuration> config, - std::shared_ptr<StateManager> state_manager, - std::shared_ptr<Watchdog> watchdog, - std::shared_ptr<metrics::Exposer> metrics_exposer, - std::shared_ptr<clock::SystemClock> system_clock) + qtils::StrictSharedPtr<log::LoggingSystem> logsys, + qtils::StrictSharedPtr<Configuration> config, + qtils::StrictSharedPtr<StateManager> state_manager, + qtils::StrictSharedPtr<Watchdog> watchdog, + qtils::StrictSharedPtr<metrics::Exposer> metrics_exposer, + qtils::StrictSharedPtr<clock::SystemClock> system_clock) : logger_(logsys->getLogger("Application", "application")), app_config_(std::move(config)), state_manager_(std::move(state_manager)), @@ -34,15 +34,13 @@ namespace jam::app { system_clock_(std::move(system_clock)), metrics_registry_(metrics::createRegistry()) { // Metric for exposing name and version of node - constexpr auto buildInfoMetricName = "jam_build_info"; - metrics_registry_->registerGaugeFamily( - buildInfoMetricName, - "A metric with a constant '1' value labeled by name, version"); - auto metric_build_info = metrics_registry_->registerGaugeMetric( - buildInfoMetricName, - {{"name", app_config_->nodeName()}, - {"version", app_config_->nodeVersion()}}); - metric_build_info->set(1); + metrics::GaugeHelper( + "jam_build_info", + "A metric with a constant '1' value labeled by name, version", + std::map<std::string, std::string>{ + {"name", app_config_->nodeName()}, + {"version", app_config_->nodeVersion()}}) + ->set(1); } void ApplicationImpl::run() { diff --git a/src/app/impl/application_impl.hpp b/src/app/impl/application_impl.hpp index 43a4d38..105a1df 100644 --- a/src/app/impl/application_impl.hpp +++ b/src/app/impl/application_impl.hpp @@ -10,6 +10,8 @@ #include <memory> +#include <qtils/strict_sptr.hpp> + #include <metrics/registry.hpp> namespace jam { @@ -43,22 +45,22 @@ namespace jam::app { class ApplicationImpl final : public Application { public: - ApplicationImpl(std::shared_ptr<log::LoggingSystem> logsys, - std::shared_ptr<Configuration> config, - std::shared_ptr<StateManager> state_manager, - std::shared_ptr<Watchdog> watchdog, - std::shared_ptr<metrics::Exposer> metrics_exposer, - std::shared_ptr<clock::SystemClock> system_clock); + ApplicationImpl(qtils::StrictSharedPtr<log::LoggingSystem> logsys, + qtils::StrictSharedPtr<Configuration> config, + qtils::StrictSharedPtr<StateManager> state_manager, + qtils::StrictSharedPtr<Watchdog> watchdog, + qtils::StrictSharedPtr<metrics::Exposer> metrics_exposer, + qtils::StrictSharedPtr<clock::SystemClock> system_clock); void run() override; private: - std::shared_ptr<soralog::Logger> logger_; - std::shared_ptr<Configuration> app_config_; - std::shared_ptr<StateManager> state_manager_; - std::shared_ptr<Watchdog> watchdog_; - std::shared_ptr<metrics::Exposer> metrics_exposer_; - std::shared_ptr<clock::SystemClock> system_clock_; + qtils::StrictSharedPtr<soralog::Logger> logger_; + qtils::StrictSharedPtr<Configuration> app_config_; + qtils::StrictSharedPtr<StateManager> state_manager_; + qtils::StrictSharedPtr<Watchdog> watchdog_; + qtils::StrictSharedPtr<metrics::Exposer> metrics_exposer_; + qtils::StrictSharedPtr<clock::SystemClock> system_clock_; // Metrics std::unique_ptr<metrics::Registry> metrics_registry_; diff --git a/src/app/impl/state_manager_impl.cpp b/src/app/impl/state_manager_impl.cpp index f5e2b73..0d43e14 100644 --- a/src/app/impl/state_manager_impl.cpp +++ b/src/app/impl/state_manager_impl.cpp @@ -92,7 +92,7 @@ namespace jam::app { } StateManagerImpl::StateManagerImpl( - std::shared_ptr<log::LoggingSystem> logging_system) + qtils::StrictSharedPtr<log::LoggingSystem> logging_system) : logger_(logging_system->getLogger("StateManager", "application")), logging_system_(std::move(logging_system)) { shuttingDownSignalsEnable(); diff --git a/src/app/impl/state_manager_impl.hpp b/src/app/impl/state_manager_impl.hpp index 13f5f26..50cbf54 100644 --- a/src/app/impl/state_manager_impl.hpp +++ b/src/app/impl/state_manager_impl.hpp @@ -9,10 +9,11 @@ #include "app/state_manager.hpp" #include <condition_variable> -#include <csignal> #include <mutex> #include <queue> +#include <qtils/strict_sptr.hpp> + #include "utils/ctor_limiters.hpp" namespace soralog { @@ -29,7 +30,7 @@ namespace jam::app { public StateManager, public std::enable_shared_from_this<StateManagerImpl> { public: - StateManagerImpl(std::shared_ptr<log::LoggingSystem> logging_system); + StateManagerImpl(qtils::StrictSharedPtr<log::LoggingSystem> logging_system); ~StateManagerImpl() override; @@ -66,8 +67,8 @@ namespace jam::app { void shutdownRequestWaiting(); - std::shared_ptr<soralog::Logger> logger_; - std::shared_ptr<log::LoggingSystem> logging_system_; + qtils::StrictSharedPtr<soralog::Logger> logger_; + qtils::StrictSharedPtr<log::LoggingSystem> logging_system_; std::atomic<State> state_ = State::Init; diff --git a/src/injector/node_injector.cpp b/src/injector/node_injector.cpp index 48ef7d5..0b5ca2f 100644 --- a/src/injector/node_injector.cpp +++ b/src/injector/node_injector.cpp @@ -29,7 +29,7 @@ namespace { template <typename C> auto useConfig(C c) { - return boost::di::bind<std::decay_t<C> >().to( + return boost::di::bind<std::decay_t<C>>().to( std::move(c))[boost::di::override]; } @@ -96,6 +96,6 @@ namespace jam::injector { std::shared_ptr<app::Application> NodeInjector::injectApplication() { return pimpl_->injector_ - .template create<std::shared_ptr<app::Application> >(); + .template create<std::shared_ptr<app::Application>>(); } } // namespace jam::injector diff --git a/src/loaders/README b/src/loaders/README new file mode 100644 index 0000000..31be259 --- /dev/null +++ b/src/loaders/README @@ -0,0 +1 @@ +# loaders are locating here \ No newline at end of file diff --git a/src/loaders/loader.hpp b/src/loaders/loader.hpp new file mode 100644 index 0000000..8d75e93 --- /dev/null +++ b/src/loaders/loader.hpp @@ -0,0 +1,34 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "injector/node_injector.hpp" +#include "modules/module.hpp" + +namespace jam::loaders { + + class Loader { + public: + Loader(const Loader &) = delete; + Loader &operator=(const Loader &) = delete; + + virtual ~Loader() = default; + virtual void start() = 0; + + std::optional<std::string> module_info() { + auto result = module_.getFunctionFromLibrary<const char*()>("module_info"); + if (result) { + return std::string((*result)()); + } + return std::nullopt; + } + + protected: + injector::NodeInjector injector_; + modules::Module module_; + }; +} // namespace jam::loaders diff --git a/src/log/logger.hpp b/src/log/logger.hpp index 5cce5fd..c35952e 100644 --- a/src/log/logger.hpp +++ b/src/log/logger.hpp @@ -9,19 +9,22 @@ #include <memory> #include <sstream> -#include <qtils/outcome.hpp> #include <qtils/enum_error_code.hpp> +#include <qtils/outcome.hpp> +#include <qtils/strict_sptr.hpp> #include <soralog/level.hpp> #include <soralog/logger.hpp> #include <soralog/logging_system.hpp> #include <soralog/macro.hpp> -#include "utils/ctor_limiters.hpp" #include "injector/dont_inject.hpp" +#include "utils/ctor_limiters.hpp" namespace jam::log { using soralog::Level; + using Logger = qtils::StrictSharedPtr<soralog::Logger>; + enum class Error : uint8_t { WRONG_LEVEL = 1, WRONG_GROUP, WRONG_LOGGER }; outcome::result<Level> str2lvl(std::string_view str); @@ -58,23 +61,22 @@ namespace jam::log { return logging_system_->getLogger(logger_name, group_name, level); } - [[nodiscard]] - bool setLevelOfGroup(const std::string &group_name, Level level) const { + [[nodiscard]] bool setLevelOfGroup(const std::string &group_name, + Level level) const { return logging_system_->setLevelOfGroup(group_name, level); } - [[nodiscard]] - bool resetLevelOfGroup(const std::string &group_name) const { + [[nodiscard]] bool resetLevelOfGroup(const std::string &group_name) const { return logging_system_->resetLevelOfGroup(group_name); } - [[nodiscard]] - bool setLevelOfLogger(const std::string &logger_name, Level level) const { + [[nodiscard]] bool setLevelOfLogger(const std::string &logger_name, + Level level) const { return logging_system_->setLevelOfLogger(logger_name, level); } - [[nodiscard]] - bool resetLevelOfLogger(const std::string &logger_name) const { + [[nodiscard]] bool resetLevelOfLogger( + const std::string &logger_name) const { return logging_system_->resetLevelOfLogger(logger_name); } diff --git a/src/metrics/histogram_timer.hpp b/src/metrics/histogram_timer.hpp index e907969..446cc78 100644 --- a/src/metrics/histogram_timer.hpp +++ b/src/metrics/histogram_timer.hpp @@ -27,6 +27,12 @@ namespace jam::metrics { registry_->registerGaugeFamily(name, help); metric_ = registry_->registerGaugeMetric(name); } + GaugeHelper(const std::string &name, + const std::string &help, + const std::map<std::string, std::string> &labels) { + registry_->registerGaugeFamily(name, help); + metric_ = registry_->registerGaugeMetric(name, labels); + } auto *operator->() const { return metric_; diff --git a/src/modules/CMakeLists.txt b/src/modules/CMakeLists.txt new file mode 100644 index 0000000..cd74ca1 --- /dev/null +++ b/src/modules/CMakeLists.txt @@ -0,0 +1,87 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +function(add_jam_module NAME) + set(MODULE_NAME ${NAME}) + + set(MODULE "${MODULE_NAME}_module") + + # Parse named arguments + cmake_parse_arguments( + # Prefix for parsed argument variables + MODULE + # List of flags (boolean arguments without values) + "" + # List of named arguments with a single value + "" + # List of named arguments with multiple values + "SOURCE;INCLUDE_DIRS;LIBRARIES;DEFINITIONS" + # Input arguments + ${ARGN} + ) + + if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/module.cpp) + message(FATAL_ERROR "Not found `module.cpp` file (main file of module)") + endif () + if (NOT EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/${MODULE_NAME}.hpp" OR NOT EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/${MODULE_NAME}.cpp") + message(FATAL_ERROR "Not found `${MODULE_NAME}.hpp` nor `${MODULE_NAME}.cpp` file (class of module)") + endif () + + # Create a shared module library + add_library(${MODULE} MODULE # or SHARED + module.cpp + ${MODULE_NAME}.cpp + ${MODULE_SOURCE} + ) + + # Set exported symbols visibility + set_target_properties(${MODULE} PROPERTIES + CXX_VISIBILITY_PRESET hidden + VISIBILITY_INLINES_HIDDEN ON + ) + + # Set include directories + if (MODULE_INCLUDE_DIRS) + target_include_directories(${MODULE} PRIVATE + ${MODULE_INCLUDE_DIRS} + ) + endif () + + # Set definitions specified for module + if (MODULE_DEFINITIONS) + target_compile_definitions(${MODULE} PRIVATE + ${MODULE_DEFINITIONS} + ) + endif () + + # Link with libs + if (MODULE_LIBRARIES) + target_link_libraries(${MODULE} + ${MODULE_LIBRARIES} + ) + endif () + + # Set C++ standard + target_compile_features(${MODULE} PRIVATE + cxx_std_20 + ) + +endfunction() + +# -------------- Core-part of module subsystem -------------- + +add_library(modules + module_loader.cpp +) + +target_link_libraries(modules + qtils::qtils +) + +# -------------- Modules -------------- + +# Example module +add_subdirectory(example) \ No newline at end of file diff --git a/src/modules/README b/src/modules/README new file mode 100644 index 0000000..a7c5cfe --- /dev/null +++ b/src/modules/README @@ -0,0 +1 @@ +# modules are locating here \ No newline at end of file diff --git a/src/modules/example/CMakeLists.txt b/src/modules/example/CMakeLists.txt new file mode 100644 index 0000000..7ff734f --- /dev/null +++ b/src/modules/example/CMakeLists.txt @@ -0,0 +1,17 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +add_jam_module(example + SOURCE + example.cpp + INCLUDE_DIRS + ${CMAKE_SOURCE_DIR}/src + DEFINITIONS + SOME_FLAG=1 + LIBRARIES + qtils::qtils + soralog::soralog +) \ No newline at end of file diff --git a/src/modules/example/example.cpp b/src/modules/example/example.cpp new file mode 100644 index 0000000..87f7e2f --- /dev/null +++ b/src/modules/example/example.cpp @@ -0,0 +1,20 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "modules/example/example.hpp" + +namespace jam::modules { + std::shared_ptr<ExampleModule> ExampleModule::instance; + + ExampleModule::ExampleModule( + qtils::StrictSharedPtr<ExampleModuleLoader> loader, + qtils::StrictSharedPtr<log::LoggingSystem> logging_system) + : loader_(loader), + logger_(logging_system->getLogger("ExampleModule", "example_module")) + + {} + +} // namespace jam::modules diff --git a/src/modules/example/example.hpp b/src/modules/example/example.hpp new file mode 100644 index 0000000..8e2958a --- /dev/null +++ b/src/modules/example/example.hpp @@ -0,0 +1,34 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <metrics/impl/session_impl.hpp> +#include <modules/module_loader.hpp> +#include <qtils/strict_sptr.hpp> + +namespace jam::modules { + class ExampleModule; + class ExampleModuleLoader; +} + +class BlockTree; + +namespace jam::modules { + + class ExampleModule : public Singleton<ExampleModule> { + public: + static std::shared_ptr<ExampleModule> instance; + CREATE_SHARED_METHOD(ExampleModule); + + ExampleModule(qtils::StrictSharedPtr<ExampleModuleLoader> loader, + qtils::StrictSharedPtr<log::LoggingSystem> logging_system); + + qtils::StrictSharedPtr<ExampleModuleLoader> loader_; + log::Logger logger_; + }; + +} // namespace jam::modules diff --git a/src/modules/example/module.cpp b/src/modules/example/module.cpp new file mode 100644 index 0000000..e4afe0a --- /dev/null +++ b/src/modules/example/module.cpp @@ -0,0 +1,31 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <modules/example/example.hpp> + +#define MODULE_C_API extern "C" __attribute__((visibility("default"))) +#define MODULE_API __attribute__((visibility("default"))) + +MODULE_C_API const char *loader_id() { + return "ExampleLoader"; +} + +MODULE_C_API const char *module_info() { + return "ExampleModule v1.0"; +} + +MODULE_API std::weak_ptr<jam::modules::ExampleModule> query_module_instance( + std::shared_ptr<jam::modules::ExampleModuleLoader> loader, + std::shared_ptr<jam::log::LoggingSystem> block_tree) { + return jam::modules::ExampleModule::instance + ? jam::modules::ExampleModule::instance + : (jam::modules::ExampleModule::instance = jam::modules::ExampleModule::create_shared( + loader, block_tree)); +} + +MODULE_API void release_module_instance() { + jam::modules::ExampleModule::instance.reset(); +} diff --git a/src/modules/module.hpp b/src/modules/module.hpp new file mode 100644 index 0000000..4279464 --- /dev/null +++ b/src/modules/module.hpp @@ -0,0 +1,69 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <cerrno> +#include <cstring> +#include <dlfcn.h> +#include <memory> +#include <optional> +#include <string> + +#include <qtils/create_smart_pointer_macros.hpp> + +namespace jam::modules { + + class Module final { + public: + CREATE_SHARED_METHOD(Module) + + // Static method for Module object creation + static std::shared_ptr<Module> create( + const std::string &path, + std::unique_ptr<void, decltype(&dlclose)> handle, + const std::string &loader_id) { + return std::shared_ptr<Module>( + new Module(path, std::move(handle), loader_id)); + } + + // Getter for library path + const std::string &get_path() const { + return path_; + } + + // Getter for loader Id + const std::string &get_loader_id() const { + return loader_id_; + } + + // Get function address from library + template <typename ReturnType, typename... ArgTypes> + std::optional<ReturnType> getFunctionFromLibrary(const char *funcName) { + void *funcAddr = dlsym(handle_.get(), funcName); + if (!funcAddr) { + return std::nullopt; + } + return reinterpret_cast<ReturnType (*)(ArgTypes...)>(funcAddr); + } + + private: + Module(const std::string &path, + std::unique_ptr<void, decltype(&dlclose)> handle, + const std::string &loader_id) + : path_(path), handle_(std::move(handle)), loader_id_(loader_id) {} + + std::string path_; // Library path + std::unique_ptr<void, decltype(&dlclose)> handle_; // Library handle + std::string loader_id_; // Loader ID + + Module(const Module &) = delete; + Module &operator=(const Module &) = delete; + Module(Module &&) = delete; + Module &operator=(Module &&) = delete; + }; + +} // namespace jam::modules diff --git a/src/modules/module_loader.cpp b/src/modules/module_loader.cpp new file mode 100644 index 0000000..b7c9da0 --- /dev/null +++ b/src/modules/module_loader.cpp @@ -0,0 +1,80 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "modules/module_loader.hpp" + +#define COMPONENT_NAME "ModuleLoader" + +OUTCOME_CPP_DEFINE_CATEGORY(jam::modules, ModuleLoader::Error, e) { + using E = jam::modules::ModuleLoader::Error; + switch (e) { + case E::PathIsNotADir: + return COMPONENT_NAME ": path is not a directory"; + case E::OpenLibraryFailed: + return COMPONENT_NAME ": open library failed"; + case E::NoLoaderIdExport: + return COMPONENT_NAME ": library doesn't provide loader_id function"; + case E::UnexpectedLoaderId: + return COMPONENT_NAME ": unexpected loader id"; + } + return COMPONENT_NAME ": unknown error"; +} + +namespace jam::modules { + + Result<void> ModuleLoader::recursive_search( + const fs::path &dir_path, std::deque<std::shared_ptr<Module>> &modules) { + if (!fs::exists(dir_path) || !fs::is_directory(dir_path)) { + return Error::PathIsNotADir; + } + + for (const auto &entry : fs::directory_iterator(dir_path)) { + const auto &entry_path = entry.path(); + const auto &entry_name = entry.path().filename().string(); + + if (entry_name[0] == '.' || entry_name[0] == '_') { + continue; + } + + if (fs::is_directory(entry)) { + OUTCOME_TRY(recursive_search(entry_path, modules)); + } else if (fs::is_regular_file(entry) + && entry_path.extension() == ".so") { + OUTCOME_TRY(load_module(entry_path.string(), modules)); + } + } + return outcome::success(); + } + + Result<void> ModuleLoader::load_module( + const std::string &module_path, + std::deque<std::shared_ptr<Module>> &modules) { + std::unique_ptr<void, decltype(&dlclose)> handle( + dlopen(module_path.c_str(), RTLD_LAZY), dlclose); + if (!handle) { + return Error::OpenLibraryFailed; + } + + typedef const char *(*LoaderIdFunc)(); + LoaderIdFunc loader_id_func = + (LoaderIdFunc)dlsym(handle.get(), "loader_id"); + + if (!loader_id_func) { + return Error::NoLoaderIdExport; + } + + const char *loader_id = loader_id_func(); + if (!loader_id) { + return Error::UnexpectedLoaderId; + } + + auto module = Module::create_shared(module_path, std::move(handle), loader_id); + modules.push_back(module); + return outcome::success(); + } + + +} // namespace jam::modules diff --git a/src/modules/module_loader.hpp b/src/modules/module_loader.hpp new file mode 100644 index 0000000..daae0d8 --- /dev/null +++ b/src/modules/module_loader.hpp @@ -0,0 +1,56 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <cerrno> +#include <cstring> +#include <deque> +#include <dlfcn.h> +#include <filesystem> +#include <iostream> +#include <string> + +#include <qtils/enum_error_code.hpp> +#include <qtils/outcome.hpp> + +#include "modules/module.hpp" + +namespace fs = std::filesystem; +template <typename T> +using Result = outcome::result<T>; + +namespace jam::modules { + + class ModuleLoader { + public: + enum class Error : uint8_t { + PathIsNotADir, + OpenLibraryFailed, + NoLoaderIdExport, + UnexpectedLoaderId, + }; + + explicit ModuleLoader(const std::string &dir_path) : dir_path_(dir_path) {} + + Result<std::deque<std::shared_ptr<Module>>> get_modules() { + std::deque<std::shared_ptr<Module>> modules; + OUTCOME_TRY(recursive_search(fs::path(dir_path_), modules)); + return modules; + } + + private: + std::string dir_path_; + + Result<void> recursive_search(const fs::path &dir_path, + std::deque<std::shared_ptr<Module>> &modules); + Result<void> load_module(const std::string &module_path, + std::deque<std::shared_ptr<Module>> &modules); + }; + +} // namespace jam::modules + +OUTCOME_HPP_DECLARE_ERROR(jam::modules, ModuleLoader::Error); diff --git a/src/se/CMakeLists.txt b/src/se/CMakeLists.txt new file mode 100644 index 0000000..fe524f6 --- /dev/null +++ b/src/se/CMakeLists.txt @@ -0,0 +1,21 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +add_library(se_async + async_dispatcher.cpp + subscription.cpp + ) + +target_link_libraries(se_async + ) + +add_library(se_sync + sync_dispatcher.cpp + subscription.cpp + ) + +target_link_libraries(se_sync + ) diff --git a/src/se/async_dispatcher.cpp b/src/se/async_dispatcher.cpp new file mode 100644 index 0000000..a742632 --- /dev/null +++ b/src/se/async_dispatcher.cpp @@ -0,0 +1,20 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <memory> + +#include "impl/async_dispatcher_impl.hpp" +#include "subscription.hpp" + +namespace jam::se { + + std::shared_ptr<Dispatcher> getDispatcher() { + return std::make_shared< + AsyncDispatcher<SubscriptionEngineHandlers::kTotalCount, + kThreadPoolSize>>(); + } + +} // namespace jam::se diff --git a/src/se/impl/async_dispatcher_impl.hpp b/src/se/impl/async_dispatcher_impl.hpp new file mode 100644 index 0000000..8169cca --- /dev/null +++ b/src/se/impl/async_dispatcher_impl.hpp @@ -0,0 +1,160 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "common.hpp" +#include "dispatcher.hpp" +#include "thread_handler.hpp" + +namespace jam::se { + + template <uint32_t kCount, uint32_t kPoolSize> + class AsyncDispatcher final : public IDispatcher, + utils::NoCopy, + utils::NoMove { + public: + static constexpr uint32_t kHandlersCount = kCount; + static constexpr uint32_t kPoolThreadsCount = kPoolSize; + + private: + using Parent = IDispatcher; + + struct SchedulerContext { + /// Scheduler to execute tasks + std::shared_ptr<IScheduler> handler; + }; + + SchedulerContext handlers_[kHandlersCount]; + SchedulerContext pool_[kPoolThreadsCount]; + + std::atomic_int64_t temporary_handlers_tasks_counter_; + std::atomic<bool> is_disposed_; + + struct BoundContexts { + typename Parent::Tid next_tid_offset = 0u; + std::unordered_map<typename Parent::Tid, SchedulerContext> contexts; + }; + utils::ReadWriteObject<BoundContexts> bound_; + + void uploadToHandler(const typename Parent::Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task, + typename Parent::Predicate &&pred) { + assert(tid != kExecuteInPool || !pred); + if (is_disposed_.load()) { + return; + } + + if (tid < kHandlersCount) { + pred ? handlers_[tid].handler->repeat( + timeout, std::move(task), std::move(pred)) + : handlers_[tid].handler->addDelayed(timeout, std::move(task)); + return; + } + + if (auto context = + bound_.sharedAccess([tid](const BoundContexts &bound) + -> std::optional<SchedulerContext> { + if (auto it = bound.contexts.find(tid); + it != bound.contexts.end()) { + return it->second; + } + return std::nullopt; + })) { + pred ? context->handler->repeat( + timeout, std::move(task), std::move(pred)) + : context->handler->addDelayed(timeout, std::move(task)); + return; + } + + std::optional<typename Parent::Task> opt_task = std::move(task); + for (auto &handler : pool_) { + if (opt_task = + handler.handler->uploadIfFree(timeout, std::move(*opt_task)); + !opt_task) { + return; + } + } + + auto h = std::make_shared<ThreadHandler>(); + ++temporary_handlers_tasks_counter_; + h->addDelayed(timeout, [this, h, task{std::move(*opt_task)}]() mutable { + if (!is_disposed_.load()) { + task(); + } + --temporary_handlers_tasks_counter_; + h->dispose(false); + }); + } + + public: + AsyncDispatcher() { + temporary_handlers_tasks_counter_.store(0); + is_disposed_ = false; + for (auto &h : handlers_) { + h.handler = std::make_shared<ThreadHandler>(); + } + for (auto &h : pool_) { + h.handler = std::make_shared<ThreadHandler>(); + } + } + + void dispose() override { + is_disposed_ = true; + for (auto &h : handlers_) { + h.handler->dispose(); + } + for (auto &h : pool_) { + h.handler->dispose(); + } + + while (temporary_handlers_tasks_counter_.load() != 0) { + std::this_thread::sleep_for(std::chrono::microseconds(0ull)); + } + } + + void add(typename Parent::Tid tid, typename Parent::Task &&task) override { + uploadToHandler( + tid, std::chrono::microseconds(0ull), std::move(task), nullptr); + } + + void addDelayed(typename Parent::Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task) override { + uploadToHandler(tid, timeout, std::move(task), nullptr); + } + + void repeat(typename Parent::Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task, + typename Parent::Predicate &&pred) override { + uploadToHandler(tid, timeout, std::move(task), std::move(pred)); + } + + std::optional<Tid> bind(std::shared_ptr<IScheduler> scheduler) override { + if (!scheduler) { + return std::nullopt; + } + + return bound_.exclusiveAccess( + [scheduler(std::move(scheduler))](BoundContexts &bound) { + const auto execution_tid = kHandlersCount + bound.next_tid_offset; + assert(bound.contexts.find(execution_tid) == bound.contexts.end()); + bound.contexts[execution_tid] = SchedulerContext{scheduler}; + ++bound.next_tid_offset; + return execution_tid; + }); + } + + bool unbind(Tid tid) override { + return bound_.exclusiveAccess([tid](BoundContexts &bound) { + return bound.contexts.erase(tid) == 1; + }); + } + }; + +} // namespace jam::se diff --git a/src/se/impl/common.hpp b/src/se/impl/common.hpp new file mode 100644 index 0000000..6e53912 --- /dev/null +++ b/src/se/impl/common.hpp @@ -0,0 +1,108 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <chrono> +#include <condition_variable> +#include <mutex> +#include <shared_mutex> + +namespace jam::se::utils { + + template <typename To, typename From> + inline std::shared_ptr<To> reinterpret_pointer_cast( + const std::shared_ptr<From> &ptr) noexcept { + return std::shared_ptr<To>(ptr, reinterpret_cast<To *>(ptr.get())); + } + + template <typename T> + inline std::weak_ptr<T> make_weak(const std::shared_ptr<T> &ptr) noexcept { + return ptr; + } + + struct NoCopy { + NoCopy(const NoCopy &) = delete; + NoCopy &operator=(const NoCopy &) = delete; + NoCopy() = default; + }; + + struct NoMove { + NoMove(NoMove &&) = delete; + NoMove &operator=(NoMove &&) = delete; + NoMove() = default; + }; + + template <typename T, typename M = std::shared_mutex> + struct SafeObject { + using Type = T; + + template <typename... Args> + SafeObject(Args &&...args) : t_(std::forward<Args>(args)...) {} + + template <typename F> + inline auto exclusiveAccess(F &&f) { + std::unique_lock lock(cs_); + return std::forward<F>(f)(t_); + } + + template <typename F> + inline auto sharedAccess(F &&f) const { + std::shared_lock lock(cs_); + return std::forward<F>(f)(t_); + } + + T &unsafeGet() { + return t_; + } + + const T &unsafeGet() const { + return t_; + } + + private: + T t_; + mutable M cs_; + }; + + template <typename T, typename M = std::shared_mutex> + using ReadWriteObject = SafeObject<T, M>; + + class WaitForSingleObject final : NoMove, NoCopy { + std::condition_variable wait_cv_; + std::mutex wait_m_; + bool flag_; + + public: + WaitForSingleObject() : flag_{true} {} + + bool wait(std::chrono::microseconds wait_timeout) { + std::unique_lock<std::mutex> _lock(wait_m_); + return wait_cv_.wait_for(_lock, wait_timeout, [&]() { + auto prev = !flag_; + flag_ = true; + return prev; + }); + } + + void wait() { + std::unique_lock<std::mutex> _lock(wait_m_); + wait_cv_.wait(_lock, [&]() { + auto prev = !flag_; + flag_ = true; + return prev; + }); + } + + void set() { + { + std::unique_lock<std::mutex> _lock(wait_m_); + flag_ = false; + } + wait_cv_.notify_one(); + } + }; +} // namespace jam::se::utils diff --git a/src/se/impl/compile-time_murmur2.hpp b/src/se/impl/compile-time_murmur2.hpp new file mode 100644 index 0000000..4aa1beb --- /dev/null +++ b/src/se/impl/compile-time_murmur2.hpp @@ -0,0 +1,102 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <cstdint> + +namespace jam::se::utils { + + class Hasher { + static constexpr /* h */ uint32_t __init__(uint32_t len) { + return 0 ^ len; + } + + template <typename __T> + static constexpr uint32_t __load__(__T &data, uint32_t offset) { + return data[offset + 0] | (data[offset + 1] << 8) + | (data[offset + 2] << 16) | (data[offset + 3] << 24); + } + + static constexpr uint32_t __mul__(uint32_t val1, uint32_t val2) { + return val1 * val2; + } + + static constexpr uint32_t __sl__(uint32_t value, uint32_t count) { + return (value << count); + } + + static constexpr uint32_t __sr__(uint32_t value, uint32_t count) { + return (value >> count); + } + + static constexpr uint32_t __xor__(uint32_t h, uint32_t k) { + return h ^ k; + } + + static constexpr uint32_t __xor_with_sr__(uint32_t k, uint32_t r) { + return __xor__(k, __sr__(k, r)); + } + + template <typename __Type> + static constexpr /* h */ uint32_t __proc__(__Type &data, + uint32_t len, + uint32_t offset, + uint32_t h, + uint32_t m, + uint32_t r) { + return len >= 4 + ? __proc__( + data, + len - 4, + offset + 4, + __xor__(__mul__(h, m), + __mul__(__xor_with_sr__( + __mul__(__load__(data, offset), m), r), + m)), + m, + r) + : len == 3 ? __proc__(data, + len - 1, + offset, + __xor__(h, __sl__(data[offset + 2], 16)), + m, + r) + : len == 2 ? __proc__(data, + len - 1, + offset, + __xor__(h, __sl__(data[offset + 1], 8)), + m, + r) + : len == 1 + ? __proc__( + data, len - 1, offset, __xor__(h, data[offset]) * m, m, r) + : __xor__(__mul__(__xor_with_sr__(h, 13), m), + __sr__(__mul__(__xor_with_sr__(h, 13), m), 15)); + } + + public: + template <typename __Type> + static constexpr uint32_t murmur2(__Type &data, uint32_t len) { + return __proc__(data, len, 0, __init__(len), 0x5bd1e995, 24); + } + }; + +} // namespace jam::se::utils + +#ifndef CT_MURMUR2 +#define CT_MURMUR2(x) \ + ::jam::se::utils::Hasher::murmur2(x, (sizeof(x) / sizeof(x[0])) - 1) +#endif // CT_MURMUR2 + +static_assert(CT_MURMUR2("Called the One Ring, or the Ruling Ring.") + == 1333588607); +static_assert( + CT_MURMUR2("Fashioned by Sauron a decade after the making of the Elven " + "rings in the fires of Mount Doom in Mordor and which") + == 1319897327); +static_assert(CT_MURMUR2("could only be destroyed in that same fire.") + == 702138758); diff --git a/src/se/impl/dispatcher.hpp b/src/se/impl/dispatcher.hpp new file mode 100644 index 0000000..5dda15c --- /dev/null +++ b/src/se/impl/dispatcher.hpp @@ -0,0 +1,39 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <optional> +#include <memory> + +#include "scheduler.hpp" + +namespace jam::se { + + struct IDispatcher { + using Tid = uint32_t; + using Task = IScheduler::Task; + using Predicate = IScheduler::Predicate; + + static constexpr Tid kExecuteInPool = std::numeric_limits<Tid>::max(); + + virtual ~IDispatcher() = default; + + virtual std::optional<Tid> bind(std::shared_ptr<IScheduler> scheduler) = 0; + virtual bool unbind(Tid tid) = 0; + + virtual void dispose() = 0; + virtual void add(Tid tid, Task &&task) = 0; + virtual void addDelayed(Tid tid, + std::chrono::microseconds timeout, + Task &&task) = 0; + virtual void repeat(Tid tid, + std::chrono::microseconds timeout, + Task &&task, + Predicate &&pred) = 0; + }; + +} // namespace jam::se diff --git a/src/se/impl/scheduler.hpp b/src/se/impl/scheduler.hpp new file mode 100644 index 0000000..cdc141a --- /dev/null +++ b/src/se/impl/scheduler.hpp @@ -0,0 +1,42 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <functional> +#include <optional> +#include <chrono> + +namespace jam::se { + + class IScheduler { + public: + using Task = std::function<void()>; + using Predicate = std::function<bool()>; + virtual ~IScheduler() {} + + /// Stops sheduler work and tasks execution + virtual void dispose(bool wait_for_release = true) = 0; + + /// Checks if current scheduler executes task + virtual bool isBusy() const = 0; + + /// If scheduller is not busy it takes task for execution. Otherwise it + /// returns it back. + virtual std::optional<Task> uploadIfFree(std::chrono::microseconds timeout, + Task &&task) = 0; + + /// Adds delayed task to execution queue + virtual void addDelayed(std::chrono::microseconds timeout, Task &&t) = 0; + + /// Adds task that will be periodicaly called with timeout period after + /// timeout, until predicate return true + virtual void repeat(std::chrono::microseconds timeout, + Task &&t, + Predicate &&pred) = 0; + }; + +} // namespace jam::se diff --git a/src/se/impl/scheduler_impl.hpp b/src/se/impl/scheduler_impl.hpp new file mode 100644 index 0000000..d2c9841 --- /dev/null +++ b/src/se/impl/scheduler_impl.hpp @@ -0,0 +1,185 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <algorithm> +#include <assert.h> +#include <atomic> +#include <chrono> +#include <deque> +#include <functional> +#include <mutex> +#include <optional> +#include <thread> + +#include "common.hpp" +#include "scheduler.hpp" + +namespace jam::se { + + class SchedulerBase : public IScheduler, utils::NoCopy, utils::NoMove { + private: + using Time = std::chrono::high_resolution_clock; + using Timepoint = std::chrono::time_point<Time>; + + struct TimedTask { + Timepoint created; + std::chrono::microseconds timeout; + Predicate predic; + Task task; + }; + using TaskContainer = std::deque<TimedTask>; + + /// Flag shows if thread loop should continue processing or exit + std::atomic_flag proceed_; + + mutable std::mutex tasks_cs_; + + /// List of tasks to be performed + TaskContainer tasks_; + + /// Event that is set when loop should make some work or exit + utils::WaitForSingleObject event_; + + /// Flag that shows if current handler is in task execution state + bool is_busy_; + + std::thread::id id_; + + private: + inline void checkLocked() { + /// Need to check that we are locked in debug. + assert(!tasks_cs_.try_lock()); + } + + inline Timepoint now() const { + return Time::now(); + } + + TaskContainer::const_iterator after(const Timepoint &tp) { + checkLocked(); + return std::upper_bound( + tasks_.begin(), tasks_.end(), tp, [](const auto &l, const auto &r) { + return l < (r.created + r.timeout); + }); + } + + void insert(TaskContainer::const_iterator after, TimedTask &&t) { + checkLocked(); + tasks_.insert(after, std::move(t)); + } + + bool extractExpired(TimedTask &task) { + std::lock_guard lock(tasks_cs_); + const Timepoint before = now(); + if (!tasks_.empty()) { + auto &first_task = tasks_.front(); + const auto timepoint = first_task.created + first_task.timeout; + if (timepoint <= before) { + task = std::move(first_task); + tasks_.pop_front(); + is_busy_ = true; + return true; + } + } + is_busy_ = false; + return false; + } + + ///@returns time duration from now till first task will be executed + std::chrono::microseconds untilFirst() const { + std::lock_guard lock(tasks_cs_); + const auto before = now(); + if (!tasks_.empty()) { + const auto &first = tasks_.front(); + const auto timepoint = first.created + first.timeout; + if (timepoint > before) { + return std::chrono::duration_cast<std::chrono::microseconds>( + timepoint - before); + } + + return std::chrono::microseconds(0ull); + } + return std::chrono::minutes(10ull); + } + + void add(TimedTask &&task) { + assert(!tasks_cs_.try_lock()); + if (task.timeout == std::chrono::microseconds(0ull)) { + is_busy_ = true; + } + + insert(after(task.created + task.timeout), std::move(task)); + event_.set(); + } + + public: + SchedulerBase() : is_busy_(false) { + proceed_.test_and_set(); + } + + uint32_t process() { + id_ = std::this_thread::get_id(); + TimedTask task{}; + do { + if (extractExpired(task)) { + try { + if (task.task) { + if (!task.predic) { + task.task(); + } else if (task.predic()) { + task.task(); + std::lock_guard lock(tasks_cs_); + task.created = now(); + add(std::move(task)); + } + } + } catch (...) { + } + } else { + event_.wait(untilFirst()); + } + + } while (proceed_.test_and_set()); + return 0; + } + + void dispose(bool wait_for_release = true) override { + proceed_.clear(); + event_.set(); + } + + bool isBusy() const override { + std::lock_guard lock(tasks_cs_); + return is_busy_; + } + + std::optional<Task> uploadIfFree(std::chrono::microseconds timeout, + Task &&task) override { + std::lock_guard lock(tasks_cs_); + if (is_busy_) { + return std::move(task); + } + + add(TimedTask{now(), timeout, nullptr, std::move(task)}); + return std::nullopt; + } + + void addDelayed(std::chrono::microseconds timeout, Task &&t) override { + std::lock_guard lock(tasks_cs_); + add(TimedTask{now(), timeout, nullptr, std::move(t)}); + } + + void repeat(std::chrono::microseconds timeout, + Task &&t, + Predicate &&pred) override { + std::lock_guard lock(tasks_cs_); + add(TimedTask{now(), timeout, std::move(pred), std::move(t)}); + } + }; + +} // namespace jam::se diff --git a/src/se/impl/subscriber.hpp b/src/se/impl/subscriber.hpp new file mode 100644 index 0000000..1d62d3c --- /dev/null +++ b/src/se/impl/subscriber.hpp @@ -0,0 +1,43 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <memory> + +#include "common.hpp" + +namespace jam::se { + + using SubscriptionSetId = uint32_t; + + /** + * Base class that determines the subscriber. + * @tparam EventKey type of listening event + * @tparam Dispatcher thread dispatcher to execute tasks + * @tparam Arguments list of event arguments + */ + template <typename EventKey, typename Dispatcher, typename... Arguments> + class Subscriber : public std::enable_shared_from_this< + Subscriber<EventKey, Dispatcher, Arguments...>>, + utils::NoMove, + utils::NoCopy { + public: + using EventType = EventKey; + virtual ~Subscriber() = default; + + /** + * Notification callback function + * @param set_id the id of the subscription set + * @param key notified event + * @param args event data + */ + virtual void on_notify(SubscriptionSetId set_id, + const EventType &key, + Arguments &&...args) = 0; + }; + +} // namespace jam::se diff --git a/src/se/impl/subscriber_impl.hpp b/src/se/impl/subscriber_impl.hpp new file mode 100644 index 0000000..c4d2481 --- /dev/null +++ b/src/se/impl/subscriber_impl.hpp @@ -0,0 +1,196 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <atomic> +#include <functional> +#include <memory> +#include <mutex> + +#include "subscriber.hpp" +#include "subscription_engine.hpp" + +namespace jam::se { + + /** + * Is a wrapper class, which provides subscription to events from + * SubscriptionEngine + * @tparam EventKey is a type of a particular subscription event (might be a + * key from an observed storage or a specific event type from an enumeration). + * @tparam Dispatcher thread dispatcher + * @tparam Receiver is a type of an object which is a part of Subscriber's + * internal state and can be accessed on every event notification. + * @tparam Arguments is a set of types of objects that are passed on every + * event notification. + */ + template <typename EventKey, + typename Dispatcher, + typename Receiver, + typename... Arguments> + class SubscriberImpl : public Subscriber<EventKey, Dispatcher, Arguments...> { + public: + using ReceiverType = Receiver; + using Hash = size_t; + using Parent = Subscriber<EventKey, Dispatcher, Arguments...>; + + using SubscriptionEngineType = SubscriptionEngine< + typename Parent::EventType, + Dispatcher, + Subscriber<typename Parent::EventType, Dispatcher, Arguments...>>; + using SubscriptionEnginePtr = std::shared_ptr<SubscriptionEngineType>; + using SubscriptionEngineWPtr = std::weak_ptr<SubscriptionEngineType>; + + using CallbackFnType = + std::function<void(SubscriptionSetId, + ReceiverType &, + const typename Parent::EventType &, + Arguments &&...)>; + + private: + using SubscriptionsContainer = + std::unordered_map<typename Parent::EventType, + typename SubscriptionEngineType::IteratorType>; + using SubscriptionsSets = + std::unordered_map<SubscriptionSetId, SubscriptionsContainer>; + + std::atomic<SubscriptionSetId> next_id_; + + /// Subscription engine weak pointer + SubscriptionEngineWPtr engine_; + + /// Internal object stored in subscriber and available in notification + /// call + ReceiverType object_; + + std::mutex subscriptions_cs_; + + /// Associative container with all active subscriptions: + /// subscription set_id -> notification event -> subscription iterator + SubscriptionsSets subscriptions_sets_; + + /// Stored notification callback + CallbackFnType on_notify_callback_; + + template <typename... SubscriberConstructorArgs> + SubscriberImpl(const SubscriptionEnginePtr &ptr, + SubscriberConstructorArgs &&...args) + : next_id_(0ull), + engine_(ptr), + object_(std::forward<SubscriberConstructorArgs>(args)...) {} + + public: + template <typename... SubscriberConstructorArgs> + static std::shared_ptr<SubscriberImpl> create( + const SubscriptionEnginePtr &ptr, SubscriberConstructorArgs &&...args) { + struct Resolver : SubscriberImpl { + Resolver(const SubscriptionEnginePtr &ptr, + SubscriberConstructorArgs &&...args) + : SubscriberImpl( + ptr, std::forward<SubscriberConstructorArgs>(args)...) {} + }; + return std::make_shared<Resolver>( + ptr, std::forward<SubscriberConstructorArgs>(args)...); + } + + ~SubscriberImpl() {} + + void setCallback(CallbackFnType &&f) { + on_notify_callback_ = std::move(f); + } + + SubscriptionSetId generateSubscriptionSetId() { + return ++next_id_; + } + + void subscribe(SubscriptionSetId id, + const typename Parent::EventType &key, + typename Dispatcher::Tid tid = Dispatcher::kExecuteInPool) { + if (auto engine = engine_.lock()) { + std::lock_guard lock(subscriptions_cs_); + auto &&[it, inserted] = subscriptions_sets_[id].emplace( + key, typename SubscriptionEngineType::IteratorType{}); + + /// Here we check first local subscriptions because of strong connection + /// with SubscriptionEngine. + if (inserted) { + it->second = + engine->subscribe(tid, id, key, Parent::weak_from_this()); + } + } + } + + /** + * @param id -- subscription set id that unsubscribes from \arg key + * @param key -- event key to unsubscribe from + * @return true if was subscribed to \arg key, false otherwise + */ + bool unsubscribe(SubscriptionSetId id, + const typename Parent::EventType &key) { + std::lock_guard<std::mutex> lock(subscriptions_cs_); + if (auto set_it = subscriptions_sets_.find(id); + set_it != subscriptions_sets_.end()) { + auto &subscriptions = set_it->second; + auto it = subscriptions.find(key); + if (subscriptions.end() != it) { + if (auto engine = engine_.lock()) { + engine->unsubscribe(key, it->second); + } + subscriptions.erase(it); + return true; + } + } + return false; + } + + /** + * @param id -- subscription set id to unsubscribe from + * @return true if was subscribed to \arg id, false otherwise + */ + bool unsubscribe(SubscriptionSetId id) { + std::lock_guard<std::mutex> lock(subscriptions_cs_); + if (auto set_it = subscriptions_sets_.find(id); + set_it != subscriptions_sets_.end()) { + if (auto engine = engine_.lock()) { + auto &subscriptions = set_it->second; + for (auto &[key, it] : subscriptions) { + engine->unsubscribe(key, it); + } + } + + subscriptions_sets_.erase(set_it); + return true; + } + return false; + } + + void unsubscribe() { + std::lock_guard<std::mutex> lock(subscriptions_cs_); + if (auto engine = engine_.lock()) { + for (auto &[_, subscriptions] : subscriptions_sets_) { + for (auto &[key, it] : subscriptions) { + engine->unsubscribe(key, it); + } + } + } + + subscriptions_sets_.clear(); + } + + void on_notify(SubscriptionSetId set_id, + const typename Parent::EventType &key, + Arguments &&...args) override { + if (nullptr != on_notify_callback_) { + on_notify_callback_(set_id, object_, key, std::move(args)...); + } + } + + ReceiverType &get() { + return object_; + } + }; + +} // namespace jam::se diff --git a/src/se/impl/subscription_engine.hpp b/src/se/impl/subscription_engine.hpp new file mode 100644 index 0000000..218283b --- /dev/null +++ b/src/se/impl/subscription_engine.hpp @@ -0,0 +1,219 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <assert.h> +#include <list> +#include <memory> +#include <shared_mutex> +#include <unordered_map> + +#include "common.hpp" +#include "subscriber.hpp" + +namespace jam::se { + + struct IDisposable { + virtual void dispose() = 0; + }; + + /** + * @tparam EventKey - the type of a specific event from event set (e. g. a key + * from a storage or a particular kind of event from an enumeration) + * @tparam Dispatcher - thread handler + * @tparam Receiver - the type of an object that is a part of a Subscriber + * internal state and can be accessed on every event + */ + template <typename EventKey, typename Dispatcher, typename Receiver> + class SubscriptionEngine final + : public IDisposable, + public std::enable_shared_from_this< + SubscriptionEngine<EventKey, Dispatcher, Receiver>>, + utils::NoMove, + utils::NoCopy { + public: + using EventKeyType = EventKey; + using ReceiverType = Receiver; + using SubscriberType = Receiver; + using SubscriberWeakPtr = std::weak_ptr<SubscriberType>; + using DispatcherType = typename std::decay<Dispatcher>::type; + using DispatcherPtr = std::shared_ptr<DispatcherType>; + + /// List is preferable here because this container iterators remain + /// alive after removal from the middle of the container + /// using custom allocator + using SubscribersContainer = std::list<std::tuple<typename Dispatcher::Tid, + SubscriptionSetId, + SubscriberWeakPtr>>; + using IteratorType = typename SubscribersContainer::iterator; + + public: + explicit SubscriptionEngine(const DispatcherPtr &dispatcher) + : dispatcher_(dispatcher) { + assert(dispatcher_); + } + ~SubscriptionEngine() = default; + + void dispose() override { + dispatcher_.reset(); + } + + private: + /// List of subscribers for a single event key + struct SubscriptionContext final { + std::mutex subscribers_list_cs; + SubscribersContainer subscribers_list; + }; + using KeyValueContainer = + std::unordered_map<EventKeyType, SubscriptionContext>; + + mutable std::shared_mutex subscribers_map_cs_; + + /// Associative container with lists of subscribers by the event key + KeyValueContainer subscribers_map_; + + /// Thread handlers dispatcher + DispatcherPtr dispatcher_; + + public: + /** + * Stores Subscriber object to retrieve later notifications + * @param tid Thread ID in which subscribers callback will be executed + * @param set_id subscription set id is a group identifier in multiple + * subscriptions + * @param key notification event key that this subscriber will listen to + * @param ptr subscriber weak pointer + * @return a position in an internal container with subscribers(!!! it must + * be kept valid in case the other subscriber will be deleted from this + * container) + */ + IteratorType subscribe(typename Dispatcher::Tid tid, + SubscriptionSetId set_id, + const EventKeyType &key, + SubscriberWeakPtr ptr) { + std::unique_lock lock(subscribers_map_cs_); + auto &subscribers_context = subscribers_map_[key]; + + std::lock_guard l(subscribers_context.subscribers_list_cs); + return subscribers_context.subscribers_list.emplace( + subscribers_context.subscribers_list.end(), + std::make_tuple(tid, set_id, std::move(ptr))); + } + + /** + * Stops the subscriber from listening to events + * @param key notification event that must be unsubscribed + * @param it_remove iterator to the subscribers position + */ + void unsubscribe(const EventKeyType &key, const IteratorType &it_remove) { + std::unique_lock lock(subscribers_map_cs_); + auto it = subscribers_map_.find(key); + if (subscribers_map_.end() != it) { + auto &subscribers_context = it->second; + std::lock_guard l(subscribers_context.subscribers_list_cs); + subscribers_context.subscribers_list.erase(it_remove); + if (subscribers_context.subscribers_list.empty()) { + subscribers_map_.erase(it); + } + } + } + + /** + * Number of subscribers which listen to the current notification event + * @param key notification event + * @return number of subscribers + */ + size_t size(const EventKeyType &key) const { + std::shared_lock lock(subscribers_map_cs_); + if (auto it = subscribers_map_.find(key); it != subscribers_map_.end()) { + auto &subscribers_context = it->second; + std::lock_guard l(subscribers_context.subscribers_list_cs); + return subscribers_context.subscribers_list.size(); + } + return 0ull; + } + + /** + * Number of subscribers which listen to all notification events + * @return number of subscribers + */ + size_t size() const { + std::shared_lock lock(subscribers_map_cs_); + size_t count = 0ull; + for (auto &it : subscribers_map_) { + auto &subscribers_context = it->second; + std::lock_guard l(subscribers_context.subscribers_list_cs); + count += subscribers_context.subscribers_list.size(); + } + return count; + } + + /** + * Notify the event subscribers without delay + * @tparam EventParams notification event type + * @param key notification event to be executed + * @param args event data to transmit + */ + template <typename... EventParams> + void notify(const EventKeyType &key, const EventParams &...args) { + notifyDelayed(std::chrono::microseconds(0ull), key, args...); + } + + /** + * Notify the event subscribers after a specified delay + * @tparam EventParams notification event type + * @param timeout delay before subscribers will be notified + * @param key notification event to be executed + * @param args event data to transmit + */ + template <typename... EventParams> + void notifyDelayed(std::chrono::microseconds timeout, + const EventKeyType &key, + const EventParams &...args) { + auto dispatcher = dispatcher_; + if (!dispatcher) { + return; + } + + std::shared_lock lock(subscribers_map_cs_); + auto it = subscribers_map_.find(key); + if (subscribers_map_.end() == it) { + return; + } + + auto &subscribers_container = it->second; + std::lock_guard l(subscribers_container.subscribers_list_cs); + for (auto it_sub = subscribers_container.subscribers_list.begin(); + it_sub != subscribers_container.subscribers_list.end();) { + auto wsub = std::get<2>(*it_sub); + auto id = std::get<1>(*it_sub); + + if (!wsub.expired()) { + dispatcher->addDelayed(std::get<0>(*it_sub), + timeout, + [wsub(std::move(wsub)), + id(id), + key(key), + args = std::make_tuple(args...)]() mutable { + if (auto sub = wsub.lock()) { + std::apply( + [&](auto &&...args) { + sub->on_notify( + id, key, std::move(args)...); + }, + std::move(args)); + } + }); + ++it_sub; + } else { + it_sub = subscribers_container.subscribers_list.erase(it_sub); + } + } + } + }; + +} // namespace jam::se diff --git a/src/se/impl/subscription_manager.hpp b/src/se/impl/subscription_manager.hpp new file mode 100644 index 0000000..1c1f8a8 --- /dev/null +++ b/src/se/impl/subscription_manager.hpp @@ -0,0 +1,172 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <assert.h> +#include <memory> +#include <shared_mutex> +#include <unordered_map> + +#include "common.hpp" +#include "compile-time_murmur2.hpp" +#include "dispatcher.hpp" +#include "subscriber.hpp" +#include "subscription_engine.hpp" + +namespace jam::se { + + /** + * Class-aggregator that keeps all event engines inside. On notification it + * selects the appropriate engine and calls notification in it. + * @tparam kHandlersCount number of supported thread handlers + * @tparam kPoolSize number of threads in thread pool + */ + template <uint32_t kHandlersCount, uint32_t kPoolSize> + class SubscriptionManager final + : public std::enable_shared_from_this< + SubscriptionManager<kHandlersCount, kPoolSize>>, + utils::NoMove, + utils::NoCopy { + public: + using Dispatcher = jam::se::IDispatcher; + + private: + using EngineHash = uint64_t; + using DispatcherPtr = std::shared_ptr<Dispatcher>; + using EnginesList = std::unordered_map<EngineHash, std::shared_ptr<void>>; + + private: + /// Thread handlers dispatcher + DispatcherPtr dispatcher_; + std::shared_mutex engines_cs_; + /// Engines container + EnginesList engines_; + std::atomic_flag disposed_; + + private: + template <typename... Args> + static constexpr EngineHash getSubscriptionHash() { +#ifdef _WIN32 + constexpr EngineHash value = CT_MURMUR2(__FUNCSIG__); +#else //_WIN32 + constexpr EngineHash value = CT_MURMUR2(__PRETTY_FUNCTION__); +#endif //_WIN32 + return value; + } + + public: + SubscriptionManager(DispatcherPtr dispatcher) + : dispatcher_(std::move(dispatcher)) { + disposed_.clear(); + } + + /** + * Detaches the dispatcher from all engines and stops thread handlers + * execution. + */ + void dispose() { + if (!disposed_.test_and_set()) { + { + std::shared_lock lock(engines_cs_); + for (auto &descriptor : engines_) { + utils::reinterpret_pointer_cast<IDisposable>(descriptor.second) + ->dispose(); + } + } + dispatcher_->dispose(); + } + } + + /** + * Method returns the engine corresponding to current arguments set + * transmission. + * @tparam EventKey typeof event enum + * @tparam Args arguments list of transmitted event data types + * @return engine object + */ + template <typename EventKey, typename... Args> + auto getEngine() { + using EngineType = + SubscriptionEngine<EventKey, + Dispatcher, + Subscriber<EventKey, Dispatcher, Args...>>; + constexpr auto engineId = getSubscriptionHash<Args...>(); + { + std::shared_lock lock(engines_cs_); + if (auto it = engines_.find(engineId); it != engines_.end()) { + return utils::reinterpret_pointer_cast<EngineType>(it->second); + } + } + std::unique_lock lock(engines_cs_); + if (auto it = engines_.find(engineId); it != engines_.end()) { + return utils::reinterpret_pointer_cast<EngineType>(it->second); + } + + /// To be sure IDisposable is the first base class, because of later cast + static_assert(std::is_base_of_v<IDisposable, EngineType>, + "Engine type must be derived from IDisposable."); + assert(uintptr_t(reinterpret_cast<EngineType *>(0x1)) + == uintptr_t(static_cast<IDisposable *>( + reinterpret_cast<EngineType *>(0x1)))); + + auto obj = std::make_shared<EngineType>(dispatcher_); + engines_[engineId] = utils::reinterpret_pointer_cast<void>(obj); + return obj; + } + + /** + * Make event notification to subscribers that are listening to this event + * @tparam EventKey typeof event enum + * @tparam Args arguments list of transmitted event data types + * @param key event key + * @param args transmitted data + */ + template <typename EventKey, typename... Args> + void notify(const EventKey &key, const Args &...args) { + notifyDelayed(std::chrono::microseconds(0ull), key, args...); + } + + /** + * Make event notification to subscribers that are listening this event + * after a delay + * @tparam EventKey typeof event enum + * @tparam Args arguments list of transmitted event data types + * @param timeout delay before subscribers will be notified + * @param key event key + * @param args transmitted data + */ + template <typename EventKey, typename... Args> + void notifyDelayed(std::chrono::microseconds timeout, + const EventKey &key, + const Args &...args) { + using EngineType = + SubscriptionEngine<EventKey, + Dispatcher, + Subscriber<EventKey, Dispatcher, Args...>>; + constexpr auto engineId = getSubscriptionHash<Args...>(); + std::shared_ptr<EngineType> engine; + { + std::shared_lock lock(engines_cs_); + if (auto it = engines_.find(engineId); it != engines_.end()) { + engine = utils::reinterpret_pointer_cast<EngineType>(it->second); + } else { + return; + } + } + assert(engine); + engine->notifyDelayed(timeout, key, args...); + } + + /** + * Getter to retrieve a dispatcher. + * @return dispatcher object + */ + DispatcherPtr dispatcher() const { + return dispatcher_; + } + }; +} // namespace jam::se diff --git a/src/se/impl/sync_dispatcher_impl.hpp b/src/se/impl/sync_dispatcher_impl.hpp new file mode 100644 index 0000000..b59aa97 --- /dev/null +++ b/src/se/impl/sync_dispatcher_impl.hpp @@ -0,0 +1,59 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "common.hpp" +#include "dispatcher.hpp" + +namespace jam::se { + + template <uint32_t kCount, uint32_t kPoolSize> + class SyncDispatcher final : public IDispatcher, + utils::NoCopy, + utils::NoMove { + private: + using Parent = IDispatcher; + + public: + SyncDispatcher() = default; + + void dispose() override {} + + void add(typename Parent::Tid /*tid*/, + typename Parent::Task &&task) override { + task(); + } + + void addDelayed(typename Parent::Tid /*tid*/, + std::chrono::microseconds /*timeout*/, + typename Parent::Task &&task) override { + task(); + } + + void repeat(Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task, + typename Parent::Predicate &&pred) override { + if (!pred || pred()) { + task(); + } + } + + std::optional<Tid> bind(std::shared_ptr<IScheduler> scheduler) override { + if (!scheduler) { + return std::nullopt; + } + + return kCount; + } + + bool unbind(Tid tid) override { + return tid == kCount; + } + }; + +} // namespace jam::se diff --git a/src/se/impl/thread_handler.hpp b/src/se/impl/thread_handler.hpp new file mode 100644 index 0000000..c1bd83d --- /dev/null +++ b/src/se/impl/thread_handler.hpp @@ -0,0 +1,36 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <assert.h> +#include <thread> + +#include "scheduler_impl.hpp" + +namespace jam::se { + + class ThreadHandler final : public SchedulerBase { + private: + std::thread worker_; + + public: + ThreadHandler() { + worker_ = std::thread( + [](ThreadHandler *__this) { return __this->process(); }, this); + } + + void dispose(bool wait_for_release = true) { + SchedulerBase::dispose(wait_for_release); + if (wait_for_release) { + worker_.join(); + } else { + worker_.detach(); + } + } + }; + +} // namespace jam::se diff --git a/src/se/subscription.cpp b/src/se/subscription.cpp new file mode 100644 index 0000000..3f5f7d1 --- /dev/null +++ b/src/se/subscription.cpp @@ -0,0 +1,30 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "subscription.hpp" + +#include <mutex> + +namespace jam::se { + + std::shared_ptr<Subscription> getSubscription() { + static std::weak_ptr<Subscription> engine; + if (auto ptr = engine.lock()) { + return ptr; + } + + static std::mutex engine_cs; + std::lock_guard<std::mutex> lock(engine_cs); + if (auto ptr = engine.lock()) { + return ptr; + } + + auto ptr = std::make_shared<Subscription>(getDispatcher()); + engine = ptr; + return ptr; + } + +} // namespace jam::se diff --git a/src/se/subscription.hpp b/src/se/subscription.hpp new file mode 100644 index 0000000..4c8a963 --- /dev/null +++ b/src/se/subscription.hpp @@ -0,0 +1,48 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <memory> + +#include "impl/common.hpp" +#include "impl/subscriber_impl.hpp" +#include "impl/subscription_manager.hpp" +#include "subscription_fwd.hpp" + +namespace jam::se { + std::shared_ptr<Dispatcher> getDispatcher(); + std::shared_ptr<Subscription> getSubscription(); + + template <typename... T> + constexpr void notifyEngine(std::tuple<T...> &&data) { + std::apply( + [](auto &...x) { (..., getSubscription()->notify(x.first, x.second)); }, + data); + } + + template <typename ObjectType, typename... EventData> + struct SubscriberCreator { + template <EventTypes key, typename F, typename... Args> + static auto create(SubscriptionEngineHandlers tid, + F &&callback, + Args &&...args) { + auto subscriber = BaseSubscriber<ObjectType, EventData...>::create( + getSubscription()->getEngine<EventTypes, EventData...>(), + std::forward<Args>(args)...); + subscriber->setCallback( + [f{std::forward<F>(callback)}](auto /*set_id*/, + auto &object, + auto event_key, + EventData... args) mutable { + assert(key == event_key); + std::forward<F>(f)(object, std::move(args)...); + }); + subscriber->subscribe(0, key, tid); + return subscriber; + } + }; +} // namespace jam::se diff --git a/src/se/subscription_fwd.hpp b/src/se/subscription_fwd.hpp new file mode 100644 index 0000000..86e638a --- /dev/null +++ b/src/se/subscription_fwd.hpp @@ -0,0 +1,46 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <memory> + +namespace jam { + enum SubscriptionEngineHandlers { + kTest = 0, + //--------------- + kTotalCount + }; + + enum EventTypes { + // TEST + kOnTestOperationComplete + }; + + static constexpr uint32_t kThreadPoolSize = 3u; + + namespace se { + struct IDispatcher; + + template <uint32_t kHandlersCount, uint32_t kPoolSize> + class SubscriptionManager; + + template <typename EventKey, + typename Dispatcher, + typename Receiver, + typename... Arguments> + class SubscriberImpl; + } // namespace se + + using Dispatcher = se::IDispatcher; + using Subscription = + se::SubscriptionManager<SubscriptionEngineHandlers::kTotalCount, + kThreadPoolSize>; + template <typename ObjectType, typename... EventData> + using BaseSubscriber = + se::SubscriberImpl<EventTypes, Dispatcher, ObjectType, EventData...>; + +} // namespace jam diff --git a/src/se/sync_dispatcher.cpp b/src/se/sync_dispatcher.cpp new file mode 100644 index 0000000..7ef5d77 --- /dev/null +++ b/src/se/sync_dispatcher.cpp @@ -0,0 +1,20 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <memory> + +#include "impl/sync_dispatcher_impl.hpp" +#include "subscription.hpp" + +namespace jam::se { + + std::shared_ptr<Dispatcher> getDispatcher() { + return std::make_shared< + SyncDispatcher<SubscriptionEngineHandlers::kTotalCount, + kThreadPoolSize>>(); + } + +} // namespace jam::se diff --git a/src/utils/ctor_limiters.hpp b/src/utils/ctor_limiters.hpp index 2563084..290fe97 100644 --- a/src/utils/ctor_limiters.hpp +++ b/src/utils/ctor_limiters.hpp @@ -9,6 +9,8 @@ #include <cstddef> #include <stdexcept> +#include <fmt/format.h> + namespace jam { class NonCopyable { @@ -42,14 +44,16 @@ namespace jam { }; template <typename T> - class Singleton : public NonCopyable, public NonMovable { + requires std::same_as<T, std::decay_t<T>> + class Singleton : NonCopyable, NonMovable { using BaseType = T; public: Singleton() { if (exists.test_and_set(std::memory_order_acquire)) { throw std::logic_error( - "Attempt to create one more instance of singleton"); + fmt::format("Attempt to create one more instance of singleton '{}'", + typeid(BaseType).name())); } } ~Singleton() { diff --git a/vcpkg-overlay/qtils/portfile.cmake b/vcpkg-overlay/qtils/portfile.cmake index ce7f86a..1f5beb8 100644 --- a/vcpkg-overlay/qtils/portfile.cmake +++ b/vcpkg-overlay/qtils/portfile.cmake @@ -2,8 +2,9 @@ vcpkg_check_linkage(ONLY_STATIC_LIBRARY) vcpkg_from_github( OUT_SOURCE_PATH SOURCE_PATH REPO qdrvm/qtils - REF refs/tags/v0.1.0 - SHA512 301987eefc98b66c42dcf731d73c11c3e9835098fc3d9a1b8e3adef9b73dad6b0198019d416e1809956620377b48e575157d56b278dcdcf65a24ecdfc134605e +# REF refs/tags/v0.1.0 + REF 163287d1e521788ebccc74bfcf8d1199cb25643b + SHA512 bd4d1f319dc2c810a2f6204d93f63c67f4b0139ddd9dedb9e374f8cb964a1ad7eb46f93f667b3b248abf8949b1f577e86c53a98aae04af926373c98c2c7582ee ) vcpkg_cmake_configure(SOURCE_PATH "${SOURCE_PATH}") vcpkg_cmake_install() diff --git a/vcpkg.json b/vcpkg.json index c3dbc24..43afc63 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -12,7 +12,8 @@ "boost-program-options", "boost-asio", "boost-beast", - "prometheus-cpp" + "prometheus-cpp", + "ftxui" ], "features": { "test": { "description": "Test", "dependencies": ["gtest"]}