diff --git a/.github/workflows/columnar.yml b/.github/workflows/columnar.yml index add45c096..9d178112f 100644 --- a/.github/workflows/columnar.yml +++ b/.github/workflows/columnar.yml @@ -19,7 +19,7 @@ jobs: - name: Install build environment run: | sudo apt-get update -y - sudo apt-get install -y libssl-dev cmake gcc g++ curl gdb + sudo apt-get install -y libssl-dev cmake gcc g++ curl gdb libcurl4-openssl-dev libprotobuf-dev libgrpc-dev gdb - uses: actions/checkout@v4 with: submodules: recursive diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index af726e716..20a22f469 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -38,7 +38,7 @@ jobs: - name: Install dependencies run: | sudo apt-get update -y - sudo apt-get install -y libssl-dev cmake curl wget gnupg2 + sudo apt-get install -y libssl-dev cmake curl wget gnupg2 libcurl4-openssl-dev libprotobuf-dev libgrpc-dev gdb wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add - sudo bash -c "echo 'deb https://apt.llvm.org/noble/ llvm-toolchain-noble main' >> /etc/apt/sources.list" sudo apt-get update -y @@ -65,7 +65,7 @@ jobs: - name: Install dependencies run: | sudo apt-get update -y - sudo apt-get install -y libssl-dev cmake curl wget gnupg2 clang clang-tools cppcheck + sudo apt-get install -y libssl-dev cmake curl wget gnupg2 clang clang-tools cppcheck libcurl4-openssl-dev libprotobuf-dev libgrpc-dev gdb - name: Run cppcheck run: ./bin/check-cppcheck @@ -78,6 +78,6 @@ jobs: - name: Install dependencies run: | sudo apt-get update -y - sudo apt-get install -y libssl-dev cmake curl wget gnupg2 clang clang-tools clang-tidy + sudo apt-get install -y libssl-dev cmake curl wget gnupg2 clang clang-tools clang-tidy libcurl4-openssl-dev libprotobuf-dev libgrpc-dev gdb - name: Run cppcheck run: ./bin/check-clang-tidy diff --git a/.github/workflows/sanitizers.yml b/.github/workflows/sanitizers.yml index 9036f566c..9909e3df7 100644 --- a/.github/workflows/sanitizers.yml +++ b/.github/workflows/sanitizers.yml @@ -40,7 +40,7 @@ jobs: - name: Install dependencies run: | sudo apt-get update -y - sudo apt-get install -y libssl-dev cmake curl wget gnupg2 gdb clang clang-tools valgrind + sudo apt-get install -y libssl-dev cmake curl wget gnupg2 libcurl4-openssl-dev libprotobuf-dev libgrpc-dev gdb clang clang-tools valgrind - uses: actions/checkout@v4 with: submodules: recursive diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9b0fb651a..f34e78700 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -41,7 +41,7 @@ jobs: - name: Install build environment run: | sudo apt-get update -y - sudo apt-get install -y libssl-dev cmake gcc g++ curl gdb + sudo apt-get install -y libssl-dev cmake gcc g++ curl libcurl4-openssl-dev libprotobuf-dev libgrpc-dev gdb - uses: actions/checkout@v4 with: submodules: recursive diff --git a/CMakeLists.txt b/CMakeLists.txt index 381da70ec..812562f11 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -493,6 +493,8 @@ if(WIN32 OR NOT COUCHBASE_CXX_CLIENT_BUILD_SHARED) set(COUCHBASE_CXX_CLIENT_VARIANT couchbase_cxx_client_static) endif() +add_subdirectory(observability) + foreach(TARGET ${couchbase_cxx_client_LIBRARIES}) include(cmake/DetectStandardFilesystem.cmake) couchbase_cxx_check_filesystem( @@ -536,6 +538,10 @@ foreach(TARGET ${couchbase_cxx_client_LIBRARIES}) couchbase_backtrace hdr_histogram_static) + target_link_libraries( + ${TARGET} + PUBLIC couchbase_observability) + if(WIN32) target_link_libraries(${TARGET} PRIVATE iphlpapi) endif() diff --git a/cmake/ThirdPartyDependencies.cmake b/cmake/ThirdPartyDependencies.cmake index af373ce02..bd8272952 100644 --- a/cmake/ThirdPartyDependencies.cmake +++ b/cmake/ThirdPartyDependencies.cmake @@ -40,6 +40,32 @@ if(NOT TARGET spdlog::spdlog) "SPDLOG_FMT_EXTERNAL OFF") endif() +if(NOT TARGET opentelemetry_api) + # https://github.com/open-telemetry/opentelemetry-cpp/releases + cpmaddpackage( + NAME + opentelemetry_api + VERSION + 1.20.0 + GITHUB_REPOSITORY + "open-telemetry/opentelemetry-cpp" + EXCLUDE_FROM_ALL ON + OPTIONS + "OPENTELEMETRY_INSTALL OFF" + "WITH_ABI_VERSION_1 OFF" + "WITH_ABI_VERSION_2 ON" + "WITH_STL ON" + "WITH_ABSEIL OFF" + "WITH_OTLP_HTTP ON" + "WITH_OTLP_GRPC OFF" + "WITH_BENCHMARK OFF" + "BUILD_TESTING OFF" + "BUILD_SHARED_LIBS OFF" + "CMAKE_C_VISIBILITY_PRESET hidden" + "CMAKE_CXX_VISIBILITY_PRESET hidden" + "CMAKE_POSITION_INDEPENDENT_CODE ON") +endif() + if(NOT TARGET Microsoft.GSL::GSL) # https://github.com/microsoft/GSL/releases cpmaddpackage( diff --git a/core/bucket.cxx b/core/bucket.cxx index 35dfdc690..d9c92acb3 100644 --- a/core/bucket.cxx +++ b/core/bucket.cxx @@ -24,7 +24,6 @@ #include "core/error_context/key_value_error_map_info.hxx" #include "core/error_context/key_value_status_code.hxx" #include "core/io/mcbp_message.hxx" -#include "core/logger/logger.hxx" #include "core/mcbp/codec.hxx" #include "core/metrics/meter_wrapper.hxx" #include "core/protocol/client_opcode.hxx" @@ -44,6 +43,8 @@ #include "protocol/cmd_get_cluster_config.hxx" #include "retry_orchestrator.hxx" +#include "observability/logger.hxx" + #include #include #include @@ -240,13 +241,20 @@ class bucket_impl auto handle_error = [is_retry, req](std::error_code ec) { // We only want to log an error on retries if the error isn't cancelled. if (!is_retry || (is_retry && ec != errc::common::request_canceled)) { - CB_LOG_ERROR("reschedule failed, failing request ({})", ec.message()); + CB_LOG_ERROR("reschedule failed, failing request", + opentelemetry::common::MakeAttributes({ + { "ec", ec.message() }, + })); } req->try_callback({}, ec); }; - CB_LOG_DEBUG("request being re-queued. opaque={}, opcode={}", req->opaque_, req->command_); + CB_LOG_DEBUG("request being re-queued", + opentelemetry::common::MakeAttributes({ + { "opaque", req->opaque_ }, + { "opcode", fmt::format("{}", req->command_) }, + })); auto session = route_request(req); if (!session || !session->has_config()) { @@ -267,7 +275,10 @@ class bucket_impl req->opaque_ = session->next_opaque(); auto data = codec_.encode_packet(*req); if (!data) { - CB_LOG_DEBUG("unable to encode packet. ec={}", data.error().message()); + CB_LOG_DEBUG("unable to encode packet", + opentelemetry::common::MakeAttributes({ + { "ec", data.error().message() }, + })); handle_error(data.error()); return data.error(); } @@ -392,33 +403,36 @@ class bucket_impl if (auto found_kv_node_index = ptr->first; found_kv_node_index != kv_node_index) { if (auto current = sessions_.find(kv_node_index); current == sessions_.end()) { - CB_LOG_WARNING( - R"({} KV node index mismatch: config rev={} states that address="{}:{}" should be at idx={}, )" - R"(but it is at idx={} ("{}"). Moving session to idx={}.)", - log_prefix_, - config_->rev_str(), - hostname, - port, - kv_node_index, - found_kv_node_index, - ptr->second.id(), - kv_node_index); + CB_LOG_WARNING("KV node index mismatch: config {rev} states that {address} should be " + "at index {excpected_index}, but it is at {actual_index} " + "({node_id}). Moving session to index {target_index}.", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "rev", config_->rev_str() }, + { "address", fmt::format("{}:{}", hostname, port) }, + { "expected_index", kv_node_index }, + { "actual_index", found_kv_node_index }, + { "node_id", ptr->second.id() }, + { "target_index", kv_node_index }, + })); sessions_.insert_or_assign(kv_node_index, std::move(ptr->second)); sessions_.erase(ptr); } else { - CB_LOG_WARNING( - R"({} KV node index mismatch: config rev={} states that address="{}:{}" should be at idx={}, )" - R"(but it is at idx={} ("{}"). Slot with idx={} is holds session with address="{}" ("{}"), swapping them.)", - log_prefix_, - config_->rev_str(), - hostname, - port, - kv_node_index, - found_kv_node_index, - ptr->second.id(), - kv_node_index, - current->second.bootstrap_address(), - current->second.id()); + CB_LOG_WARNING("KV node index mismatch: config {rev} states that {address} should be " + "at index {expected_index}, but it is at {actual_index} ({node_id}). " + "Slot with index {target_index} is holds session with {target_address} " + "({target_node_id}), swapping them.)", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "rev", config_->rev_str() }, + { "address", fmt::format("{}:{}", hostname, port) }, + { "expected_index", kv_node_index }, + { "actual_index", found_kv_node_index }, + { "node_id", ptr->second.id() }, + { "target_index", kv_node_index }, + { "target_address", current->second.bootstrap_address() }, + { "target_node_id", current->second.id() }, + })); std::swap(current->second, ptr->second); } } @@ -439,13 +453,14 @@ class bucket_impl known_features_) : io::mcbp_session( client_id_, node.node_uuid, ctx_, origin, state_listener_, name_, known_features_); - CB_LOG_DEBUG(R"({} rev={}, restart idx={}, session="{}", address="{}:{}")", - log_prefix_, - config_->rev_str(), - node.index, - session.id(), - hostname, - port); + CB_LOG_DEBUG("config {rev}, restart session {session_id}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "rev", config_->rev_str() }, + { "index", node.index }, + { "session_id", session.id() }, + { "address", fmt::format("{}:{}", hostname, port) }, + })); session.bootstrap( [self = shared_from_this(), session](std::error_code err, topology::configuration cfg) mutable { @@ -471,12 +486,16 @@ class bucket_impl const std::scoped_lock lock(sessions_mutex_); for (auto ptr = sessions_.cbegin(); ptr != sessions_.cend();) { if (ptr->second.id() == id) { - CB_LOG_DEBUG(R"({} removed session id="{}", address="{}", bootstrap_address="{}:{}")", - log_prefix_, - ptr->second.id(), - ptr->second.remote_address(), - ptr->second.bootstrap_hostname(), - ptr->second.bootstrap_port()); + CB_LOG_DEBUG( + "removed session {session_id}, address={address}, bootstrap_address={bootstrap_address}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "session_id", ptr->second.id() }, + { "address", ptr->second.remote_address() }, + { "bootstrap_address", + fmt::format( + "{}:{}", ptr->second.bootstrap_hostname(), ptr->second.bootstrap_port()) }, + })); ptr = sessions_.erase(ptr); found = true; } else { @@ -504,10 +523,12 @@ class bucket_impl new_session.bootstrap([self = shared_from_this(), new_session, h = std::move(handler)]( std::error_code ec, topology::configuration cfg) mutable { if (ec) { - CB_LOG_WARNING(R"({} failed to bootstrap session ec={}, bucket="{}")", - new_session.log_prefix(), - ec.message(), - self->name_); + CB_LOG_WARNING("failed to bootstrap session", + opentelemetry::common::MakeAttributes({ + { "log_prefix", new_session.log_prefix() }, + { "ec", ec.message() }, + { "bucket", self->name_ }, + })); self->remove_session(new_session.id()); } else { const std::size_t this_index = new_session.index(); @@ -576,8 +597,11 @@ class bucket_impl std::swap(deferred_commands_, commands); } if (!commands.empty()) { - CB_LOG_TRACE( - R"({} draining deferred operation queue, size={})", log_prefix_, commands.size()); + CB_LOG_TRACE("draining deferred operation queue", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "log_prefix", commands.size() }, + })); } while (!commands.empty()) { commands.front()(ec); @@ -594,9 +618,12 @@ class bucket_impl { const std::scoped_lock lock(sessions_mutex_); if (sessions_.empty()) { - CB_LOG_WARNING(R"({} unable to find connected session (sessions_ is empty), retry in {})", - log_prefix_, - heartbeat_interval_); + CB_LOG_WARNING( + "unable to find connected session (sessions_ is empty), retry in {backoff_duration}}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "backoff_duration", fmt::format("{}", heartbeat_interval_) }, + })); return; } @@ -616,9 +643,12 @@ class bucket_impl req.opaque(session->next_opaque()); session->write_and_flush(req.data()); } else { - CB_LOG_WARNING(R"({} unable to find connected session with GCCCP support, retry in {})", - log_prefix_, - heartbeat_interval_); + CB_LOG_WARNING( + "unable to find connected session with GCCCP support, retry in {backoff_interval}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "backoff_duration", fmt::format("{}", heartbeat_interval_) }, + })); } } @@ -712,36 +742,52 @@ class bucket_impl if (config.vbmap && config.vbmap->empty()) { if (!config_) { CB_LOG_WARNING( - "{} will not initialize configuration rev={} because config has an empty partition map", - log_prefix_, - config.rev_str()); + "will not initialize configuration {rev} because config has an empty partition map", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "rev", config.rev_str() }, + })); } else { - CB_LOG_WARNING("{} will not update the configuration old={} -> new={}, because new " + CB_LOG_WARNING("will not update the configuration {old_rev} -> {new_rev}, because new " "config has an empty partition map", - log_prefix_, - config_->rev_str(), - config.rev_str()); + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "old_rev", config_->rev_str() }, + { "new_rev", config.rev_str() }, + })); } // this is to make sure we can get a correct config soon poll_config(errc::network::configuration_not_available); return; } if (!config_) { - CB_LOG_DEBUG("{} initialize configuration rev={}", log_prefix_, config.rev_str()); + CB_LOG_DEBUG("initialize configuration {rev}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "rev", config.rev_str() }, + })); } else if (config.force) { - CB_LOG_DEBUG("{} forced to accept configuration rev={}", log_prefix_, config.rev_str()); + CB_LOG_DEBUG("forced to accept configuration {rev}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "rev", config.rev_str() }, + })); } else if (!config.vbmap) { - CB_LOG_DEBUG("{} will not update the configuration old={} -> new={}, because new config " + CB_LOG_DEBUG("will not update the configuration {old_rev} -> {new_rev}, because new config " "does not have partition map", - log_prefix_, - config_->rev_str(), - config.rev_str()); + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "old_rev", config_->rev_str() }, + { "new_rev", config.rev_str() }, + })); return; } else if (*config_ < config) { - CB_LOG_DEBUG("{} will update the configuration old={} -> new={}", - log_prefix_, - config_->rev_str(), - config.rev_str()); + CB_LOG_DEBUG("will update the configuration {old_rev} -> {new_rev}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "old_rev", config_->rev_str() }, + { "new_rev", config.rev_str() }, + })); } else { return; } @@ -791,14 +837,17 @@ class bucket_impl for (auto it = sessions_.begin(); it != sessions_.end(); ++it) { if (it->second.bootstrap_hostname() == hostname && it->second.bootstrap_port_number() == port) { - CB_LOG_DEBUG(R"({} rev={}, preserve session="{}", address="{}:{}", index={}->{})", - log_prefix_, - config.rev_str(), - it->second.id(), - it->second.bootstrap_hostname(), - it->second.bootstrap_port(), - it->first, - next_index); + CB_LOG_DEBUG( + R"(config {rev}, preserve session {session_id} {old_index} -> {new_index})", + opentelemetry::common::MakeAttributes( + { { "log_prefix", log_prefix_ }, + { "rev", config.rev_str() }, + { "session_id", it->second.id() }, + { "address", + fmt::format( + "{}:{}", it->second.bootstrap_hostname(), it->second.bootstrap_port()) }, + { "old_index", it->first }, + { "new_index", next_index } })); new_sessions.insert_or_assign(next_index, std::move(it->second)); reused_session = true; ++next_index; @@ -824,25 +873,28 @@ class bucket_impl known_features_) : io::mcbp_session( client_id_, node.node_uuid, ctx_, origin, state_listener_, name_, known_features_); - CB_LOG_DEBUG(R"({} rev={}, add session="{}", address="{}:{}", index={})", - log_prefix_, - config.rev_str(), - session.id(), - hostname, - port, - node.index); + CB_LOG_DEBUG("config {rev}, add session {session_id}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "rev", config.rev_str() }, + { "session_id", session.id() }, + { "address", fmt::format("{}:{}", hostname, port) }, + { "index", node.index }, + })); session.bootstrap( [self = shared_from_this(), session, idx = next_index]( std::error_code err, topology::configuration cfg) mutable { if (err) { CB_LOG_WARNING( - R"({} failed to bootstrap session="{}", address="{}:{}", index={}, ec={})", - session.log_prefix(), - session.id(), - session.bootstrap_hostname(), - session.bootstrap_port(), - idx, - err.message()); + "failed to bootstrap session {session_id}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session.log_prefix() }, + { "session_id", session.id() }, + { "address", + fmt::format("{}:{}", session.bootstrap_hostname(), session.bootstrap_port()) }, + { "index", idx }, + { "ec", err.message() }, + })); return self->remove_session(session.id()); } self->update_config(std::move(cfg)); @@ -859,13 +911,16 @@ class bucket_impl std::swap(sessions_, new_sessions); for (auto it = new_sessions.begin(); it != new_sessions.end(); ++it) { - CB_LOG_DEBUG(R"({} rev={}, drop session="{}", address="{}:{}", index={})", - log_prefix_, - config.rev_str(), - it->second.id(), - it->second.bootstrap_hostname(), - it->second.bootstrap_port(), - it->first); + CB_LOG_DEBUG( + "config {rev}, drop session {session_id}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "rev", config.rev_str() }, + { "session_id", it->second.id() }, + { "address", + fmt::format("{}:{}", it->second.bootstrap_hostname(), it->second.bootstrap_port()) }, + { "index", it->first }, + })); asio::post(asio::bind_executor(ctx_, [session = std::move(it->second)]() mutable { return session.stop(retry_reason::do_not_retry); })); diff --git a/core/io/mcbp_command.hxx b/core/io/mcbp_command.hxx index 494b33509..d68e2ad44 100644 --- a/core/io/mcbp_command.hxx +++ b/core/io/mcbp_command.hxx @@ -85,11 +85,13 @@ struct mcbp_command : public std::enable_shared_from_thisupdate_counter(app_telemetry_counter::kv_r_timedout); auto time_left = deadline.expiry() - std::chrono::steady_clock::now(); - CB_LOG_TRACE(R"([{}] timeout operation id="{}", {}, key="{}", partition={}, time_left={})", - session_ ? session_->log_prefix() : manager_->log_prefix(), - id_, - encoded_request_type::body_type::opcode, - request.id, - request.partition, - time_left); + CB_LOG_TRACE("timeout operation", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_ ? session_->log_prefix() : manager_->log_prefix() }, + { "operation_id", id_ }, + { "opcode", fmt::format("{}", encoded_request_type::body_type::opcode) }, + { "partition", request.partition }, + { "time_left", fmt::format("{}", time_left) }, + })); } else if (ec == errc::common::request_canceled) { telemetry_recorder->update_counter(app_telemetry_counter::kv_r_canceled); } @@ -208,11 +211,13 @@ struct mcbp_command : public std::enable_shared_from_thislog_prefix(), - request.id, - std::chrono::duration_cast(time_left).count(), - id_); + CB_LOG_DEBUG("unknown collection response", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix() }, + { "document_id", fmt::format("{}", request.id) }, + { "time_left", fmt::format("{}", time_left) }, + { "operation_id", id_ }, + })); request.retries.add_reason(retry_reason::key_value_collection_outdated); if (time_left < backoff) { return invoke_handler(make_error_code(request.retries.idempotent() @@ -241,12 +246,13 @@ struct mcbp_command : public std::enable_shared_from_thislog_prefix(), - request.id, - timeout_.count(), - id_); + CB_LOG_DEBUG("no cache entry for collection, resolve collection id", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix() }, + { "document_id", fmt::format("{}", request.id) }, + { "timeout", fmt::format("{}", timeout_) }, + { "operation_id", id_ }, + })); return request_collection_id(); } } else { @@ -347,13 +353,13 @@ struct mcbp_command : public std::enable_shared_from_thishandle_unknown_collection(); } if (status == key_value_status_code::config_only) { - CB_LOG_DEBUG("{} server returned status 0x{:02x} ({}) meaning that the node does not " - "serve data operations, " - "requesting new " - "configuration and retrying", - self->session_->log_prefix(), - msg.header.status(), - status); + CB_LOG_DEBUG("server returned status {status} meaning that the node does not serve data " + "operations, requesting new configuration and retrying", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->session_->log_prefix() }, + { "status", fmt::format("0x{:02x}", msg.header.status()) }, + { "status_text", fmt::format("{}", status) }, + })); self->manager_->fetch_config(); return io::retry_orchestrator::maybe_retry( self->manager_, self, retry_reason::service_response_code_indicated, ec); diff --git a/core/io/mcbp_session.cxx b/core/io/mcbp_session.cxx index f9122f763..52f4926f4 100644 --- a/core/io/mcbp_session.cxx +++ b/core/io/mcbp_session.cxx @@ -26,7 +26,6 @@ #include "core/diagnostics.hxx" #include "core/impl/bootstrap_error.hxx" #include "core/impl/bootstrap_state_listener.hxx" -#include "core/logger/logger.hxx" #include "core/mcbp/codec.hxx" #include "core/mcbp/queue_request.hxx" #include "core/meta/version.hxx" @@ -56,11 +55,14 @@ #include "retry_orchestrator.hxx" #include "streams.hxx" +#include "observability/logger.hxx" + #include #include #include #include +#include #include #include @@ -290,10 +292,13 @@ class mcbp_session_impl auto user_agent = meta::user_agent_for_mcbp( session_->client_id_, session_->id_, session_->origin_.options().user_agent_extra, 250); hello_req.body().user_agent(user_agent); - CB_LOG_DEBUG("{} user_agent={}, requested_features=[{}]", - session_->log_prefix_, - user_agent, - utils::join_strings_fmt(hello_req.body().features(), ", ")); + CB_LOG_DEBUG( + "starting MCBP handshake", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "user_agent", user_agent }, + { "requested_features", utils::join_strings_fmt(hello_req.body().features(), ", ") }, + })); session_->write(hello_req.data()); if (!session_->origin_.credentials().uses_certificate()) { @@ -345,7 +350,7 @@ class mcbp_session_impl return; } Expects(protocol::is_valid_magic(msg.header.magic)); - switch (auto magic = static_cast(msg.header.magic)) { + switch (static_cast(msg.header.magic)) { case protocol::magic::client_response: case protocol::magic::alt_client_response: Expects(protocol::is_valid_client_opcode(msg.header.opcode)); @@ -364,7 +369,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_DEBUG("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_DEBUG( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::common::rate_limited); } case key_value_status_code::scope_size_limit_exceeded: { @@ -378,7 +390,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_DEBUG("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_DEBUG( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::common::quota_limited); } default: @@ -389,12 +408,17 @@ class mcbp_session_impl protocol::client_response resp(std::move(msg)); if (resp.status() == key_value_status_code::success) { session_->supported_features_ = resp.body().supported_features(); - CB_LOG_DEBUG("{} supported_features=[{}]", - session_->log_prefix_, - utils::join_strings_fmt(session_->supported_features_, ", ")); + CB_LOG_DEBUG("handshake successful", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "supported_features", + utils::join_strings_fmt(session_->supported_features_, ", ") }, + })); if (session_->origin_.credentials().uses_certificate()) { - CB_LOG_DEBUG("{} skip SASL authentication, because TLS certificate was specified", - session_->log_prefix_); + CB_LOG_DEBUG("skip SASL authentication, because TLS certificate was specified", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + })); return auth_success(); } } else { @@ -406,7 +430,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_WARNING("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_WARNING( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::network::handshake_failure); } } break; @@ -423,7 +454,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_WARNING("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_WARNING( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::common::authentication_failure); } } break; @@ -453,7 +491,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_ERROR("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_ERROR( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::common::authentication_failure); } } else { @@ -467,7 +512,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_WARNING("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_WARNING( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::common::authentication_failure); } } break; @@ -487,7 +539,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_ERROR("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_ERROR( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::common::authentication_failure); } case protocol::client_opcode::get_error_map: { @@ -504,16 +563,25 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_WARNING("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_WARNING( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::network::protocol_error); } } break; case protocol::client_opcode::select_bucket: { protocol::client_response resp(std::move(msg)); if (resp.status() == key_value_status_code::success) { - CB_LOG_DEBUG("{} selected bucket: {}", - session_->log_prefix_, - session_->bucket_name_.value_or("")); + CB_LOG_DEBUG("selected bucket", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bucket_name", session_->bucket_name_.value_or("") }, + })); session_->bucket_selected_ = true; } else if (resp.status() == key_value_status_code::not_found) { auto error_msg = @@ -526,7 +594,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_DEBUG("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_DEBUG( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::network::configuration_not_available); } else if (resp.status() == key_value_status_code::no_access) { auto error_msg = @@ -536,7 +611,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_DEBUG("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_DEBUG( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); session_->bucket_selected_ = false; return complete(errc::common::bucket_not_found); } else { @@ -549,7 +631,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_WARNING("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_WARNING( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::common::bucket_not_found); } } break; @@ -560,13 +649,15 @@ class mcbp_session_impl std::move(msg), info); if (session_->origin_.options().dump_configuration) { if (const auto& text = resp.body().config_text(); text.has_value()) { - CB_LOG_TRACE("{} configuration from get_cluster_config request (bootstrap, " - "size={}, endpoint=\"{}:{}\"), {}", - session_->log_prefix_, - text.value().size(), - info.endpoint_address, - info.endpoint_port, - text.value()); + CB_LOG_TRACE( + "configuration from get_cluster_config request (bootstrap)", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "size", text.value().size() }, + { "endpoint", + fmt::format("{}:{}", info.endpoint_address, info.endpoint_port) }, + { "text", text.value() }, + })); } } if (resp.status() == key_value_status_code::success) { @@ -577,8 +668,10 @@ class mcbp_session_impl // and we cannot use a config w/ an empty vbucket map). if (const auto vbmap = resp.body().config().vbmap; vbmap.has_value() && vbmap->empty()) { - CB_LOG_WARNING("{} received a configuration with an empty vbucket map, retrying", - session_->log_prefix_); + CB_LOG_WARNING("received a configuration with an empty vbucket map, retrying", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + })); return complete(errc::network::configuration_not_available); } session_->update_configuration(resp.body().config()); @@ -594,15 +687,24 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_DEBUG("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_DEBUG( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::network::configuration_not_available); } else if (resp.status() == key_value_status_code::no_bucket && !session_->bucket_name_) { // bucket-less session, but the server wants bucket session_->supports_gcccp_ = false; - CB_LOG_WARNING("{} this server does not support GCCCP, open bucket before making " - "any cluster-level command", - session_->log_prefix_); + CB_LOG_WARNING("this server does not support GCCCP, open bucket before making any " + "cluster-level command", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + })); session_->update_configuration( topology::make_blank_configuration(session_->connection_endpoints_.remote_address, session_->connection_endpoints_.remote.port(), @@ -618,7 +720,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_WARNING("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_WARNING( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::network::protocol_error); } } break; @@ -628,7 +737,14 @@ class mcbp_session_impl std::move(error_msg), session_->bootstrap_hostname(), session_->bootstrap_port() }; - CB_LOG_WARNING("{} {}", session_->log_prefix_, last_bootstrap_error_.error_message); + CB_LOG_WARNING( + last_bootstrap_error_.error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "bootstrap_address", + fmt::format( + "{}:{}", session_->bootstrap_hostname(), session_->bootstrap_port()) }, + })); return complete(errc::network::protocol_error); } break; @@ -642,13 +758,15 @@ class mcbp_session_impl std::move(msg), info); if (session_->origin_.options().dump_configuration) { if (const auto& text = req.body().config_text(); text.has_value()) { - CB_LOG_TRACE("{} configuration from cluster_map_change_notification request " - "(size={}, endpoint=\"{}:{}\"), {}", - session_->log_prefix_, - text.value().size(), - info.endpoint_address, - info.endpoint_port, - text.value()); + CB_LOG_TRACE( + "configuration from cluster_map_change_notification request", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "size", text.value().size() }, + { "endpoint", + fmt::format("{}:{}", info.endpoint_address, info.endpoint_port) }, + { "text", text.value() }, + })); } } std::optional config = req.body().config(); @@ -663,24 +781,26 @@ class mcbp_session_impl } } break; default: - CB_LOG_WARNING("{} unexpected server request: opcode={:x}, opaque={}{:a}{:a}", - session_->log_prefix_, - msg.header.opcode, - utils::byte_swap(msg.header.opaque), - spdlog::to_hex(msg.header_data()), - spdlog::to_hex(msg.body)); + CB_LOG_WARNING("unexpected server request", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "opcode", fmt::format("0x{:02x}", msg.header.opcode) }, + { "opaque", utils::byte_swap(msg.header.opaque) }, + { "header", fmt::format("{}", mcbp_header_view(msg.header_data())) }, + })); } break; case protocol::magic::client_request: case protocol::magic::alt_client_request: case protocol::magic::server_response: - CB_LOG_WARNING("{} unexpected magic: {} (opcode={:x}, opaque={}){:a}{:a}", - session_->log_prefix_, - magic, - msg.header.opcode, - utils::byte_swap(msg.header.opaque), - spdlog::to_hex(msg.header_data()), - spdlog::to_hex(msg.body)); + CB_LOG_WARNING("unexpected magic", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "magic", fmt::format("0x{:02x}", msg.header.magic) }, + { "opcode", fmt::format("0x{:02x}", msg.header.opcode) }, + { "opaque", fmt::format("{}", utils::byte_swap(msg.header.opaque)) }, + { "header", fmt::format("{}", mcbp_header_view(msg.header_data())) }, + })); break; } } @@ -722,7 +842,7 @@ class mcbp_session_impl return; } Expects(protocol::is_valid_magic(msg.header.magic)); - switch (auto magic = static_cast(msg.header.magic)) { + switch (static_cast(msg.header.magic)) { case protocol::magic::client_response: case protocol::magic::alt_client_response: Expects(protocol::is_valid_client_opcode(msg.header.opcode)); @@ -734,13 +854,15 @@ class mcbp_session_impl std::move(msg), info); if (session_->origin_.options().dump_configuration) { if (const auto& text = resp.body().config_text(); text.has_value()) { - CB_LOG_TRACE("{} configuration from get_cluster_config response (size={}, " - "endpoint=\"{}:{}\"), {}", - session_->log_prefix_, - text.value().size(), - info.endpoint_address, - info.endpoint_port, - text.value()); + CB_LOG_TRACE( + "configuration from get_cluster_config response", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "size", session_->log_prefix_ }, + { "endpoint", + fmt::format("{}:{}", info.endpoint_address, info.endpoint_port) }, + { "text", text.value() }, + })); } } if (resp.status() == key_value_status_code::success) { @@ -748,10 +870,12 @@ class mcbp_session_impl session_->update_configuration(resp.body().config()); } } else { - CB_LOG_WARNING("{} unexpected message status: {} (opaque={})", - session_->log_prefix_, - resp.error_message(), - resp.opaque()); + CB_LOG_WARNING("unexpected message status", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "error", resp.error_message() }, + { "opaque", resp.opaque() }, + })); } } break; case protocol::client_opcode::noop: @@ -785,26 +909,31 @@ class mcbp_session_impl std::uint32_t opaque = utils::byte_swap(msg.header.opaque); if (session_->handle_request(opcode, status, opaque, std::move(msg))) { - CB_LOG_TRACE("{} MCBP invoked operation handler: opcode={}, opaque={}, status={}", - session_->log_prefix_, - opcode, - opaque, - protocol::status_to_string(status)); + CB_LOG_TRACE("MCBP invoked operation handler", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "opcode", fmt::format("{}", opcode) }, + { "opaque", opaque }, + { "status", protocol::status_to_string(status) }, + })); } else { - CB_LOG_DEBUG("{} unexpected orphan response: opcode={}, opaque={}, status={}", - session_->log_prefix_, - opcode, - opaque, - protocol::status_to_string(status)); + CB_LOG_DEBUG("unexpected orphan response", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "opcode", fmt::format("{}", opcode) }, + { "opaque", opaque }, + { "status", protocol::status_to_string(status) }, + })); } } break; default: - CB_LOG_WARNING("{} unexpected client response: opcode={}, opaque={}{:a}{:a})", - session_->log_prefix_, - opcode, - msg.header.opaque, - spdlog::to_hex(msg.header_data()), - spdlog::to_hex(msg.body)); + CB_LOG_WARNING("unexpected client response", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "opcode", fmt::format("{}", opcode) }, + { "opaque", utils::byte_swap(msg.header.opaque) }, + { "header", fmt::format("{}", mcbp_header_view(msg.header_data())) }, + })); } break; case protocol::magic::server_request: @@ -817,13 +946,15 @@ class mcbp_session_impl std::move(msg), info); if (session_->origin_.options().dump_configuration) { if (const auto& text = req.body().config_text(); text.has_value()) { - CB_LOG_TRACE("{} configuration from cluster_map_change_notification request " - "(size={}, endpoint=\"{}:{}\"), {}", - session_->log_prefix_, - text.value().size(), - info.endpoint_address, - info.endpoint_port, - text.value()); + CB_LOG_TRACE( + "configuration from cluster_map_change_notification request", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "size", text.value().size() }, + { "endpoint", + fmt::format("{}:{}", info.endpoint_address, info.endpoint_port) }, + { "text", text.value() }, + })); } } std::optional config = req.body().config(); @@ -838,24 +969,26 @@ class mcbp_session_impl } } break; default: - CB_LOG_WARNING("{} unexpected server request: opcode={:x}, opaque={}{:a}{:a}", - session_->log_prefix_, - msg.header.opcode, - msg.header.opaque, - spdlog::to_hex(msg.header_data()), - spdlog::to_hex(msg.body)); + CB_LOG_WARNING("unexpected server request", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "opcode", fmt::format("0x{:02x}", msg.header.opcode) }, + { "opaque", utils::byte_swap(msg.header.opaque) }, + { "header", fmt::format("{}", mcbp_header_view(msg.header_data())) }, + })); } break; case protocol::magic::client_request: case protocol::magic::alt_client_request: case protocol::magic::server_response: - CB_LOG_WARNING("{} unexpected magic: {} (opcode={:x}, opaque={}){:a}{:a}", - session_->log_prefix_, - magic, - msg.header.opcode, - msg.header.opaque, - spdlog::to_hex(msg.header_data()), - spdlog::to_hex(msg.body)); + CB_LOG_WARNING("unexpected magic", + opentelemetry::common::MakeAttributes({ + { "log_prefix", session_->log_prefix_ }, + { "magic", fmt::format("0x{:02x}", msg.header.magic) }, + { "opcode", fmt::format("0x{:02x}", msg.header.opcode) }, + { "opaque", fmt::format("{}", utils::byte_swap(msg.header.opaque)) }, + { "header", fmt::format("{}", mcbp_header_view(msg.header_data())) }, + })); break; } } @@ -925,7 +1058,10 @@ class mcbp_session_impl ~mcbp_session_impl() override { - CB_LOG_DEBUG("{} destroy MCBP connection", log_prefix_); + CB_LOG_DEBUG("destroy MCBP connection", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + })); stop(retry_reason::do_not_retry); } @@ -1053,9 +1189,11 @@ class mcbp_session_impl self->bootstrap_port_ }); } auto backoff = std::chrono::milliseconds(500); - CB_LOG_DEBUG("{} unable to connect in time, waiting for {}ms before retry", - self->log_prefix_, - backoff.count()); + CB_LOG_DEBUG("unable to connect in time, waiting for {backoff_interval} before retry", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "backoff_interval", fmt::format("{}", backoff) }, + })); self->retry_backoff_.expires_after(backoff); self->retry_backoff_.async_wait([self](std::error_code ec) mutable { if (ec == asio::error::operation_aborted || self->stopped_) { @@ -1068,7 +1206,10 @@ class mcbp_session_impl if (!ec) { ec = errc::common::unambiguous_timeout; } - CB_LOG_WARNING("{} unable to bootstrap in time", self->log_prefix_); + CB_LOG_WARNING("unable to bootstrap in time", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + })); if (auto h = std::move(self->bootstrap_callback_); h) { h(ec, {}); } @@ -1078,7 +1219,10 @@ class mcbp_session_impl if (!ec) { ec = errc::common::unambiguous_timeout; } - CB_LOG_WARNING("{} unable to bootstrap in time", self->log_prefix_); + CB_LOG_WARNING("unable to bootstrap in time", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + })); if (auto h = std::move(self->bootstrap_callback_); h) { h(ec, {}); } @@ -1100,12 +1244,14 @@ class mcbp_session_impl state_ = diag::endpoint_state::connecting; if (stream_->is_open()) { return stream_->close([self = shared_from_this(), old_id = stream_->id()](std::error_code) { - CB_LOG_DEBUG(R"({} reopened socket connection "{}" -> "{}", host="{}", port={})", - self->log_prefix_, - old_id, - self->stream_->id(), - self->bootstrap_hostname_, - self->bootstrap_port_); + CB_LOG_DEBUG("reopened socket connection {old_stream_id} -> {new_stream_id}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "old_stream_id", old_id }, + { "new_stream_id", self->stream_->id() }, + { "bootstrap_address", + fmt::format("{}:{}", self->bootstrap_hostname_, self->bootstrap_port_) }, + })); return self->initiate_bootstrap(); }); } @@ -1124,9 +1270,12 @@ class mcbp_session_impl } #endif auto backoff = std::chrono::milliseconds(500); - CB_LOG_DEBUG("{} reached the end of list of bootstrap nodes, waiting for {}ms before restart", - log_prefix_, - backoff.count()); + CB_LOG_DEBUG( + "reached the end of list of bootstrap nodes, waiting for {backoff_interval} before restart", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "backoff_interval", fmt::format("{}", backoff) }, + })); retry_backoff_.expires_after(backoff); retry_backoff_.async_wait([self = shared_from_this()](std::error_code ec) mutable { if (ec == asio::error::operation_aborted || self->stopped_) { @@ -1147,7 +1296,10 @@ class mcbp_session_impl stream_->log_prefix(), bucket_name_.value_or("-"), bootstrap_address_); - CB_LOG_DEBUG("{} attempt to establish MCBP connection", log_prefix_); + CB_LOG_DEBUG("attempt to establish MCBP connection", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + })); async_resolve(origin_.options().use_ip_protocol, resolver_, @@ -1193,18 +1345,24 @@ class mcbp_session_impl if (reason == retry_reason::socket_closed_while_in_flight && !bootstrapped_) { return stream_->close([self = shared_from_this(), old_id = stream_->id()](std::error_code) { CB_LOG_DEBUG( - R"({} reopened socket connection due to IO error, "{}" -> "{}", host="{}", port={})", - self->log_prefix_, - old_id, - self->stream_->id(), - self->bootstrap_hostname_, - self->bootstrap_port_); + "reopened socket connection due to IO error, {old_stream_id} -> {new_stream_id}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "old_stream_id", old_id }, + { "new_stream_id", self->stream_->id() }, + { "bootstrap_address", + fmt::format("{}:{}", self->bootstrap_hostname_, self->bootstrap_port_) }, + })); return self->initiate_bootstrap(); }); } state_ = diag::endpoint_state::disconnecting; - CB_LOG_DEBUG("{} stop MCBP connection, reason={}", log_prefix_, reason); + CB_LOG_DEBUG("stop MCBP connection", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "reason", fmt::format("{}", reason) }, + })); stopped_ = true; bootstrap_deadline_.cancel(); connection_deadline_.cancel(); @@ -1229,10 +1387,12 @@ class mcbp_session_impl const std::scoped_lock lock(command_handlers_mutex_); for (auto& [opaque, handler] : command_handlers_) { if (handler) { - CB_LOG_DEBUG("{} MCBP cancel operation during session close, opaque={}, ec={}", - log_prefix_, - opaque, - ec.message()); + CB_LOG_DEBUG("MCBP cancel operation during session close", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "opaque", opaque }, + { "ec", ec.message() }, + })); auto fun = std::move(handler); fun(ec, reason, {}, {}); } @@ -1245,10 +1405,12 @@ class mcbp_session_impl for (auto& [opaque, operation] : operations) { auto& [request, handler] = operation; if (handler) { - CB_LOG_DEBUG("{} MCBP cancel operation during session close, opaque={}, ec={}", - log_prefix_, - opaque, - ec.message()); + CB_LOG_DEBUG("MCBP cancel operation during session close", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "opaque", opaque }, + { "ec", ec.message() }, + })); handler->handle_response(std::move(request), {}, reason, {}, {}); } } @@ -1271,7 +1433,11 @@ class mcbp_session_impl if (stopped_) { return; } - CB_LOG_TRACE("{} MCBP send {}", log_prefix_, mcbp_header_view(buf)); + CB_LOG_TRACE("MCBP send", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "header", fmt::format("{}", mcbp_header_view(buf)) }, + })); const std::scoped_lock lock(output_buffer_mutex_); output_buffer_.emplace_back(std::move(buf)); } @@ -1369,14 +1535,22 @@ class mcbp_session_impl auto opaque = request->opaque_; auto data = codec_.encode_packet(*request); if (!data) { - CB_LOG_DEBUG("unable to encode packet. opaque={}, ec={}", opaque, data.error().message()); + CB_LOG_DEBUG("unable to encode packet", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "opaque", opaque }, + { "ec", data.error().message() }, + })); request->try_callback({}, data.error()); return; } if (stopped_) { - CB_LOG_WARNING("cancel operation while trying to write to closed mcbp session, opaque={}", - opaque); + CB_LOG_WARNING("cancel operation while trying to write to closed mcbp session", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "opaque", opaque }, + })); handler->handle_response(request, errc::common::request_canceled, retry_reason::socket_closed_while_in_flight, @@ -1388,9 +1562,11 @@ class mcbp_session_impl if (bootstrapped_ && stream_->is_open()) { write_and_flush(std::move(data.value())); } else { - CB_LOG_DEBUG("{} the stream is not ready yet, put the message into pending buffer, opaque={}", - log_prefix_, - opaque); + CB_LOG_DEBUG("the stream is not ready yet, put the message into pending buffer", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "opaque", opaque }, + })); const std::scoped_lock lock(pending_buffer_mutex_); if (bootstrapped_ && stream_->is_open()) { write_and_flush(std::move(data.value())); @@ -1405,9 +1581,11 @@ class mcbp_session_impl command_handler&& handler) { if (stopped_) { - CB_LOG_WARNING("{} MCBP cancel operation, while trying to write to closed session, opaque={}", - log_prefix_, - opaque); + CB_LOG_WARNING("MCBP cancel operation, while trying to write to closed session", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "opaque", opaque }, + })); handler(errc::common::request_canceled, retry_reason::socket_closed_while_in_flight, {}, {}); return; } @@ -1418,9 +1596,11 @@ class mcbp_session_impl if (bootstrapped_ && stream_->is_open()) { write_and_flush(std::move(data)); } else { - CB_LOG_DEBUG("{} the stream is not ready yet, put the message into pending buffer, opaque={}", - log_prefix_, - opaque); + CB_LOG_DEBUG("the stream is not ready yet, put the message into pending buffer", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "opaque", opaque }, + })); const std::scoped_lock lock(pending_buffer_mutex_); if (bootstrapped_ && stream_->is_open()) { write_and_flush(std::move(data)); @@ -1437,11 +1617,12 @@ class mcbp_session_impl } command_handlers_mutex_.lock(); if (auto handler = command_handlers_.find(opaque); handler != command_handlers_.end()) { - CB_LOG_DEBUG("{} MCBP cancel operation, opaque={}, ec={} ({})", - log_prefix_, - opaque, - ec.value(), - ec.message()); + CB_LOG_DEBUG("MCBP cancel operation", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "opaque", opaque }, + { "ec", ec.message() }, + })); if (handler->second) { auto fun = std::move(handler->second); command_handlers_.erase(handler); @@ -1558,28 +1739,36 @@ class mcbp_session_impl // should already have a config w/ a non-empty vbucket map (bootstrap will not complete // successfully unless we have a config w/ a non-empty vbucket map). if (config.vbmap && config.vbmap->empty()) { - CB_LOG_DEBUG("{} received a configuration with an empty vbucket map, ignoring", log_prefix_); + CB_LOG_DEBUG("received a configuration with an empty vbucket map, ignoring", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + })); return; } if (config_) { if (config_->vbmap && config.vbmap && config_->vbmap->size() != config.vbmap->size()) { - CB_LOG_DEBUG("{} received a configuration with a different number of vbuckets, ignoring", - log_prefix_); + CB_LOG_DEBUG("received a configuration with a different number of vbuckets, ignoring", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + })); return; } if (config == config_) { - CB_LOG_TRACE( - "{} received a configuration with identical revision (new={}, old={}), ignoring", - log_prefix_, - config.rev_str(), - config_->rev_str()); + CB_LOG_TRACE("received a configuration with identical revision, ignoring", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "old_rev", config.rev_str() }, + { "new_rev", config_->rev_str() }, + })); return; } if (config < config_) { - CB_LOG_DEBUG("{} received a configuration with older revision (new={}, old={}), ignoring", - log_prefix_, - config.rev_str(), - config_->rev_str()); + CB_LOG_DEBUG("received a configuration with older revision, ignoring", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "old_rev", config.rev_str() }, + { "new_rev", config_->rev_str() }, + })); return; } } @@ -1638,21 +1827,23 @@ class mcbp_session_impl msg.body.size() - static_cast(offset) }; if (origin_.options().dump_configuration) { CB_LOG_TRACE( - "{} configuration from not_my_vbucket response (size={}, endpoint=\"{}:{}\"), {}", - log_prefix_, - config_text.size(), - bootstrap_hostname_, - bootstrap_port_number_, - config_text); + "configuration from not_my_vbucket response", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "size", config_text.size() }, + { "endpoint", fmt::format("{}:{}", bootstrap_hostname_, bootstrap_port_number_) }, + { "text", config_text }, + })); } auto config = protocol::parse_config(config_text, bootstrap_hostname_, bootstrap_port_number_); - CB_LOG_DEBUG( - "{} received not_my_vbucket status for {}, opaque={} with config rev={} in the payload", - log_prefix_, - protocol::client_opcode(msg.header.opcode), - utils::byte_swap(msg.header.opaque), - config.rev_str()); + CB_LOG_DEBUG("received not_my_vbucket status for {opcode}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "opcode", fmt::format("{}", protocol::client_opcode(msg.header.opcode)) }, + { "opaque", utils::byte_swap(msg.header.opaque) }, + { "rev", config.rev_str() }, + })); update_configuration(std::move(config)); } } @@ -1693,16 +1884,18 @@ class mcbp_session_impl return initiate_bootstrap(); } if (retry_bootstrap_on_bucket_not_found_ && ec == errc::common::bucket_not_found) { - CB_LOG_DEBUG(R"({} server returned {} ({}), it must be transient condition, retrying)", - log_prefix_, - ec.value(), - ec.message()); + CB_LOG_DEBUG("server returned bucket_not_found, it must be transient condition, retrying", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + })); return initiate_bootstrap(); } if (!origin_.exhausted() && ec == errc::common::authentication_failure) { - CB_LOG_DEBUG( - R"({} server returned authentication_failure, but the bootstrap list is not exhausted yet. It must be transient condition, retrying)", - log_prefix_); + CB_LOG_DEBUG("server returned authentication_failure, but the bootstrap list is not " + "exhausted yet. It must be transient condition, retrying", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + })); return initiate_bootstrap(); } @@ -1756,16 +1949,22 @@ class mcbp_session_impl connection_deadline_.cancel(); last_active_ = std::chrono::steady_clock::now(); if (ec) { - CB_LOG_ERROR("{} error on resolve: {} ({})", log_prefix_, ec.value(), ec.message()); + CB_LOG_ERROR("error on resolve", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "ec", ec.message() }, + })); last_bootstrap_error_ = { ec, ec.message(), bootstrap_hostname_, bootstrap_port_ }; return initiate_bootstrap(); } endpoints_ = endpoints; - CB_LOG_TRACE("{} resolved \"{}:{}\" to {} endpoint(s)", - log_prefix_, - bootstrap_hostname_, - bootstrap_port_, - endpoints_.size()); + CB_LOG_TRACE( + "resolved {bootstrap_address} to {number_of_endpoints} endpoint(s)", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "bootstrap_address", fmt::format("{}:{}", bootstrap_hostname_, bootstrap_port_) }, + { "number_of_endpoints", endpoints_.size() }, + })); do_connect(endpoints_.begin()); connection_deadline_.expires_after(origin_.options().resolve_timeout); connection_deadline_.async_wait([self = shared_from_this()](const auto timer_ec) { @@ -1785,25 +1984,27 @@ class mcbp_session_impl if (it != endpoints_.end()) { auto hostname = it->endpoint().address().to_string(); auto port = it->endpoint().port(); - CB_LOG_DEBUG("{} connecting to {}:{} (\"{}:{}\"), timeout={}ms", - log_prefix_, - hostname, - port, - bootstrap_hostname_, - bootstrap_port_, - origin_.options().connect_timeout.count()); + CB_LOG_DEBUG( + "connecting to {address} ({bootstrap_address})", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "address", fmt::format("{}:{}", hostname, port) }, + { "bootstrap_address", fmt::format("{}:{}", bootstrap_hostname_, bootstrap_port_) }, + { "timeout", fmt::format("{}", origin_.options().connect_timeout) }, + })); connection_deadline_.expires_after(origin_.options().connect_timeout); connection_deadline_.async_wait( [self = shared_from_this(), hostname, port](const auto timer_ec) { if (timer_ec == asio::error::operation_aborted || self->stopped_) { return; } - CB_LOG_DEBUG("{} unable to connect to {}:{} (\"{}:{}\") in time, reconnecting", - self->log_prefix_, - hostname, - port, - self->bootstrap_hostname_, - self->bootstrap_port_); + CB_LOG_DEBUG("unable to connect to {address} ({bootstrap_address}) in time, reconnecting", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "address", fmt::format("{}:{}", hostname, port) }, + { "bootstrap_address", + fmt::format("{}:{}", self->bootstrap_hostname_, self->bootstrap_port_) }, + })); self->initiate_bootstrap(); }); stream_->async_connect(it->endpoint(), [capture0 = shared_from_this(), it](auto&& PH1) { @@ -1817,7 +2018,12 @@ class mcbp_session_impl last_bootstrap_error_ = { errc::network::no_endpoints_left, std::move(error_msg), bootstrap_hostname_, bootstrap_port_ }; - CB_LOG_ERROR("{} {}", log_prefix_, last_bootstrap_error_.value().error_message); + CB_LOG_ERROR( + last_bootstrap_error_->error_message, + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "bootstrap_address", fmt::format("{}:{}", bootstrap_hostname_, bootstrap_port_) }, + })); if (state_listener_) { state_listener_->report_bootstrap_error( fmt::format("{}:{}", bootstrap_hostname_, bootstrap_port_), @@ -1844,25 +2050,34 @@ class mcbp_session_impl ? ERR_error_string(static_cast(ec.value()), nullptr) : ec.message(); #endif - CB_LOG_WARNING("{} unable to connect to {}:{}: {} ({}){}. is_open={}", - log_prefix_, - it->endpoint().address().to_string(), - it->endpoint().port(), - ec.value(), - error_message, - (ec == asio::error::connection_refused) - ? ", check server ports and cluster encryption setting" - : "", - stream_->is_open()); + CB_LOG_WARNING( + "unable to connect to {endpoint}: {ec} ({ec_text}) {note}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "endpoint", + fmt::format("{}:{}", it->endpoint().address().to_string(), it->endpoint().port()) }, + { "ec", ec.value() }, + { "ec_text", error_message }, + { "note", + (ec == asio::error::connection_refused) + ? "check server ports and cluster encryption setting" + : "" }, + { "is_open", stream_->is_open() }, + })); if (stream_->is_open()) { stream_->close([self = shared_from_this(), next_address = ++it](std::error_code ec) { if (ec) { CB_LOG_WARNING( - "{} unable to close socket, but continue connecting attempt to {}:{}: {}", - self->log_prefix_, - next_address->endpoint().address().to_string(), - next_address->endpoint().port(), - ec.value()); + "unable to close socket, but continue connecting attempt to {next_address}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "ec", ec.value() }, + { "ec_text", ec.message() }, + { "next_address", + fmt::format("{}:{}", + next_address->endpoint().address().to_string(), + next_address->endpoint().port()) }, + })); } self->do_connect(next_address); }); @@ -1872,10 +2087,14 @@ class mcbp_session_impl } else { stream_->set_options(); connection_endpoints_ = { it->endpoint(), stream_->local_endpoint() }; - CB_LOG_DEBUG("{} connected to {}:{}", - log_prefix_, - connection_endpoints_.remote_address, - connection_endpoints_.remote.port()); + CB_LOG_DEBUG( + "connected to {address}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", log_prefix_ }, + { "address", + fmt::format( + "{}:{}", connection_endpoints_.remote_address, connection_endpoints_.remote.port()) }, + })); log_prefix_ = fmt::format("[{}/{}/{}/{}] <{}/{}:{}>", client_id_, id_, @@ -1896,12 +2115,17 @@ class mcbp_session_impl if (timer_ec == asio::error::operation_aborted || self->stopped_) { return; } - CB_LOG_DEBUG("{} unable to boostrap single node at {}:{} (\"{}:{}\") in time, reconnecting", - self->log_prefix_, - self->connection_endpoints_.remote_address, - self->connection_endpoints_.remote.port(), - self->bootstrap_hostname_, - self->bootstrap_port_); + CB_LOG_DEBUG( + "unable to boostrap single node at {address} ({bootstrap_address}) in time, reconnecting", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "address", + fmt::format("{}:{}", + self->connection_endpoints_.remote_address, + self->connection_endpoints_.remote.port()) }, + { "bootstrap_address", + fmt::format("{}:{}", self->bootstrap_hostname_, self->bootstrap_port_) }, + })); return self->initiate_bootstrap(); }); } @@ -1919,40 +2143,45 @@ class mcbp_session_impl std::size_t bytes_transferred) { if (ec == asio::error::operation_aborted || self->stopped_) { self->reading_ = false; - CB_LOG_PROTOCOL("[MCBP, IN] host=\"{}\", port={}, rc={}, bytes_received={}", - self->connection_endpoints_.remote_address, - self->connection_endpoints_.remote.port(), - ec ? ec.message() : "ok", - bytes_transferred); + // CB_LOG_PROTOCOL("[MCBP, IN] host=\"{}\", port={}, rc={}, + // bytes_received={}", + // self->connection_endpoints_.remote_address, + // self->connection_endpoints_.remote.port(), + // ec ? ec.message() : "ok", + // bytes_transferred); return; } - CB_LOG_PROTOCOL("[MCBP, IN] host=\"{}\", port={}, rc={}, bytes_received={}{:a}", - self->connection_endpoints_.remote_address, - self->connection_endpoints_.remote.port(), - ec ? ec.message() : "ok", - bytes_transferred, - spdlog::to_hex(self->input_buffer_.data(), - self->input_buffer_.data() + - static_cast(bytes_transferred))); + // CB_LOG_PROTOCOL("[MCBP, IN] host=\"{}\", port={}, rc={}, + // bytes_received={}{:a}", + // self->connection_endpoints_.remote_address, + // self->connection_endpoints_.remote.port(), + // ec ? ec.message() : "ok", + // bytes_transferred, + // spdlog::to_hex(self->input_buffer_.data(), + // self->input_buffer_.data() + + // static_cast(bytes_transferred))); self->last_active_ = std::chrono::steady_clock::now(); if (ec) { self->reading_ = false; if (stream_id != self->stream_->id()) { - CB_LOG_ERROR( - R"({} ignore IO error while reading from the socket: {} ({}), old_id="{}", new_id="{}")", - self->log_prefix_, - ec.value(), - ec.message(), - stream_id, - self->stream_->id()); + CB_LOG_ERROR("ignore IO error while reading from the socket", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "ec", ec.value() }, + { "ec_text", ec.message() }, + { "old_stream_id", stream_id }, + { "new_stream_id", self->stream_->id() }, + })); return; } - CB_LOG_ERROR(R"({} IO error while reading from the socket("{}"): {} ({}))", - self->log_prefix_, - self->stream_->id(), - ec.value(), - ec.message()); + CB_LOG_ERROR("IO error while reading from the socket", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "stream_id", self->stream_->id() }, + { "ec", ec.value() }, + { "ec_text", ec.message() }, + })); return self->stop(retry_reason::socket_closed_while_in_flight); } self->parser_.feed(self->input_buffer_.data(), @@ -1966,8 +2195,11 @@ class mcbp_session_impl if (self->stopped_) { return; } - CB_LOG_TRACE( - "{} MCBP recv {}", self->log_prefix_, mcbp_header_view(msg.header_data())); + CB_LOG_TRACE("MCBP recv", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "header", fmt::format("{}", mcbp_header_view(msg.header_data())) }, + })); if (self->bootstrapped_) { self->handler_->handle(std::move(msg)); } else if (self->bootstrap_handler_) { @@ -2003,31 +2235,34 @@ class mcbp_session_impl std::vector buffers; buffers.reserve(writing_buffer_.size()); for (auto& buf : writing_buffer_) { - CB_LOG_PROTOCOL("[MCBP, OUT] host=\"{}\", port={}, buffer_size={}{:a}", - connection_endpoints_.remote_address, - connection_endpoints_.remote.port(), - buf.size(), - spdlog::to_hex(buf)); + // CB_LOG_PROTOCOL("[MCBP, OUT] host=\"{}\", port={}, buffer_size={}{:a}", + // connection_endpoints_.remote_address, + // connection_endpoints_.remote.port(), + // buf.size(), + // spdlog::to_hex(buf)); buffers.emplace_back(asio::buffer(buf)); } stream_->async_write( - buffers, [self = shared_from_this()](std::error_code ec, std::size_t bytes_transferred) { - CB_LOG_PROTOCOL("[MCBP, OUT] host=\"{}\", port={}, rc={}, bytes_sent={}", - self->connection_endpoints_.remote_address, - self->connection_endpoints_.remote.port(), - ec ? ec.message() : "ok", - bytes_transferred); + buffers, + [self = shared_from_this()](std::error_code ec, std::size_t /* bytes_transferred */) { + // CB_LOG_PROTOCOL("[MCBP, OUT] host=\"{}\", port={}, rc={}, bytes_sent={}", + // self->connection_endpoints_.remote_address, + // self->connection_endpoints_.remote.port(), + // ec ? ec.message() : "ok", + // bytes_transferred); if (ec == asio::error::operation_aborted || self->stopped_) { return; } self->last_active_ = std::chrono::steady_clock::now(); if (ec) { - CB_LOG_ERROR(R"({} IO error while writing to the socket("{}"): {} ({}))", - self->log_prefix_, - self->stream_->id(), - ec.value(), - ec.message()); + CB_LOG_ERROR("IO error while writing to the socket", + opentelemetry::common::MakeAttributes({ + { "log_prefix", self->log_prefix_ }, + { "stream_id", self->stream_->id() }, + { "ec", ec.value() }, + { "ec_text", ec.message() }, + })); return self->stop(retry_reason::socket_closed_while_in_flight); } { diff --git a/core/io/retry_orchestrator.hxx b/core/io/retry_orchestrator.hxx index 7c3a312e2..2f8d1e8cf 100644 --- a/core/io/retry_orchestrator.hxx +++ b/core/io/retry_orchestrator.hxx @@ -17,8 +17,8 @@ #pragma once -#include "core/logger/logger.hxx" #include "core/protocol/client_opcode_fmt.hxx" +#include "observability/logger.hxx" #include #include @@ -33,8 +33,8 @@ namespace priv { template auto -cap_duration(std::chrono::milliseconds uncapped, - std::shared_ptr command) -> std::chrono::milliseconds +cap_duration(std::chrono::milliseconds uncapped, std::shared_ptr command) + -> std::chrono::milliseconds { auto theoretical_deadline = std::chrono::steady_clock::now() + uncapped; auto absolute_deadline = command->deadline.expiry(); @@ -59,15 +59,18 @@ retry_with_duration(std::shared_ptr manager, { command->request.retries.record_retry_attempt(reason); CB_LOG_TRACE( - R"({} retrying operation {} (duration={}ms, id="{}", vbucket_id={}, reason={}, attempts={}, last_dispatched_to="{}"))", - manager->log_prefix(), - decltype(command->request)::encoded_request_type::body_type::opcode, - duration.count(), - command->id_, - command->request.partition, - reason, - command->request.retries.retry_attempts(), - command->session_ ? command->session_->remote_address() : ""); + "retrying operation {opcode}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", manager->log_prefix() }, + { "opcode", + fmt::format("{}", decltype(command->request)::encoded_request_type::body_type::opcode) }, + { "duration", fmt::format("{}", duration) }, + { "id", command->id_ }, + { "vbucket_id", command->request.partition }, + { "reason", fmt::format("{}", reason) }, + { "attempts", command->request.retries.retry_attempts() }, + { "last_dispatched_to", command->session_ ? command->session_->remote_address() : "" }, + })); manager->schedule_for_retry(command, duration); } @@ -95,14 +98,17 @@ maybe_retry(std::shared_ptr manager, manager, command, reason, priv::cap_duration(action.duration(), command)); } - CB_LOG_TRACE(R"({} not retrying operation {} (id="{}", reason={}, attempts={}, ec={} ({})))", - manager->log_prefix(), - decltype(command->request)::encoded_request_type::body_type::opcode, - command->id_, - reason, - command->request.retries.retry_attempts(), - ec.value(), - ec.message()); + CB_LOG_TRACE( + "not retrying operation {}", + opentelemetry::common::MakeAttributes({ + { "log_prefix", manager->log_prefix() }, + { "opcode", + fmt::format("{}", decltype(command->request)::encoded_request_type::body_type::opcode) }, + { "id", command->id_ }, + { "reason", fmt::format("{}", reason) }, + { "attempts", command->request.retries.retry_attempts() }, + { "ec", ec.message() }, + })); return command->invoke_handler(ec); } diff --git a/core/management/rbac_json.hxx b/core/management/rbac_json.hxx index 0515d0411..4f61918f4 100644 --- a/core/management/rbac_json.hxx +++ b/core/management/rbac_json.hxx @@ -17,9 +17,10 @@ #pragma once +#include "opentelemetry/common/key_value_iterable_view.h" #include "rbac.hxx" -#include "core/logger/logger.hxx" +#include "observability/logger.hxx" #include @@ -37,7 +38,10 @@ struct traits { } else if (domain == "external") { result.domain = couchbase::core::management::rbac::auth_domain::external; } else { - CB_LOG_ERROR(R"("unexpected domain for user with metadata: "{}")", domain); + CB_LOG_ERROR("unexpected domain for user with metadata", + opentelemetry::common::MakeAttributes({ + { "domain", domain }, + })); } result.username = v.at("id").get_string(); if (const auto* display_name = v.find("name"); diff --git a/core/meta/version.cxx b/core/meta/version.cxx index 69ebfe20a..33cddf1e1 100644 --- a/core/meta/version.cxx +++ b/core/meta/version.cxx @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -168,6 +169,7 @@ sdk_build_info() -> std::map #if defined(__GLIBC__) info["libc"] = fmt::format("glibc {}.{}", __GLIBC__, __GLIBC_MINOR__); #endif + info["opentelemetry_api"] = OPENTELEMETRY_VERSION; return info; } diff --git a/core/operations/document_get_all_replicas.hxx b/core/operations/document_get_all_replicas.hxx index da297ea55..f972f7b54 100644 --- a/core/operations/document_get_all_replicas.hxx +++ b/core/operations/document_get_all_replicas.hxx @@ -20,12 +20,13 @@ #include "core/error_context/key_value.hxx" #include "core/impl/get_replica.hxx" #include "core/impl/replica_utils.hxx" -#include "core/logger/logger.hxx" #include "core/operations/document_get.hxx" #include "core/operations/operation_traits.hxx" #include "core/utils/movable_function.hxx" #include "couchbase/error_codes.hxx" +#include "observability/logger.hxx" + #include #include @@ -77,11 +78,12 @@ struct get_all_replicas_request { auto nodes = impl::effective_nodes(id, config, read_preference, origin.options().server_group); if (nodes.empty()) { - CB_LOG_DEBUG( - "Unable to retrieve replicas for \"{}\", server_group={}, number_of_replicas={}", - id, - origin.options().server_group, - config->num_replicas.value_or(0)); + CB_LOG_DEBUG("Unable to retrieve replicas for {document_id}", + opentelemetry::common::MakeAttributes({ + { "document_id", id }, + { "server_group", origin.options().server_group }, + { "number_of_replicas", config->num_replicas.value_or(0) }, + })); return h(response_type{ make_key_value_error_context(errc::key_value::document_irretrievable, id) }); } diff --git a/core/tracing/noop_tracer.hxx b/core/tracing/noop_tracer.hxx index 1268c62d2..4f3d85f93 100644 --- a/core/tracing/noop_tracer.hxx +++ b/core/tracing/noop_tracer.hxx @@ -39,7 +39,7 @@ class noop_span : public couchbase::tracing::request_span /* do nothing */ } - auto uses_tags() const -> bool override + [[nodiscard]] auto uses_tags() const -> bool override { return false; } diff --git a/core/transactions/atr_cleanup_entry.cxx b/core/transactions/atr_cleanup_entry.cxx index 7bc122487..054a806f0 100644 --- a/core/transactions/atr_cleanup_entry.cxx +++ b/core/transactions/atr_cleanup_entry.cxx @@ -96,7 +96,11 @@ atr_cleanup_entry::atr_cleanup_entry(const std::shared_ptr& ctx void atr_cleanup_entry::clean(transactions_cleanup_attempt* result) { - CB_ATTEMPT_CLEANUP_LOG_TRACE("cleaning {}", *this); + CB_LOG_TRACE("cleaning {cleanup_entry_ptr}", + opentelemetry::common::MakeAttributes({ + { "subsystem", "transactions_cleanup" }, + { "cleanup_entry_ptr", fmt::format("{}", *this) }, + })); // get atr entry if needed const atr_entry entry; if (nullptr == atr_entry_) { @@ -110,10 +114,18 @@ atr_cleanup_entry::clean(transactions_cleanup_attempt* result) atr_entry_ = &(*it); return check_atr_and_cleanup(result); } - CB_ATTEMPT_CLEANUP_LOG_TRACE("could not find attempt {}, nothing to clean", attempt_id_); + CB_LOG_TRACE("could not find attempt {attempt_id}, nothing to clean", + opentelemetry::common::MakeAttributes({ + { "subsystem", "transactions_cleanup" }, + { "attempt_id", attempt_id_ }, + })); return; } - CB_ATTEMPT_CLEANUP_LOG_TRACE("could not find atr {}, nothing to clean", atr_id_); + CB_LOG_TRACE("could not find atr {attempt_id}, nothing to clean", + opentelemetry::common::MakeAttributes({ + { "subsystem", "transactions_cleanup" }, + { "attempt_id", attempt_id_ }, + })); return; } check_atr_and_cleanup(result); @@ -173,8 +185,11 @@ atr_cleanup_entry::cleanup_docs(durability_level dl) remove_txn_links(atr_entry_->removed_ids(), dl); break; default: - CB_ATTEMPT_CLEANUP_LOG_TRACE("attempt in {}, nothing to do in cleanup_docs", - attempt_state_name(atr_entry_->state())); + CB_LOG_TRACE("attempt in {state}, nothing to do in cleanup_docs", + opentelemetry::common::MakeAttributes({ + { "subsystem", "transactions_cleanup" }, + { "state", attempt_state_name(atr_entry_->state()) }, + })); } } diff --git a/couchbase/error_context.hxx b/couchbase/error_context.hxx index 7e1ed4265..99b1dbcc4 100644 --- a/couchbase/error_context.hxx +++ b/couchbase/error_context.hxx @@ -39,6 +39,9 @@ public: [[nodiscard]] auto to_json( error_context_json_format format = error_context_json_format::compact) const -> std::string; + [[nodiscard]] auto to_attributes( + error_context_json_format format = error_context_json_format::compact) const -> std::string; + [[nodiscard]] auto impl() const -> std::shared_ptr; private: diff --git a/couchbase/metrics/otel_meter.hxx b/couchbase/metrics/otel_meter.hxx deleted file mode 100644 index 122492fcc..000000000 --- a/couchbase/metrics/otel_meter.hxx +++ /dev/null @@ -1,139 +0,0 @@ -/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ -/* - * Copyright 2021 Couchbase, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "opentelemetry/sdk/metrics/meter.h" -#include - -#include -#include -#include -#include - -using couchbase::metrics::meter; -using couchbase::metrics::value_recorder; - -namespace nostd = opentelemetry::nostd; -namespace metrics_api = opentelemetry::metrics; -namespace metrics_sdk = opentelemetry::sdk::metrics; - -namespace couchbase::metrics -{ - -class otel_sync_histogram -{ -public: - otel_sync_histogram(nostd::shared_ptr> histogram_counter) - : histogram_counter_(histogram_counter) - { - } - - void record(std::uint64_t value, - const opentelemetry::common::KeyValueIterable& tags, - opentelemetry::context::Context& ctx) - { - histogram_counter_->Record(value, tags, ctx); - } - -private: - nostd::shared_ptr> histogram_counter_; - std::mutex mutex_; -}; - -class otel_value_recorder : public couchbase::metrics::value_recorder -{ -public: - explicit otel_value_recorder( - nostd::shared_ptr> histogram_counter, - const std::map& tags) - : histogram_counter_(histogram_counter) - , tags_(tags) - { - } - void record_value(std::int64_t value) override - { - value = std::max(value, 0); - auto uvalue = static_cast(value); - histogram_counter_->Record( - uvalue, opentelemetry::common::KeyValueIterableView{ tags_ }, context_); - } - - const std::map tags() - { - return tags_; - } - - nostd::shared_ptr> histogram_counter() - { - return histogram_counter_; - } - -private: - nostd::shared_ptr> histogram_counter_; - const std::map tags_; - opentelemetry::context::Context context_{}; - std::mutex mutex_; -}; - -class otel_meter : public couchbase::metrics::meter -{ -public: - explicit otel_meter(nostd::shared_ptr meter) - : meter_(meter) - { - } - - auto get_value_recorder(const std::string& name, const std::map& tags) - -> std::shared_ptr override - { - // first look up the histogram, in case we already have it... - std::scoped_lock lock(mutex_); - auto it = recorders_.equal_range(name); - if (it.first == it.second) { - // this name isn't associated with any histogram, so make one and return it. - // Note we'd like to make one with more buckets than default, given the range of - // response times we'd like to display (queries vs kv for instance), but otel - // api doesn't seem to allow this. - return recorders_ - .insert({ name, - std::make_shared( - meter_->CreateUInt64Histogram(name, "", "us"), tags) }) - ->second; - } - // so it is already, lets see if we already have one with those tags, or need - // to make a new one (using the histogram we already have). - for (auto itr = it.first; itr != it.second; itr++) { - if (tags == itr->second->tags()) { - return itr->second; - } - } - // if you are here, we need to add one with these tags and the histogram associated with the - // name. - return recorders_ - .insert( - { name, - std::make_shared(it.first->second->histogram_counter(), tags) }) - ->second; - } - -private: - nostd::shared_ptr meter_; - std::mutex mutex_; - std::multimap> recorders_; -}; -} // namespace couchbase::metrics diff --git a/couchbase/tracing/otel_tracer.hxx b/couchbase/tracing/otel_tracer.hxx deleted file mode 100644 index 16c3828db..000000000 --- a/couchbase/tracing/otel_tracer.hxx +++ /dev/null @@ -1,86 +0,0 @@ -/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ -/* - * Copyright 2021 Couchbase, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include - -namespace nostd = opentelemetry::nostd; -namespace couchbase::tracing -{ -class otel_request_span : public couchbase::tracing::request_span -{ -public: - explicit otel_request_span(nostd::shared_ptr span) - : span_(std::move(span)) - { - } - void add_tag(const std::string& name, const std::string& value) override - { - span_->SetAttribute(name, value); - } - void add_tag(const std::string& name, uint64_t value) override - { - span_->SetAttribute(name, value); - } - void end() override - { - span_->End(); - } - nostd::shared_ptr wrapped_span() - { - return span_; - } - -private: - nostd::shared_ptr span_; -}; - -class otel_request_tracer : public couchbase::tracing::request_tracer -{ -public: - otel_request_tracer(nostd::shared_ptr tracer) - : tracer_(std::move(tracer)) - { - } - - auto start_span(std::string name, std::shared_ptr parent = {}) - -> std::shared_ptr override - { - auto wrapped_parent = std::dynamic_pointer_cast(parent); - if (wrapped_parent) { - opentelemetry::trace::StartSpanOptions opts; - opts.parent = wrapped_parent->wrapped_span()->GetContext(); - return std::make_shared(tracer_->StartSpan(name, opts)); - } - return std::make_shared(tracer_->StartSpan(name)); - } - - auto wrap_span(nostd::shared_ptr span) - -> std::shared_ptr - { - return std::make_shared(span); - } - -private: - nostd::shared_ptr tracer_; -}; -} // namespace couchbase::tracing \ No newline at end of file diff --git a/couchbase/tracing/request_tracer.hxx b/couchbase/tracing/request_tracer.hxx index 157299422..8cd08d688 100644 --- a/couchbase/tracing/request_tracer.hxx +++ b/couchbase/tracing/request_tracer.hxx @@ -51,7 +51,7 @@ public: /* do nothing */ } - virtual auto start_span(std::string name, std::shared_ptr parent = {}) + virtual auto start_span(std::string name, std::shared_ptr parent) -> std::shared_ptr = 0; }; diff --git a/examples/async_game_server.cxx b/examples/async_game_server.cxx index 110ae0c55..97c5d3aff 100644 --- a/examples/async_game_server.cxx +++ b/examples/async_game_server.cxx @@ -23,7 +23,6 @@ #include -#include #include #include #include @@ -31,8 +30,8 @@ using namespace couchbase::transactions; -std::string -make_uuid() +auto +make_uuid() -> std::string { static std::random_device dev; static std::mt19937 rng(dev()); @@ -138,31 +137,31 @@ class GameServer { } - [[nodiscard]] int calculate_level_for_experience(int experience) const + [[nodiscard]] auto calculate_level_for_experience(int experience) const -> int { return experience / 100; } - std::future> player_hits_monster( - int damage, - const couchbase::collection& collection, - const std::string& player_id, - const std::string& monster_id, - std::atomic& exists) + auto player_hits_monster(int damage, + const couchbase::collection& collection, + const std::string& player_id, + const std::string& monster_id, + std::atomic& exists) + -> std::future> { auto barrier = std::make_shared>>(); auto f = barrier->get_future(); transactions_->run( [this, damage, collection, player_id, monster_id, &exists]( - std::shared_ptr ctx) -> couchbase::error { + const std::shared_ptr& ctx) -> couchbase::error { ctx->get( collection, monster_id, - [ctx, this, collection, monster_id, player_id, &exists, damage = std::move(damage)]( - auto e, auto monster) { + [ctx, this, collection, monster_id, player_id, &exists, damage](const auto& e, + const auto& monster) { if (e.ec() == couchbase::errc::transaction_op::document_not_found) { - std::cout << "monster no longer exists" << std::endl; + std::cout << "monster no longer exists\n"; exists = false; return; } @@ -172,22 +171,23 @@ class GameServer std::cout << "Monster " << monster_id << " had " << monster_hitpoints << " hitpoints, took " << damage << " damage, now has " - << monster_new_hitpoints << " hitpoints" << std::endl; + << monster_new_hitpoints << " hitpoints" << "\n"; if (monster_new_hitpoints <= 0) { // Monster is killed. The remove is just for demoing, and a more realistic examples // would set a "dead" flag or similar. - ctx->remove(monster, [](auto e) { + ctx->remove(monster, [](const auto& e) { if (e.ec()) { - std::cout << "error removing monster: " << e.ec().message() << std::endl; + std::cout << "error removing monster: " << e.ec().message() << "\n"; } }); // also, in parallel, get/update player ctx->get( collection, player_id, - [ctx, player_id, monster_id, monster_body, this](auto e, auto player) { + [ctx, player_id, monster_id, monster_body, this](const auto& e, + const auto& player) { if (e.ec()) { - std::cout << "error getting player: " << e.ec().message() << std::endl; + std::cout << "error getting player: " << e.ec().message() << "\n"; return; } const Player& player_body = player.template content_as(); @@ -200,45 +200,49 @@ class GameServer std::cout << "Monster " << monster_id << " was killed. Player " << player_id << " gains " << experience_for_killing_monster - << " experience, now has level " << player_new_level << std::endl; + << " experience, now has level " << player_new_level << "\n"; Player player_new_body = player_body; player_new_body.experience = player_new_experience; player_new_body.level = player_new_level; - ctx->replace(player, player_new_body, [](auto e, auto) { + ctx->replace(player, player_new_body, [](const auto& e, const auto& /* res */) { if (e.ec()) { - std::cout << "Error updating player :" << e.ec().message() << std::endl; + std::cout << "Error updating player :" << e.ec().message() << "\n"; } }); }); } else { - std::cout << "Monster " << monster_id << " is damaged but alive" << std::endl; + std::cout << "Monster " << monster_id << " is damaged but alive" << "\n"; Monster monster_new_body = monster_body; monster_new_body.hitpoints = monster_new_hitpoints; - ctx->replace(monster, monster_new_body, [monster_new_body](auto e, auto res) { - if (e.ec()) { - std::cout << "Error updating monster :" << e.ec().message() << std::endl; - } else { - auto body = couchbase::codec::tao_json_serializer::serialize(monster_new_body); - std::cout << "Monster body updated to :" - << std::string(reinterpret_cast(&body.front()), body.size()) - << std::endl; - } - }); + ctx->replace(monster, + monster_new_body, + [monster_new_body](const auto& e, const auto& /* res */) { + if (e.ec()) { + std::cout << "Error updating monster :" << e.ec().message() << "\n"; + } else { + auto body = + couchbase::codec::tao_json_serializer::serialize(monster_new_body); + std::cout + << "Monster body updated to :" + << std::string(reinterpret_cast(&body.front()), body.size()) + << "\n"; + } + }); } }); return {}; }, [barrier](auto err, auto res) { - barrier->set_value({ err, res }); + barrier->set_value({ std::move(err), std::move(res) }); }); return f; } }; -int -main() +auto +main() -> int { couchbase::logger::initialize_console_logger(); couchbase::logger::set_level(couchbase::logger::log_level::trace); @@ -248,8 +252,10 @@ main() std::string bucket_name = "default"; std::uniform_int_distribution hit_distribution(1, 6); - std::mt19937 random_number_engine; // pseudorandom number generator - auto rand = std::bind(hit_distribution, random_number_engine); + std::default_random_engine random_number_engine{ std::random_device{}() }; + auto rand = [&hit_distribution, &random_number_engine] { + return hit_distribution(random_number_engine); + }; auto options = couchbase::cluster_options("Administrator", "password"); options.transactions().cleanup_config().cleanup_window(std::chrono::seconds(60)); @@ -259,11 +265,11 @@ main() auto [connect_err, cluster] = couchbase::cluster::connect("couchbase://localhost", options).get(); if (connect_err) { - std::cout << "Error opening cluster: " << fmt::format("{}", connect_err) << std::endl; + std::cout << "Error opening cluster: " << fmt::format("{}", connect_err) << "\n"; return -1; } - auto collection = cluster.bucket("default").default_collection(); + auto collection = cluster.bucket(bucket_name).default_collection(); std::string player_id{ "player_data" }; Player player_data{ 14248, 23832, "player", 141, true, "Jane", make_uuid() }; @@ -276,7 +282,7 @@ main() auto [err, resp] = collection.upsert(player_id, player_data, {}).get(); if (!err) { std::cout << "Upserted sample player document: " << player_id - << "with cas:" << resp.cas().value() << std::endl; + << "with cas:" << resp.cas().value() << "\n"; } } // upsert a monster document @@ -284,7 +290,7 @@ main() auto [err, resp] = collection.upsert(monster_id, monster_data, {}).get(); if (!err) { std::cout << "Upserted sample monster document: " << monster_id - << "with cas:" << resp.cas().value() << std::endl; + << "with cas:" << resp.cas().value() << "\n"; } } @@ -297,21 +303,21 @@ main() while (monster_exists.load()) { try { std::cout << "[thread " << std::this_thread::get_id() - << "]Monster exists -- lets hit it!" << std::endl; + << "]Monster exists -- lets hit it!" << "\n"; auto [err, res] = game_server .player_hits_monster(rand() % 80, collection, player_id, monster_id, monster_exists) .get(); if (!err.ec()) { - std::cout << "[thread " << std::this_thread::get_id() << "] success" << std::endl; + std::cout << "[thread " << std::this_thread::get_id() << "] success" << "\n"; } else { std::cout << "[thread " << std::this_thread::get_id() << "] " << err.ec().message() - << std::endl; + << "\n"; } } catch (const std::exception& e) { std::cout << "[thread " << std::this_thread::get_id() << "] got exception " << e.what() - << std::endl; + << "\n"; } } }); diff --git a/examples/bulk_base_api.cxx b/examples/bulk_base_api.cxx index a9602ac89..866c03b33 100644 --- a/examples/bulk_base_api.cxx +++ b/examples/bulk_base_api.cxx @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -40,6 +41,7 @@ struct program_arguments { static auto load_from_environment() -> program_arguments { + // NOLINTBEGIN(concurrency-mt-unsafe) program_arguments arguments; if (const auto* val = getenv("CB_CONNECTION_STRING"); val != nullptr && val[0] != '\0') { arguments.connection_string = val; @@ -73,12 +75,13 @@ struct program_arguments { arguments.document_body_size = int_val; } } + // NOLINTEND(concurrency-mt-unsafe) return arguments; } }; -std::string -random_text(std::size_t length) +auto +random_text(std::size_t length) -> std::string { std::string alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; static thread_local std::mt19937_64 gen{ std::random_device()() }; @@ -137,14 +140,13 @@ run_workload_sequential(const couchbase::collection& collection, const program_a } auto exec_end = std::chrono::system_clock::now(); - fmt::print( - "\rExecuted {} upsert operations in {}ms ({}us, {}s), average latency: {}ms\n", - arguments.number_of_operations, - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count() / - arguments.number_of_operations); + fmt::print("\rExecuted {} upsert operations in {} ({}, {}), average latency: {}\n", + arguments.number_of_operations, + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start) / + arguments.number_of_operations); if (errors.empty()) { fmt::print("\tAll operations completed successfully\n"); @@ -169,14 +171,13 @@ run_workload_sequential(const couchbase::collection& collection, const program_a } auto exec_end = std::chrono::system_clock::now(); - fmt::print( - "\rExecuted {} get operations in {}ms ({}us, {}s), average latency: {}ms\n", - arguments.number_of_operations, - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count() / - arguments.number_of_operations); + fmt::print("\rExecuted {} get operations in {} ({}, {}), average latency: {}\n", + arguments.number_of_operations, + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start) / + arguments.number_of_operations); if (errors.empty()) { fmt::print("\tAll operations completed successfully\n"); @@ -250,24 +251,19 @@ run_workload_bulk(const couchbase::collection& collection, const program_argumen } auto completion_end = std::chrono::system_clock::now(); fmt::print( - "\rCompleted {} upsert operations in {}ms ({}us, {}s)\n", + "\rCompleted {} upsert operations in {} ({}, {})\n", arguments.number_of_operations, - std::chrono::duration_cast(completion_end - completion_start) - .count(), - std::chrono::duration_cast(completion_end - completion_start) - .count(), - std::chrono::duration_cast(completion_end - completion_start).count()); + std::chrono::duration_cast(completion_end - completion_start), + std::chrono::duration_cast(completion_end - completion_start), + std::chrono::duration_cast(completion_end - completion_start)); fmt::print( - "Executed {} upsert operations in {}ms ({}us, {}s), average latency: {}ms\n", + "Executed {} upsert operations in {} ({}, {}), average latency: {}\n", arguments.number_of_operations, - std::chrono::duration_cast(completion_end - schedule_start) - .count(), - std::chrono::duration_cast(completion_end - schedule_start) - .count(), - std::chrono::duration_cast(completion_end - schedule_start).count(), - std::chrono::duration_cast(completion_end - schedule_start) - .count() / + std::chrono::duration_cast(completion_end - schedule_start), + std::chrono::duration_cast(completion_end - schedule_start), + std::chrono::duration_cast(completion_end - schedule_start), + std::chrono::duration_cast(completion_end - schedule_start) / arguments.number_of_operations); if (errors.empty()) { @@ -310,24 +306,19 @@ run_workload_bulk(const couchbase::collection& collection, const program_argumen } auto completion_end = std::chrono::system_clock::now(); fmt::print( - "\rCompleted {} get operations in {}ms ({}us, {}s)\n", + "\rCompleted {} get operations in {} ({}, {})\n", arguments.number_of_operations, - std::chrono::duration_cast(completion_end - completion_start) - .count(), - std::chrono::duration_cast(completion_end - completion_start) - .count(), - std::chrono::duration_cast(completion_end - completion_start).count()); + std::chrono::duration_cast(completion_end - completion_start), + std::chrono::duration_cast(completion_end - completion_start), + std::chrono::duration_cast(completion_end - completion_start)); fmt::print( - "Executed {} get operations in {}ms ({}us, {}s), average latency: {}ms\n", + "Executed {} get operations in {} ({}, {}), average latency: {}\n", arguments.number_of_operations, - std::chrono::duration_cast(completion_end - schedule_start) - .count(), - std::chrono::duration_cast(completion_end - schedule_start) - .count(), - std::chrono::duration_cast(completion_end - schedule_start).count(), - std::chrono::duration_cast(completion_end - schedule_start) - .count() / + std::chrono::duration_cast(completion_end - schedule_start), + std::chrono::duration_cast(completion_end - schedule_start), + std::chrono::duration_cast(completion_end - schedule_start), + std::chrono::duration_cast(completion_end - schedule_start) / arguments.number_of_operations); if (errors.empty()) { @@ -342,16 +333,16 @@ run_workload_bulk(const couchbase::collection& collection, const program_argumen auto end = std::chrono::system_clock::now(); - fmt::print("Total time for bulk execution {}ms ({}us, {}s)\n", - std::chrono::duration_cast(end - start).count(), - std::chrono::duration_cast(end - start).count(), - std::chrono::duration_cast(end - start).count(), - std::chrono::duration_cast(end - start).count() / + fmt::print("Total time for bulk execution {} ({}, {})\n", + std::chrono::duration_cast(end - start), + std::chrono::duration_cast(end - start), + std::chrono::duration_cast(end - start), + std::chrono::duration_cast(end - start) / arguments.number_of_operations); } -int -main() +auto +main() -> int { auto arguments = program_arguments::load_from_environment(); diff --git a/examples/bulk_transactional_api.cxx b/examples/bulk_transactional_api.cxx index 09afc2187..dba76f734 100644 --- a/examples/bulk_transactional_api.cxx +++ b/examples/bulk_transactional_api.cxx @@ -44,6 +44,8 @@ struct program_arguments { static auto load_from_environment() -> program_arguments { program_arguments arguments; + + // NOLINTBEGIN(concurrency-mt-unsafe) if (const auto* val = getenv("CB_CONNECTION_STRING"); val != nullptr && val[0] != '\0') { arguments.connection_string = val; } @@ -83,12 +85,13 @@ struct program_arguments { arguments.transaction_timeout = std::chrono::seconds{ int_val }; } } + // NOLINTEND(concurrency-mt-unsafe) return arguments; } }; -std::string -random_text(std::size_t length) +auto +random_text(std::size_t length) -> std::string { std::string alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; static thread_local std::mt19937_64 gen{ std::random_device()() }; @@ -181,9 +184,10 @@ run_workload_sequential(const std::shared_ptr errors; auto exec_start = std::chrono::system_clock::now(); - auto [err, result] = transactions->run( - [&collection, &document_ids, &document, &arguments, &errors]( - std::shared_ptr attempt) -> couchbase::error { + auto [err, result] = + transactions->run([&collection, &document_ids, &document, &arguments, &errors]( + const std::shared_ptr& attempt) + -> couchbase::error { for (std::size_t i = 0; i < arguments.number_of_operations; ++i) { auto [err, res] = attempt->insert(collection, document_ids[i], document); if (err.ec()) { @@ -196,15 +200,14 @@ run_workload_sequential(const std::shared_ptr(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count() / - arguments.number_of_operations); + fmt::print("\rExecuted transaction with {} INSERT operations in {} ({}, {}), average latency: " + "{}ms\n", + arguments.number_of_operations, + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start) / + arguments.number_of_operations); if (err.ec()) { fmt::print("\tTransaction completed with error {}, cause={}\n", err.ec().message(), @@ -230,9 +233,10 @@ run_workload_sequential(const std::shared_ptr errors; auto exec_start = std::chrono::system_clock::now(); - auto [err, result] = transactions->run( - [&collection, &document_ids, &arguments, &errors]( - std::shared_ptr attempt) -> couchbase::error { + auto [err, result] = + transactions->run([&collection, &document_ids, &arguments, &errors]( + const std::shared_ptr& attempt) + -> couchbase::error { for (std::size_t i = 0; i < arguments.number_of_operations; ++i) { auto [err, res] = attempt->get(collection, document_ids[i]); if (err.ec()) { @@ -246,12 +250,12 @@ run_workload_sequential(const std::shared_ptr(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count() / + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start) / arguments.number_of_operations); if (err.ec()) { fmt::print("\tTransaction completed with error {}, cause={}\n", @@ -360,42 +364,43 @@ run_workload_bulk(const std::shared_ptr& auto schedule_start = std::chrono::system_clock::now(); transactions->run( [&collection, &document_ids, &document, &arguments, &errors]( - std::shared_ptr attempt) + const std::shared_ptr& attempt) -> couchbase::error { for (std::size_t i = 0; i < arguments.number_of_operations; ++i) { - attempt->insert(collection, document_ids[i], document, [&errors](auto ctx, auto) { - if (ctx.ec()) { - errors[ctx.ec().message()]++; - } - }); + attempt->insert(collection, + document_ids[i], + document, + [&errors](const auto& ctx, const auto& /* res */) { + if (ctx.ec()) { + errors[ctx.ec().message()]++; + } + }); } return {}; }, [tx_promise](auto err, auto result) { - tx_promise->set_value({ err, result }); + tx_promise->set_value({ std::move(err), std::move(result) }); }); auto schedule_end = std::chrono::system_clock::now(); - fmt::print( - "\rScheduled transaction with {} INSERT operations in {}ms ({}us, {}s)\n", - arguments.number_of_operations, - std::chrono::duration_cast(schedule_end - schedule_start).count(), - std::chrono::duration_cast(schedule_end - schedule_start).count(), - std::chrono::duration_cast(schedule_end - schedule_start).count()); + fmt::print("\rScheduled transaction with {} INSERT operations in {} ({}, {})\n", + arguments.number_of_operations, + std::chrono::duration_cast(schedule_end - schedule_start), + std::chrono::duration_cast(schedule_end - schedule_start), + std::chrono::duration_cast(schedule_end - schedule_start)); auto exec_start = std::chrono::system_clock::now(); auto [err, result] = tx_future.get(); auto exec_end = std::chrono::system_clock::now(); - fmt::print( - "\rExecuted transaction with {} INSERT operations in {}ms ({}us, {}s), average latency: " - "{}ms\n", - arguments.number_of_operations, - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count() / - arguments.number_of_operations); + fmt::print("\rExecuted transaction with {} INSERT operations in {} ({}, {}), average latency: " + "{}ms\n", + arguments.number_of_operations, + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start) / + arguments.number_of_operations); if (err.ec()) { fmt::print("\tTransaction completed with error {}, cause={}\n", err.ec().message(), @@ -427,40 +432,40 @@ run_workload_bulk(const std::shared_ptr& auto schedule_start = std::chrono::system_clock::now(); transactions->run( [&collection, &document_ids, &arguments, &errors]( - std::shared_ptr attempt) + const std::shared_ptr& attempt) -> couchbase::error { for (std::size_t i = 0; i < arguments.number_of_operations; ++i) { - attempt->get(collection, document_ids[i], [&errors](auto ctx, auto) { - if (ctx.ec()) { - errors[ctx.ec().message()]++; - } - }); + attempt->get( + collection, document_ids[i], [&errors](const auto& ctx, const auto& /* res */) { + if (ctx.ec()) { + errors[ctx.ec().message()]++; + } + }); } return {}; }, [tx_promise](auto err, auto result) { - tx_promise->set_value({ err, result }); + tx_promise->set_value({ std::move(err), std::move(result) }); }); auto schedule_end = std::chrono::system_clock::now(); - fmt::print( - "\rScheduled transaction with {} GET operations in {}ms ({}us, {}s)\n", - arguments.number_of_operations, - std::chrono::duration_cast(schedule_end - schedule_start).count(), - std::chrono::duration_cast(schedule_end - schedule_start).count(), - std::chrono::duration_cast(schedule_end - schedule_start).count()); + fmt::print("\rScheduled transaction with {} GET operations in {} ({}, {})\n", + arguments.number_of_operations, + std::chrono::duration_cast(schedule_end - schedule_start), + std::chrono::duration_cast(schedule_end - schedule_start), + std::chrono::duration_cast(schedule_end - schedule_start)); auto exec_start = std::chrono::system_clock::now(); auto [err, result] = tx_future.get(); auto exec_end = std::chrono::system_clock::now(); fmt::print( - "\rExecuted transaction with {} GET operations in {}ms ({}us, {}s), average latency: {}ms\n", + "\rExecuted transaction with {} GET operations in {} ({}, {}), average latency: {}\n", arguments.number_of_operations, - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count() / + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start) / arguments.number_of_operations); if (err.ec()) { fmt::print("\tTransaction completed with error {}, cause={}\n", @@ -488,14 +493,14 @@ run_workload_bulk(const std::shared_ptr& } auto end = std::chrono::system_clock::now(); - fmt::print("Total time for bulk execution {}ms ({}us, {}s)\n", - std::chrono::duration_cast(end - start).count(), - std::chrono::duration_cast(end - start).count(), - std::chrono::duration_cast(end - start).count()); + fmt::print("Total time for bulk execution {} ({}, {})\n", + std::chrono::duration_cast(end - start), + std::chrono::duration_cast(end - start), + std::chrono::duration_cast(end - start)); } -int -main() +auto +main() -> int { auto arguments = program_arguments::load_from_environment(); diff --git a/examples/bulk_transactional_get_replace.cxx b/examples/bulk_transactional_get_replace.cxx index ee99e9ede..3c225429e 100644 --- a/examples/bulk_transactional_get_replace.cxx +++ b/examples/bulk_transactional_get_replace.cxx @@ -46,6 +46,8 @@ struct program_arguments { static auto load_from_environment() -> program_arguments { program_arguments arguments; + + // NOLINTBEGIN(concurrency-mt-unsafe) if (const auto* val = getenv("CB_CONNECTION_STRING"); val != nullptr && val[0] != '\0') { arguments.connection_string = val; } @@ -100,12 +102,14 @@ struct program_arguments { arguments.transaction_timeout = std::chrono::seconds{ int_val }; } } + // NOLINTEND(concurrency-mt-unsafe) + return arguments; } }; -std::string -random_text(std::size_t length) +auto +random_text(std::size_t length) -> std::string { std::string alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; static thread_local std::mt19937_64 gen{ std::random_device()() }; @@ -161,7 +165,7 @@ run_workload(const std::shared_ptr& trans for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) { transactions->run( [&collection, &document_ids, &arguments, &errors]( - std::shared_ptr attempt) + const std::shared_ptr& attempt) -> couchbase::error { std::vector selected_keys; std::sample(document_ids.begin(), @@ -172,12 +176,14 @@ run_workload(const std::shared_ptr& trans for (const auto& id : selected_keys) { attempt->get( - collection, id, [attempt, &collection, id, &arguments, &errors](auto ctx, auto res) { + collection, + id, + [attempt, &collection, id, &arguments, &errors](const auto& ctx, auto res) { if (ctx.ec() == couchbase::errc::transaction_op::document_not_found) { attempt->insert(collection, id, generate_document(arguments.document_body_size), - [&errors](auto ctx, auto) { + [&errors](const auto& ctx, const auto& /* res */) { if (ctx.ec()) { errors[ctx.ec().message()]++; } @@ -185,9 +191,9 @@ run_workload(const std::shared_ptr& trans } else if (ctx.ec()) { errors[ctx.ec().message()]++; } else { - attempt->replace(res, + attempt->replace(std::move(res), generate_document(arguments.document_body_size), - [&errors](auto ctx, auto) { + [&errors](const auto& ctx, const auto& /* res */) { if (ctx.ec()) { errors[ctx.ec().message()]++; } @@ -198,18 +204,18 @@ run_workload(const std::shared_ptr& trans return {}; }, [&promise = results[i]](auto err, auto result) { - promise.set_value({ err, result }); + promise.set_value({ std::move(err), std::move(result) }); }); } auto schedule_end = std::chrono::system_clock::now(); fmt::print( - "\rScheduled {} transactions with {} GET+[INSERT|REPLACE] operations in {}ms ({}us, {}s)\n", + "\rScheduled {} transactions with {} GET+[INSERT|REPLACE] operations in {} ({}, {})\n", arguments.number_of_transactions, arguments.number_of_keys_per_transaction, - std::chrono::duration_cast(schedule_end - schedule_start).count(), - std::chrono::duration_cast(schedule_end - schedule_start).count(), - std::chrono::duration_cast(schedule_end - schedule_start).count()); + std::chrono::duration_cast(schedule_end - schedule_start), + std::chrono::duration_cast(schedule_end - schedule_start), + std::chrono::duration_cast(schedule_end - schedule_start)); std::map transactions_errors; auto exec_start = std::chrono::system_clock::now(); @@ -224,16 +230,15 @@ run_workload(const std::shared_ptr& trans } auto exec_end = std::chrono::system_clock::now(); - fmt::print( - "\rExecuted {} transactions with {} GET+[INSERT|REPLACE] operations in {}ms ({}us, {}s), " - "average latency: {}ms\n", - arguments.number_of_transactions, - arguments.number_of_keys_per_transaction, - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count(), - std::chrono::duration_cast(exec_end - exec_start).count() / - arguments.number_of_keys_per_transaction); + fmt::print("\rExecuted {} transactions with {} GET+[INSERT|REPLACE] operations in {} ({}, {}), " + "average latency: {}\n", + arguments.number_of_transactions, + arguments.number_of_keys_per_transaction, + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start), + std::chrono::duration_cast(exec_end - exec_start) / + arguments.number_of_keys_per_transaction); if (transactions_errors.empty()) { fmt::print("\tAll transactions completed successfully\n"); } else { @@ -260,8 +265,8 @@ run_workload(const std::shared_ptr& trans std::chrono::duration_cast(end - start).count()); } -int -main() +auto +main() -> int { auto arguments = program_arguments::load_from_environment(); diff --git a/observability/CMakeLists.txt b/observability/CMakeLists.txt new file mode 100644 index 000000000..35c794996 --- /dev/null +++ b/observability/CMakeLists.txt @@ -0,0 +1,13 @@ +add_library(couchbase_observability fmt_log_record_exporter.cxx logger.cxx otel_tracer.cxx) +set_target_properties(couchbase_observability PROPERTIES POSITION_INDEPENDENT_CODE ON) +target_link_libraries( + couchbase_observability + PUBLIC project_options + project_warnings + opentelemetry_exporter_ostream_logs + opentelemetry_exporter_otlp_http + opentelemetry_exporter_otlp_http_log + opentelemetry_logs + opentelemetry_trace + spdlog::spdlog) +target_include_directories(couchbase_observability PRIVATE ${PROJECT_BINARY_DIR}/generated ${PROJECT_SOURCE_DIR}) diff --git a/observability/fmt_log_record_exporter.cxx b/observability/fmt_log_record_exporter.cxx new file mode 100644 index 000000000..0ade92ed8 --- /dev/null +++ b/observability/fmt_log_record_exporter.cxx @@ -0,0 +1,325 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2025-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fmt_log_record_exporter.hxx" + +#if defined(__GNUC__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsign-conversion" +#elif defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wdeprecated-builtins" +#endif +#include +#include +#include +#if defined(__GNUC__) +#pragma GCC diagnostic pop +#elif defined(__clang__) +#pragma clang diagnostic pop +#endif + +#include +#include +#include + +#include + +#include +#include + +template<> +struct fmt::formatter + : fmt::formatter { + + template + auto format(const opentelemetry::common::SystemTimestamp& ts, FormatContext& ctx) const + { + return fmt::formatter::format( + static_cast(ts), ctx); + } +}; + +namespace +{ +template +auto +to_hex(const IdType& id) -> std::string +{ + std::string buffer(2 * IdType::kSize, '0'); + id.ToLowerBase16(buffer); + return buffer; +} + +auto +trim_quotes(const std::string& s) -> std::string_view +{ + size_t start = 0; + size_t end = s.size(); + + if (!s.empty() && s.front() == '"') { + ++start; + } + if (end > start && s.back() == '"') { + --end; + } + return { s.data() + start, end - start }; +} + +struct log_ids { + const opentelemetry::trace::TraceId& tid; + const opentelemetry::trace::SpanId& sid; +}; + +struct log_body { + const opentelemetry::common::AttributeValue& fmt_string; + const std::unordered_map& params; +}; +} // namespace + +template<> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) const + { + return ctx.begin(); + } + + template + auto format(const log_ids& ids, FormatContext& ctx) const + { + if (ids.tid.IsValid() && ids.sid.IsValid()) { + return fmt::format_to( + ctx.out(), "\t[tid=\"{}\", sid=\"{}\"]", to_hex(ids.tid), to_hex(ids.sid)); + } + return ctx.out(); + } +}; + +template<> +struct fmt::formatter { + template + constexpr auto parse(ParseContext& ctx) + { + return ctx.begin(); + } + + template + auto format(const opentelemetry::common::AttributeValue& value, FormatContext& ctx) const + { + return std::visit( + [&ctx](const auto& v) { + using T = std::decay_t; + if constexpr (std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v>) { + return fmt::format_to(ctx.out(), "[{}]", fmt::join(v, ", ")); + } + if constexpr (std::is_same_v) { + return fmt::format_to(ctx.out(), "\"{}\"", v); + } + if constexpr (std::is_same_v) { + return fmt::format_to(ctx.out(), "\"{}\"", v); + } + return fmt::format_to(ctx.out(), "{}", v); + }, + value); + } +}; + +template<> +struct fmt::formatter> { + template + constexpr auto parse(ParseContext& ctx) + { + return ctx.begin(); + } + + template + auto format( + const std::unordered_map& attributes, + FormatContext& ctx) const + { + if (attributes.empty()) { + return ctx.out(); + } + + auto out = ctx.out(); + out = fmt::format_to(out, "\t{{"); + for (auto it = attributes.begin(); it != attributes.end(); ++it) { + if (it != attributes.begin()) { + out = fmt::format_to(out, ", "); + } + out = fmt::format_to(out, "\"{}\": {}", it->first, it->second); + } + out = fmt::format_to(out, "}}"); + return out; + } +}; + +template<> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) const + { + return ctx.begin(); + } + + template + auto format(const log_body& body, FormatContext& ctx) const + { + std::string quoted_fmt_string = fmt::format("{}", body.fmt_string); + auto fmt_string = trim_quotes(quoted_fmt_string); + + if (fmt_string.empty()) { + return ctx.out(); + } + + fmt::dynamic_format_arg_store store; + + for (const auto& [name, value] : body.params) { + std::visit( + [&](const auto& v) { + store.push_back(fmt::arg(name.c_str(), v)); + }, + value); + } + + auto result = fmt::vformat(fmt_string, store); + return fmt::format_to(ctx.out(), "\t{}", result); + } +}; + +template<> +struct fmt::formatter { + template + constexpr auto parse(ParseContext& ctx) + { + return ctx.begin(); + } + + template + auto format(const opentelemetry::sdk::common::OwnedAttributeValue& value, + FormatContext& ctx) const + { + return std::visit( + [&ctx](const auto& v) { + using T = std::decay_t; + if constexpr (std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v> || + std::is_same_v>) { + return fmt::format_to(ctx.out(), "[{}]", fmt::join(v, ", ")); + } + if constexpr (std::is_same_v) { + return fmt::format_to(ctx.out(), "\"{}\"", v); + } + if constexpr (std::is_same_v) { + return fmt::format_to(ctx.out(), "\"{}\"", v); + } + return fmt::format_to(ctx.out(), "{}", v); + }, + value); + } +}; + +template<> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) const + { + return ctx.begin(); + } + + template + auto format(const opentelemetry::sdk::instrumentationscope::InstrumentationScope& scope, + FormatContext& ctx) const + { + if (const auto& name = scope.GetName(); !name.empty()) { + auto out = fmt::format_to(ctx.out(), " ["); + + const auto& attrs = scope.GetAttributes(); + if (auto attr = attrs.find("process_id"); attr != attrs.end()) { + out = fmt::format_to(out, "{},", attr->second); + } + if (auto attr = attrs.find("thread_id"); attr != attrs.end()) { + out = fmt::format_to(out, "{},", attr->second); + } + return fmt::format_to(out, "{}]", name); + } + return ctx.out(); + } +}; + +namespace couchbase::observability +{ + +fmt_log_exporter::fmt_log_exporter(FILE* file) + : file_(file) +{ +} + +auto +fmt_log_exporter::MakeRecordable() noexcept -> std::unique_ptr +{ + return std::make_unique(); +} + +auto +fmt_log_exporter::Export( + const opentelemetry::nostd::span>& + records) noexcept -> opentelemetry::sdk::common::ExportResult +{ + for (auto& record : records) { + auto log_record = std::unique_ptr( + dynamic_cast(record.release())); + + if (log_record == nullptr) { + continue; + } + + fmt::println(file_, + "{:%Y-%m-%dT%H:%M:%S%z}{:>7}{}{}{}{}", + log_record->GetObservedTimestamp(), + log_record->GetSeverityText(), + log_record->GetInstrumentationScope(), + log_body{ log_record->GetBody(), log_record->GetAttributes() }, + log_record->GetAttributes(), + log_ids{ log_record->GetTraceId(), log_record->GetSpanId() }); + } + return opentelemetry::sdk::common::ExportResult::kSuccess; +} + +auto +fmt_log_exporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept -> bool +{ + return true; +} + +auto +fmt_log_exporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept -> bool +{ + fflush(file_); + return true; +} +} // namespace couchbase::observability diff --git a/observability/fmt_log_record_exporter.hxx b/observability/fmt_log_record_exporter.hxx new file mode 100644 index 000000000..f382a8a7b --- /dev/null +++ b/observability/fmt_log_record_exporter.hxx @@ -0,0 +1,45 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2025-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +namespace couchbase::observability +{ +class fmt_log_exporter : public opentelemetry::sdk::logs::LogRecordExporter +{ +public: + explicit fmt_log_exporter(FILE* file); + + auto MakeRecordable() noexcept -> std::unique_ptr override; + + auto Export( + const opentelemetry::nostd::span>& + records) noexcept -> opentelemetry::sdk::common::ExportResult override; + + auto Shutdown(std::chrono::microseconds timeout) noexcept -> bool override; + + auto ForceFlush(std::chrono::microseconds timeout) noexcept -> bool override; + +private: + FILE* file_; +}; +} // namespace couchbase::observability diff --git a/observability/logger.cxx b/observability/logger.cxx new file mode 100644 index 000000000..4bad5925d --- /dev/null +++ b/observability/logger.cxx @@ -0,0 +1,180 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "logger.hxx" + +#include "fmt_log_record_exporter.hxx" + +#if defined(__GNUC__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsign-conversion" +#elif defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wdeprecated-builtins" +#endif +#include + +#include +#include +#include +#include + +#include +#include +#include +#if defined(__GNUC__) +#pragma GCC diagnostic pop +#elif defined(__clang__) +#pragma clang diagnostic pop +#endif + +#if defined(_WIN32) +#include +#include +#else +#include +#if defined(__linux__) +#include +#elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__linux__) +#include +#else +#include +#endif +#endif + +#include + +namespace couchbase::observability +{ +namespace +{ +inline auto +thread_id() noexcept -> std::uint64_t +{ +#ifdef _WIN32 + return static_cast(::GetCurrentThreadId()); +#elif defined(__linux__) + return static_cast(::syscall(SYS_gettid)); +#elif __APPLE__ + uint64_t tid; +#ifdef MAC_OS_X_VERSION_MAX_ALLOWED + { +#if (MAC_OS_X_VERSION_MAX_ALLOWED < 1060) || defined(__POWERPC__) + tid = pthread_mach_thread_np(pthread_self()); +#elif MAC_OS_X_VERSION_MIN_REQUIRED < 1060 + if (&pthread_threadid_np) { + pthread_threadid_np(nullptr, &tid); + } else { + tid = pthread_mach_thread_np(pthread_self()); + } +#else + pthread_threadid_np(nullptr, &tid); +#endif + } +#else + pthread_threadid_np(nullptr, &tid); +#endif + return static_cast(tid); +#else + return static_cast(std::hash()(std::this_thread::get_id())); +#endif +} + +inline auto +process_id() noexcept -> int +{ +#ifdef _WIN32 + return static_cast(::GetCurrentProcessId()); +#else + return ::getpid(); +#endif +} + +inline auto +thread_name() -> std::string +{ +#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__linux__) + std::string name(100, '\0'); + int res = ::pthread_getname_np(::pthread_self(), name.data(), name.size()); + if (res == 0) { + name.resize(strnlen(name.data(), name.size())); + return name; + } +#elif defined(_WIN32) + HANDLE hThread = GetCurrentThread(); + PWSTR data = nullptr; + HRESULT hr = GetThreadDescription(hThread, &data); + if (SUCCEEDED(hr) && data) { + int len = WideCharToMultiByte(CP_UTF8, 0, data, -1, nullptr, 0, nullptr, nullptr); + std::string name(len - 1, '\0'); + WideCharToMultiByte(CP_UTF8, 0, data, -1, name.data(), len, nullptr, nullptr); + LocalFree(data); + return name; + } +#endif + return {}; +} + +} // namespace + +auto +logger() -> std::shared_ptr +{ + thread_local auto logger = opentelemetry::logs::Provider::GetLoggerProvider()->GetLogger( + "cbc_logger", + "cxxcbc", + couchbase::core::meta::sdk_semver(), + "", + { + { "process_id", process_id() }, + { "thread_id", thread_id() }, + { "thread_name", thread_name() }, + }); + return logger; +} + +void +init_logger(const logger_options& options) +{ + using namespace opentelemetry; + + if (options.use_http_logger) { + exporter::otlp::OtlpHttpLogRecordExporterOptions logger_options; + auto exporter = exporter::otlp::OtlpHttpLogRecordExporterFactory::Create(logger_options); + auto processor = sdk::logs::SimpleLogRecordProcessorFactory::Create(std::move(exporter)); + auto resource = sdk::resource::Resource::Create({ + { "service.name", "cbc" }, + { "service.version", couchbase::core::meta::sdk_semver() }, + }); + std::vector> processors; + processors.emplace_back(std::move(processor)); + auto context = sdk::logs::LoggerContextFactory::Create(std::move(processors), resource); + std::shared_ptr provider = + sdk::logs::LoggerProviderFactory::Create(std::move(context)); + logs::Provider::SetLoggerProvider(provider); + } else { + auto exporter = std::make_unique(stderr); + auto processor = sdk::logs::SimpleLogRecordProcessorFactory::Create(std::move(exporter)); + std::vector> processors; + processors.emplace_back(std::move(processor)); + auto context = sdk::logs::LoggerContextFactory::Create(std::move(processors)); + std::shared_ptr provider = + sdk::logs::LoggerProviderFactory::Create(std::move(context)); + logs::Provider::SetLoggerProvider(provider); + } +} +} // namespace couchbase::observability diff --git a/observability/logger.hxx b/observability/logger.hxx new file mode 100644 index 000000000..06cfa06a1 --- /dev/null +++ b/observability/logger.hxx @@ -0,0 +1,99 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#if defined(__GNUC__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsign-conversion" +#elif defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wdeprecated-builtins" +#endif +#include +#include +#if defined(__GNUC__) +#pragma GCC diagnostic pop +#elif defined(__clang__) +#pragma clang diagnostic pop +#endif + +namespace couchbase::observability +{ +struct logger_options { + bool use_http_logger{ false }; +}; + +void +init_logger(const logger_options& options); + +auto +logger() -> std::shared_ptr; + +} // namespace couchbase::observability + +// NOLINTBEGIN(cppcoreguidelines-macro-usage) +#define CB_LOG_TRACE(...) \ + do { \ + if (const auto& logger = ::couchbase::observability::logger(); \ + logger->Enabled(opentelemetry::logs::Severity::kTrace)) { \ + logger->Trace(__VA_ARGS__); \ + } \ + } while (false) + +#define CB_LOG_DEBUG(...) \ + do { \ + if (const auto& logger = ::couchbase::observability::logger(); \ + logger->Enabled(opentelemetry::logs::Severity::kDebug)) { \ + logger->Debug(__VA_ARGS__); \ + } \ + } while (false) + +#define CB_LOG_INFO(...) \ + do { \ + if (const auto& logger = ::couchbase::observability::logger(); \ + logger->Enabled(opentelemetry::logs::Severity::kInfo)) { \ + logger->Info(__VA_ARGS__); \ + } \ + } while (false) + +#define CB_LOG_WARNING(...) \ + do { \ + if (const auto& logger = ::couchbase::observability::logger(); \ + logger->Enabled(opentelemetry::logs::Severity::kTrace)) { \ + logger->Trace(__VA_ARGS__); \ + } \ + } while (false) + +#define CB_LOG_ERROR(...) \ + do { \ + if (const auto& logger = ::couchbase::observability::logger(); \ + logger->Enabled(opentelemetry::logs::Severity::kError)) { \ + logger->Error(__VA_ARGS__); \ + } \ + } while (false) + +#define CB_LOG_CRITICAL(...) \ + do { \ + if (const auto& logger = ::couchbase::observability::logger(); \ + logger->Enabled(opentelemetry::logs::Severity::kFatal)) { \ + logger->Fatal(__VA_ARGS__); \ + } \ + } while (false) +// NOLINTEND(cppcoreguidelines-macro-usage) diff --git a/observability/otel_meter.cxx b/observability/otel_meter.cxx new file mode 100644 index 000000000..bf8ee0051 --- /dev/null +++ b/observability/otel_meter.cxx @@ -0,0 +1,141 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2021 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "otel_meter.hxx" + +#include + +#include +#include + +#include + +namespace couchbase::observability +{ +namespace +{ +class otel_sync_histogram +{ +public: + explicit otel_sync_histogram( + std::shared_ptr> histogram_counter) + : histogram_counter_(std::move(histogram_counter)) + { + } + + void record(std::uint64_t value, + const opentelemetry::common::KeyValueIterable& tags, + opentelemetry::context::Context& ctx) + { + histogram_counter_->Record(value, tags, ctx); + } + +private: + std::shared_ptr> histogram_counter_; + std::mutex mutex_; +}; + +class otel_value_recorder : public couchbase::metrics::value_recorder +{ +public: + explicit otel_value_recorder( + std::shared_ptr> histogram_counter, + const std::map& tags) + : histogram_counter_(histogram_counter) + , tags_(tags) + { + } + void record_value(std::int64_t value) override + { + value = std::max(value, 0); + auto uvalue = static_cast(value); + histogram_counter_->Record( + uvalue, opentelemetry::common::KeyValueIterableView{ tags_ }, context_); + } + + auto tags() -> const std::map + { + return tags_; + } + + auto histogram_counter() + -> opentelemetry::nostd::shared_ptr> + { + return histogram_counter_; + } + +private: + opentelemetry::nostd::shared_ptr> + histogram_counter_; + const std::map tags_; + opentelemetry::context::Context context_{}; + std::mutex mutex_; +}; +} // namespace + +class otel_meter_impl +{ + friend otel_meter; + +public: + explicit otel_meter_impl(opentelemetry::nostd::shared_ptr meter) + : meter_(meter) + { + } + +private: + opentelemetry::nostd::shared_ptr meter_; + std::mutex mutex_; + std::multimap> recorders_; +}; + +auto +otel_meter::get_value_recorder(const std::string& name, + const std::map& tags) + -> std::shared_ptr +{ + // first look up the histogram, in case we already have it... + std::scoped_lock lock(mutex_); + auto it = recorders_.equal_range(name); + if (it.first == it.second) { + // this name isn't associated with any histogram, so make one and return it. + // Note we'd like to make one with more buckets than default, given the range of + // response times we'd like to display (queries vs kv for instance), but otel + // api doesn't seem to allow this. + return recorders_ + .insert({ name, + std::make_shared(meter_->CreateUInt64Histogram(name, "", "us"), + tags) }) + ->second; + } + // so it is already, lets see if we already have one with those tags, or need + // to make a new one (using the histogram we already have). + for (auto itr = it.first; itr != it.second; itr++) { + if (tags == itr->second->tags()) { + return itr->second; + } + } + // if you are here, we need to add one with these tags and the histogram associated with the + // name. + return recorders_ + .insert( + { name, std::make_shared(it.first->second->histogram_counter(), tags) }) + ->second; +} +} // namespace couchbase::observability diff --git a/observability/otel_meter.hxx b/observability/otel_meter.hxx new file mode 100644 index 000000000..0f85255e8 --- /dev/null +++ b/observability/otel_meter.hxx @@ -0,0 +1,36 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2021 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace couchbase::observability +{ + +class otel_meter_impl; + +class otel_meter : public couchbase::metrics::meter +{ +public: + auto get_value_recorder(const std::string& name, const std::map& tags) + -> std::shared_ptr override; + +private: + std::unique_ptr impl_; +}; +} // namespace couchbase::observability diff --git a/observability/otel_tracer.cxx b/observability/otel_tracer.cxx new file mode 100644 index 000000000..77b243532 --- /dev/null +++ b/observability/otel_tracer.cxx @@ -0,0 +1,136 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2021 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "otel_tracer.hxx" + +#include "core/meta/version.hxx" + +#if defined(__GNUC__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsign-conversion" +#elif defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wdeprecated-builtins" +#endif +#include +#include +#if defined(__GNUC__) +#pragma GCC diagnostic pop +#elif defined(__clang__) +#pragma clang diagnostic pop +#endif + +namespace couchbase::core::tracing +{ + +auto +otel_request_span::wrapped_span() -> std::shared_ptr +{ + return span_; +} + +void +otel_request_span::end() +{ + span_->End(); +} + +void +otel_request_span::add_tag(const std::string& name, std::uint64_t value) +{ + span_->SetAttribute(name, value); +} + +void +otel_request_span::add_tag(const std::string& name, const std::string& value) +{ + span_->SetAttribute(name, value); +} + +otel_request_span::otel_request_span(std::shared_ptr span) + : span_(std::move(span)) +{ +} + +otel_request_span::~otel_request_span() +{ + span_->End(); +} + +auto +otel_request_span::wrap(std::shared_ptr span) + -> std::shared_ptr +{ + return std::make_shared(span); +} + +class otel_request_tracer_impl +{ + friend otel_request_tracer; + +public: + otel_request_tracer_impl() + : tracer_{ + opentelemetry::trace::Provider::GetTracerProvider()->GetTracer("couchbase_cxx_sdk", + meta::sdk_semver()), + } + { + } + + explicit otel_request_tracer_impl(std::shared_ptr tracer) + : tracer_{ std::move(tracer) } + { + } + +private: + std::shared_ptr tracer_; +}; + +otel_request_tracer::otel_request_tracer() + : impl_{ std::make_unique() } +{ +} + +otel_request_tracer::otel_request_tracer(std::shared_ptr tracer) + : impl_{ std::make_unique(std::move(tracer)) } +{ +} + +auto +otel_request_tracer::wrap(std::shared_ptr tracer) + -> std::shared_ptr +{ + return std::make_shared(std::move(tracer)); +} + +otel_request_tracer::~otel_request_tracer() = default; + +auto +otel_request_tracer::start_span(std::string name, + std::shared_ptr parent) + -> std::shared_ptr +{ + auto wrapped_parent = std::dynamic_pointer_cast(parent); + if (wrapped_parent) { + opentelemetry::trace::StartSpanOptions opts; + opts.parent = wrapped_parent->wrapped_span()->GetContext(); + return otel_request_span::wrap(impl_->tracer_->StartSpan(name, opts)); + } + return otel_request_span::wrap(impl_->tracer_->StartSpan(name)); +} + +} // namespace couchbase::core::tracing diff --git a/observability/otel_tracer.hxx b/observability/otel_tracer.hxx new file mode 100644 index 000000000..341275858 --- /dev/null +++ b/observability/otel_tracer.hxx @@ -0,0 +1,79 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2021 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace trace +{ +class Tracer; +class Span; +} // namespace trace +OPENTELEMETRY_END_NAMESPACE + +namespace couchbase::core::tracing +{ + +class otel_request_span : public couchbase::tracing::request_span +{ +public: + static auto wrap(std::shared_ptr span) + -> std::shared_ptr; + + explicit otel_request_span(std::shared_ptr span); + ~otel_request_span() override; + otel_request_span(const otel_request_span&) = delete; + otel_request_span(otel_request_span&&) = delete; + auto operator=(const otel_request_span&) -> otel_request_span& = delete; + auto operator=(otel_request_span&&) -> otel_request_span& = delete; + + void add_tag(const std::string& name, const std::string& value) override; + void add_tag(const std::string& name, std::uint64_t value) override; + void end() override; + auto wrapped_span() -> std::shared_ptr; + +private: + std::shared_ptr span_; +}; + +class otel_request_tracer_impl; + +class otel_request_tracer : public couchbase::tracing::request_tracer +{ +public: + static auto wrap(std::shared_ptr tracer) + -> std::shared_ptr; + + otel_request_tracer(); + explicit otel_request_tracer(std::shared_ptr tracer); + otel_request_tracer(const otel_request_tracer&) = delete; + otel_request_tracer(otel_request_tracer&&) noexcept = default; + auto operator=(const otel_request_tracer&) = delete; + auto operator=(otel_request_tracer&&) -> otel_request_tracer& = default; + ~otel_request_tracer() override; + + auto start_span(std::string name, std::shared_ptr parent) + -> std::shared_ptr override; + +private: + std::unique_ptr impl_; +}; +} // namespace couchbase::core::tracing diff --git a/tools/analytics.cxx b/tools/analytics.cxx index c67e95d36..65361e5e0 100644 --- a/tools/analytics.cxx +++ b/tools/analytics.cxx @@ -20,7 +20,6 @@ #include "core/error_context/analytics_json.hxx" #include "core/impl/internal_error_context.hxx" -#include "core/logger/logger.hxx" #include "core/utils/binary.hxx" #include "core/utils/json.hxx" diff --git a/tools/cbc.cxx b/tools/cbc.cxx index 3a526ebfa..d613a8f43 100644 --- a/tools/cbc.cxx +++ b/tools/cbc.cxx @@ -26,8 +26,8 @@ #include -int -main(int argc, const char** argv) +auto +main(int argc, const char** argv) -> int { CLI::App app{ "Talk to Couchbase Server.", "cbc" }; app.set_version_flag("--version", fmt::format("cbc {}", couchbase::core::meta::sdk_semver())); diff --git a/tools/get.cxx b/tools/get.cxx index 7c5f7aa0c..0e8aa2bd7 100644 --- a/tools/get.cxx +++ b/tools/get.cxx @@ -16,13 +16,29 @@ */ #include "get.hxx" + #include "utils.hxx" -#include +#include + #include #include #include +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include + #include #include #include @@ -30,6 +46,8 @@ #include #include +#include + namespace cbc { namespace @@ -68,56 +86,103 @@ class get_app : public CLI::App json_lines_, "Use JSON Lines format (https://jsonlines.org) to print results."); + add_flag("--use-http-logger", use_http_logger_, "Use HTTP logger instead of ostream."); + add_common_options(this, common_options_); allow_extras(true); } - [[nodiscard]] int execute() const + [[nodiscard]] auto get_otel_tracer() const -> std::shared_ptr + { + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); + return provider->GetTracer("cbc", couchbase::core::meta::sdk_semver()); + } + + [[nodiscard]] auto execute() const -> int { apply_logger_options(common_options_.logger); auto cluster_options = build_cluster_options(common_options_); - couchbase::get_options get_options{}; + init_otel_tracer(); + couchbase::observability::init_logger({ + use_http_tracer_, + }); + + couchbase::get_options common_get_options{}; if (with_expiry_) { - get_options.with_expiry(true); + common_get_options.with_expiry(true); } if (!projections_.empty()) { - get_options.project(projections_); + common_get_options.project(projections_); } const auto connection_string = common_options_.connection.connection_string; + auto logger = couchbase::observability::logger(); + auto tracer = get_otel_tracer(); + + cluster_options.tracing().tracer(couchbase::core::tracing::otel_request_tracer::wrap(tracer)); + auto [connect_err, cluster] = couchbase::cluster::connect(connection_string, cluster_options).get(); if (connect_err) { - fail(fmt::format( "Failed to connect to the cluster at \"{}\": {}", connection_string, connect_err)); } - for (const auto& id : ids_) { - auto bucket_name = bucket_name_; - auto scope_name = scope_name_; - auto collection_name = collection_name_; - auto document_id = id; - - if (inlined_keyspace_) { - if (auto keyspace_with_id = extract_inlined_keyspace(id); keyspace_with_id) { - bucket_name = keyspace_with_id->bucket_name; - scope_name = keyspace_with_id->scope_name; - collection_name = keyspace_with_id->collection_name; - document_id = keyspace_with_id->id; + { + auto top_level_span = tracer->StartSpan("cbc.get-batch", + { + { "number_of_documents", ids_.size() }, + }); + for (const auto& id : ids_) { + auto bucket_name = bucket_name_; + auto scope_name = scope_name_; + auto collection_name = collection_name_; + auto document_id = id; + + if (inlined_keyspace_) { + if (auto keyspace_with_id = extract_inlined_keyspace(id); keyspace_with_id) { + bucket_name = keyspace_with_id->bucket_name; + scope_name = keyspace_with_id->scope_name; + collection_name = keyspace_with_id->collection_name; + document_id = keyspace_with_id->id; + } } - } - auto collection = cluster.bucket(bucket_name).scope(scope_name).collection(collection_name); + auto collection = cluster.bucket(bucket_name).scope(scope_name).collection(collection_name); - auto [err, resp] = collection.get(document_id, get_options).get(); - if (json_lines_) { - print_result_json_line(bucket_name, scope_name, collection_name, document_id, err, resp); - } else { - print_result(bucket_name, scope_name, collection_name, document_id, err, resp); + opentelemetry::trace::StartSpanOptions span_options; + span_options.parent = top_level_span->GetContext(); + auto span = tracer->StartSpan("cbc.get", + { + { "cbc.bucket", bucket_name }, + { "cbc.scope", scope_name }, + { "cbc.collection", collection_name }, + }, + span_options); + auto get_options = common_get_options; + get_options.parent_span(couchbase::core::tracing::otel_request_span::wrap(span)); + auto [err, resp] = collection.get(document_id, get_options).get(); + CB_LOG_WARNING(""); + CB_LOG_WARNING("this is the message error"); + CB_LOG_WARNING("this is the message error: {msg}", + opentelemetry::common::MakeAttributes({ + { "msg", err.ec().message() }, + })); + CB_LOG_WARNING("this is the message error: {msg} and some context", + opentelemetry::common::MakeAttributes({ + { "msg", err.ec().message() }, + }), + span->GetContext()); + CB_LOG_ERROR("this is the message error: {}", err.ec().message(), span->GetContext()); + + if (json_lines_) { + print_result_json_line(bucket_name, scope_name, collection_name, document_id, err, resp); + } else { + print_result(bucket_name, scope_name, collection_name, document_id, err, resp); + } } } @@ -220,6 +285,25 @@ class get_app : public CLI::App } } + void init_otel_tracer() const + { + using namespace opentelemetry; + + if (use_http_tracer_) { + exporter::otlp::OtlpHttpExporterOptions opts; + auto exporter = exporter::otlp::OtlpHttpExporterFactory::Create(opts); + auto processor = sdk::trace::SimpleSpanProcessorFactory::Create(std::move(exporter)); + auto resource = sdk::resource::Resource::Create({ + { "service.name", "cbc" }, + { "service.version", couchbase::core::meta::sdk_semver() }, + }); + auto provider = sdk::trace::TracerProviderFactory::Create(std::move(processor), resource); + trace::Provider::SetTracerProvider( + std::shared_ptr{ std::move(provider) }); + } else { + } + } + common_options common_options_{}; std::string bucket_name_{ default_bucket_name }; @@ -233,6 +317,9 @@ class get_app : public CLI::App bool json_lines_{ false }; bool verbose_{ false }; + bool use_http_tracer_{ false }; + bool use_http_logger_{ false }; + std::vector ids_{}; }; } // namespace diff --git a/tools/pillowfight.cxx b/tools/pillowfight.cxx index f6d641bec..55046e536 100644 --- a/tools/pillowfight.cxx +++ b/tools/pillowfight.cxx @@ -143,7 +143,7 @@ class operation_generator operation::cmd_query }; operation_weights weights_; std::vector weights_vector_; - std::discrete_distribution<> distribution_; + std::discrete_distribution distribution_; }; constexpr const char* default_bucket_name{ "default" }; @@ -440,11 +440,13 @@ class pillowfight_app : public CLI::App stats_timer.cancel(); fmt::print("\n\nTotal operations: {}\n", total); - fmt::print( - "Total keys used: {}\n", - std::accumulate(known_keys.begin(), known_keys.end(), 0, [](auto count, const auto& keys) { - return count + keys.size(); - })); + fmt::print("Total keys used: {}\n", + std::accumulate(known_keys.begin(), + known_keys.end(), + static_cast(0), + [](auto count, const auto& keys) { + return count + keys.size(); + })); const auto total_time = finish_time - start_time; fmt::print("Total time: {}s ({}ms)\n", std::chrono::duration_cast(total_time).count(), @@ -549,9 +551,7 @@ class pillowfight_app : public CLI::App for (auto&& [start, future] : futures) { std::visit( - [&stopping, start = start, &known_keys, verbose = verbose_](auto f) mutable { - using T = std::decay_t; - + [&stopping, start = start, verbose = verbose_](auto f) mutable { while (f.wait_for(std::chrono::milliseconds{ 200 }) != std::future_status::ready) { if (!running.test_and_set()) { stopping = true;