diff --git a/.gitignore b/.gitignore index 4933a2d..70315af 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,7 @@ .idea /build +/build-ex /cmake-build-* /.build diff --git a/CMakeLists.txt b/CMakeLists.txt index a84f22b..7bc2e53 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,7 @@ set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) set(CMAKE_POSITION_INDEPENDENT_CODE ON) +set(CMAKE_EXPERIMENTAL_CXX_MODULE_CMAKE_API ON) project(cpp-jam VERSION 0.0.1 @@ -40,6 +41,12 @@ find_package(Boost.DI CONFIG REQUIRED) find_package(qtils CONFIG REQUIRED) find_package(prometheus-cpp CONFIG REQUIRED) +if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + add_compile_options(-fmodules-ts) +elseif(CMAKE_CXX_COMPILER_ID MATCHES "Clang|AppleClang") + add_compile_options(-fmodules) +endif() + add_library(headers INTERFACE) target_include_directories(headers INTERFACE $ diff --git a/example/config.yaml b/example/config.yaml index 1992f97..499064a 100644 --- a/example/config.yaml +++ b/example/config.yaml @@ -1,5 +1,7 @@ general: name: NameFromConfig + base_path: /tmp/jam_node + modules_dir: modules metrics: enabled: true diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 08f0356..87a038b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -24,3 +24,8 @@ add_subdirectory(metrics) # Clocks and time subsystem add_subdirectory(clock) +# Subscription Engine subsystem +add_subdirectory(se) + +# Modules subsystem +add_subdirectory(modules) diff --git a/src/app/CMakeLists.txt b/src/app/CMakeLists.txt index 61fd884..3fc38cd 100644 --- a/src/app/CMakeLists.txt +++ b/src/app/CMakeLists.txt @@ -28,7 +28,9 @@ add_library(build_version add_library(app_configuration SHARED configuration.cpp) target_link_libraries(app_configuration - Boost::boost) + Boost::boost + fmt::fmt +) add_library(app_configurator SHARED configurator.cpp) target_link_libraries(app_configurator diff --git a/src/app/configuration.cpp b/src/app/configuration.cpp index d2f1aa1..c00786c 100644 --- a/src/app/configuration.cpp +++ b/src/app/configuration.cpp @@ -18,6 +18,14 @@ namespace jam::app { return name_; } + std::filesystem::path Configuration::basePath() const { + return base_path_; + } + + std::filesystem::path Configuration::modulesDir() const { + return modules_dir_; + } + std::optional Configuration::metricsEndpoint() const { return metrics_endpoint_; diff --git a/src/app/configuration.hpp b/src/app/configuration.hpp index 380dff5..12c3ad5 100644 --- a/src/app/configuration.hpp +++ b/src/app/configuration.hpp @@ -7,6 +7,7 @@ #include +#include #include #include @@ -22,6 +23,9 @@ namespace jam::app { [[nodiscard]] std::string nodeVersion() const; [[nodiscard]] std::string nodeName() const; + [[nodiscard]] std::filesystem::path basePath() const; + [[nodiscard]] std::filesystem::path modulesDir() const; + [[nodiscard]] std::optional metricsEndpoint() const; private: @@ -29,6 +33,8 @@ namespace jam::app { std::string version_; std::string name_; + std::filesystem::path base_path_; + std::filesystem::path modules_dir_; Endpoint metrics_endpoint_; std::optional metrics_enabled_; diff --git a/src/app/configurator.cpp b/src/app/configurator.cpp index 3963bdc..97655a2 100644 --- a/src/app/configurator.cpp +++ b/src/app/configurator.cpp @@ -3,20 +3,20 @@ * SPDX-License-Identifier: Apache-2.0 */ -#include "app/configuration.hpp" +#include "app/configurator.hpp" #include #include #include +#include #include #include #include +#include #include "app/build_version.hpp" -#include "app/configurator.hpp" - -#include +#include "app/configuration.hpp" using Endpoint = boost::asio::ip::tcp::endpoint; @@ -27,6 +27,8 @@ OUTCOME_CPP_DEFINE_CATEGORY(jam::app, Configurator::Error, e) { return "CLI Arguments parse failed"; case E::ConfigFileParseFailed: return "Config file parse failed"; + case E::InvalidValue: + return "Result config has invalid values"; } BOOST_UNREACHABLE_RETURN("Unknown log::Error"); } @@ -84,23 +86,25 @@ namespace jam::app { po::options_description general_options("General options", 120, 100); general_options.add_options() - ("help,h", "show this help message") - ("version,v", "show version information") - ("name,n", po::value(), "set name of node") - ("config,c", po::value(), "optional, filepath to load configuration from. Overrides default config values") + ("help,h", "Show this help message") + ("version,v", "Show version information") + ("base_path", po::value(), "Set base path. All relative paths will be resolved based on this path.") + ("config,c", po::value(), "Optional. Filepath to load configuration from. Overrides default configuration values.") + ("name,n", po::value(), "Set name of node") ("log,l", po::value>(), "Sets a custom logging filter.\n" - "Syntax is `=`, e.g. -llibp2p=off.\n" - "Log levels (most to least verbose) are trace, debug, verbose, info, warn, error, critical, off.\n" - "By default, all targets log `info`.\n" - "The global log level can be set with -l.") + "Syntax: =, e.g., -llibp2p=off.\n" + "Log levels: trace, debug, verbose, info, warn, error, critical, off.\n" + "Default: all targets log at `info`.\n" + "Global log level can be set with: -l.") + ("modules_dir", po::value(), "Set path to modules directory.") ; po::options_description metrics_options("Metric options"); metrics_options.add_options() - ("prometheus-disable", "set to disable OpenMetrics") - ("prometheus-host", po::value(), "address for OpenMetrics over HTTP") - ("prometheus-port", po::value(), "port for OpenMetrics over HTTP") + ("prometheus-disable", "Set to disable OpenMetrics") + ("prometheus-host", po::value(), "Set address for OpenMetrics over HTTP") + ("prometheus-port", po::value(), "Set port for OpenMetrics over HTTP") ; // clang-format on @@ -110,7 +114,7 @@ namespace jam::app { .add(metrics_options); } - outcome::result Configurator::step1() { + outcome::result Configurator::step1() { // read min cli-args and config namespace po = boost::program_options; namespace fs = std::filesystem; @@ -120,7 +124,7 @@ namespace jam::app { po::variables_map vm; - // first-run parse to read only general options and to lookup for "help", + // first-run parse to read-only general options and to lookup for "help", // "config" and "version". all the rest options are ignored try { po::parsed_options parsed = po::command_line_parser(argc_, argv_) @@ -192,7 +196,8 @@ namespace jam::app { } outcome::result> Configurator::calculateConfig( - std::shared_ptr logger) { + qtils::StrictSharedPtr logger) { + logger_ = std::move(logger); OUTCOME_TRY(initGeneralConfig()); OUTCOME_TRY(initOpenMetricsConfig()); @@ -200,6 +205,7 @@ namespace jam::app { } outcome::result Configurator::initGeneralConfig() { + // Init by config-file if (config_file_.has_value()) { auto section = (*config_file_)["general"]; if (section.IsDefined()) { @@ -214,13 +220,49 @@ namespace jam::app { file_has_error_ = true; } } + auto base_path = section["base_path"]; + if (base_path.IsDefined()) { + if (base_path.IsScalar()) { + auto value = base_path.as(); + config_->base_path_ = value; + } else { + file_errors_ << "E: Value 'general.base_path' must be scalar\n"; + file_has_error_ = true; + } + } + auto modules_dir = section["modules_dir"]; + if (modules_dir.IsDefined()) { + if (modules_dir.IsScalar()) { + auto value = modules_dir.as(); + config_->modules_dir_ = value; + } else { + file_errors_ << "E: Value 'general.modules_dir' must be scalar\n"; + file_has_error_ = true; + } + } } else { - file_errors_ << "E: Section 'general' defined, but is not scalar\n"; + file_errors_ << "E: Section 'general' defined, but is not map\n"; file_has_error_ = true; } } } + if (file_has_error_) { + std::string path; + find_argument( + cli_values_map_, "config", [&](const std::string &value) { + path = value; + }); + SL_ERROR(logger_, "Config file `{}` has some problems:", path); + std::istringstream iss(file_errors_.str()); + std::string line; + while (std::getline(iss, line)) { + SL_ERROR(logger_, " {}", std::string_view(line).substr(3)); + } + return Error::ConfigFileParseFailed; + } + + // Adjust by CLI arguments bool fail; fail = false; @@ -228,10 +270,47 @@ namespace jam::app { cli_values_map_, "name", [&](const std::string &value) { config_->name_ = value; }); + find_argument( + cli_values_map_, "base_path", [&](const std::string &value) { + config_->base_path_ = value; + }); + find_argument( + cli_values_map_, "modules_dir", [&](const std::string &value) { + config_->modules_dir_ = value; + }); if (fail) { return Error::CliArgsParseFailed; } + // Check values + if (not config_->base_path_.is_absolute()) { + SL_ERROR(logger_, + "The 'base_path' must be defined as absolute: {}", + config_->base_path_.c_str()); + return Error::InvalidValue; + } + if (not is_directory(config_->base_path_)) { + SL_ERROR(logger_, + "The 'base_path' does not exist or is not a directory: {}", + config_->base_path_.c_str()); + return Error::InvalidValue; + } + current_path(config_->base_path_); + + auto make_absolute = [&](const std::filesystem::path &path) { + return weakly_canonical(config_->base_path_.is_absolute() + ? path + : (config_->base_path_ / path)); + }; + + config_->modules_dir_ = make_absolute(config_->modules_dir_); + if (not is_directory(config_->modules_dir_)) { + SL_ERROR(logger_, + "The 'modules_dir' does not exist or is not a directory: {}", + config_->modules_dir_.c_str()); + return Error::InvalidValue; + } + return outcome::success(); } diff --git a/src/app/configurator.hpp b/src/app/configurator.hpp index 8fd1a8e..dbed656 100644 --- a/src/app/configurator.hpp +++ b/src/app/configurator.hpp @@ -6,8 +6,10 @@ #pragma once #include +#include #include #include +#include #include #include "injector/dont_inject.hpp" @@ -27,6 +29,7 @@ namespace jam::app { enum class Error : uint8_t { CliArgsParseFailed, ConfigFileParseFailed, + InvalidValue, }; DONT_INJECT(Configurator); @@ -49,7 +52,7 @@ namespace jam::app { outcome::result getLoggingConfig(); outcome::result> calculateConfig( - std::shared_ptr logger); + qtils::StrictSharedPtr logger); private: outcome::result initGeneralConfig(); @@ -60,6 +63,7 @@ namespace jam::app { const char **env_; std::shared_ptr config_; + std::shared_ptr logger_; std::optional config_file_; bool file_has_warn_ = false; diff --git a/src/app/impl/application_impl.cpp b/src/app/impl/application_impl.cpp index 2f29cb1..b5a8181 100644 --- a/src/app/impl/application_impl.cpp +++ b/src/app/impl/application_impl.cpp @@ -20,12 +20,12 @@ namespace jam::app { ApplicationImpl::ApplicationImpl( - std::shared_ptr logsys, - std::shared_ptr config, - std::shared_ptr state_manager, - std::shared_ptr watchdog, - std::shared_ptr metrics_exposer, - std::shared_ptr system_clock) + qtils::StrictSharedPtr logsys, + qtils::StrictSharedPtr config, + qtils::StrictSharedPtr state_manager, + qtils::StrictSharedPtr watchdog, + qtils::StrictSharedPtr metrics_exposer, + qtils::StrictSharedPtr system_clock) : logger_(logsys->getLogger("Application", "application")), app_config_(std::move(config)), state_manager_(std::move(state_manager)), @@ -34,15 +34,13 @@ namespace jam::app { system_clock_(std::move(system_clock)), metrics_registry_(metrics::createRegistry()) { // Metric for exposing name and version of node - constexpr auto buildInfoMetricName = "jam_build_info"; - metrics_registry_->registerGaugeFamily( - buildInfoMetricName, - "A metric with a constant '1' value labeled by name, version"); - auto metric_build_info = metrics_registry_->registerGaugeMetric( - buildInfoMetricName, - {{"name", app_config_->nodeName()}, - {"version", app_config_->nodeVersion()}}); - metric_build_info->set(1); + metrics::GaugeHelper( + "jam_build_info", + "A metric with a constant '1' value labeled by name, version", + std::map{ + {"name", app_config_->nodeName()}, + {"version", app_config_->nodeVersion()}}) + ->set(1); } void ApplicationImpl::run() { diff --git a/src/app/impl/application_impl.hpp b/src/app/impl/application_impl.hpp index 43a4d38..105a1df 100644 --- a/src/app/impl/application_impl.hpp +++ b/src/app/impl/application_impl.hpp @@ -10,6 +10,8 @@ #include +#include + #include namespace jam { @@ -43,22 +45,22 @@ namespace jam::app { class ApplicationImpl final : public Application { public: - ApplicationImpl(std::shared_ptr logsys, - std::shared_ptr config, - std::shared_ptr state_manager, - std::shared_ptr watchdog, - std::shared_ptr metrics_exposer, - std::shared_ptr system_clock); + ApplicationImpl(qtils::StrictSharedPtr logsys, + qtils::StrictSharedPtr config, + qtils::StrictSharedPtr state_manager, + qtils::StrictSharedPtr watchdog, + qtils::StrictSharedPtr metrics_exposer, + qtils::StrictSharedPtr system_clock); void run() override; private: - std::shared_ptr logger_; - std::shared_ptr app_config_; - std::shared_ptr state_manager_; - std::shared_ptr watchdog_; - std::shared_ptr metrics_exposer_; - std::shared_ptr system_clock_; + qtils::StrictSharedPtr logger_; + qtils::StrictSharedPtr app_config_; + qtils::StrictSharedPtr state_manager_; + qtils::StrictSharedPtr watchdog_; + qtils::StrictSharedPtr metrics_exposer_; + qtils::StrictSharedPtr system_clock_; // Metrics std::unique_ptr metrics_registry_; diff --git a/src/app/impl/state_manager_impl.cpp b/src/app/impl/state_manager_impl.cpp index f5e2b73..0d43e14 100644 --- a/src/app/impl/state_manager_impl.cpp +++ b/src/app/impl/state_manager_impl.cpp @@ -92,7 +92,7 @@ namespace jam::app { } StateManagerImpl::StateManagerImpl( - std::shared_ptr logging_system) + qtils::StrictSharedPtr logging_system) : logger_(logging_system->getLogger("StateManager", "application")), logging_system_(std::move(logging_system)) { shuttingDownSignalsEnable(); diff --git a/src/app/impl/state_manager_impl.hpp b/src/app/impl/state_manager_impl.hpp index 13f5f26..50cbf54 100644 --- a/src/app/impl/state_manager_impl.hpp +++ b/src/app/impl/state_manager_impl.hpp @@ -9,10 +9,11 @@ #include "app/state_manager.hpp" #include -#include #include #include +#include + #include "utils/ctor_limiters.hpp" namespace soralog { @@ -29,7 +30,7 @@ namespace jam::app { public StateManager, public std::enable_shared_from_this { public: - StateManagerImpl(std::shared_ptr logging_system); + StateManagerImpl(qtils::StrictSharedPtr logging_system); ~StateManagerImpl() override; @@ -66,8 +67,8 @@ namespace jam::app { void shutdownRequestWaiting(); - std::shared_ptr logger_; - std::shared_ptr logging_system_; + qtils::StrictSharedPtr logger_; + qtils::StrictSharedPtr logging_system_; std::atomic state_ = State::Init; diff --git a/src/executable/CMakeLists.txt b/src/executable/CMakeLists.txt index 392671f..0a0234c 100644 --- a/src/executable/CMakeLists.txt +++ b/src/executable/CMakeLists.txt @@ -35,6 +35,7 @@ else () add_executable(jam_node jam_node.cpp) target_link_libraries(jam_node ${LIBRARIES}) endif () +add_dependencies(jam_node all_modules) #if (BACKWARD) # add_backward(jam_node) diff --git a/src/executable/jam_node.cpp b/src/executable/jam_node.cpp index af58cdf..aab5296 100644 --- a/src/executable/jam_node.cpp +++ b/src/executable/jam_node.cpp @@ -4,23 +4,114 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include +#include #include #include -#include - #include "app/application.hpp" #include "app/configuration.hpp" #include "app/configurator.hpp" #include "injector/node_injector.hpp" +#include "loaders/impl/example_loader.hpp" #include "log/logger.hpp" +#include "modules/module_loader.hpp" +#include "se/subscription.hpp" using std::string_view_literals::operator""sv; // NOLINTBEGIN(cppcoreguidelines-pro-bounds-pointer-arithmetic) +template +struct Channel { + struct _Receiver; + struct _Sender; + + struct _Receiver { + using Other = _Sender; + }; + struct _Sender { + using Other = _Receiver; + }; + + template + struct Endpoint { + static_assert(std::is_same_v || std::is_same_v, + "Incorrect type"); + static constexpr bool IsReceiver = std::is_same_v; + static constexpr bool IsSender = std::is_same_v; + + void register_opp(Endpoint& opp) { + opp_ = &opp; + } + + void unregister_opp(Endpoint& opp) { + assert(opp_ == &opp); + opp_ = nullptr; + } + + ~Endpoint() requires(IsSender) { + if (opp_) { + opp_->context_.event_.set(); + opp_->unregister_opp(*this); + opp_ = nullptr; + } + } + + ~Endpoint() requires(IsReceiver) { + if (opp_) { + opp_->unregister_opp(*this); + opp_ = nullptr; + } + } + + void set(T&& t) requires(IsSender) { + opp_->context_.data_ = std::move(t); + opp_->context_.event_.set(); + } + + void set(T& t) requires(IsSender) { + opp_->context_.data_ = t; + opp_->context_.event_.set(); + } + + std::optional wait() requires(IsReceiver) { + context_.event_.wait(); + return std::move(context_.data_); + } + + private: + friend struct Endpoint; + + struct ExecutionContext { + jam::se::utils::WaitForSingleObject event_; + std::optional data_; + }; + + Endpoint* opp_ = nullptr; + std::conditional_t, + ExecutionContext, + std::monostate> + context_; + }; + + using Receiver = Endpoint<_Receiver>; + using Sender = Endpoint<_Sender>; +}; + +void tttt() { + Channel::Receiver r; + Channel::Sender s; + + r.register_opp(s); + s.register_opp(r); + + int q = 10; + s.set(q); +} + namespace { void wrong_usage() { std::cerr << "Wrong usage.\n" @@ -32,16 +123,52 @@ namespace { using jam::injector::NodeInjector; using jam::log::LoggingSystem; + int run_node(std::shared_ptr logsys, std::shared_ptr appcfg) { auto injector = std::make_unique(logsys, appcfg); + qtils::FinalAction dispose_se_on_exit( + [se_manager{injector->getSE()}] { se_manager->dispose(); }); + + // Load modules + { + auto logger = logsys->getLogger("Modules", "jam"); + const std::string path(appcfg->modulesDir()); + + jam::modules::ModuleLoader module_loader(path); + auto modules = module_loader.get_modules(); + if (modules.has_error()) { + SL_CRITICAL(logger, "Failed to load modules from path: {}", path); + return EXIT_FAILURE; + } + + std::deque> loaders; + for (const auto &module : modules.value()) { + if ("ExampleLoader" == module->get_loader_id()) { + auto loader = std::make_shared( + *injector, logsys, module); + if (auto info = loader->module_info()) { + SL_INFO(logger, "> Module: {} [{}]", *info, module->get_path()); + loaders.emplace_back(loader); + loader->start(); + } + } if ("PVM_Module_Rust" == module->get_loader_id()) { + // auto loader = std::make_shared( + // *injector, logsys, module); + // if (auto info = loader->module_info()) { + // SL_INFO(logger, "> Module: {} [{}]", *info, module->get_path()); + // loaders.emplace_back(loader); + // loader->start(); + // } + } + } + } auto logger = logsys->getLogger("Main", jam::log::defaultGroupName); - auto app = injector->injectApplication(); - SL_INFO(logger, "Node started. Version: {} ", appcfg->nodeVersion()); + tttt(); app->run(); SL_INFO(logger, "Node stopped"); diff --git a/src/injector/CMakeLists.txt b/src/injector/CMakeLists.txt index 7030c72..3b6979b 100644 --- a/src/injector/CMakeLists.txt +++ b/src/injector/CMakeLists.txt @@ -15,4 +15,6 @@ target_link_libraries(node_injector application metrics clock + se_async + modules ) diff --git a/src/injector/node_injector.cpp b/src/injector/node_injector.cpp index 48ef7d5..36b281c 100644 --- a/src/injector/node_injector.cpp +++ b/src/injector/node_injector.cpp @@ -29,7 +29,7 @@ namespace { template auto useConfig(C c) { - return boost::di::bind >().to( + return boost::di::bind>().to( std::move(c))[boost::di::override]; } @@ -50,6 +50,9 @@ namespace { di::bind.to(logsys), di::bind.to(), di::bind.to(), + di::bind.to([](const auto &injector) { + return jam::se::getDispatcher(); + }), di::bind.to([](const auto &injector) { return metrics::Exposer::Configuration{ {boost::asio::ip::address_v4::from_string("127.0.0.1"), 7777} @@ -96,6 +99,12 @@ namespace jam::injector { std::shared_ptr NodeInjector::injectApplication() { return pimpl_->injector_ - .template create >(); + .template create>(); } + + std::shared_ptr NodeInjector::getSE() { + return pimpl_->injector_ + .template create>(); + } + } // namespace jam::injector diff --git a/src/injector/node_injector.hpp b/src/injector/node_injector.hpp index 6be0473..294ccda 100644 --- a/src/injector/node_injector.hpp +++ b/src/injector/node_injector.hpp @@ -7,6 +7,7 @@ #pragma once #include +#include "se/subscription.hpp" namespace jam::log { class LoggingSystem; @@ -29,6 +30,7 @@ namespace jam::injector { std::shared_ptr configuration); std::shared_ptr injectApplication(); + std::shared_ptr getSE(); protected: std::shared_ptr pimpl_; diff --git a/src/loaders/README b/src/loaders/README new file mode 100644 index 0000000..31be259 --- /dev/null +++ b/src/loaders/README @@ -0,0 +1 @@ +# loaders are locating here \ No newline at end of file diff --git a/src/loaders/impl/example_loader.hpp b/src/loaders/impl/example_loader.hpp new file mode 100644 index 0000000..f3e4d3a --- /dev/null +++ b/src/loaders/impl/example_loader.hpp @@ -0,0 +1,65 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include + +#include "loaders/loader.hpp" +#include "log/logger.hpp" +#include "modules/example/example.hpp" +#include "se/subscription.hpp" + +namespace jam::loaders { + + class ExampleLoader final + : public std::enable_shared_from_this, + public Loader, + public modules::ExampleModuleLoader { + struct __T{}; + std::shared_ptr logsys_; + + using InitCompleteSubscriber = BaseSubscriber<__T>; + std::shared_ptr on_init_complete_; + + public: + ExampleLoader(injector::NodeInjector &injector, + std::shared_ptr logsys, + std::shared_ptr module) + : Loader(injector, std::move(module)), logsys_(std::move(logsys)) {} + + ExampleLoader(const ExampleLoader &) = delete; + ExampleLoader &operator=(const ExampleLoader &) = delete; + + ~ExampleLoader() = default; + + void start() { + auto function = module_->getFunctionFromLibrary< + std::weak_ptr, + std::shared_ptr, + std::shared_ptr>("query_module_instance"); + + auto se_manager = injector_.getSE(); + + if (function) { + auto module_internal = (*function)(shared_from_this(), logsys_); + on_init_complete_ = se::SubscriberCreator<__T>::template create< + EventTypes::kOnTestOperationComplete>( + *se_manager, + SubscriptionEngineHandlers::kTest, [module_internal](auto &) { + if (auto m = module_internal.lock()) { + m->on_loaded_success(); + } + }); + + se_manager->notify( + jam::EventTypes::kOnTestOperationComplete); + } + } + }; +} // namespace jam::loaders diff --git a/src/loaders/loader.hpp b/src/loaders/loader.hpp new file mode 100644 index 0000000..451ddaa --- /dev/null +++ b/src/loaders/loader.hpp @@ -0,0 +1,39 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "injector/node_injector.hpp" +#include "modules/module.hpp" + +namespace jam::loaders { + + class Loader { + public: + Loader(const Loader &) = delete; + Loader &operator=(const Loader &) = delete; + + virtual ~Loader() = default; + virtual void start() = 0; + + std::optional module_info() { + auto result = + module_->getFunctionFromLibrary("module_info"); + if (result) { + return (*result)(); + } + return std::nullopt; + } + + Loader(injector::NodeInjector &injector, + std::shared_ptr module) + : injector_(injector), module_(std::move(module)) {} + + protected: + injector::NodeInjector &injector_; + std::shared_ptr module_; + }; +} // namespace jam::loaders diff --git a/src/loaders/request_response.hpp b/src/loaders/request_response.hpp new file mode 100644 index 0000000..61cfbe8 --- /dev/null +++ b/src/loaders/request_response.hpp @@ -0,0 +1,87 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "injector/node_injector.hpp" +#include "se/impl/common.hpp" +#include +#include +#include "modules/module.hpp" + +namespace jam::loaders { + + template + struct ResponseContext { + enum class Error : uint8_t { + REQUEST_RESPONSE_DELETED = 1, + }; + + using Response = outcome::result; + virtual ~ResponseContext() = default; + virtual void on_response(Response response) = 0; + }; + + template + struct RequestResponse final : public ResponseContext, se::utils::NoCopy { + using typename ResponseContext::Response; + + RequestResponse(FRespCb &&on_response_cb, Args&&... args) + : on_response_cb_(std::move(on_response_cb)), args_(std::forward(args)...) {} + + ~RequestResponse() override { + if (on_response_cb_) { + (*on_response_cb_)(args(), outcome::failure(std::error_code( + static_cast(ResponseContext::Error::REQUEST_RESPONSE_DELETED)))); + } + } + + void on_response(Response response) override { + if (on_response_cb_) { + (*on_response_cb_)(args(), std::move(response)); + on_response_cb_ = std::nullopt; + } + } + + std::tuple &args() { + return args_; + } + + const std::tuple &args() const { + return args_; + } + + private: + std::optional on_response_cb_; + std::tuple args_; + }; + + template + inline auto make_request(FRespCb &&on_response_cb, Args&&... args) { + return RequestResponse, std::decay_t...>( + std::forward(on_response_cb), + std::forward(args)... + ); + } + + /* Example: + auto r2 = jam::loaders::make_request( + [](const std::tuple& args, outcome::result resp) { + if(resp) { + std::cout << "Успех: " << resp.value() << std::endl; + } else { + std::cout << "Ошибка при обработке " + << std::get<0>(args) << " (код: " + << std::get<1>(args) << ")" << std::endl; + } + }, + "запрос данных", 42 + ); + + r2.on_response(outcome::result("результат")); + */ + +} // namespace jam::loaders diff --git a/src/log/logger.hpp b/src/log/logger.hpp index 5cce5fd..c35952e 100644 --- a/src/log/logger.hpp +++ b/src/log/logger.hpp @@ -9,19 +9,22 @@ #include #include -#include #include +#include +#include #include #include #include #include -#include "utils/ctor_limiters.hpp" #include "injector/dont_inject.hpp" +#include "utils/ctor_limiters.hpp" namespace jam::log { using soralog::Level; + using Logger = qtils::StrictSharedPtr; + enum class Error : uint8_t { WRONG_LEVEL = 1, WRONG_GROUP, WRONG_LOGGER }; outcome::result str2lvl(std::string_view str); @@ -58,23 +61,22 @@ namespace jam::log { return logging_system_->getLogger(logger_name, group_name, level); } - [[nodiscard]] - bool setLevelOfGroup(const std::string &group_name, Level level) const { + [[nodiscard]] bool setLevelOfGroup(const std::string &group_name, + Level level) const { return logging_system_->setLevelOfGroup(group_name, level); } - [[nodiscard]] - bool resetLevelOfGroup(const std::string &group_name) const { + [[nodiscard]] bool resetLevelOfGroup(const std::string &group_name) const { return logging_system_->resetLevelOfGroup(group_name); } - [[nodiscard]] - bool setLevelOfLogger(const std::string &logger_name, Level level) const { + [[nodiscard]] bool setLevelOfLogger(const std::string &logger_name, + Level level) const { return logging_system_->setLevelOfLogger(logger_name, level); } - [[nodiscard]] - bool resetLevelOfLogger(const std::string &logger_name) const { + [[nodiscard]] bool resetLevelOfLogger( + const std::string &logger_name) const { return logging_system_->resetLevelOfLogger(logger_name); } diff --git a/src/metrics/histogram_timer.hpp b/src/metrics/histogram_timer.hpp index e907969..446cc78 100644 --- a/src/metrics/histogram_timer.hpp +++ b/src/metrics/histogram_timer.hpp @@ -27,6 +27,12 @@ namespace jam::metrics { registry_->registerGaugeFamily(name, help); metric_ = registry_->registerGaugeMetric(name); } + GaugeHelper(const std::string &name, + const std::string &help, + const std::map &labels) { + registry_->registerGaugeFamily(name, help); + metric_ = registry_->registerGaugeMetric(name, labels); + } auto *operator->() const { return metric_; diff --git a/src/modules/CMakeLists.txt b/src/modules/CMakeLists.txt new file mode 100644 index 0000000..5e5f336 --- /dev/null +++ b/src/modules/CMakeLists.txt @@ -0,0 +1,93 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +if(NOT TARGET all_modules) + add_custom_target(all_modules) +endif() + +function(add_jam_module NAME) + set(MODULE_NAME ${NAME}) + + set(MODULE "${MODULE_NAME}_module") + + # Parse named arguments + cmake_parse_arguments( + # Prefix for parsed argument variables + MODULE + # List of flags (boolean arguments without values) + "" + # List of named arguments with a single value + "" + # List of named arguments with multiple values + "SOURCE;INCLUDE_DIRS;LIBRARIES;DEFINITIONS" + # Input arguments + ${ARGN} + ) + + if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/module.cpp) + message(FATAL_ERROR "Not found `module.cpp` file (main file of module)") + endif () + if (NOT EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/${MODULE_NAME}.hpp" OR NOT EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/${MODULE_NAME}.cpp") + message(FATAL_ERROR "Not found `${MODULE_NAME}.hpp` nor `${MODULE_NAME}.cpp` file (class of module)") + endif () + + # Create a shared module library + add_library(${MODULE} MODULE # or SHARED + module.cpp + ${MODULE_NAME}.cpp + ${MODULE_SOURCE} + ) + + # Set exported symbols visibility + set_target_properties(${MODULE} PROPERTIES + CXX_VISIBILITY_PRESET hidden + VISIBILITY_INLINES_HIDDEN ON + ) + + # Set include directories + if (MODULE_INCLUDE_DIRS) + target_include_directories(${MODULE} PRIVATE + ${MODULE_INCLUDE_DIRS} + ) + endif () + + # Set definitions specified for module + if (MODULE_DEFINITIONS) + target_compile_definitions(${MODULE} PRIVATE + ${MODULE_DEFINITIONS} + ) + endif () + + # Link with libs + if (MODULE_LIBRARIES) + target_link_libraries(${MODULE} + ${MODULE_LIBRARIES} + ) + endif () + + # Set C++ standard + target_compile_features(${MODULE} PRIVATE + cxx_std_20 + ) + + add_dependencies(all_modules ${MODULE}) + +endfunction() + +# -------------- Core-part of module subsystem -------------- + +add_library(modules + module_loader.cpp +) + +target_link_libraries(modules + qtils::qtils +) + +# -------------- Modules -------------- + +# Example module +add_subdirectory(example) \ No newline at end of file diff --git a/src/modules/README b/src/modules/README new file mode 100644 index 0000000..a7c5cfe --- /dev/null +++ b/src/modules/README @@ -0,0 +1 @@ +# modules are locating here \ No newline at end of file diff --git a/src/modules/example/CMakeLists.txt b/src/modules/example/CMakeLists.txt new file mode 100644 index 0000000..7ff734f --- /dev/null +++ b/src/modules/example/CMakeLists.txt @@ -0,0 +1,17 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +add_jam_module(example + SOURCE + example.cpp + INCLUDE_DIRS + ${CMAKE_SOURCE_DIR}/src + DEFINITIONS + SOME_FLAG=1 + LIBRARIES + qtils::qtils + soralog::soralog +) \ No newline at end of file diff --git a/src/modules/example/example.cpp b/src/modules/example/example.cpp new file mode 100644 index 0000000..5eb1c50 --- /dev/null +++ b/src/modules/example/example.cpp @@ -0,0 +1,18 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "modules/example/example.hpp" + +namespace jam::modules { + // ExampleModule::ExampleModule( + // qtils::StrictSharedPtr loader, + // qtils::StrictSharedPtr logging_system) + // : loader_(loader), + // logger_(logging_system->getLogger("ExampleModule", "example_module")) + + // {} + +} // namespace jam::modules diff --git a/src/modules/example/example.hpp b/src/modules/example/example.hpp new file mode 100644 index 0000000..ad5fe69 --- /dev/null +++ b/src/modules/example/example.hpp @@ -0,0 +1,41 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include + +namespace jam::modules { + class ExampleModule; + struct ExampleModuleLoader { + virtual ~ExampleModuleLoader() = default; + }; + + struct ExampleModule { + virtual ~ExampleModule() = default; + virtual void on_loaded_success() = 0; + }; +} // namespace jam::modules + +// class BlockTree; + +namespace jam::modules { + + // class ExampleModule : public Singleton { + // public: + // static std::shared_ptr instance; + // CREATE_SHARED_METHOD(ExampleModule); + + // ExampleModule(qtils::StrictSharedPtr loader, + // qtils::StrictSharedPtr logging_system); + + // qtils::StrictSharedPtr loader_; + // log::Logger logger_; + // }; + +} // namespace jam::modules diff --git a/src/modules/example/module.cpp b/src/modules/example/module.cpp new file mode 100644 index 0000000..7120918 --- /dev/null +++ b/src/modules/example/module.cpp @@ -0,0 +1,49 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#define MODULE_C_API extern "C" __attribute__((visibility("default"))) +#define MODULE_API __attribute__((visibility("default"))) + +MODULE_C_API const char *loader_id() { + return "ExampleLoader"; +} + +MODULE_C_API const char *module_info() { + return "ExampleModule v1.0"; +} + +class ExampleModuleImpl final : public jam::modules::ExampleModule { + std::shared_ptr loader_; + std::shared_ptr logger_; + + public: + ExampleModuleImpl(std::shared_ptr loader, + std::shared_ptr logger) + : loader_(std::move(loader)), logger_(std::move(logger)) {} + + void on_loaded_success() override { + auto l = logger_->getLogger("ExampleModule", "jam"); + SL_INFO(l, "Loaded success"); + } +}; +static std::shared_ptr exmpl_mod; + + +MODULE_C_API std::weak_ptr query_module_instance( + std::shared_ptr loader, + std::shared_ptr logger) { + if (!exmpl_mod) { + exmpl_mod = std::make_shared(std::move(loader), + std::move(logger)); + } + return exmpl_mod; +} + +MODULE_C_API void release_module_instance() { + exmpl_mod.reset(); +} diff --git a/src/modules/module.hpp b/src/modules/module.hpp new file mode 100644 index 0000000..7bff35d --- /dev/null +++ b/src/modules/module.hpp @@ -0,0 +1,67 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace jam::modules { + + class Module final : public std::enable_shared_from_this { + public: + Module(Module &&) = default; + + // Static method for Module object creation + static std::shared_ptr create( + const std::string &path, + std::unique_ptr handle, + const std::string &loader_id) { + return std::shared_ptr( + new Module(path, std::move(handle), loader_id)); + } + + // Getter for library path + const std::string &get_path() const { + return path_; + } + + // Getter for loader Id + const std::string &get_loader_id() const { + return loader_id_; + } + + // Get function address from library + template + std::optional getFunctionFromLibrary( + const char *funcName) { + void *funcAddr = dlsym(handle_.get(), funcName); + if (!funcAddr) { + return std::nullopt; + } + return reinterpret_cast(funcAddr); + } + + private: + Module(const std::string &path, + std::unique_ptr handle, + const std::string &loader_id) + : path_(path), handle_(std::move(handle)), loader_id_(loader_id) {} + + std::string path_; // Library path + std::unique_ptr handle_; // Library handle + std::string loader_id_; // Loader ID + + Module(const Module &) = delete; + Module &operator=(const Module &) = delete; + Module &operator=(Module &&) = delete; + }; + +} // namespace jam::modules diff --git a/src/modules/module_loader.cpp b/src/modules/module_loader.cpp new file mode 100644 index 0000000..7cdf14d --- /dev/null +++ b/src/modules/module_loader.cpp @@ -0,0 +1,80 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "modules/module_loader.hpp" + +#define COMPONENT_NAME "ModuleLoader" + +OUTCOME_CPP_DEFINE_CATEGORY(jam::modules, ModuleLoader::Error, e) { + using E = jam::modules::ModuleLoader::Error; + switch (e) { + case E::PathIsNotADir: + return COMPONENT_NAME ": path is not a directory"; + case E::OpenLibraryFailed: + return COMPONENT_NAME ": open library failed"; + case E::NoLoaderIdExport: + return COMPONENT_NAME ": library doesn't provide loader_id function"; + case E::UnexpectedLoaderId: + return COMPONENT_NAME ": unexpected loader id"; + } + return COMPONENT_NAME ": unknown error"; +} + +namespace jam::modules { + + Result ModuleLoader::recursive_search( + const fs::path &dir_path, std::deque> &modules) { + if (!fs::exists(dir_path) || !fs::is_directory(dir_path)) { + return Error::PathIsNotADir; + } + + for (const auto &entry : fs::directory_iterator(dir_path)) { + const auto &entry_path = entry.path(); + const auto &entry_name = entry.path().filename().string(); + + if (entry_name[0] == '.' || entry_name[0] == '_') { + continue; + } + + if (fs::is_directory(entry)) { + OUTCOME_TRY(recursive_search(entry_path, modules)); + } else if (fs::is_regular_file(entry) + && entry_path.extension() == ".so") { + OUTCOME_TRY(load_module(entry_path.string(), modules)); + } + } + return outcome::success(); + } + + Result ModuleLoader::load_module( + const std::string &module_path, + std::deque> &modules) { + std::unique_ptr handle( + dlopen(module_path.c_str(), RTLD_LAZY), dlclose); + if (!handle) { + return Error::OpenLibraryFailed; + } + + typedef const char *(*LoaderIdFunc)(); + LoaderIdFunc loader_id_func = + (LoaderIdFunc)dlsym(handle.get(), "loader_id"); + + if (!loader_id_func) { + return Error::NoLoaderIdExport; + } + + const char *loader_id = loader_id_func(); + if (!loader_id) { + return Error::UnexpectedLoaderId; + } + + auto module = Module::create(module_path, std::move(handle), loader_id); + modules.push_back(module); + return outcome::success(); + } + + +} // namespace jam::modules diff --git a/src/modules/module_loader.hpp b/src/modules/module_loader.hpp new file mode 100644 index 0000000..daae0d8 --- /dev/null +++ b/src/modules/module_loader.hpp @@ -0,0 +1,56 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "modules/module.hpp" + +namespace fs = std::filesystem; +template +using Result = outcome::result; + +namespace jam::modules { + + class ModuleLoader { + public: + enum class Error : uint8_t { + PathIsNotADir, + OpenLibraryFailed, + NoLoaderIdExport, + UnexpectedLoaderId, + }; + + explicit ModuleLoader(const std::string &dir_path) : dir_path_(dir_path) {} + + Result>> get_modules() { + std::deque> modules; + OUTCOME_TRY(recursive_search(fs::path(dir_path_), modules)); + return modules; + } + + private: + std::string dir_path_; + + Result recursive_search(const fs::path &dir_path, + std::deque> &modules); + Result load_module(const std::string &module_path, + std::deque> &modules); + }; + +} // namespace jam::modules + +OUTCOME_HPP_DECLARE_ERROR(jam::modules, ModuleLoader::Error); diff --git a/src/se/CMakeLists.txt b/src/se/CMakeLists.txt new file mode 100644 index 0000000..fe524f6 --- /dev/null +++ b/src/se/CMakeLists.txt @@ -0,0 +1,21 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +add_library(se_async + async_dispatcher.cpp + subscription.cpp + ) + +target_link_libraries(se_async + ) + +add_library(se_sync + sync_dispatcher.cpp + subscription.cpp + ) + +target_link_libraries(se_sync + ) diff --git a/src/se/async_dispatcher.cpp b/src/se/async_dispatcher.cpp new file mode 100644 index 0000000..a742632 --- /dev/null +++ b/src/se/async_dispatcher.cpp @@ -0,0 +1,20 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include "impl/async_dispatcher_impl.hpp" +#include "subscription.hpp" + +namespace jam::se { + + std::shared_ptr getDispatcher() { + return std::make_shared< + AsyncDispatcher>(); + } + +} // namespace jam::se diff --git a/src/se/impl/async_dispatcher_impl.hpp b/src/se/impl/async_dispatcher_impl.hpp new file mode 100644 index 0000000..8169cca --- /dev/null +++ b/src/se/impl/async_dispatcher_impl.hpp @@ -0,0 +1,160 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "common.hpp" +#include "dispatcher.hpp" +#include "thread_handler.hpp" + +namespace jam::se { + + template + class AsyncDispatcher final : public IDispatcher, + utils::NoCopy, + utils::NoMove { + public: + static constexpr uint32_t kHandlersCount = kCount; + static constexpr uint32_t kPoolThreadsCount = kPoolSize; + + private: + using Parent = IDispatcher; + + struct SchedulerContext { + /// Scheduler to execute tasks + std::shared_ptr handler; + }; + + SchedulerContext handlers_[kHandlersCount]; + SchedulerContext pool_[kPoolThreadsCount]; + + std::atomic_int64_t temporary_handlers_tasks_counter_; + std::atomic is_disposed_; + + struct BoundContexts { + typename Parent::Tid next_tid_offset = 0u; + std::unordered_map contexts; + }; + utils::ReadWriteObject bound_; + + void uploadToHandler(const typename Parent::Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task, + typename Parent::Predicate &&pred) { + assert(tid != kExecuteInPool || !pred); + if (is_disposed_.load()) { + return; + } + + if (tid < kHandlersCount) { + pred ? handlers_[tid].handler->repeat( + timeout, std::move(task), std::move(pred)) + : handlers_[tid].handler->addDelayed(timeout, std::move(task)); + return; + } + + if (auto context = + bound_.sharedAccess([tid](const BoundContexts &bound) + -> std::optional { + if (auto it = bound.contexts.find(tid); + it != bound.contexts.end()) { + return it->second; + } + return std::nullopt; + })) { + pred ? context->handler->repeat( + timeout, std::move(task), std::move(pred)) + : context->handler->addDelayed(timeout, std::move(task)); + return; + } + + std::optional opt_task = std::move(task); + for (auto &handler : pool_) { + if (opt_task = + handler.handler->uploadIfFree(timeout, std::move(*opt_task)); + !opt_task) { + return; + } + } + + auto h = std::make_shared(); + ++temporary_handlers_tasks_counter_; + h->addDelayed(timeout, [this, h, task{std::move(*opt_task)}]() mutable { + if (!is_disposed_.load()) { + task(); + } + --temporary_handlers_tasks_counter_; + h->dispose(false); + }); + } + + public: + AsyncDispatcher() { + temporary_handlers_tasks_counter_.store(0); + is_disposed_ = false; + for (auto &h : handlers_) { + h.handler = std::make_shared(); + } + for (auto &h : pool_) { + h.handler = std::make_shared(); + } + } + + void dispose() override { + is_disposed_ = true; + for (auto &h : handlers_) { + h.handler->dispose(); + } + for (auto &h : pool_) { + h.handler->dispose(); + } + + while (temporary_handlers_tasks_counter_.load() != 0) { + std::this_thread::sleep_for(std::chrono::microseconds(0ull)); + } + } + + void add(typename Parent::Tid tid, typename Parent::Task &&task) override { + uploadToHandler( + tid, std::chrono::microseconds(0ull), std::move(task), nullptr); + } + + void addDelayed(typename Parent::Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task) override { + uploadToHandler(tid, timeout, std::move(task), nullptr); + } + + void repeat(typename Parent::Tid tid, + std::chrono::microseconds timeout, + typename Parent::Task &&task, + typename Parent::Predicate &&pred) override { + uploadToHandler(tid, timeout, std::move(task), std::move(pred)); + } + + std::optional bind(std::shared_ptr scheduler) override { + if (!scheduler) { + return std::nullopt; + } + + return bound_.exclusiveAccess( + [scheduler(std::move(scheduler))](BoundContexts &bound) { + const auto execution_tid = kHandlersCount + bound.next_tid_offset; + assert(bound.contexts.find(execution_tid) == bound.contexts.end()); + bound.contexts[execution_tid] = SchedulerContext{scheduler}; + ++bound.next_tid_offset; + return execution_tid; + }); + } + + bool unbind(Tid tid) override { + return bound_.exclusiveAccess([tid](BoundContexts &bound) { + return bound.contexts.erase(tid) == 1; + }); + } + }; + +} // namespace jam::se diff --git a/src/se/impl/common.hpp b/src/se/impl/common.hpp new file mode 100644 index 0000000..28f4387 --- /dev/null +++ b/src/se/impl/common.hpp @@ -0,0 +1,109 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include + +namespace jam::se::utils { + + template + inline std::shared_ptr reinterpret_pointer_cast( + const std::shared_ptr &ptr) noexcept { + return std::shared_ptr(ptr, reinterpret_cast(ptr.get())); + } + + template + inline std::weak_ptr make_weak(const std::shared_ptr &ptr) noexcept { + return ptr; + } + + struct NoCopy { + NoCopy(const NoCopy &) = delete; + NoCopy &operator=(const NoCopy &) = delete; + NoCopy() = default; + }; + + struct NoMove { + NoMove(NoMove &&) = delete; + NoMove &operator=(NoMove &&) = delete; + NoMove() = default; + }; + + template + struct SafeObject { + using Type = T; + + template + SafeObject(Args &&...args) : t_(std::forward(args)...) {} + + template + inline auto exclusiveAccess(F &&f) { + std::unique_lock lock(cs_); + return std::forward(f)(t_); + } + + template + inline auto try_exclusiveAccess(F &&f) { + std::unique_lock lock(cs_, std::try_to_lock); + if (lock.owns_lock()) { + return std::make_optional(std::forward(f)(t_)); + } + return std::nullopt; + } + + template + inline auto sharedAccess(F &&f) const { + std::shared_lock lock(cs_); + return std::forward(f)(t_); + } + + private: + T t_; + mutable M cs_; + }; + + template + using ReadWriteObject = SafeObject; + + class WaitForSingleObject final : NoMove, NoCopy { + std::condition_variable wait_cv_; + std::mutex wait_m_; + bool flag_; + + public: + WaitForSingleObject() : flag_{true} {} + + bool wait(std::chrono::microseconds wait_timeout) { + std::unique_lock _lock(wait_m_); + return wait_cv_.wait_for(_lock, wait_timeout, [&]() { + auto prev = !flag_; + flag_ = true; + return prev; + }); + } + + void wait() { + std::unique_lock _lock(wait_m_); + wait_cv_.wait(_lock, [&]() { + auto prev = !flag_; + flag_ = true; + return prev; + }); + } + + void set() { + { + std::unique_lock _lock(wait_m_); + flag_ = false; + } + wait_cv_.notify_one(); + } + }; +} // namespace jam::se::utils diff --git a/src/se/impl/compile-time_murmur2.hpp b/src/se/impl/compile-time_murmur2.hpp new file mode 100644 index 0000000..4aa1beb --- /dev/null +++ b/src/se/impl/compile-time_murmur2.hpp @@ -0,0 +1,102 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +namespace jam::se::utils { + + class Hasher { + static constexpr /* h */ uint32_t __init__(uint32_t len) { + return 0 ^ len; + } + + template + static constexpr uint32_t __load__(__T &data, uint32_t offset) { + return data[offset + 0] | (data[offset + 1] << 8) + | (data[offset + 2] << 16) | (data[offset + 3] << 24); + } + + static constexpr uint32_t __mul__(uint32_t val1, uint32_t val2) { + return val1 * val2; + } + + static constexpr uint32_t __sl__(uint32_t value, uint32_t count) { + return (value << count); + } + + static constexpr uint32_t __sr__(uint32_t value, uint32_t count) { + return (value >> count); + } + + static constexpr uint32_t __xor__(uint32_t h, uint32_t k) { + return h ^ k; + } + + static constexpr uint32_t __xor_with_sr__(uint32_t k, uint32_t r) { + return __xor__(k, __sr__(k, r)); + } + + template + static constexpr /* h */ uint32_t __proc__(__Type &data, + uint32_t len, + uint32_t offset, + uint32_t h, + uint32_t m, + uint32_t r) { + return len >= 4 + ? __proc__( + data, + len - 4, + offset + 4, + __xor__(__mul__(h, m), + __mul__(__xor_with_sr__( + __mul__(__load__(data, offset), m), r), + m)), + m, + r) + : len == 3 ? __proc__(data, + len - 1, + offset, + __xor__(h, __sl__(data[offset + 2], 16)), + m, + r) + : len == 2 ? __proc__(data, + len - 1, + offset, + __xor__(h, __sl__(data[offset + 1], 8)), + m, + r) + : len == 1 + ? __proc__( + data, len - 1, offset, __xor__(h, data[offset]) * m, m, r) + : __xor__(__mul__(__xor_with_sr__(h, 13), m), + __sr__(__mul__(__xor_with_sr__(h, 13), m), 15)); + } + + public: + template + static constexpr uint32_t murmur2(__Type &data, uint32_t len) { + return __proc__(data, len, 0, __init__(len), 0x5bd1e995, 24); + } + }; + +} // namespace jam::se::utils + +#ifndef CT_MURMUR2 +#define CT_MURMUR2(x) \ + ::jam::se::utils::Hasher::murmur2(x, (sizeof(x) / sizeof(x[0])) - 1) +#endif // CT_MURMUR2 + +static_assert(CT_MURMUR2("Called the One Ring, or the Ruling Ring.") + == 1333588607); +static_assert( + CT_MURMUR2("Fashioned by Sauron a decade after the making of the Elven " + "rings in the fires of Mount Doom in Mordor and which") + == 1319897327); +static_assert(CT_MURMUR2("could only be destroyed in that same fire.") + == 702138758); diff --git a/src/se/impl/dispatcher.hpp b/src/se/impl/dispatcher.hpp new file mode 100644 index 0000000..5dda15c --- /dev/null +++ b/src/se/impl/dispatcher.hpp @@ -0,0 +1,39 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +#include "scheduler.hpp" + +namespace jam::se { + + struct IDispatcher { + using Tid = uint32_t; + using Task = IScheduler::Task; + using Predicate = IScheduler::Predicate; + + static constexpr Tid kExecuteInPool = std::numeric_limits::max(); + + virtual ~IDispatcher() = default; + + virtual std::optional bind(std::shared_ptr scheduler) = 0; + virtual bool unbind(Tid tid) = 0; + + virtual void dispose() = 0; + virtual void add(Tid tid, Task &&task) = 0; + virtual void addDelayed(Tid tid, + std::chrono::microseconds timeout, + Task &&task) = 0; + virtual void repeat(Tid tid, + std::chrono::microseconds timeout, + Task &&task, + Predicate &&pred) = 0; + }; + +} // namespace jam::se diff --git a/src/se/impl/scheduler.hpp b/src/se/impl/scheduler.hpp new file mode 100644 index 0000000..cdc141a --- /dev/null +++ b/src/se/impl/scheduler.hpp @@ -0,0 +1,42 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include + +namespace jam::se { + + class IScheduler { + public: + using Task = std::function; + using Predicate = std::function; + virtual ~IScheduler() {} + + /// Stops sheduler work and tasks execution + virtual void dispose(bool wait_for_release = true) = 0; + + /// Checks if current scheduler executes task + virtual bool isBusy() const = 0; + + /// If scheduller is not busy it takes task for execution. Otherwise it + /// returns it back. + virtual std::optional uploadIfFree(std::chrono::microseconds timeout, + Task &&task) = 0; + + /// Adds delayed task to execution queue + virtual void addDelayed(std::chrono::microseconds timeout, Task &&t) = 0; + + /// Adds task that will be periodicaly called with timeout period after + /// timeout, until predicate return true + virtual void repeat(std::chrono::microseconds timeout, + Task &&t, + Predicate &&pred) = 0; + }; + +} // namespace jam::se diff --git a/src/se/impl/scheduler_impl.hpp b/src/se/impl/scheduler_impl.hpp new file mode 100644 index 0000000..d2c9841 --- /dev/null +++ b/src/se/impl/scheduler_impl.hpp @@ -0,0 +1,185 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.hpp" +#include "scheduler.hpp" + +namespace jam::se { + + class SchedulerBase : public IScheduler, utils::NoCopy, utils::NoMove { + private: + using Time = std::chrono::high_resolution_clock; + using Timepoint = std::chrono::time_point