diff --git a/agents/grpc/proto/reconfigure.proto b/agents/grpc/proto/reconfigure.proto index 52b0eb2e02..35e5d151be 100644 --- a/agents/grpc/proto/reconfigure.proto +++ b/agents/grpc/proto/reconfigure.proto @@ -18,6 +18,8 @@ message ReconfigureBody { optional uint32 tracingModulesBlacklist = 11; optional bool contCpuProfile = 12; optional bool assetsEnabled = 13; + optional uint32 metricsBatchSize = 14; + optional uint32 metricsBufferSize = 15; } message ReconfigureEvent { diff --git a/agents/grpc/src/grpc_agent.cc b/agents/grpc/src/grpc_agent.cc index 2df0c56968..a0ff18c446 100644 --- a/agents/grpc/src/grpc_agent.cc +++ b/agents/grpc/src/grpc_agent.cc @@ -4,12 +4,10 @@ #include "nsolid/nsolid_api.h" #include "nsolid/continuous_profiler.h" #include "nsolid/nsolid_util.h" -#include "../../otlp/src/otlp_common.h" #include "../../src/root_certs.h" #include "../../src/span_collector.h" #include "absl/log/initialize.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" -#include "opentelemetry/sdk/metrics/export/metric_producer.h" #include "opentelemetry/semconv/incubating/process_attributes.h" #include "opentelemetry/semconv/service_attributes.h" #include "opentelemetry/exporters/otlp/otlp_grpc_client.h" @@ -18,8 +16,6 @@ #include "opentelemetry/exporters/otlp/otlp_grpc_exporter_factory.h" #include "opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h" #include "opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter_factory.h" -#include "opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h" -#include "opentelemetry/exporters/otlp/otlp_grpc_metric_exporter_factory.h" #include "opentelemetry/exporters/otlp/otlp_metric_utils.h" using std::chrono::duration_cast; @@ -50,8 +46,6 @@ using opentelemetry::v1::exporter::otlp::OtlpGrpcExporterOptions; using opentelemetry::v1::exporter::otlp::OtlpGrpcLogRecordExporter; using opentelemetry::v1::exporter::otlp::OtlpGrpcLogRecordExporterFactory; using opentelemetry::v1::exporter::otlp::OtlpGrpcLogRecordExporterOptions; -using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporter; -using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporterFactory; using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporterOptions; using opentelemetry::v1::exporter::otlp::OtlpMetricUtils; using nsolid_grpc_async = @@ -263,7 +257,7 @@ void PopulateMetricsEvent(grpcagent::MetricsEvent* metrics_event, ResourceMetrics data; data.resource_ = otlp::GetResource(); - std::vector metrics; + otlp::MetricDataBatch metrics; // As this is the cached we're sending, we pass the same value for prev_stor. otlp::fill_proc_metrics(metrics, proc_metrics, proc_metrics, false); @@ -271,8 +265,15 @@ void PopulateMetricsEvent(grpcagent::MetricsEvent* metrics_event, otlp::fill_env_metrics(metrics, env_metrics_stor, false); } + auto batched = metrics.DumpMetricsAndReset(); + std::vector metric_data; + for (const auto& bm : batched) { + metric_data.push_back(otlp::BatchedMetricToMetricData(bm)); + } + data.scope_metric_data_ = - std::vector{{otlp::GetScope(), metrics}}; + std::vector{{ otlp::GetScope(), + std::move(metric_data) }}; OtlpMetricUtils::PopulateResourceMetrics( data, metrics_event->mutable_body()->mutable_resource_metrics()->Add()); } @@ -433,6 +434,7 @@ GrpcAgent::GrpcAgent(): hooks_init_(false), ready_(false), exiting_(false), trace_flags_(0), + metrics_paused_(false), proc_metrics_(), proc_prev_stor_(), config_(json::object()), @@ -941,20 +943,18 @@ void GrpcAgent::env_deletion_cb_(SharedEnvInst envinst, return; } - ResourceMetrics data; - data.resource_ = otlp::GetResource(); - std::vector metrics; - ThreadMetricsStor stor; while (agent->thr_metrics_msg_q_.dequeue(stor)) { - otlp::fill_env_metrics(metrics, stor, false); + otlp::fill_env_metrics(agent->thr_metrics_batch_, stor, false); agent->thr_metrics_cache_.insert_or_assign(stor.thread_id, std::move(stor)); } - data.scope_metric_data_ = - std::vector{{otlp::GetScope(), metrics}}; - auto result = agent->metrics_exporter_->Export(data); - Debug("# ThreadMetrics Exported. Result: %d\n", static_cast(result)); + if (agent->thr_metrics_batch_.ShouldFlush()) { + auto metric_data = agent->thr_metrics_batch_.DumpMetricsAndReset(); + if (!metric_data.empty()) { + agent->metrics_exporter_->enqueue(std::move(metric_data)); + } + } } /*static*/void GrpcAgent::metrics_timer_cb_(nsuv::ns_timer*, @@ -964,11 +964,29 @@ void GrpcAgent::env_deletion_cb_(SharedEnvInst envinst, return; } - agent->got_proc_metrics(); - for (auto& item : agent->env_metrics_map_) { + agent->on_metrics_timer(); +} + +void GrpcAgent::on_metrics_timer() { + if (metrics_paused_) { + auto metric_data = proc_metrics_batch_.DumpMetricsAndReset(); + if (!metric_data.empty()) { + metrics_exporter_->enqueue(std::move(metric_data)); + } + + metric_data = thr_metrics_batch_.DumpMetricsAndReset(); + if (!metric_data.empty()) { + metrics_exporter_->enqueue(std::move(metric_data)); + } + + metrics_exporter_->flush(); + } + + got_proc_metrics(); + for (auto& item : env_metrics_map_) { // Retrieve metrics from the Metrics API. Ignore any return error since // there's nothing to be done. - item.second.metrics_->Update(thr_metrics_cb_, agent_wp); + item.second.metrics_->Update(thr_metrics_cb_, weak_from_this()); } } @@ -1117,8 +1135,18 @@ int GrpcAgent::config(const json& config) { options.credentials = opts.credentials; } - metrics_exporter_ = - std::make_unique(options, client); + // Get metrics buffer size from config, default to 100 + size_t buffer_size = 100; + auto it = config_.find("metricsBufferSize"); + if (it != config_.end()) { + buffer_size = it->get(); + } + + metrics_exporter_ = std::make_unique(&loop_, + options, + client, + buffer_size); + metrics_exporter_->init(); } { OtlpGrpcLogRecordExporterOptions options; @@ -1178,6 +1206,28 @@ int GrpcAgent::config(const json& config) { update_tracer(trace_flags_); } + if (utils::find_any_fields_in_diff(diff, { "/metricsBatchSize" })) { + auto it = config_.find("metricsBatchSize"); + if (it != config_.end()) { + size_t batch_size = it->get(); + if (batch_size > 0) { + proc_metrics_batch_.Resize(batch_size); + thr_metrics_batch_.Resize(batch_size); + } + } + } + + if (utils::find_any_fields_in_diff(diff, { "/metricsBufferSize" })) { + auto it = config_.find("metricsBufferSize"); + if (it != config_.end()) { + size_t buffer_size = it->get(); + if (buffer_size > 0 && metrics_exporter_) { + // Resize the metrics buffer instead of recreating the exporter + metrics_exporter_->resize_buffer(buffer_size); + } + } + } + // If metrics timer is not active or if the diff contains metrics fields, // recalculate the metrics status. (stop/start/what period) if (!metrics_timer_.is_active() || @@ -1185,15 +1235,21 @@ int GrpcAgent::config(const json& config) { uint64_t period = 0; auto it = config_.find("pauseMetrics"); if (it != config_.end()) { - bool pause = *it; - if (!pause) { + metrics_paused_ = *it; + if (!metrics_paused_) { it = config_.find("interval"); if (it != config_.end()) { period = *it; } + } else { + period = 5000; } } + if (period == 0) { + period = 5000; + } + ret = setup_metrics_timer(period); } @@ -1308,6 +1364,7 @@ void GrpcAgent::do_stop() { } log_exporter_.reset(); + otlp_grpc_client_.reset(); metrics_exporter_.reset(); trace_exporter_.reset(); ready_ = false; @@ -1389,15 +1446,15 @@ void GrpcAgent::got_logs() { void GrpcAgent::got_proc_metrics() { ASSERT_EQ(0, proc_metrics_.Update()); ProcessMetrics::MetricsStor stor = proc_metrics_.Get(); - std::vector metrics; - otlp::fill_proc_metrics(metrics, stor, proc_prev_stor_, false); - ResourceMetrics data; - data.resource_ = otlp::GetResource(); - data.scope_metric_data_ = - std::vector{{otlp::GetScope(), metrics}}; - auto result = metrics_exporter_->Export(data); - Debug("# ProcessMetrics Exported. Result: %d\n", static_cast(result)); + otlp::fill_proc_metrics(proc_metrics_batch_, stor, proc_prev_stor_, false); proc_prev_stor_ = stor; + + if (proc_metrics_batch_.ShouldFlush()) { + auto metric_data = proc_metrics_batch_.DumpMetricsAndReset(); + if (!metric_data.empty()) { + metrics_exporter_->enqueue(std::move(metric_data)); + } + } } void GrpcAgent::got_profile(const ProfileCollector::ProfileQStor& stor) { @@ -1769,6 +1826,13 @@ void GrpcAgent::reconfigure(const grpcagent::CommandRequest& request) { out["assetsEnabled"] = body.assetsenabled(); } + if (body.has_metricsbatchsize()) { + out["metricsBatchSize"] = body.metricsbatchsize(); + } + if (body.has_metricsbuffersize()) { + out["metricsBufferSize"] = body.metricsbuffersize(); + } + DebugJSON("Reconfigure out: \n%s\n", out); UpdateConfig(out.dump()); @@ -1807,7 +1871,9 @@ void GrpcAgent::send_blocked_loop_event(BlockedLoopStor&& stor) { auto context = GrpcClient::MakeClientContext(agent_id_, saas()); - GrpcClient::DelegateAsyncExport( + GrpcClient::DelegateAsyncExport( nsolid_service_stub_.get(), &nsolid_grpc_async::ExportBlockedLoop, std::move(context), std::move(arena), std::move(*event), @@ -1815,7 +1881,6 @@ void GrpcAgent::send_blocked_loop_event(BlockedLoopStor&& stor) { std::unique_ptr &&, const grpcagent::BlockedLoopEvent& event, grpcagent::EventResponse*) { - return true; }); } @@ -1849,7 +1914,9 @@ void GrpcAgent::send_exit() { uv_mutex_init(&lock); uv_cond_init(&cond); - GrpcClient::DelegateAsyncExport( + GrpcClient::DelegateAsyncExport( nsolid_service_stub_.get(), &nsolid_grpc_async::ExportExit, std::move(context), std::move(arena), std::move(*exit_event), @@ -1861,7 +1928,6 @@ void GrpcAgent::send_exit() { signaled = true; uv_cond_signal(&cond); uv_mutex_unlock(&lock); - return true; }); // wait for the exit event to be sent @@ -1894,7 +1960,9 @@ void GrpcAgent::send_info_event(const char* req_id) { auto context = GrpcClient::MakeClientContext(agent_id_, saas()); - GrpcClient::DelegateAsyncExport( + GrpcClient::DelegateAsyncExport( nsolid_service_stub_.get(), &nsolid_grpc_async::ExportInfo, std::move(context), std::move(arena), std::move(*info_event), @@ -1902,7 +1970,6 @@ void GrpcAgent::send_info_event(const char* req_id) { std::unique_ptr&&, const grpcagent::InfoEvent& info_event, grpcagent::EventResponse*) { - return true; }); } @@ -1922,7 +1989,9 @@ void GrpcAgent::send_metrics_event(const char* req_id) { auto context = GrpcClient::MakeClientContext(agent_id_, saas()); - GrpcClient::DelegateAsyncExport( + GrpcClient::DelegateAsyncExport( nsolid_service_stub_.get(), &nsolid_grpc_async::ExportMetrics, std::move(context), std::move(arena), std::move(*metrics_event), @@ -1930,7 +1999,6 @@ void GrpcAgent::send_metrics_event(const char* req_id) { std::unique_ptr&&, const grpcagent::MetricsEvent& metrics_event, grpcagent::EventResponse*) { - return true; }); } @@ -1945,7 +2013,9 @@ void GrpcAgent::send_packages_event(const char* req_id) { auto context = GrpcClient::MakeClientContext(agent_id_, saas()); - GrpcClient::DelegateAsyncExport( + GrpcClient::DelegateAsyncExport( nsolid_service_stub_.get(), &nsolid_grpc_async::ExportPackages, std::move(context), std::move(arena), std::move(*packages_event), @@ -1953,7 +2023,6 @@ void GrpcAgent::send_packages_event(const char* req_id) { std::unique_ptr&&, const grpcagent::PackagesEvent& info_event, grpcagent::EventResponse*) { - return true; }); } @@ -1969,7 +2038,9 @@ void GrpcAgent::send_reconfigure_event(const char* req_id) { auto context = GrpcClient::MakeClientContext(agent_id_, saas()); - GrpcClient::DelegateAsyncExport( + GrpcClient::DelegateAsyncExport( nsolid_service_stub_.get(), &nsolid_grpc_async::ExportReconfigure, std::move(context), std::move(arena), std::move(*reconfigure_event), @@ -1977,7 +2048,6 @@ void GrpcAgent::send_reconfigure_event(const char* req_id) { std::unique_ptr&&, const grpcagent::ReconfigureEvent& info_event, grpcagent::EventResponse*) { - return true; }); } @@ -2020,7 +2090,9 @@ void GrpcAgent::send_source_code_event(const grpcagent::CommandRequest& req) { auto context = GrpcClient::MakeClientContext(agent_id_, saas()); - GrpcClient::DelegateAsyncExport( + GrpcClient::DelegateAsyncExport( nsolid_service_stub_.get(), &nsolid_grpc_async::ExportSourceCode, std::move(context), std::move(arena), std::move(*source_code_event), @@ -2028,7 +2100,6 @@ void GrpcAgent::send_source_code_event(const grpcagent::CommandRequest& req) { std::unique_ptr&&, const grpcagent::SourceCodeEvent& info_event, grpcagent::EventResponse*) { - return true; }); } @@ -2043,7 +2114,9 @@ void GrpcAgent::send_startup_times_event(const char* req_id) { auto context = GrpcClient::MakeClientContext(agent_id_, saas()); - GrpcClient::DelegateAsyncExport( + GrpcClient::DelegateAsyncExport( nsolid_service_stub_.get(), &nsolid_grpc_async::ExportStartupTimes, std::move(context), std::move(arena), std::move(*st_event), @@ -2051,8 +2124,6 @@ void GrpcAgent::send_startup_times_event(const char* req_id) { std::unique_ptr&&, const grpcagent::StartupTimesEvent& info_event, grpcagent::EventResponse*) { - Debug("StartupTimesEvent: %s\n", status.error_message().c_str()); - return true; }); } @@ -2068,7 +2139,9 @@ void GrpcAgent::send_unblocked_loop_event(BlockedLoopStor&& stor) { auto context = GrpcClient::MakeClientContext(agent_id_, saas()); - GrpcClient::DelegateAsyncExport( + GrpcClient::DelegateAsyncExport( nsolid_service_stub_.get(), &nsolid_grpc_async::ExportUnblockedLoop, std::move(context), std::move(arena), std::move(*event), @@ -2076,7 +2149,6 @@ void GrpcAgent::send_unblocked_loop_event(BlockedLoopStor&& stor) { std::unique_ptr &&, const grpcagent::UnblockedLoopEvent& event, grpcagent::EventResponse*) { - return true; }); } diff --git a/agents/grpc/src/grpc_agent.h b/agents/grpc/src/grpc_agent.h index 690f8e162c..99d6a05e46 100644 --- a/agents/grpc/src/grpc_agent.h +++ b/agents/grpc/src/grpc_agent.h @@ -9,11 +9,14 @@ #include "./proto/nsolid_service.grpc.pb.h" #include "opentelemetry/version.h" #include "opentelemetry/sdk/trace/recordable.h" +#include "opentelemetry/exporters/otlp/otlp_grpc_client.h" +#include "../../otlp/src/otlp_common.h" #include "../../src/profile_collector.h" #include "asset_stream.h" #include "command_stream.h" #include "grpc_client.h" #include "grpc_errors.h" +#include "grpc_metrics_exporter.h" // Class pre-declaration OPENTELEMETRY_BEGIN_NAMESPACE @@ -147,6 +150,8 @@ class GrpcAgent: public std::enable_shared_from_this, bool testing; }; + static constexpr uint32_t kMaxMetricsExportRetries = 5; + GrpcAgent(); ~GrpcAgent(); @@ -186,6 +191,8 @@ class GrpcAgent: public std::enable_shared_from_this, static void metrics_timer_cb_(nsuv::ns_timer*, WeakGrpcAgent); + static void metrics_retry_cb_(nsuv::ns_async*, WeakGrpcAgent); + static void profile_msg_cb_(nsuv::ns_async*, WeakGrpcAgent); static void shutdown_cb_(nsuv::ns_async*, WeakGrpcAgent); @@ -254,6 +261,8 @@ class GrpcAgent: public std::enable_shared_from_this, void handle_command_request(CommandRequestStor&& req); + void on_metrics_timer(); + void parse_saas_token(const std::string& token); bool pending_profiles() const; @@ -310,6 +319,11 @@ class GrpcAgent: public std::enable_shared_from_this, // Blocked Loop std::shared_ptr> blocked_loop_queue_; + std::shared_ptr + otlp_grpc_client_; + opentelemetry::v1::exporter::otlp::OtlpGrpcClientOptions + otlp_grpc_client_options_; + // For the Tracing API uint32_t trace_flags_; std::shared_ptr span_collector_; @@ -319,16 +333,17 @@ class GrpcAgent: public std::enable_shared_from_this, recordables_; // For the Metrics API - uint64_t metrics_interval_; + bool metrics_paused_; ProcessMetrics proc_metrics_; ProcessMetrics::MetricsStor proc_prev_stor_; + otlp::MetricDataBatch proc_metrics_batch_; std::map env_metrics_map_; nsuv::ns_async metrics_msg_; TSQueue thr_metrics_msg_q_; nsuv::ns_timer metrics_timer_; - std::unique_ptr - metrics_exporter_; std::map thr_metrics_cache_; + otlp::MetricDataBatch thr_metrics_batch_; + std::shared_ptr metrics_exporter_; // For the Configuration API nsuv::ns_async config_msg_; diff --git a/agents/grpc/src/grpc_client.h b/agents/grpc/src/grpc_client.h index ed26d7b299..8355d2cc77 100644 --- a/agents/grpc/src/grpc_client.h +++ b/agents/grpc/src/grpc_client.h @@ -29,20 +29,20 @@ namespace grpc { // Template class for managing async call data for DelegateAsyncExport // Moved from grpc_client.cc so it is visible to all template instantiations -template +template class GrpcAsyncCallData { public: std::unique_ptr arena; ::grpc::Status grpc_status; std::unique_ptr<::grpc::ClientContext> grpc_context; - std::function&&, const EventType&, - grpcagent::EventResponse*)> result_callback; + ResponseType*)> result_callback; EventType* event = nullptr; - grpcagent::EventResponse* event_response = nullptr; + ResponseType* event_response = nullptr; uint64_t start; GrpcAsyncCallData() = default; @@ -86,39 +86,40 @@ class GrpcClient { /** * Generic DelegateAsyncExport for any event type. * Usage example: - * GrpcClient::DelegateAsyncExport( + * GrpcClient::DelegateAsyncExport( * stub, - * &grpcagent::NSolidService::StubInterface::async_interface::ExportEventType, + * &Stub::async_interface::ExportEventType, * std::move(context), * std::move(arena), * std::move(event), * std::move(callback)); */ - template - static int DelegateAsyncExport( - grpcagent::NSolidService::StubInterface* stub, - void(grpcagent::NSolidService::StubInterface::async_interface::*exportFunc)( + template + static ::grpc::Status DelegateAsyncExport( + Stub* stub, + void(Stub::async_interface::*exportFunc)( ::grpc::ClientContext*, const EventT*, - ::grpcagent::EventResponse*, + ResponseT*, std::function), std::unique_ptr<::grpc::ClientContext>&& context, std::unique_ptr&& arena, EventT&& event, - std::function&&, - const EventT&, - grpcagent::EventResponse*)>&& result_callback) { + std::function&&, + const EventT&, + ResponseT*)>&& result_callback) { ASSERT_NOT_NULL(stub); - auto call_data = std::make_shared>(); + auto call_data = std::make_shared>(); call_data->arena.swap(arena); call_data->result_callback.swap(result_callback); call_data->event = Arena::Create(call_data->arena.get(), std::move(event)); call_data->event_response = - Arena::Create(call_data->arena.get()); + Arena::Create(call_data->arena.get()); if (call_data->event == nullptr || call_data->event_response == nullptr) { - return -1; + return ::grpc::Status(::grpc::StatusCode::INTERNAL, + "Failed to create event"); } if (per_process::enabled_debug_list.enabled( @@ -127,7 +128,6 @@ class GrpcClient { } call_data->grpc_context.swap(context); - // Call the correct async export method on the stub (stub->async()->*exportFunc)(call_data->grpc_context.get(), call_data->event, call_data->event_response, @@ -153,7 +153,7 @@ class GrpcClient { call_data->event_response); }); - return 0; + return ::grpc::Status::OK; } }; diff --git a/agents/grpc/src/grpc_metrics_exporter.cc b/agents/grpc/src/grpc_metrics_exporter.cc new file mode 100644 index 0000000000..0e89d4905a --- /dev/null +++ b/agents/grpc/src/grpc_metrics_exporter.cc @@ -0,0 +1,131 @@ +#include "grpc_metrics_exporter.h" +#include "grpc_client.h" + +#include "../../otlp/src/otlp_common.h" +#include "opentelemetry/exporters/otlp/otlp_grpc_client.h" +#include "opentelemetry/exporters/otlp/otlp_metric_utils.h" + +using + opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; +using + opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse; +using opentelemetry::proto::collector::metrics::v1::MetricsService; +using opentelemetry::sdk::common::ExportResult; +using opentelemetry::sdk::metrics::ResourceMetrics; +using opentelemetry::v1::exporter::otlp::OtlpGrpcClient; +using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporterOptions; +using opentelemetry::v1::exporter::otlp::OtlpMetricUtils; + +namespace node { +namespace nsolid { +namespace grpc { + +using SharedGrpcMetricsExporter = std::shared_ptr; +using WeakGrpcMetricsExporter = std::weak_ptr; + +GrpcMetricsExporter::GrpcMetricsExporter( + uv_loop_t* loop, + OtlpGrpcMetricExporterOptions options, + std::shared_ptr client, + size_t buffer_size): + loop_(loop), + options_(options), + client_(client), + metrics_service_stub_(client_->MakeMetricsServiceStub()), + metrics_q_(buffer_size), + in_flight_(false) { +} + +void GrpcMetricsExporter::init() { + metrics_completion_q_ = AsyncTSQueue<::grpc::Status>::create( + loop_, + +[](::grpc::Status status, WeakGrpcMetricsExporter exporter_wp) { + SharedGrpcMetricsExporter exporter = exporter_wp.lock(); + if (exporter == nullptr) { + return; + } + + exporter->on_metrics_export_complete(status); + }, + weak_from_this()); +} + +GrpcMetricsExporter::~GrpcMetricsExporter() { +} + +void GrpcMetricsExporter::enqueue( + std::vector&& metrics) { + metrics_q_.push(std::move(metrics)); + if (!in_flight_) { + export_current(); + } +} + +void GrpcMetricsExporter::export_current() { + if (metrics_q_.empty()) { + return; + } + + auto* stub = metrics_service_stub_.get(); + if (stub == nullptr) { + return; + } + + auto& data = metrics_q_.front(); + + google::protobuf::ArenaOptions arena_options; + arena_options.initial_block_size = 1024; + arena_options.max_block_size = 65536; + auto arena = std::make_unique(arena_options); + + auto* request = + google::protobuf::Arena::Create(arena.get()); + + otlp::PopulateRequest(data, otlp::GetResource(), otlp::GetScope(), request); + + auto context = OtlpGrpcClient::MakeClientContext(options_); + ::grpc::Status immediate = + GrpcClient::DelegateAsyncExport( + stub, + &MetricsServiceStub::async_interface::Export, + std::move(context), + std::move(arena), + std::move(*request), + [weak = weak_from_this()](::grpc::Status status, + std::unique_ptr&&, + const ExportMetricsServiceRequest&, + ExportMetricsServiceResponse*) { + auto exporter = weak.lock(); + if (exporter != nullptr) { + exporter->metrics_completion_q_->enqueue(status); + } + }); + + if (!immediate.ok()) { + metrics_completion_q_->enqueue(immediate); + } else { + in_flight_ = true; + } +} + +void GrpcMetricsExporter::flush() { + export_current(); +} + +void GrpcMetricsExporter::resize_buffer(size_t new_buffer_size) { + metrics_q_.resize(new_buffer_size); +} + +void GrpcMetricsExporter::on_metrics_export_complete(::grpc::Status status) { + in_flight_ = false; + if (status.ok()) { + metrics_q_.pop(); + export_current(); + } +} + +} // namespace grpc +} // namespace nsolid +} // namespace node diff --git a/agents/grpc/src/grpc_metrics_exporter.h b/agents/grpc/src/grpc_metrics_exporter.h new file mode 100644 index 0000000000..310cda2533 --- /dev/null +++ b/agents/grpc/src/grpc_metrics_exporter.h @@ -0,0 +1,77 @@ +#ifndef AGENTS_GRPC_SRC_GRPC_METRICS_EXPORTER_H_ +#define AGENTS_GRPC_SRC_GRPC_METRICS_EXPORTER_H_ + +#include +#include +#include + +#include "../../otlp/src/batched_metric_data.h" +#include "nsolid/async_ts_queue.h" +#include "nsolid/nsolid_util.h" +#include "opentelemetry/exporters/otlp/otlp_grpc_client.h" +#include "opentelemetry/exporters/otlp/otlp_grpc_metric_exporter_options.h" +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" +#include "opentelemetry/sdk/common/exporter_utils.h" +#include "opentelemetry/sdk/metrics/export/metric_producer.h" + +// Class pre-declaration +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace otlp { +class OtlpGrpcClient; +} +} +namespace sdk { +namespace trace { +} +} +OPENTELEMETRY_END_NAMESPACE + +namespace node { +namespace nsolid { +namespace grpc { + +class GrpcMetricsExporter: + public std::enable_shared_from_this { + public: + explicit GrpcMetricsExporter( + uv_loop_t* loop, + opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporterOptions options, + std::shared_ptr client, + size_t buffer_size); + ~GrpcMetricsExporter(); + + GrpcMetricsExporter(const GrpcMetricsExporter&) = delete; + GrpcMetricsExporter& operator=(const GrpcMetricsExporter&) = delete; + + void init(); + + void enqueue(std::vector&& metrics); + + void flush(); + + void resize_buffer(size_t new_buffer_size); + + private: + using MetricsServiceStub = + opentelemetry::proto::collector::metrics::v1::MetricsService::StubInterface; + + void export_current(); + + void on_metrics_export_complete(::grpc::Status status); + + uv_loop_t* loop_; + opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporterOptions options_; + std::shared_ptr client_; + std::unique_ptr metrics_service_stub_; + utils::RingBuffer> metrics_q_; + bool in_flight_; + std::shared_ptr> metrics_completion_q_; +}; + +} // namespace grpc +} // namespace nsolid +} // namespace node + +#endif // AGENTS_GRPC_SRC_GRPC_METRICS_EXPORTER_H_ + diff --git a/agents/grpc/src/proto/reconfigure.pb.cc b/agents/grpc/src/proto/reconfigure.pb.cc index d115cf0694..84709382e7 100644 --- a/agents/grpc/src/proto/reconfigure.pb.cc +++ b/agents/grpc/src/proto/reconfigure.pb.cc @@ -47,7 +47,9 @@ inline constexpr ReconfigureBody::Impl_::Impl_( tracingenabled_{false}, tracingmodulesblacklist_{0u}, contcpuprofile_{false}, - assetsenabled_{false} {} + assetsenabled_{false}, + metricsbatchsize_{0u}, + metricsbuffersize_{0u} {} template PROTOBUF_CONSTEXPR ReconfigureBody::ReconfigureBody(::_pbi::ConstantInitialized) @@ -104,7 +106,7 @@ const ::uint32_t protodesc_cold) = { 0x081, // bitmap PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureBody, _impl_._has_bits_), - 16, // hasbit index offset + 18, // hasbit index offset PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureBody, _impl_.blockedloopthreshold_), PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureBody, _impl_.interval_), PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureBody, _impl_.pausemetrics_), @@ -118,6 +120,8 @@ const ::uint32_t PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureBody, _impl_.tracingmodulesblacklist_), PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureBody, _impl_.contcpuprofile_), PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureBody, _impl_.assetsenabled_), + PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureBody, _impl_.metricsbatchsize_), + PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureBody, _impl_.metricsbuffersize_), 4, 5, 6, @@ -131,6 +135,8 @@ const ::uint32_t 10, 11, 12, + 13, + 14, 0x081, // bitmap PROTOBUF_FIELD_OFFSET(::grpcagent::ReconfigureEvent, _impl_._has_bits_), 5, // hasbit index offset @@ -143,7 +149,7 @@ const ::uint32_t static const ::_pbi::MigrationSchema schemas[] ABSL_ATTRIBUTE_SECTION_VARIABLE(protodesc_cold) = { {0, sizeof(::grpcagent::ReconfigureBody)}, - {29, sizeof(::grpcagent::ReconfigureEvent)}, + {33, sizeof(::grpcagent::ReconfigureEvent)}, }; static const ::_pb::Message* PROTOBUF_NONNULL const file_default_instances[] = { &::grpcagent::_ReconfigureBody_default_instance_._instance, @@ -152,7 +158,7 @@ static const ::_pb::Message* PROTOBUF_NONNULL const file_default_instances[] = { const char descriptor_table_protodef_reconfigure_2eproto[] ABSL_ATTRIBUTE_SECTION_VARIABLE( protodesc_cold) = { "\n\021reconfigure.proto\022\tgrpcagent\032\014common.p" - "roto\"\323\004\n\017ReconfigureBody\022!\n\024blockedLoopT" + "roto\"\275\005\n\017ReconfigureBody\022!\n\024blockedLoopT" "hreshold\030\001 \001(\004H\000\210\001\001\022\025\n\010interval\030\002 \001(\004H\001\210" "\001\001\022\031\n\014pauseMetrics\030\003 \001(\010H\002\210\001\001\022\034\n\017promise" "Tracking\030\004 \001(\010H\003\210\001\001\022\034\n\017redactSnapshots\030\005" @@ -161,15 +167,18 @@ const char descriptor_table_protodef_reconfigure_2eproto[] ABSL_ATTRIBUTE_SECTIO "\001\001\022\014\n\004tags\030\t \003(\t\022\033\n\016tracingEnabled\030\n \001(\010" "H\010\210\001\001\022$\n\027tracingModulesBlacklist\030\013 \001(\rH\t" "\210\001\001\022\033\n\016contCpuProfile\030\014 \001(\010H\n\210\001\001\022\032\n\rasse" - "tsEnabled\030\r \001(\010H\013\210\001\001B\027\n\025_blockedLoopThre" - "sholdB\013\n\t_intervalB\017\n\r_pauseMetricsB\022\n\020_" - "promiseTrackingB\022\n\020_redactSnapshotsB\t\n\007_" - "statsdB\017\n\r_statsdBucketB\r\n\013_statsdTagsB\021" - "\n\017_tracingEnabledB\032\n\030_tracingModulesBlac" - "klistB\021\n\017_contCpuProfileB\020\n\016_assetsEnabl" - "ed\"g\n\020ReconfigureEvent\022)\n\006common\030\001 \001(\0132\031" - ".grpcagent.CommonResponse\022(\n\004body\030\002 \001(\0132" - "\032.grpcagent.ReconfigureBodyb\006proto3" + "tsEnabled\030\r \001(\010H\013\210\001\001\022\035\n\020metricsBatchSize" + "\030\016 \001(\rH\014\210\001\001\022\036\n\021metricsBufferSize\030\017 \001(\rH\r" + "\210\001\001B\027\n\025_blockedLoopThresholdB\013\n\t_interva" + "lB\017\n\r_pauseMetricsB\022\n\020_promiseTrackingB\022" + "\n\020_redactSnapshotsB\t\n\007_statsdB\017\n\r_statsd" + "BucketB\r\n\013_statsdTagsB\021\n\017_tracingEnabled" + "B\032\n\030_tracingModulesBlacklistB\021\n\017_contCpu" + "ProfileB\020\n\016_assetsEnabledB\023\n\021_metricsBat" + "chSizeB\024\n\022_metricsBufferSize\"g\n\020Reconfig" + "ureEvent\022)\n\006common\030\001 \001(\0132\031.grpcagent.Com" + "monResponse\022(\n\004body\030\002 \001(\0132\032.grpcagent.Re" + "configureBodyb\006proto3" }; static const ::_pbi::DescriptorTable* PROTOBUF_NONNULL const descriptor_table_reconfigure_2eproto_deps[1] = { @@ -179,7 +188,7 @@ static ::absl::once_flag descriptor_table_reconfigure_2eproto_once; PROTOBUF_CONSTINIT const ::_pbi::DescriptorTable descriptor_table_reconfigure_2eproto = { false, false, - 755, + 861, descriptor_table_protodef_reconfigure_2eproto, "reconfigure.proto", &descriptor_table_reconfigure_2eproto_once, @@ -240,9 +249,9 @@ ReconfigureBody::ReconfigureBody( offsetof(Impl_, blockedloopthreshold_), reinterpret_cast(&from._impl_) + offsetof(Impl_, blockedloopthreshold_), - offsetof(Impl_, assetsenabled_) - + offsetof(Impl_, metricsbuffersize_) - offsetof(Impl_, blockedloopthreshold_) + - sizeof(Impl_::assetsenabled_)); + sizeof(Impl_::metricsbuffersize_)); // @@protoc_insertion_point(copy_constructor:grpcagent.ReconfigureBody) } @@ -260,9 +269,9 @@ inline void ReconfigureBody::SharedCtor(::_pb::Arena* PROTOBUF_NULLABLE arena) { ::memset(reinterpret_cast(&_impl_) + offsetof(Impl_, blockedloopthreshold_), 0, - offsetof(Impl_, assetsenabled_) - + offsetof(Impl_, metricsbuffersize_) - offsetof(Impl_, blockedloopthreshold_) + - sizeof(Impl_::assetsenabled_)); + sizeof(Impl_::metricsbuffersize_)); } ReconfigureBody::~ReconfigureBody() { // @@protoc_insertion_point(destructor:grpcagent.ReconfigureBody) @@ -336,16 +345,16 @@ ReconfigureBody::GetClassData() const { return ReconfigureBody_class_data_.base(); } PROTOBUF_CONSTINIT PROTOBUF_ATTRIBUTE_INIT_PRIORITY1 -const ::_pbi::TcParseTable<4, 13, 0, 74, 2> +const ::_pbi::TcParseTable<4, 15, 0, 74, 2> ReconfigureBody::_table_ = { { PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_._has_bits_), 0, // no _extensions_ - 13, 120, // max_field_number, fast_idx_mask + 15, 120, // max_field_number, fast_idx_mask offsetof(decltype(_table_), field_lookup_table), - 4294959104, // skipmap + 4294934528, // skipmap offsetof(decltype(_table_), field_entries), - 13, // num_field_entries + 15, // num_field_entries 0, // num_aux_entries offsetof(decltype(_table_), field_names), // no aux_entries ReconfigureBody_class_data_.base(), @@ -408,8 +417,14 @@ ReconfigureBody::_table_ = { {::_pbi::TcParser::SingularVarintNoZag1(), {104, 12, 0, PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.assetsenabled_)}}, - {::_pbi::TcParser::MiniParse, {}}, - {::_pbi::TcParser::MiniParse, {}}, + // optional uint32 metricsBatchSize = 14; + {::_pbi::TcParser::SingularVarintNoZag1<::uint32_t, offsetof(ReconfigureBody, _impl_.metricsbatchsize_), 13>(), + {112, 13, 0, + PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.metricsbatchsize_)}}, + // optional uint32 metricsBufferSize = 15; + {::_pbi::TcParser::SingularVarintNoZag1<::uint32_t, offsetof(ReconfigureBody, _impl_.metricsbuffersize_), 14>(), + {120, 14, 0, + PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.metricsbuffersize_)}}, }}, {{ 65535, 65535 }}, {{ @@ -439,6 +454,10 @@ ReconfigureBody::_table_ = { {PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.contcpuprofile_), _Internal::kHasBitsOffset + 11, 0, (0 | ::_fl::kFcOptional | ::_fl::kBool)}, // optional bool assetsEnabled = 13; {PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.assetsenabled_), _Internal::kHasBitsOffset + 12, 0, (0 | ::_fl::kFcOptional | ::_fl::kBool)}, + // optional uint32 metricsBatchSize = 14; + {PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.metricsbatchsize_), _Internal::kHasBitsOffset + 13, 0, (0 | ::_fl::kFcOptional | ::_fl::kUInt32)}, + // optional uint32 metricsBufferSize = 15; + {PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.metricsbuffersize_), _Internal::kHasBitsOffset + 14, 0, (0 | ::_fl::kFcOptional | ::_fl::kUInt32)}, }}, // no aux_entries {{ @@ -477,10 +496,10 @@ PROTOBUF_NOINLINE void ReconfigureBody::Clear() { reinterpret_cast(&_impl_.promisetracking_) - reinterpret_cast(&_impl_.blockedloopthreshold_)) + sizeof(_impl_.promisetracking_)); } - if (BatchCheckHasBit(cached_has_bits, 0x00001f00U)) { + if (BatchCheckHasBit(cached_has_bits, 0x00007f00U)) { ::memset(&_impl_.redactsnapshots_, 0, static_cast<::size_t>( - reinterpret_cast(&_impl_.assetsenabled_) - - reinterpret_cast(&_impl_.redactsnapshots_)) + sizeof(_impl_.assetsenabled_)); + reinterpret_cast(&_impl_.metricsbuffersize_) - + reinterpret_cast(&_impl_.redactsnapshots_)) + sizeof(_impl_.metricsbuffersize_)); } _impl_._has_bits_.Clear(); _internal_metadata_.Clear<::google::protobuf::UnknownFieldSet>(); @@ -602,6 +621,20 @@ ::uint8_t* PROTOBUF_NONNULL ReconfigureBody::_InternalSerialize( 13, this_._internal_assetsenabled(), target); } + // optional uint32 metricsBatchSize = 14; + if (CheckHasBit(cached_has_bits, 0x00002000U)) { + target = stream->EnsureSpace(target); + target = ::_pbi::WireFormatLite::WriteUInt32ToArray( + 14, this_._internal_metricsbatchsize(), target); + } + + // optional uint32 metricsBufferSize = 15; + if (CheckHasBit(cached_has_bits, 0x00004000U)) { + target = stream->EnsureSpace(target); + target = ::_pbi::WireFormatLite::WriteUInt32ToArray( + 15, this_._internal_metricsbuffersize(), target); + } + if (ABSL_PREDICT_FALSE(this_._internal_metadata_.have_unknown_fields())) { target = ::_pbi::WireFormat::InternalSerializeUnknownFieldsToArray( @@ -664,12 +697,22 @@ ::size_t ReconfigureBody::ByteSizeLong() const { this_._internal_interval()); } } - { + if (BatchCheckHasBit(cached_has_bits, 0x00006400U)) { // optional uint32 tracingModulesBlacklist = 11; if (CheckHasBit(cached_has_bits, 0x00000400U)) { total_size += ::_pbi::WireFormatLite::UInt32SizePlusOne( this_._internal_tracingmodulesblacklist()); } + // optional uint32 metricsBatchSize = 14; + if (CheckHasBit(cached_has_bits, 0x00002000U)) { + total_size += ::_pbi::WireFormatLite::UInt32SizePlusOne( + this_._internal_metricsbatchsize()); + } + // optional uint32 metricsBufferSize = 15; + if (CheckHasBit(cached_has_bits, 0x00004000U)) { + total_size += ::_pbi::WireFormatLite::UInt32SizePlusOne( + this_._internal_metricsbuffersize()); + } } return this_.MaybeComputeUnknownFieldsSize(total_size, &this_._impl_._cached_size_); @@ -718,7 +761,7 @@ void ReconfigureBody::MergeImpl(::google::protobuf::MessageLite& to_msg, _this->_impl_.promisetracking_ = from._impl_.promisetracking_; } } - if (BatchCheckHasBit(cached_has_bits, 0x00001f00U)) { + if (BatchCheckHasBit(cached_has_bits, 0x00007f00U)) { if (CheckHasBit(cached_has_bits, 0x00000100U)) { _this->_impl_.redactsnapshots_ = from._impl_.redactsnapshots_; } @@ -734,6 +777,12 @@ void ReconfigureBody::MergeImpl(::google::protobuf::MessageLite& to_msg, if (CheckHasBit(cached_has_bits, 0x00001000U)) { _this->_impl_.assetsenabled_ = from._impl_.assetsenabled_; } + if (CheckHasBit(cached_has_bits, 0x00002000U)) { + _this->_impl_.metricsbatchsize_ = from._impl_.metricsbatchsize_; + } + if (CheckHasBit(cached_has_bits, 0x00004000U)) { + _this->_impl_.metricsbuffersize_ = from._impl_.metricsbuffersize_; + } } _this->_impl_._has_bits_[0] |= cached_has_bits; _this->_internal_metadata_.MergeFrom<::google::protobuf::UnknownFieldSet>( @@ -759,8 +808,8 @@ void ReconfigureBody::InternalSwap(ReconfigureBody* PROTOBUF_RESTRICT PROTOBUF_N ::_pbi::ArenaStringPtr::InternalSwap(&_impl_.statsdbucket_, &other->_impl_.statsdbucket_, arena); ::_pbi::ArenaStringPtr::InternalSwap(&_impl_.statsdtags_, &other->_impl_.statsdtags_, arena); ::google::protobuf::internal::memswap< - PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.assetsenabled_) - + sizeof(ReconfigureBody::_impl_.assetsenabled_) + PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.metricsbuffersize_) + + sizeof(ReconfigureBody::_impl_.metricsbuffersize_) - PROTOBUF_FIELD_OFFSET(ReconfigureBody, _impl_.blockedloopthreshold_)>( reinterpret_cast(&_impl_.blockedloopthreshold_), reinterpret_cast(&other->_impl_.blockedloopthreshold_)); diff --git a/agents/grpc/src/proto/reconfigure.pb.h b/agents/grpc/src/proto/reconfigure.pb.h index 95294467cd..1b5db62b9b 100644 --- a/agents/grpc/src/proto/reconfigure.pb.h +++ b/agents/grpc/src/proto/reconfigure.pb.h @@ -230,6 +230,8 @@ class ReconfigureBody final : public ::google::protobuf::Message kTracingModulesBlacklistFieldNumber = 11, kContCpuProfileFieldNumber = 12, kAssetsEnabledFieldNumber = 13, + kMetricsBatchSizeFieldNumber = 14, + kMetricsBufferSizeFieldNumber = 15, }; // repeated string tags = 9; int tags_size() const; @@ -399,12 +401,34 @@ class ReconfigureBody final : public ::google::protobuf::Message bool _internal_assetsenabled() const; void _internal_set_assetsenabled(bool value); + public: + // optional uint32 metricsBatchSize = 14; + bool has_metricsbatchsize() const; + void clear_metricsbatchsize() ; + ::uint32_t metricsbatchsize() const; + void set_metricsbatchsize(::uint32_t value); + + private: + ::uint32_t _internal_metricsbatchsize() const; + void _internal_set_metricsbatchsize(::uint32_t value); + + public: + // optional uint32 metricsBufferSize = 15; + bool has_metricsbuffersize() const; + void clear_metricsbuffersize() ; + ::uint32_t metricsbuffersize() const; + void set_metricsbuffersize(::uint32_t value); + + private: + ::uint32_t _internal_metricsbuffersize() const; + void _internal_set_metricsbuffersize(::uint32_t value); + public: // @@protoc_insertion_point(class_scope:grpcagent.ReconfigureBody) private: class _Internal; friend class ::google::protobuf::internal::TcParser; - static const ::google::protobuf::internal::TcParseTable<4, 13, + static const ::google::protobuf::internal::TcParseTable<4, 15, 0, 74, 2> _table_; @@ -439,6 +463,8 @@ class ReconfigureBody final : public ::google::protobuf::Message ::uint32_t tracingmodulesblacklist_; bool contcpuprofile_; bool assetsenabled_; + ::uint32_t metricsbatchsize_; + ::uint32_t metricsbuffersize_; PROTOBUF_TSAN_DECLARE_MEMBER }; union { Impl_ _impl_; }; @@ -1215,6 +1241,64 @@ inline void ReconfigureBody::_internal_set_assetsenabled(bool value) { _impl_.assetsenabled_ = value; } +// optional uint32 metricsBatchSize = 14; +inline bool ReconfigureBody::has_metricsbatchsize() const { + bool value = CheckHasBit(_impl_._has_bits_[0], 0x00002000U); + return value; +} +inline void ReconfigureBody::clear_metricsbatchsize() { + ::google::protobuf::internal::TSanWrite(&_impl_); + _impl_.metricsbatchsize_ = 0u; + ClearHasBit(_impl_._has_bits_[0], + 0x00002000U); +} +inline ::uint32_t ReconfigureBody::metricsbatchsize() const { + // @@protoc_insertion_point(field_get:grpcagent.ReconfigureBody.metricsBatchSize) + return _internal_metricsbatchsize(); +} +inline void ReconfigureBody::set_metricsbatchsize(::uint32_t value) { + _internal_set_metricsbatchsize(value); + SetHasBit(_impl_._has_bits_[0], 0x00002000U); + // @@protoc_insertion_point(field_set:grpcagent.ReconfigureBody.metricsBatchSize) +} +inline ::uint32_t ReconfigureBody::_internal_metricsbatchsize() const { + ::google::protobuf::internal::TSanRead(&_impl_); + return _impl_.metricsbatchsize_; +} +inline void ReconfigureBody::_internal_set_metricsbatchsize(::uint32_t value) { + ::google::protobuf::internal::TSanWrite(&_impl_); + _impl_.metricsbatchsize_ = value; +} + +// optional uint32 metricsBufferSize = 15; +inline bool ReconfigureBody::has_metricsbuffersize() const { + bool value = CheckHasBit(_impl_._has_bits_[0], 0x00004000U); + return value; +} +inline void ReconfigureBody::clear_metricsbuffersize() { + ::google::protobuf::internal::TSanWrite(&_impl_); + _impl_.metricsbuffersize_ = 0u; + ClearHasBit(_impl_._has_bits_[0], + 0x00004000U); +} +inline ::uint32_t ReconfigureBody::metricsbuffersize() const { + // @@protoc_insertion_point(field_get:grpcagent.ReconfigureBody.metricsBufferSize) + return _internal_metricsbuffersize(); +} +inline void ReconfigureBody::set_metricsbuffersize(::uint32_t value) { + _internal_set_metricsbuffersize(value); + SetHasBit(_impl_._has_bits_[0], 0x00004000U); + // @@protoc_insertion_point(field_set:grpcagent.ReconfigureBody.metricsBufferSize) +} +inline ::uint32_t ReconfigureBody::_internal_metricsbuffersize() const { + ::google::protobuf::internal::TSanRead(&_impl_); + return _impl_.metricsbuffersize_; +} +inline void ReconfigureBody::_internal_set_metricsbuffersize(::uint32_t value) { + ::google::protobuf::internal::TSanWrite(&_impl_); + _impl_.metricsbuffersize_ = value; +} + // ------------------------------------------------------------------- // ReconfigureEvent diff --git a/agents/otlp/src/batched_metric_data.h b/agents/otlp/src/batched_metric_data.h new file mode 100644 index 0000000000..4612a855c5 --- /dev/null +++ b/agents/otlp/src/batched_metric_data.h @@ -0,0 +1,51 @@ +#ifndef AGENTS_OTLP_SRC_BATCHED_METRIC_DATA_H_ +#define AGENTS_OTLP_SRC_BATCHED_METRIC_DATA_H_ + +#include + +#include "opentelemetry/nostd/variant.h" +#include "opentelemetry/sdk/common/attribute_utils.h" +#include "opentelemetry/sdk/metrics/data/point_data.h" +#include "opentelemetry/sdk/metrics/instruments.h" +#include "opentelemetry/version.h" + +namespace node { +namespace nsolid { +namespace otlp { + +using opentelemetry::sdk::metrics::SumPointData; +using opentelemetry::sdk::metrics::HistogramPointData; +using opentelemetry::sdk::metrics::Base2ExponentialHistogramPointData; +using opentelemetry::sdk::metrics::LastValuePointData; +using opentelemetry::sdk::metrics::DropPointData; +using opentelemetry::sdk::metrics::SummaryPointData; +using opentelemetry::sdk::metrics::AggregationTemporality; +using opentelemetry::sdk::metrics::InstrumentDescriptor; +using PointAttributes = opentelemetry::sdk::common::OrderedAttributeMap; +using PointType = + opentelemetry::nostd::variant; + +struct TimedPointDataAttributes { + opentelemetry::common::SystemTimestamp start_ts; + opentelemetry::common::SystemTimestamp end_ts; + PointAttributes attributes; + PointType point_data; +}; + +class BatchedMetricData { + public: + InstrumentDescriptor instrument_descriptor; + AggregationTemporality aggregation_temporality; + std::vector point_data_attr_; +}; + +} // namespace otlp +} // namespace nsolid +} // namespace node + +#endif // AGENTS_OTLP_SRC_BATCHED_METRIC_DATA_H_ diff --git a/agents/otlp/src/otlp_common.cc b/agents/otlp/src/otlp_common.cc index dd07505aa4..6b9980a655 100644 --- a/agents/otlp/src/otlp_common.cc +++ b/agents/otlp/src/otlp_common.cc @@ -1,13 +1,18 @@ #include "otlp_common.h" // NOLINTNEXTLINE(build/c++11) +#include +// NOLINTNEXTLINE(build/c++11) #include +#include #include #include "asserts-cpp/asserts.h" #include "env-inl.h" +#include "nsolid/nsolid_util.h" #include "nlohmann/json.hpp" #include "opentelemetry/semconv/incubating/process_attributes.h" -#include "opentelemetry/semconv/incubating/service_attributes.h" +#include "opentelemetry/exporters/otlp/otlp_populate_attribute_utils.h" #include "opentelemetry/semconv/incubating/thread_attributes.h" +#include "opentelemetry/semconv/incubating/service_attributes.h" #include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h" #include "opentelemetry/sdk/logs/recordable.h" #include "opentelemetry/sdk/trace/recordable.h" @@ -23,34 +28,30 @@ using std::chrono::microseconds; using std::chrono::milliseconds; using std::chrono::nanoseconds; +using google::protobuf::RepeatedPtrField; using opentelemetry::common::SystemTimestamp; +using opentelemetry::proto::common::v1::KeyValue; +using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; using opentelemetry::sdk::instrumentationscope::InstrumentationScope; using LogsRecordable = opentelemetry::sdk::logs::Recordable; using opentelemetry::sdk::common::OwnedAttributeType; using opentelemetry::sdk::metrics::AggregationTemporality; -using opentelemetry::sdk::metrics::MetricData; using opentelemetry::sdk::metrics::InstrumentDescriptor; using opentelemetry::sdk::metrics::InstrumentType; using opentelemetry::sdk::metrics::InstrumentValueType; +using opentelemetry::sdk::metrics::LastValuePointData; using opentelemetry::sdk::metrics::PointAttributes; using opentelemetry::sdk::metrics::PointDataAttributes; +using opentelemetry::sdk::metrics::SummaryPointData; using opentelemetry::sdk::metrics::SumPointData; using opentelemetry::sdk::metrics::ValueType; using opentelemetry::sdk::resource::Resource; using opentelemetry::sdk::resource::ResourceAttributes; using opentelemetry::sdk::trace::Recordable; -using opentelemetry::trace::SpanContext; -using opentelemetry::trace::SpanId; -using opentelemetry::trace::SpanKind; -using opentelemetry::trace::TraceFlags; -using opentelemetry::trace::TraceId; -using opentelemetry::trace::propagation::detail::HexToBinary; -using opentelemetry::semconv::process::kProcessOwner; -using opentelemetry::semconv::service::kServiceName; -using opentelemetry::semconv::service::kServiceInstanceId; -using opentelemetry::semconv::service::kServiceVersion; -using opentelemetry::semconv::thread::kThreadId; -using opentelemetry::semconv::thread::kThreadName; +using OtlpPopulateAttributeUtils = + opentelemetry::exporter::otlp::OtlpPopulateAttributeUtils; + +namespace proto = opentelemetry::proto; namespace node { namespace nsolid { @@ -67,73 +68,140 @@ static std::vector discarded_metrics = { "thread_id", "timestamp" }; -static std::unique_ptr resource_g = - std::make_unique(Resource::GetEmpty()); +static auto resource_g = std::make_unique(Resource::GetEmpty()); static bool isResourceInitialized_g = false; +// Helper to serialize attributes +void SerializeAttributes(const PointAttributes& attrs, + RepeatedPtrField* proto_attrs) { + for (const auto& attr : attrs) { + OtlpPopulateAttributeUtils::PopulateAttribute( + proto_attrs->Add(), attr.first, attr.second, false); + } +} + +opentelemetry::sdk::metrics::MetricData BatchedMetricToMetricData( + const BatchedMetricData& bm) { + opentelemetry::sdk::metrics::MetricData md; + md.instrument_descriptor = bm.instrument_descriptor; + md.aggregation_temporality = bm.aggregation_temporality; + if (!bm.point_data_attr_.empty()) { + md.start_ts = bm.point_data_attr_[0].start_ts; + md.end_ts = bm.point_data_attr_[0].end_ts; + for (const auto& p : bm.point_data_attr_) { + md.point_data_attr_.push_back({ p.attributes, p.point_data }); + } + } + return md; +} + +// Helper to convert vector to vector +std::vector + ConvertBatchedToMetricData(const std::vector& batched) { + std::vector metric_data; + metric_data.reserve(batched.size()); + for (const auto& bm : batched) { + metric_data.push_back(BatchedMetricToMetricData(bm)); + } + return metric_data; +} + // NOLINTNEXTLINE(runtime/references) -static void add_counter(std::vector& metrics, +static void add_counter(MetricDataBatch& metrics_batch, const time_point& start, const time_point& end, const char* name, const char* unit, - InstrumentValueType type, + InstrumentValueType value_type, ValueType value, PointAttributes attrs = {}) { SumPointData sum_point_data; sum_point_data.value_ = value; - MetricData metric_data{ - InstrumentDescriptor{ name, "", unit, InstrumentType::kCounter, type}, - AggregationTemporality::kCumulative, - SystemTimestamp{ start }, - SystemTimestamp{ end }, - std::vector{{ attrs, sum_point_data }} - }; - metrics.push_back(metric_data); + PointDataAttributes point_data_attributes { attrs, sum_point_data }; + metrics_batch.AddDataPoint(start, + end, + name, + unit, + InstrumentType::kCounter, + value_type, + AggregationTemporality::kCumulative, + std::move(point_data_attributes)); } // NOLINTNEXTLINE(runtime/references) -static void add_gauge(std::vector& metrics, +static void add_gauge(MetricDataBatch& metrics_batch, const time_point& start, const time_point& end, const char* name, const char* unit, - InstrumentValueType type, + InstrumentValueType value_type, ValueType value, PointAttributes attrs = {}) { - opentelemetry::sdk::metrics::LastValuePointData lv_point_data; + LastValuePointData lv_point_data; lv_point_data.value_ = value; - MetricData metric_data{ - InstrumentDescriptor{ - name, "", unit, InstrumentType::kObservableGauge, type }, - AggregationTemporality::kCumulative, - SystemTimestamp{ start }, - SystemTimestamp{ end }, - std::vector{{ attrs, lv_point_data }} - }; - metrics.push_back(metric_data); + PointDataAttributes point_data_attributes { attrs, lv_point_data }; + metrics_batch.AddDataPoint(start, + end, + name, + unit, + InstrumentType::kGauge, + value_type, + AggregationTemporality::kCumulative, + std::move(point_data_attributes)); } // NOLINTNEXTLINE(runtime/references) -static void add_summary(std::vector& metrics, +static void add_summary(MetricDataBatch& metrics_batch, const time_point& start, const time_point& end, const char* name, const char* unit, - InstrumentValueType type, + InstrumentValueType value_type, std::unordered_map&& values, PointAttributes attrs = {}) { - opentelemetry::sdk::metrics::SummaryPointData summary_point_data{}; + SummaryPointData summary_point_data{}; summary_point_data.quantile_values_ = std::move(values); - MetricData metric_data{ - InstrumentDescriptor{ - name, "", unit, InstrumentType::kSummary, type }, - AggregationTemporality::kUnspecified, + PointDataAttributes point_data_attributes { attrs, summary_point_data }; + metrics_batch.AddDataPoint(start, + end, + name, + unit, + InstrumentType::kSummary, + value_type, + AggregationTemporality::kUnspecified, + std::move(point_data_attributes)); +} + +MetricDataBatch::MetricDataBatch(std::size_t limit): max_points_(limit) { +} + +void MetricDataBatch::AddDataPoint(const time_point& start, + const time_point& end, + const char* name, + const char* unit, + InstrumentType type, + InstrumentValueType value_type, + AggregationTemporality temporality, + PointDataAttributes&& pdata_attrs) { + TimedPointDataAttributes timed_point_data_attrs { SystemTimestamp{ start }, SystemTimestamp{ end }, - std::vector{{ attrs, summary_point_data }} + std::move(pdata_attrs.attributes), + std::move(pdata_attrs.point_data) }; - metrics.push_back(metric_data); + auto it = metric_indices_.find(name); + if (it == metric_indices_.end()) { + BatchedMetricData metric_data { + InstrumentDescriptor { name, "", unit, type, value_type }, + temporality, + std::vector{std::move(timed_point_data_attrs)} + }; + metrics_.push_back(metric_data); + TrackMetricIndex(name, metrics_.size() - 1); + } else { + metrics_[it->second].point_data_attr_. + push_back(std::move(timed_point_data_attrs)); + } } InstrumentationScope* GetScope() { @@ -149,6 +217,9 @@ Resource* GetResource() { ASSERT(!config.is_discarded()); auto it = config.find("app"); ASSERT(it != config.end()); + using opentelemetry::semconv::service::kServiceName; + using opentelemetry::semconv::service::kServiceInstanceId; + using opentelemetry::semconv::service::kServiceVersion; ResourceAttributes attrs({ {kServiceName, it->get()}, {kServiceInstanceId, nsolid::GetAgentId()} @@ -172,6 +243,7 @@ Resource* UpdateResource(ResourceAttributes&& attrs) { // value "unknown_service". (See Resource::Create() method in the SDK). auto resource = GetResource(); auto attributes = resource->GetAttributes(); + using opentelemetry::semconv::service::kServiceName; if (attributes.find(kServiceName) != attributes.end() && attrs.find(kServiceName) == attrs.end()) { attrs.SetAttribute(kServiceName, @@ -184,7 +256,7 @@ Resource* UpdateResource(ResourceAttributes&& attrs) { } // NOLINTNEXTLINE(runtime/references) -void fill_proc_metrics(std::vector& metrics, +void fill_proc_metrics(MetricDataBatch& metrics_batch, const ProcessMetrics::MetricsStor& stor, const ProcessMetrics::MetricsStor& prev_stor, bool use_snake_case) { @@ -216,7 +288,7 @@ void fill_proc_metrics(std::vector& metrics, switch (MetricsType::MType) { \ case MetricsType::ECounter: \ { \ - add_counter(metrics, \ + add_counter(metrics_batch, \ process_start, \ end, \ use_snake_case ? #CName : #JSName, \ @@ -227,7 +299,7 @@ void fill_proc_metrics(std::vector& metrics, break; \ case MetricsType::EGauge: \ { \ - add_gauge(metrics, \ + add_gauge(metrics_batch, \ process_start, \ end, \ use_snake_case ? #CName : #JSName, \ @@ -245,9 +317,12 @@ NSOLID_PROCESS_METRICS_UINT64(V) NSOLID_PROCESS_METRICS_DOUBLE(V) #undef V + metrics_batch.IncrementPoints(); + // Update Resource if needed: // Check if 'user' or 'title' are different from the previous metrics. if (prev_stor.user != stor.user || prev_stor.title != stor.title) { + using opentelemetry::semconv::process::kProcessOwner; ResourceAttributes attrs = { { kProcessOwner, stor.user }, { "process.title", stor.title }, @@ -258,7 +333,7 @@ NSOLID_PROCESS_METRICS_DOUBLE(V) } // NOLINTNEXTLINE(runtime/references) -void fill_env_metrics(std::vector& metrics, +void fill_env_metrics(MetricDataBatch& metrics_batch, const ThreadMetrics::MetricsStor& stor, bool use_snake_case) { time_point end{ @@ -268,6 +343,8 @@ void fill_env_metrics(std::vector& metrics, InstrumentValueType type; ValueType value; + using opentelemetry::semconv::thread::kThreadId; + using opentelemetry::semconv::thread::kThreadName; PointAttributes attrs = { { kThreadId, static_cast(stor.thread_id) }, { kThreadName, stor.thread_name }, @@ -294,7 +371,7 @@ void fill_env_metrics(std::vector& metrics, switch (MetricsType::MType) { \ case MetricsType::ECounter: \ { \ - add_counter(metrics, \ + add_counter(metrics_batch, \ process_start, \ end, \ use_snake_case ? #CName : #JSName, \ @@ -306,7 +383,7 @@ void fill_env_metrics(std::vector& metrics, break; \ case MetricsType::EGauge: \ { \ - add_gauge(metrics, \ + add_gauge(metrics_batch, \ process_start, \ end, \ use_snake_case ? #CName : #JSName, \ @@ -324,7 +401,7 @@ NSOLID_ENV_METRICS_NUMBERS(V) #undef V // Add the summary metrics separately. - add_summary(metrics, + add_summary(metrics_batch, process_start, end, use_snake_case ? "gc_dur_us" : "gcDurUs", @@ -333,7 +410,7 @@ NSOLID_ENV_METRICS_NUMBERS(V) {{ 0.5, stor.gc_dur_us_median }, { 0.99, stor.gc_dur_us99_ptile }}, attrs); - add_summary(metrics, + add_summary(metrics_batch, process_start, end, "dns", @@ -341,7 +418,7 @@ NSOLID_ENV_METRICS_NUMBERS(V) InstrumentValueType::kDouble, {{ 0.5, stor.dns_median }, { 0.99, stor.dns99_ptile }}, attrs); - add_summary(metrics, + add_summary(metrics_batch, process_start, end, use_snake_case ? "http_client" : "httpClient", @@ -350,7 +427,7 @@ NSOLID_ENV_METRICS_NUMBERS(V) {{ 0.5, stor.http_client99_ptile }, { 0.99, stor.http_client_median }}, attrs); - add_summary(metrics, + add_summary(metrics_batch, process_start, end, use_snake_case ? "http_server" : "httpServer", @@ -359,6 +436,8 @@ NSOLID_ENV_METRICS_NUMBERS(V) {{ 0.5, stor.http_server_median }, { 0.99, stor.http_server99_ptile }}, attrs); + + metrics_batch.IncrementPoints(); } void fill_log_recordable(LogsRecordable* recordable, @@ -383,6 +462,7 @@ void fill_recordable(Recordable* recordable, const Tracer::SpanStor& s) { recordable->SetDuration( nanoseconds(static_cast((s.end - s.start) * 1e6))); + using opentelemetry::trace::propagation::detail::HexToBinary; uint8_t span_buf[kSpanIdSize / 2]; HexToBinary(s.span_id, span_buf, sizeof(span_buf)); @@ -392,6 +472,11 @@ void fill_recordable(Recordable* recordable, const Tracer::SpanStor& s) { uint8_t trace_buf[kTraceIdSize / 2]; HexToBinary(s.trace_id, trace_buf, sizeof(trace_buf)); + using opentelemetry::trace::SpanContext; + using opentelemetry::trace::SpanId; + using opentelemetry::trace::SpanKind; + using opentelemetry::trace::TraceFlags; + using opentelemetry::trace::TraceId; SpanContext ctx(TraceId(trace_buf), SpanId(span_buf), TraceFlags(0), false); SpanId parent_id(parent_buf); @@ -511,6 +596,93 @@ void fill_recordable(Recordable* recordable, const Tracer::SpanStor& s) { recordable->SetResource(*GetResource()); } +void PopulateRequest(const std::vector& metrics, + const Resource* resource, + const InstrumentationScope* scope, + ExportMetricsServiceRequest* request) { + if (!request) return; + + auto* resource_metrics = request->add_resource_metrics(); + + // Populate resource + if (resource) { + OtlpPopulateAttributeUtils::PopulateAttribute( + resource_metrics->mutable_resource(), *resource); + resource_metrics->set_schema_url(resource->GetSchemaURL()); + } + + // Scope + auto* scope_metrics = resource_metrics->add_scope_metrics(); + if (scope) { + auto* proto_scope = scope_metrics->mutable_scope(); + proto_scope->set_name(scope->GetName()); + proto_scope->set_version(scope->GetVersion()); + OtlpPopulateAttributeUtils::PopulateAttribute(proto_scope, *scope); + scope_metrics->set_schema_url(scope->GetSchemaURL()); + } + + // Metrics + for (const auto& batched_metric : metrics) { + auto* proto_metric = scope_metrics->add_metrics(); + proto_metric->set_name(batched_metric.instrument_descriptor.name_); + proto_metric->set_unit(batched_metric.instrument_descriptor.unit_); + proto_metric->set_description(""); + auto type = batched_metric.instrument_descriptor.type_; + auto value_type = batched_metric.instrument_descriptor.value_type_; + // Type + if (type == InstrumentType::kCounter) { + auto* sum = proto_metric->mutable_sum(); + sum->set_aggregation_temporality( + static_cast( + batched_metric.aggregation_temporality)); + sum->set_is_monotonic(true); + for (const auto& point : batched_metric.point_data_attr_) { + auto* dp = sum->add_data_points(); + dp->set_start_time_unix_nano(point.start_ts.time_since_epoch().count()); + dp->set_time_unix_nano(point.end_ts.time_since_epoch().count()); + // Attributes + SerializeAttributes(point.attributes, dp->mutable_attributes()); + // Value + auto sum_data = std::get(point.point_data); + if (value_type == InstrumentValueType::kInt) { + dp->set_as_int(std::get(sum_data.value_)); + } else { + dp->set_as_double(std::get(sum_data.value_)); + } + } + } else if (type == InstrumentType::kGauge) { + auto* gauge = proto_metric->mutable_gauge(); + for (const auto& point : batched_metric.point_data_attr_) { + auto* dp = gauge->add_data_points(); + dp->set_time_unix_nano(point.end_ts.time_since_epoch().count()); + SerializeAttributes(point.attributes, dp->mutable_attributes()); + auto gauge_data = std::get(point.point_data); + if (value_type == InstrumentValueType::kInt) { + dp->set_as_int(std::get(gauge_data.value_)); + } else { + dp->set_as_double(std::get(gauge_data.value_)); + } + } + } else if (type == InstrumentType::kSummary) { + auto* summary = proto_metric->mutable_summary(); + for (const auto& point : batched_metric.point_data_attr_) { + auto* dp = summary->add_data_points(); + dp->set_start_time_unix_nano(point.start_ts.time_since_epoch().count()); + dp->set_time_unix_nano(point.end_ts.time_since_epoch().count()); + SerializeAttributes(point.attributes, dp->mutable_attributes()); + auto summary_data = std::get(point.point_data); + dp->set_sum(std::get(summary_data.quantile_values_.at(0.5))); + dp->set_count(1); + for (const auto& q : summary_data.quantile_values_) { + auto* quantile = dp->add_quantile_values(); + quantile->set_quantile(q.first); + quantile->set_value(std::get(q.second)); + } + } + } + } +} + } // namespace otlp } // namespace nsolid } // namespace node diff --git a/agents/otlp/src/otlp_common.h b/agents/otlp/src/otlp_common.h index 8c8bd8cdc6..43f7ab0042 100644 --- a/agents/otlp/src/otlp_common.h +++ b/agents/otlp/src/otlp_common.h @@ -1,9 +1,16 @@ #ifndef AGENTS_OTLP_SRC_OTLP_COMMON_H_ #define AGENTS_OTLP_SRC_OTLP_COMMON_H_ +#include +#include +#include +#include + #include "nsolid.h" +#include "batched_metric_data.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" #include "opentelemetry/sdk/resource/resource.h" +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" // Class pre-declaration OPENTELEMETRY_BEGIN_NAMESPACE @@ -39,6 +46,63 @@ namespace node { namespace nsolid { namespace otlp { +class MetricDataBatch { + using MetricVector = std::vector; + using MetricIndexMap = std::unordered_map; + + public: + explicit MetricDataBatch(std::size_t limit = 1); + + ~MetricDataBatch() = default; + + void AddDataPoint(const std::chrono::system_clock::time_point& start, + const std::chrono::system_clock::time_point& end, + const char* name, + const char* unit, + opentelemetry::sdk::metrics::InstrumentType type, + opentelemetry::sdk::metrics::InstrumentValueType value_type, + opentelemetry::sdk::metrics::AggregationTemporality temp, + opentelemetry::sdk::metrics::PointDataAttributes&& data); + + MetricVector DumpMetricsAndReset() { + MetricVector metrics = std::move(metrics_); + Reset(); + return metrics; + } + + void IncrementPoints(std::size_t count = 1) { + total_points_ += count; + } + + bool ShouldFlush() const { + return max_points_ > 0 && total_points_ >= max_points_; + } + + void Reset() { + metrics_.clear(); + metric_indices_.clear(); + total_points_ = 0; + } + + void Resize(std::size_t limit) { + max_points_ = limit; + } + + private: + MetricDataBatch(const MetricDataBatch&) = delete; + MetricDataBatch& operator=(const MetricDataBatch&) = delete; + + void TrackMetricIndex(std::string key, std::size_t index) { + metric_indices_[std::move(key)] = index; + } + + MetricVector metrics_; + MetricIndexMap metric_indices_; + + std::size_t total_points_ = 0; + std::size_t max_points_ = 0; +}; + OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope* GetScope(); @@ -47,12 +111,14 @@ OPENTELEMETRY_NAMESPACE::sdk::resource::Resource* GetResource(); OPENTELEMETRY_NAMESPACE::sdk::resource::Resource* UpdateResource( OPENTELEMETRY_NAMESPACE::sdk::resource::ResourceAttributes&&); -void fill_proc_metrics(std::vector&, +// NOLINTNEXTLINE(runtime/references) +void fill_proc_metrics(MetricDataBatch& metrics_batch, const ProcessMetrics::MetricsStor& stor, const ProcessMetrics::MetricsStor& prev_stor, bool use_snake_case = true); -void fill_env_metrics(std::vector&, +// NOLINTNEXTLINE(runtime/references) +void fill_env_metrics(MetricDataBatch& metrics_batch, const ThreadMetrics::MetricsStor& stor, bool use_snake_case = true); @@ -62,6 +128,17 @@ void fill_log_recordable(OPENTELEMETRY_NAMESPACE::sdk::logs::Recordable*, void fill_recordable(OPENTELEMETRY_NAMESPACE::sdk::trace::Recordable*, const Tracer::SpanStor&); +void PopulateRequest( + const std::vector& metrics, + const opentelemetry::sdk::resource::Resource* resource, + const opentelemetry::sdk::instrumentationscope::InstrumentationScope*, + opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest*); + +opentelemetry::sdk::metrics::MetricData + BatchedMetricToMetricData(const BatchedMetricData& bm); + +std::vector + ConvertBatchedToMetricData(const std::vector& batched); } // namespace otlp } // namespace nsolid diff --git a/agents/otlp/src/otlp_metrics.cc b/agents/otlp/src/otlp_metrics.cc index fbcc50021b..36b62a3366 100644 --- a/agents/otlp/src/otlp_metrics.cc +++ b/agents/otlp/src/otlp_metrics.cc @@ -106,11 +106,15 @@ OTLPMetrics::~OTLPMetrics() { /*virtual*/ void OTLPMetrics::got_proc_metrics(const ProcessMetricsStor& stor, const ProcessMetricsStor& prev_stor) { - std::vector metrics; + MetricDataBatch metrics; fill_proc_metrics(metrics, stor, prev_stor); + auto batched = metrics.DumpMetricsAndReset(); + auto metric_data = ConvertBatchedToMetricData(batched); ResourceMetrics data; data.resource_ = GetResource(); - data.scope_metric_data_ = std::vector{{scope_, metrics}}; + data.scope_metric_data_ = + std::vector{{ otlp::GetScope(), + std::move(metric_data) }}; auto result = otlp_metric_exporter_->Export(data); Debug("# ProcessMetrics Exported. Result: %d\n", static_cast(result)); } @@ -120,14 +124,17 @@ void OTLPMetrics::got_thr_metrics( const std::vector& thr_metrics) { ResourceMetrics data; data.resource_ = GetResource(); - std::vector metrics; + MetricDataBatch metrics; for (const auto& tm : thr_metrics) { fill_env_metrics(metrics, tm.stor); } + auto batched = metrics.DumpMetricsAndReset(); + auto metric_data = ConvertBatchedToMetricData(batched); + data.scope_metric_data_ = - std::vector{{scope_, metrics}}; + std::vector{{scope_, std::move(metric_data)}}; auto result = otlp_metric_exporter_->Export(data); Debug("# ThreadMetrics Exported. Result: %d\n", static_cast(result)); } diff --git a/lib/nsolid.js b/lib/nsolid.js index 08f3e50d29..b116f1ac97 100644 --- a/lib/nsolid.js +++ b/lib/nsolid.js @@ -6,6 +6,8 @@ const { DateNow, JSONParse, JSONStringify, + Number, + NumberIsFinite, NumberParseInt, ObjectAssign, ObjectDefineProperty, @@ -58,6 +60,8 @@ const { const DEFAULT_HOSTNAME = getHostname(); const DEFAULT_APPNAME = 'untitled application'; const DEFAULT_INTERVAL = 5000; +const DEFAULT_METRICS_BATCH_SIZE = 1; +const DEFAULT_METRICS_BUFFER_SIZE = 100; const DEFAULT_BLOCKED_LOOP_THRESHOLD = 200; const DEFAULT_PUBKEY = '^kvy 0) + return value; + } + return fallback; +} + + function genPackageList() { let main_path; let last_path; diff --git a/node.gyp b/node.gyp index c2946f41e6..c42231d30a 100644 --- a/node.gyp +++ b/node.gyp @@ -446,6 +446,8 @@ 'agents/grpc/src/grpc_client.cc', 'agents/grpc/src/grpc_client.h', 'agents/grpc/src/grpc_errors.h', + 'agents/grpc/src/grpc_metrics_exporter.cc', + 'agents/grpc/src/grpc_metrics_exporter.h', 'agents/grpc/src/proto/nsolid_service.grpc.pb.cc', 'agents/grpc/src/proto/nsolid_service.pb.cc', 'agents/grpc/src/proto/asset.grpc.pb.cc', diff --git a/src/nsolid/nsolid_util.h b/src/nsolid/nsolid_util.h index 460a762aa6..c930b11778 100644 --- a/src/nsolid/nsolid_util.h +++ b/src/nsolid/nsolid_util.h @@ -16,6 +16,7 @@ #include "uv.h" #include "nlohmann/json.hpp" +#include "util.h" using string_vector = std::vector; using json = nlohmann::json; @@ -202,6 +203,7 @@ class RingBuffer { size_(0), head_(0), tail_(0) { + CHECK_GT(s, 0); buffer_ = new T[s]; } @@ -237,7 +239,7 @@ class RingBuffer { } void push(T&& value) { - buffer_[tail_] = value; + buffer_[tail_] = std::move(value); tail_ = (tail_ + 1) % capacity_; if (size_ == capacity_) @@ -246,9 +248,47 @@ class RingBuffer { size_++; } + void resize(size_t new_capacity) { + if (new_capacity == capacity_) { + return; + } + + CHECK_GT(new_capacity, 0); + + T* new_buffer = new T[new_capacity]; + + // Copy existing elements to new buffer + // Keep the newest elements when resizing down + size_t elements_to_copy = std::min(size_, new_capacity); + for (size_t i = 0; i < elements_to_copy; ++i) { + // If we're resizing down, start from the newest elements + // The newest element is at (head_ + size_ - 1) % capacity_ + // We want to copy the last 'elements_to_copy' elements + size_t start_idx = 0; + if (size_ > new_capacity) { + // We're truncating, start from the element that will become the new + // head + start_idx = (head_ + size_ - elements_to_copy) % capacity_; + } else { + // We're expanding, start from the current head + start_idx = head_; + } + + size_t src_idx = (start_idx + i) % capacity_; + new_buffer[i] = std::move(buffer_[src_idx]); + } + + delete[] buffer_; + buffer_ = new_buffer; + capacity_ = new_capacity; + size_ = elements_to_copy; + head_ = 0; + tail_ = size_ % new_capacity; + } + private: T* buffer_; - const size_t capacity_; + size_t capacity_; size_t size_; size_t head_; size_t tail_; diff --git a/test/agents/test-grpc-metrics-batch.mjs b/test/agents/test-grpc-metrics-batch.mjs new file mode 100644 index 0000000000..4380fb8070 --- /dev/null +++ b/test/agents/test-grpc-metrics-batch.mjs @@ -0,0 +1,312 @@ +// Flags: --expose-internals +import { mustCall, mustSucceed } from '../common/index.mjs'; +import assert from 'node:assert'; +import { + checkOTLPMetricsData, + GRPCServer, + TestClient, + hasThreadAttributes, +} from '../common/nsolid-grpc-agent/index.js'; + + +// Simple test for static batch sizes (no reconfiguration) +async function runSimpleTest({ + getEnv, + expectedBatchSize = null, + pauseMetricsAt = null, +}) { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + console.log('GRPC server started', port); + const env = getEnv(port); + const opts = { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], + env, + }; + const child = new TestClient([], opts); + const agentId = await child.id(); + const config = await child.config({ app: 'my_app_name', interval: 200 }); + + let processMetricsReceived = false; + let threadMetricsReceived = false; + let pauseTimeout = null; + let exportCount = 0; + let pauseHappened = false; + + // Listen for OTLP metrics export (not the custom N|Solid metrics RPC) + grpcServer.on('metrics', mustCall(async ({ request }) => { + exportCount++; + console.log(`Received OTLP metrics export #${exportCount}`); + + const metrics = await child.metrics(); + + // The data should contain the OTLP ExportMetricsServiceRequest + assert.ok(request, 'No metrics data received'); + assert.ok(request.resourceMetrics, 'Missing resourceMetrics in OTLP export'); + + // Check if this is process or thread metrics based on attributes + const scopeMetrics = request.resourceMetrics[0].scopeMetrics[0]; + const firstMetric = scopeMetrics.metrics[0]; + const isThreadMetrics = hasThreadAttributes(firstMetric); + + // Only treat exports as flushed if pause has actually happened + // The expectFlushedBatch parameter indicates this test expects flush behavior, + // but actual flushing only occurs after pauseHappened becomes true + const isFlushedExport = pauseHappened; + console.log(`Processing ${isThreadMetrics ? 'thread' : 'process'} metrics #${exportCount} (expectedBatchSize=${expectedBatchSize}, flushed=${isFlushedExport})`); + + checkOTLPMetricsData(request.resourceMetrics, + agentId, + config, + metrics, + expectedBatchSize, + isThreadMetrics, + isFlushedExport, + null); + + if (isThreadMetrics) { + threadMetricsReceived = true; + } else { + processMetricsReceived = true; + } + + // For pause tests, wait for 4 exports (2 initial + 2 flushed after pause) + const expectedExports = pauseMetricsAt !== null ? 4 : 2; + + // Wait for both process and thread metrics before completing + if (processMetricsReceived && threadMetricsReceived && exportCount >= expectedExports) { + if (pauseTimeout) { + clearTimeout(pauseTimeout); + } + console.log('All expected metrics exports received - test complete!'); + await child.shutdown(0); + grpcServer.close(); + resolve(); + } + }, pauseMetricsAt !== null ? 4 : 2)); + + // If testing pause functionality, pause metrics after specified time + if (pauseMetricsAt !== null) { + pauseTimeout = setTimeout(async () => { + console.log(`Pausing metrics after ${pauseMetricsAt}ms`); + await grpcServer.reconfigure(agentId, { pauseMetrics: true }); + console.log('Metrics paused - waiting for flush...'); + pauseHappened = true; // Mark that pause has occurred + + // Give some time for the flush to happen, then complete + setTimeout(async () => { + if (!processMetricsReceived || !threadMetricsReceived) { + console.log('No flush received within timeout - completing test'); + await child.shutdown(0); + grpcServer.close(); + resolve(); + } + }, 6000); + }, pauseMetricsAt); + } + })); + }); +} + +// Complex test for dynamic batch size reconfiguration with pause/unpause +async function runTest({ + getEnv, + initialBatchSize = null, + newBatchSize = null, + reconfigureAt = null, + expectFlushRange = null, +}) { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + grpcServer.start(mustSucceed(async (port) => { + console.log('GRPC server started', port); + const env = getEnv(port); + const opts = { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], + env, + }; + const child = new TestClient([], opts); + const agentId = await child.id(); + const config = await child.config({ app: 'my_app_name', interval: 200 }); + + let processMetricsReceived = false; + let threadMetricsReceived = false; + + // Export index across all OTLP metrics exports (process + thread) + let exportIndex = 0; + + // Track if we've seen the first export after reconfigure + let firstExportAfterReconfigure = false; + + // Listen for OTLP metrics export (not the custom N|Solid metrics RPC) + grpcServer.on('metrics', mustCall(async ({ request }) => { + exportIndex++; + console.log(`Received OTLP metrics export #${exportIndex}`); + + const metrics = await child.metrics(); + + // The data should contain the OTLP ExportMetricsServiceRequest + assert.ok(request, 'No metrics data received'); + assert.ok(request.resourceMetrics, 'Missing resourceMetrics in OTLP export'); + + // Check if this is process or thread metrics based on attributes + const scopeMetrics = request.resourceMetrics[0].scopeMetrics[0]; + const firstMetric = scopeMetrics.metrics[0]; + const isThreadMetrics = hasThreadAttributes(firstMetric); + + // Determine expected batch size based on phase + let phaseExpectedBatchSize; + let phaseExpectFlushed; + + if (exportIndex <= 2) { + // Phase 1: pre-reconfigure with initial batch size (partial) + phaseExpectedBatchSize = initialBatchSize; + phaseExpectFlushed = true; + } else if (exportIndex <= 4) { + // Phase 2: post-reconfigure with new batch size (partial) + phaseExpectedBatchSize = newBatchSize; + phaseExpectFlushed = true; + if (exportIndex === 3 && !firstExportAfterReconfigure) { + firstExportAfterReconfigure = true; + console.log('First export after reconfigure received, will pause shortly...'); + } + } else if (exportIndex <= 6) { + // Phase 3: after pause/unpause flush (flushed batch) + phaseExpectedBatchSize = newBatchSize; + phaseExpectFlushed = true; + if (expectFlushRange) { + console.log(`Flush phase, expecting datapoints in range [${expectFlushRange[0]}, ${expectFlushRange[1]}]`); + } + } else { + // Phase 4: first exports after flush (partial) + phaseExpectedBatchSize = newBatchSize; + phaseExpectFlushed = true; + } + + console.log(`Processing ${isThreadMetrics ? 'thread' : 'process'} metrics export #${exportIndex} (expectedBatchSize=${phaseExpectedBatchSize}, flushed=${phaseExpectFlushed})`); + + checkOTLPMetricsData(request.resourceMetrics, + agentId, + config, + metrics, + phaseExpectedBatchSize, + isThreadMetrics, + phaseExpectFlushed, + expectFlushRange); + + if (isThreadMetrics) { + threadMetricsReceived = true; + } else { + processMetricsReceived = true; + } + + // Wait for both process and thread metrics before completing + if (processMetricsReceived && threadMetricsReceived && exportIndex >= 8) { + console.log('All expected metrics exports received - test complete!'); + await child.shutdown(0); + grpcServer.close(); + resolve(); + } + }, 8)); + + // Reconfigure at specified time + setTimeout(async () => { + console.log(`Reconfiguring metricsBatchSize from ${initialBatchSize} to ${newBatchSize} after ${reconfigureAt}ms`); + await grpcServer.reconfigure(agentId, { metricsBatchSize: newBatchSize }); + console.log(`Metrics batch size reconfigured to ${newBatchSize}`); + }, reconfigureAt); + + // Pause after first export post-reconfigure + const pauseCheckInterval = setInterval(async () => { + if (firstExportAfterReconfigure) { + clearInterval(pauseCheckInterval); + console.log('Pausing metrics after first export post-reconfigure'); + await grpcServer.reconfigure(agentId, { pauseMetrics: true }); + console.log('Metrics paused'); + + // Unpause after brief delay + setTimeout(async () => { + console.log('Unpausing metrics'); + await grpcServer.reconfigure(agentId, { pauseMetrics: false }); + console.log('Metrics unpaused'); + }, 1000); + } + }, 100); + })); + }); +} + +const testConfigs = [ + { + name: 'default_batch_size', + expectedBatchSize: 1, + getEnv: (port) => ({ + NSOLID_GRPC_INSECURE: 1, + NSOLID_GRPC: `localhost:${port}`, + NSOLID_INTERVAL: 100, + }), + }, + // Test various static batch sizes + ...[1, 3, 5].map((batchSize) => ({ + name: `batch_size_${batchSize}`, + expectedBatchSize: batchSize, + getEnv: (port) => ({ + NSOLID_GRPC_INSECURE: 1, + NSOLID_GRPC: `localhost:${port}`, + NSOLID_INTERVAL: 200, + NSOLID_METRICS_BATCH_SIZE: batchSize, + }), + })), + { + name: 'pause_and_flush', + expectedBatchSize: 1, + pauseMetricsAt: 1200, // Pause after 1.2 seconds (should get ~6 datapoints at 200ms interval) + getEnv: (port) => ({ + NSOLID_GRPC_INSECURE: 1, + NSOLID_GRPC: `localhost:${port}`, + NSOLID_INTERVAL: 200, // 200ms interval = 5 datapoints per second + }), + }, + { + name: 'batchsize_increase_with_pause', + initialBatchSize: 2, + newBatchSize: 3, + reconfigureAt: 600, + expectFlushRange: [1, 5], + getEnv: (port) => { + return { + NSOLID_GRPC_INSECURE: 1, + NSOLID_GRPC: `localhost:${port}`, + NSOLID_INTERVAL: 300, + NSOLID_METRICS_BATCH_SIZE: 2, + }; + }, + }, + { + name: 'batchsize_decrease_with_pause', + initialBatchSize: 3, + newBatchSize: 2, + reconfigureAt: 400, + expectFlushRange: [1, 5], + getEnv: (port) => { + return { + NSOLID_GRPC_INSECURE: 1, + NSOLID_GRPC: `localhost:${port}`, + NSOLID_INTERVAL: 300, + NSOLID_METRICS_BATCH_SIZE: 3, + }; + }, + }, +]; + +for (const testConfig of testConfigs) { + console.log(`Running test: ${testConfig.name}`); + // Use complex test for reconfiguration scenarios, simple test otherwise + if (testConfig.reconfigureAt !== undefined) { + await runTest(testConfig); + } else { + await runSimpleTest(testConfig); + } + console.log(`Test ${testConfig.name} completed!`); +} diff --git a/test/agents/test-grpc-metrics-buffer-size.mjs b/test/agents/test-grpc-metrics-buffer-size.mjs new file mode 100644 index 0000000000..ab143b5b0b --- /dev/null +++ b/test/agents/test-grpc-metrics-buffer-size.mjs @@ -0,0 +1,100 @@ +// Flags: --expose-internals +import { mustSucceed } from '../common/index.mjs'; +import assert from 'node:assert'; +import { + checkOTLPMetricsData, + GRPCServer, + TestClient, + hasThreadAttributes, +} from '../common/nsolid-grpc-agent/index.js'; + +// Test for NSOLID_METRICS_BUFFER_SIZE functionality +async function runBufferSizeTest({ getEnv, bufferSize }) { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + const interval = 200; // 200ms interval + let metricsReceived = 0; + const targetMetrics = 5; // Just test basic functionality + + grpcServer.start(mustSucceed(async (port) => { + console.log(`GRPC server started ${port} with buffer size: ${bufferSize}`); + const env = getEnv(port); + const opts = { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], + env: { ...process.env, ...env }, + }; + const child = new TestClient([], opts); + const agentId = await child.id(); + const config = await child.config({ app: 'buffer_test_app', interval }); + const metrics = await child.metrics(); + + grpcServer.on('metrics', async ({ request }) => { + metricsReceived++; + console.log(`Received OTLP metrics export #${metricsReceived}`); + + // Check if this is thread metrics or process metrics + const scopeMetrics = request.resourceMetrics[0].scopeMetrics[0]; + const firstMetric = scopeMetrics.metrics[0]; + const isThreadMetrics = hasThreadAttributes(firstMetric); + + checkOTLPMetricsData(request.resourceMetrics, agentId, config, metrics, 1, isThreadMetrics); + + if (metricsReceived >= targetMetrics) { + console.log(`Buffer size test complete! Received ${metricsReceived} metrics with buffer size ${bufferSize}`); + + clearTimeout(timeoutId); + await child.shutdown(0); + if (grpcServer) { + grpcServer.close(); + } + + resolve({ + bufferSize, + metricsReceived, + }); + } + }); + + // Timeout fallback + const timeoutId = setTimeout(async () => { + console.log('Timeout reached - test failed!'); + await child.shutdown(0); + if (grpcServer) { + grpcServer.close(); + } + + assert.fail(`Test timeout: Expected at least ${targetMetrics} metrics exports with buffer size ${bufferSize}, but got ${metricsReceived}`); + }, 5000); + })); + }); +} + +// Test different buffer sizes +async function testGrpcMetricsBufferSize() { + console.log('Running GrpcMetricsExporter buffer size tests...'); + + const testCases = [ + { bufferSize: 10, name: 'Small buffer (10)' }, + { bufferSize: 50, name: 'Medium buffer (50)' }, + { bufferSize: 200, name: 'Large buffer (200)' }, + ]; + + for (const testCase of testCases) { + console.log(`\n--- Testing ${testCase.name} ---`); + + const getEnv = (port) => ({ + NSOLID_GRPC_INSECURE: 1, + NSOLID_GRPC: `localhost:${port}`, + NSOLID_INTERVAL: 200, + NSOLID_METRICS_BUFFER_SIZE: testCase.bufferSize, + }); + + const results = await runBufferSizeTest({ getEnv, bufferSize: testCase.bufferSize }); + console.log(`✓ ${testCase.name} test passed!`, results); + } + + console.log('\nAll buffer size tests completed successfully!'); +} + +// Run the test +testGrpcMetricsBufferSize(); diff --git a/test/agents/test-grpc-metrics-retry.mjs b/test/agents/test-grpc-metrics-retry.mjs new file mode 100644 index 0000000000..76d85206fc --- /dev/null +++ b/test/agents/test-grpc-metrics-retry.mjs @@ -0,0 +1,115 @@ +// Flags: --expose-internals +import { mustSucceed } from '../common/index.mjs'; +import assert from 'node:assert'; +import { + checkOTLPMetricsData, + GRPCServer, + TestClient, + hasThreadAttributes, +} from '../common/nsolid-grpc-agent/index.js'; + + +// Test for GrpcMetricsExporter retry functionality +async function runRetryTest({ getEnv }) { + return new Promise((resolve) => { + const grpcServer = new GRPCServer(); + const interval = 200; // 200ms interval + let metricsReceived = 0; + let serverKilled = false; + let serverRestarted = false; + + grpcServer.start(mustSucceed(async (port) => { + console.log(`GRPC server started on port ${port}`); + + const env = getEnv(port); + const opts = { + stdio: ['inherit', 'inherit', 'inherit', 'ipc'], + env, + }; + const child = new TestClient([], opts); + const agentId = await child.id(); + const config = await child.config({ app: 'my_app_name', interval }); + const metrics = await child.metrics(); + + grpcServer.on('metrics', async ({ request }) => { + metricsReceived++; + console.log(`Received OTLP metrics export #${metricsReceived}`); + + // Check if this is thread metrics or process metrics + const scopeMetrics = request.resourceMetrics[0].scopeMetrics[0]; + const firstMetric = scopeMetrics.metrics[0]; + const isThreadMetrics = hasThreadAttributes(firstMetric); + + checkOTLPMetricsData(request.resourceMetrics, agentId, config, metrics, 1, isThreadMetrics); + + // When first metrics arrive, kill the server + if (!serverKilled && metricsReceived >= 2) { + console.log('First metrics received, killing server...'); + serverKilled = true; + grpcServer.close(); + + // Wait for 5x interval, then restart server + setTimeout(() => { + console.log(`Restarting server on same port ${port}...`); + grpcServer.start(mustSucceed(() => { + serverRestarted = true; + console.log(`Server restarted on port ${port}`); + }), port); + }, 5 * interval); // Wait 5x interval + } + + // If server is restarted and we have enough metrics, complete test + if (serverRestarted && metricsReceived >= 8) { // ~2x3 + initial instead of 2x5 + initial to be more lenient + console.log('Test complete! Received expected retry metrics'); + console.log(`Total metrics received: ${metricsReceived}`); + + assert.ok(serverKilled, 'Server should have been killed'); + assert.ok(serverRestarted, 'Server should have been restarted'); + + clearTimeout(timeoutId); + await child.shutdown(0); + if (grpcServer) { + grpcServer.close(); + } + + resolve({ + serverKilled, + serverRestarted, + metricsReceived, + }); + } + }); + + // Timeout fallback + const timeoutId = setTimeout(async () => { + console.log('Timeout reached - test failed!'); + await child.shutdown(0); + if (grpcServer) { + grpcServer.close(); + } + + // Test should fail if timeout is reached + assert.fail(`Test timeout: Expected at least 8 metrics exports and server restart, but got ${metricsReceived} metrics, serverKilled: ${serverKilled}, serverRestarted: ${serverRestarted}`); + }, 5000); // 5 seconds = 25 × 200ms interval (reasonable buffer) + })); + }); +} + +// Test GrpcMetricsExporter retry functionality +async function testGrpcMetricsRetry() { + console.log('Running GrpcMetricsExporter retry test...'); + + const getEnv = (port) => ({ + // NODE_DEBUG_NATIVE: 'nsolid_grpc_agent', + NSOLID_GRPC_INSECURE: 1, + NSOLID_GRPC: `localhost:${port}`, + NSOLID_INTERVAL: 200, + }); + + const results = await runRetryTest({ getEnv }); + console.log('GrpcMetricsExporter retry test completed!', results); + console.log('---'); +} + +// Run the test +testGrpcMetricsRetry(); diff --git a/test/agents/test-grpc-metrics.mjs b/test/agents/test-grpc-metrics.mjs index 565d8d2103..164472ab10 100644 --- a/test/agents/test-grpc-metrics.mjs +++ b/test/agents/test-grpc-metrics.mjs @@ -2,210 +2,11 @@ import { mustCall, mustSucceed } from '../common/index.mjs'; import assert from 'node:assert'; import { - checkResource, + checkOTLPMetricsData, GRPCServer, TestClient, } from '../common/nsolid-grpc-agent/index.js'; -import validators from 'internal/validators'; -const { - validateArray, - validateNumber, -} = validators; - -const expectedProcMetrics = [ - [ 'uptime', 's', 'asInt', 'sum' ], - [ 'systemUptime', 's', 'asInt', 'sum' ], - [ 'freeMem', 'byte', 'asInt', 'gauge' ], - [ 'blockInputOpCount', '', 'asInt', 'sum' ], - [ 'blockOutputOpCount', '', 'asInt', 'sum' ], - [ 'ctxSwitchInvoluntaryCount', '', 'asInt', 'sum' ], - [ 'ctxSwitchVoluntaryCount', '', 'asInt', 'sum' ], - [ 'ipcReceivedCount', '', 'asInt', 'sum' ], - [ 'ipcSentCount', '', 'asInt', 'sum' ], - [ 'pageFaultHardCount', '', 'asInt', 'sum' ], - [ 'pageFaultSoftCount', '', 'asInt', 'sum' ], - [ 'signalCount', '', 'asInt', 'sum' ], - [ 'swapCount', '', 'asInt', 'sum' ], - [ 'rss', 'byte', 'asInt', 'gauge' ], - [ 'load1m', '', 'asDouble', 'gauge' ], - [ 'load5m', '', 'asDouble', 'gauge' ], - [ 'load15m', '', 'asDouble', 'gauge' ], - [ 'cpuUserPercent', '', 'asDouble', 'gauge' ], - [ 'cpuSystemPercent', '', 'asDouble', 'gauge' ], - [ 'cpuPercent', '', 'asDouble', 'gauge' ], -]; - -const expectedThreadMetrics = [ - ['activeHandles', '', 'asInt', 'gauge'], - ['activeRequests', '', 'asInt', 'gauge'], - ['heapTotal', 'byte', 'asInt', 'gauge'], - ['totalHeapSizeExecutable', 'byte', 'asInt', 'gauge'], - ['totalPhysicalSize', 'byte', 'asInt', 'gauge'], - ['totalAvailableSize', 'byte', 'asInt', 'gauge'], - ['heapUsed', 'byte', 'asInt', 'gauge'], - ['heapSizeLimit', 'byte', 'asInt', 'gauge'], - ['mallocedMemory', 'byte', 'asInt', 'gauge'], - ['externalMem', 'byte', 'asInt', 'gauge'], - ['peakMallocedMemory', 'byte', 'asInt', 'gauge'], - ['numberOfNativeContexts', '', 'asInt', 'gauge'], - ['numberOfDetachedContexts', '', 'asInt', 'gauge'], - ['gcCount', '', 'asInt', 'sum'], - ['gcForcedCount', '', 'asInt', 'sum'], - ['gcFullCount', '', 'asInt', 'sum'], - ['gcMajorCount', '', 'asInt', 'sum'], - ['dnsCount', '', 'asInt', 'sum'], - ['httpClientAbortCount', '', 'asInt', 'sum'], - ['httpClientCount', '', 'asInt', 'sum'], - ['httpServerAbortCount', '', 'asInt', 'sum'], - ['httpServerCount', '', 'asInt', 'sum'], - ['loopIdleTime', 'ms', 'asInt', 'gauge'], - ['loopIterations', '', 'asInt', 'sum'], - ['loopIterWithEvents', '', 'asInt', 'sum'], - ['eventsProcessed', '', 'asInt', 'sum'], - ['eventsWaiting', '', 'asInt', 'gauge'], - ['providerDelay', 'ms', 'asInt', 'gauge'], - ['processingDelay', 'ms', 'asInt', 'gauge'], - ['loopTotalCount', '', 'asInt', 'sum'], - ['pipeServerCreatedCount', '', 'asInt', 'sum'], - ['pipeServerDestroyedCount', '', 'asInt', 'sum'], - ['pipeSocketCreatedCount', '', 'asInt', 'sum'], - ['pipeSocketDestroyedCount', '', 'asInt', 'sum'], - ['tcpServerCreatedCount', '', 'asInt', 'sum'], - ['tcpServerDestroyedCount', '', 'asInt', 'sum'], - ['tcpSocketCreatedCount', '', 'asInt', 'sum'], - ['tcpSocketDestroyedCount', '', 'asInt', 'sum'], - ['udpSocketCreatedCount', '', 'asInt', 'sum'], - ['udpSocketDestroyedCount', '', 'asInt', 'sum'], - ['promiseCreatedCount', '', 'asInt', 'sum'], - ['promiseResolvedCount', '', 'asInt', 'sum'], - ['fsHandlesOpenedCount', '', 'asInt', 'sum'], - ['fsHandlesClosedCount', '', 'asInt', 'sum'], - ['loopUtilization', '', 'asDouble', 'gauge'], - ['res5s', '', 'asDouble', 'gauge'], - ['res1m', '', 'asDouble', 'gauge'], - ['res5m', '', 'asDouble', 'gauge'], - ['res15m', '', 'asDouble', 'gauge'], - ['loopAvgTasks', '', 'asDouble', 'gauge'], - ['loopEstimatedLag', 'ms', 'asDouble', 'gauge'], - ['loopIdlePercent', '', 'asDouble', 'gauge'], - ['gcDurUs', 'us', 'asDouble', 'summary'], - ['dns', 'ms', 'asDouble', 'summary'], - ['httpClient', 'ms', 'asDouble', 'summary'], - ['httpServer', 'ms', 'asDouble', 'summary'], -]; - -function checkScopeMetrics(scopeMetrics) { - validateArray(scopeMetrics, 'scopeMetrics'); - assert.strictEqual(scopeMetrics.length, 1); - assert.strictEqual(scopeMetrics[0].scope.name, 'nsolid'); - assert.strictEqual(scopeMetrics[0].scope.version, - `${process.version}+nsv${process.versions.nsolid}`); - - const metrics = scopeMetrics[0].metrics; - // Check that expectedProcMetrics and expectedThreadMetrics are present - // This is an example of a proc metric: - // { - // metadata: [], - // name: 'freeMem', - // description: '', - // unit: 'byte', - // gauge: { - // dataPoints: [ - // { - // exemplars: [], - // attributes: [], - // startTimeUnixNano: '1729258760942892000', - // timeUnixNano: '1729258761188000000', - // flags: 0, - // asInt: '13867581440', - // value: 'asInt' - // } - // ] - // }, - // data: 'gauge' - // }, - // This is an example of a thread metric: - // { - // metadata: [], - // name: 'totalPhysicalSize', - // description: '', - // unit: 'byte', - // gauge: { - // dataPoints: [ - // { - // exemplars: [], - // attributes: [ - // { - // key: 'thread.id', - // value: { intValue: '0', value: 'intValue' } - // }, - // { - // key: 'thread.name', - // value: { stringValue: '', value: 'stringValue' } - // } - // ], - // startTimeUnixNano: '1729258760942892000', - // timeUnixNano: '1729258761189000000', - // flags: 0, - // asInt: '6811648', - // value: 'asInt' - // } - // ] - // }, - // data: 'gauge' - // }, - // Lets' start with checking that all expectedProcMetrics are present - for (const expectedMetric of expectedProcMetrics) { - const metric = metrics.find((m) => m.name === expectedMetric[0]); - assert.ok(metric, `Expected metric ${expectedMetric[0]} not found`); - assert.strictEqual(metric.unit, expectedMetric[1]); - assert.strictEqual(metric.data, expectedMetric[3]); - const dataPoints = metric[expectedMetric[3]].dataPoints; - assert.strictEqual(dataPoints.length, 1); - const dataPoint = dataPoints[0]; - assert.strictEqual(dataPoint.attributes.length, 0); - assert.strictEqual(dataPoint.value, expectedMetric[2]); - } - - // Now check that all expectedThreadMetrics are present - for (const expectedMetric of expectedThreadMetrics) { - const [ name, unit, type, aggregation ] = expectedMetric; - const metric = metrics.find((m) => m.name === name); - assert.ok(metric, `Expected metric ${name} not found`); - assert.strictEqual(metric.unit, unit); - assert.strictEqual(metric.data, aggregation); - const dataPoints = metric[aggregation].dataPoints; - assert.strictEqual(dataPoints.length, 1); - const dataPoint = dataPoints[0]; - assert.strictEqual(dataPoint.attributes.length, 2); - const threadIdAttr = dataPoint.attributes.find((a) => a.key === 'thread.id'); - assert.ok(threadIdAttr); - assert.strictEqual(threadIdAttr.value.intValue, '0'); - const threadNameAttr = dataPoint.attributes.find((a) => a.key === 'thread.name'); - assert.ok(threadNameAttr); - assert.strictEqual(threadNameAttr.value.stringValue, ''); - if (metric.data === 'summary') { - validateArray(dataPoint.quantileValues, `${name}.quantileValues`); - assert.strictEqual(dataPoint.quantileValues.length, 2); - assert.strictEqual(dataPoint.quantileValues[0].quantile, 0.99); - assert.strictEqual(dataPoint.quantileValues[1].quantile, 0.5); - if (dataPoint.quantileValues[0].value) { - validateNumber(dataPoint.quantileValues[0].value, `${name}.quantileValues[0].value`); - } - if (dataPoint.quantileValues[1].value) { - validateNumber(dataPoint.quantileValues[1].value, `${name}.quantileValues[1].value`); - } - } else { - assert.strictEqual(dataPoint.value, expectedMetric[2]); - if (type === 'asInt') { - validateNumber(parseInt(dataPoint[type], 10), `${name}.${type}`); - } else { // asDouble - validateNumber(dataPoint[type], `${name}.${type}`); - } - } - } -} function checkMetricsData(msg, metadata, requestId, agentId, nsolidConfig, nsolidMetrics) { const metrics = msg; @@ -219,10 +20,7 @@ function checkMetricsData(msg, metadata, requestId, agentId, nsolidConfig, nsoli // also the body fields const resourceMetrics = metrics.body.resourceMetrics; - validateArray(resourceMetrics, 'resourceMetrics'); - assert.strictEqual(resourceMetrics.length, 1); - checkResource(resourceMetrics[0].resource, agentId, nsolidConfig, nsolidMetrics); - checkScopeMetrics(resourceMetrics[0].scopeMetrics); + checkOTLPMetricsData(resourceMetrics, agentId, nsolidConfig, nsolidMetrics, 1, false); } async function runTest({ getEnv }) { diff --git a/test/agents/test-otlp-grpc-metrics.mjs b/test/agents/test-otlp-grpc-metrics.mjs index 698226dc8d..62515040b8 100644 --- a/test/agents/test-otlp-grpc-metrics.mjs +++ b/test/agents/test-otlp-grpc-metrics.mjs @@ -476,6 +476,9 @@ if (process.argv[2] === 'child') { function checkMetricsData(context) { const { metrics, expected } = context; + if (expected.length === 0) { + return; + } const indicesToRemove = []; for (let i = 0; i < metrics.length; ++i) { const metric = metrics[i]; @@ -486,25 +489,47 @@ if (process.argv[2] === 'child') { assert.strictEqual(metric.unit, unit); } - assert.strictEqual(metric[aggregation].dataPoints.length, 1); - const dataPoint = metric[aggregation].dataPoints[0]; + const dataPoints = metric[aggregation].dataPoints; + validateArray(dataPoints, `${name}.dataPoints`); + assert.ok(dataPoints.length > 0); + let dataPoint; // eslint-disable-next-line eqeqeq if (context.threadId != undefined) { - const attrIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`); - if (attrIndex > -1) { - indicesToRemove.push(i); - const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name'); - assert(nameIndex > -1); - if (context.threadId === 0) { // main-thread - assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread'); - } else { // worker-thread - assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread'); + const dataPointIndex = dataPoints.findIndex((dp) => { + if (!dp.attributes) { + return false; } + return dp.attributes.some((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`); + }); + if (dataPointIndex === -1) { + // With batching, not all threads may have metrics in every batch + continue; + } + dataPoint = dataPoints[dataPointIndex]; + const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name'); + assert(nameIndex > -1); + if (context.threadId === 0) { // main-thread + assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread'); + } else { // worker-thread + assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread'); + } + console.log(`Processing thread ${context.threadId}, attributes:`, dataPoint.attributes.map((a) => `${a.key}=${a.value.intValue || a.value.stringValue}`).join(', ')); + dataPoints.splice(dataPointIndex, 1); + if (dataPoints.length === 0) { + indicesToRemove.push(i); } } else { + assert.strictEqual(dataPoints.length, 1); + dataPoint = dataPoints[0]; indicesToRemove.push(i); } + // eslint-disable-next-line eqeqeq + if (context.threadId != undefined) { + const attrIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`); + assert(attrIndex > -1); + } + const startTime = BigInt(dataPoint.startTimeUnixNano); assert.ok(startTime); const time = BigInt(dataPoint.timeUnixNano); @@ -538,6 +563,25 @@ if (process.argv[2] === 'child') { for (let i = indicesToRemove.length - 1; i >= 0; --i) { metrics.splice(indicesToRemove[i], 1); } + + // Log thread_ids present in this batch + const threadIdsInBatch = new Set(); + metrics.forEach((metric) => { + const aggregation = metric.data; + if (metric[aggregation]?.dataPoints) { + metric[aggregation].dataPoints.forEach((dp) => { + if (dp.attributes) { + const threadIdAttr = dp.attributes.find((a) => a.key === 'thread.id'); + if (threadIdAttr) { + threadIdsInBatch.add(threadIdAttr.value.intValue); + } + } + }); + } + }); + if (threadIdsInBatch.size > 0) { + console.log(`Batch contains metrics for threads: [${Array.from(threadIdsInBatch).join(', ')}]`); + } } function checkMetrics(metrics, context) { @@ -561,6 +605,7 @@ if (process.argv[2] === 'child') { context.procMetricsDone = true; } else if (context.state === State.ThreadMetrics) { context.threadList.shift(); + context.threadId = null; } context.state = State.None; } @@ -585,12 +630,48 @@ if (process.argv[2] === 'child') { assert.ok(hasThreadMetrics || hasProcMetrics, 'No thread or process metrics found'); if (hasThreadMetrics) { - assert.ok(context.threadList.length > 0, 'No more threads available'); - context.state = State.ThreadMetrics; + // Find which thread is in the batch + const threadIdsInBatch = new Set(); + context.metrics.forEach((metric) => { + if (metric[metric.data]?.dataPoints) { + metric[metric.data].dataPoints.forEach((dp) => { + if (dp.attributes) { + const threadIdAttr = dp.attributes.find((a) => a.key === 'thread.id'); + if (threadIdAttr) { + threadIdsInBatch.add(parseInt(threadIdAttr.value.intValue, 10)); + } + } + }); + } + }); + // Find the thread that is expected and in the batch + let foundThread = null; + console.log('threadIdsInBatch:', Array.from(threadIdsInBatch)); + console.log('context.threadList:', context.threadList); + for (const tid of threadIdsInBatch) { + console.log('checking tid:', tid, 'includes:', context.threadList.includes(tid)); + if (context.threadList.includes(tid)) { + foundThread = tid; + console.log('foundThread set to:', foundThread); + break; + } + } + console.log('final foundThread:', foundThread); + if (foundThread === null) { + // Skip this batch if no expected thread found + console.log('Skipping batch'); + context.state = State.None; + return; + } + context.threadId = foundThread; + context.threadList = context.threadList.filter((t) => t !== foundThread); context.expected = [...expectedThreadMetrics]; - } else { + } else if (hasProcMetrics) { context.state = State.ProcMetrics; + context.threadId = null; context.expected = [...expectedProcMetrics]; + } else { + context.state = State.None; } } @@ -599,7 +680,7 @@ if (process.argv[2] === 'child') { metrics: [], expected: [], threadId: null, - threadList: [ threadId ], + threadList: [], procMetricsDone: false, }; @@ -610,6 +691,7 @@ if (process.argv[2] === 'child') { } async function runTest(getEnv) { + context.threadList.push(threadId); return new Promise((resolve, reject) => { let childExited = false; const otlpServer = new OTLPGRPCServer(); diff --git a/test/agents/test-otlp-metrics.mjs b/test/agents/test-otlp-metrics.mjs index 10245e6a03..6c480eaf46 100644 --- a/test/agents/test-otlp-metrics.mjs +++ b/test/agents/test-otlp-metrics.mjs @@ -488,22 +488,37 @@ if (process.argv[2] === 'child') { assert.strictEqual(metric.unit, unit); } - assert.strictEqual(metric[aggregation].dataPoints.length, 1); - const dataPoint = metric[aggregation].dataPoints[0]; + const dataPoints = metric[aggregation].dataPoints; + validateArray(dataPoints, `${name}.dataPoints`); + assert.ok(dataPoints.length > 0, `Expected datapoints for ${name}`); + let dataPoint; // eslint-disable-next-line eqeqeq if (context.threadId != undefined) { - const attrIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`); - if (attrIndex > -1) { - indicesToRemove.push(i); - const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name'); - assert(nameIndex > -1); - if (context.threadId === 0) { // main-thread - assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread'); - } else { // worker-thread - assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread'); + const dataPointIndex = dataPoints.findIndex((dp) => { + if (!dp.attributes) { + return false; } + return dp.attributes.some((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`); + }); + if (dataPointIndex === -1) { + // With batching, not all threads may have metrics in every batch + continue; + } + dataPoint = dataPoints[dataPointIndex]; + const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name'); + assert(nameIndex > -1); + if (context.threadId === 0) { // main-thread + assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread'); + } else { // worker-thread + assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread'); + } + dataPoints.splice(dataPointIndex, 1); + if (dataPoints.length === 0) { + indicesToRemove.push(i); } } else { + assert.strictEqual(dataPoints.length, 1); + dataPoint = dataPoints[0]; indicesToRemove.push(i); } @@ -540,6 +555,25 @@ if (process.argv[2] === 'child') { for (let i = indicesToRemove.length - 1; i >= 0; --i) { metrics.splice(indicesToRemove[i], 1); } + + // Log thread_ids present in this batch + const threadIdsInBatch = new Set(); + metrics.forEach((metric) => { + const aggregation = metric.data; + if (metric[aggregation]?.dataPoints) { + metric[aggregation].dataPoints.forEach((dp) => { + if (dp.attributes) { + const threadIdAttr = dp.attributes.find((a) => a.key === 'thread.id'); + if (threadIdAttr) { + threadIdsInBatch.add(threadIdAttr.value.intValue); + } + } + }); + } + }); + if (threadIdsInBatch.size > 0) { + console.log(`Batch contains metrics for threads: [${Array.from(threadIdsInBatch).join(', ')}]`); + } } function checkMetrics(metrics, context) { diff --git a/test/cctest/test_nsolid_ring_buffer.cc b/test/cctest/test_nsolid_ring_buffer.cc index a289c9a49b..583699fa47 100644 --- a/test/cctest/test_nsolid_ring_buffer.cc +++ b/test/cctest/test_nsolid_ring_buffer.cc @@ -37,3 +37,152 @@ TEST(RingBufferTest, Basic) { buffer.push(5); EXPECT_EQ(buffer.front(), 3); } + +TEST(RingBufferTest, ResizeToSameCapacity) { + // Create a buffer of size 3. + RingBuffer buffer(3); + + // Add some elements. + buffer.push(1); + buffer.push(2); + buffer.push(3); + + // Resize to same capacity - should be a no-op. + buffer.resize(3); + + // Buffer should be unchanged. + EXPECT_FALSE(buffer.empty()); + EXPECT_EQ(buffer.front(), 1); + + // Pop and verify all elements are still there. + buffer.pop(); + EXPECT_EQ(buffer.front(), 2); + buffer.pop(); + EXPECT_EQ(buffer.front(), 3); + buffer.pop(); + EXPECT_TRUE(buffer.empty()); +} + +TEST(RingBufferTest, ResizeLargerEmpty) { + // Create a buffer of size 3. + RingBuffer buffer(3); + + // Resize empty buffer to larger capacity. + buffer.resize(6); + + // Buffer should still be empty. + EXPECT_TRUE(buffer.empty()); + + // Should be able to push more elements than original capacity. + buffer.push(1); + buffer.push(2); + buffer.push(3); + buffer.push(4); + buffer.push(5); + buffer.push(6); + + // Verify all elements are there. + EXPECT_EQ(buffer.front(), 1); + buffer.pop(); + EXPECT_EQ(buffer.front(), 2); + buffer.pop(); + EXPECT_EQ(buffer.front(), 3); + buffer.pop(); + EXPECT_EQ(buffer.front(), 4); + buffer.pop(); + EXPECT_EQ(buffer.front(), 5); + buffer.pop(); + EXPECT_EQ(buffer.front(), 6); + buffer.pop(); + EXPECT_TRUE(buffer.empty()); +} + +TEST(RingBufferTest, ResizeLargerWithElements) { + // Create a buffer of size 3. + RingBuffer buffer(3); + + // Add some elements. + buffer.push(1); + buffer.push(2); + buffer.push(3); + + // Resize to larger capacity. + buffer.resize(5); + + // All elements should be preserved in order. + EXPECT_FALSE(buffer.empty()); + EXPECT_EQ(buffer.front(), 1); + + // Pop and verify original elements are still there. + buffer.pop(); + EXPECT_EQ(buffer.front(), 2); + buffer.pop(); + EXPECT_EQ(buffer.front(), 3); + buffer.pop(); + EXPECT_TRUE(buffer.empty()); + + // Should now be able to push more elements. + buffer.push(4); + buffer.push(5); + EXPECT_EQ(buffer.front(), 4); +} + +TEST(RingBufferTest, ResizeSmallerWithElements) { + // Create a buffer of size 5. + RingBuffer buffer(5); + + // Add some elements. + buffer.push(1); + buffer.push(2); + buffer.push(3); + buffer.push(4); + buffer.push(5); + + // Resize to smaller capacity. + buffer.resize(3); + + // Should keep the newest 3 elements (3, 4, 5). + EXPECT_FALSE(buffer.empty()); + EXPECT_EQ(buffer.front(), 3); + + // Pop and verify the correct elements are kept. + buffer.pop(); + EXPECT_EQ(buffer.front(), 4); + buffer.pop(); + EXPECT_EQ(buffer.front(), 5); + buffer.pop(); + EXPECT_TRUE(buffer.empty()); +} + +TEST(RingBufferTest, ResizeSmallerWithWrapAround) { + // Create a buffer of size 3. + RingBuffer buffer(3); + + // Fill and wrap around the buffer. + buffer.push(1); + buffer.push(2); + buffer.push(3); + buffer.push(4); // This should overwrite 1 + buffer.push(5); // This should overwrite 2 + + // Current buffer should contain [3, 4, 5] with 3 at front. + EXPECT_EQ(buffer.front(), 3); + + // Resize to smaller capacity. + buffer.resize(2); + + // Should keep the newest 2 elements (4, 5). + EXPECT_FALSE(buffer.empty()); + EXPECT_EQ(buffer.front(), 4); + + // Pop and verify the correct elements are kept. + buffer.pop(); + EXPECT_EQ(buffer.front(), 5); + buffer.pop(); + EXPECT_TRUE(buffer.empty()); +} + +TEST(RingBufferTest, ResizeToZeroCrashes) { + RingBuffer buffer(3); + EXPECT_DEATH(buffer.resize(0), ""); +} diff --git a/test/common/nsolid-grpc-agent/index.js b/test/common/nsolid-grpc-agent/index.js index 4185416e48..cad42b32c0 100644 --- a/test/common/nsolid-grpc-agent/index.js +++ b/test/common/nsolid-grpc-agent/index.js @@ -1,71 +1,17 @@ 'use strict'; const common = require('../'); -const assert = require('node:assert'); const { EventEmitter } = require('node:events'); const { fork } = require('node:child_process'); const { randomUUID } = require('node:crypto'); const path = require('node:path'); const { - validateArray, - validateObject, - validateString, -} = require('internal/validators'); - -function checkExitData(data, metadata, agentId, expectedData) { - console.dir(data, { depth: null }); - validateString(data.common.requestId, 'common.requestId'); - assert.strictEqual(data.common.command, 'exit'); - // From here check at least that all the fields are present - validateObject(data.common.recorded, 'recorded'); - const recSeconds = BigInt(data.common.recorded.seconds); - assert.ok(recSeconds); - const recNanoSecs = BigInt(data.common.recorded.nanoseconds); - assert.ok(recNanoSecs); - validateObject(data.body, 'body'); - // also the body fields - assert.strictEqual(data.body.code, expectedData.code); - assert.strictEqual(data.body.profile, expectedData.profile); - if (expectedData.error === null) { - assert.strictEqual(data.body.error, null); - } else { - assert.ok(data.body.error); - assert.strictEqual(data.body.error.message, expectedData.error.message); - validateString(data.body.error.stack, 'error.stack'); - } - - validateArray(metadata['user-agent'], 'metadata.user-agent'); - validateString(metadata['user-agent'][0], 'metadata.user-agent[0]'); - assert.strictEqual(metadata['nsolid-agent-id'][0], agentId); -} - -function checkResource(resource, agentId, config, metrics) { - validateArray(resource.attributes, 'attributes'); - - const expectedAttributes = { - 'telemetry.sdk.version': process.versions.opentelemetry, - 'telemetry.sdk.language': 'cpp', - 'telemetry.sdk.name': 'opentelemetry', - 'service.instance.id': agentId, - 'service.name': config.app, - 'service.version': config.appVersion, - }; - - if (metrics) { - expectedAttributes['process.title'] = metrics.title; - expectedAttributes['process.owner'] = metrics.user; - } - - assert.strictEqual(resource.attributes.length, Object.keys(expectedAttributes).length); - - resource.attributes.forEach((attribute) => { - assert.strictEqual(attribute.value.stringValue, expectedAttributes[attribute.key]); - delete expectedAttributes[attribute.key]; - }); - - assert.strictEqual(Object.keys(expectedAttributes).length, 0); -} + checkExitData, + checkResource, + checkOTLPMetricsData, + hasThreadAttributes, +} = require('./validators.js'); class GRPCServer extends EventEmitter { @@ -77,12 +23,17 @@ class GRPCServer extends EventEmitter { this.#opts = opts || {}; } - start(cb) { - const args = []; + start(cb, port = null) { + const args = [ '--port' ]; + if (port) { + args.push(port.toString()); + } else { + args.push('0'); + } + if (this.#opts.tls) { args.push('--tls'); } - const opts = { stdio: ['inherit', 'inherit', 'inherit', 'ipc'], }; @@ -660,6 +611,8 @@ class TestClient { module.exports = { checkExitData, checkResource, + checkOTLPMetricsData, + hasThreadAttributes, GRPCServer, TestClient, }; diff --git a/test/common/nsolid-grpc-agent/server.mjs b/test/common/nsolid-grpc-agent/server.mjs index 3bb26cb341..9aae78715d 100644 --- a/test/common/nsolid-grpc-agent/server.mjs +++ b/test/common/nsolid-grpc-agent/server.mjs @@ -8,6 +8,9 @@ import protoLoader from '@grpc/proto-loader'; import fixtures from '../fixtures.js'; const options = { + port: { + type: 'string', + }, tls: { type: 'boolean', default: false, @@ -66,7 +69,7 @@ async function injectDelay(serviceName, callback) { } // Create a local server to receive data from -async function startServer(cb) { +async function startServer(cb, port = 0) { const server = new grpc.Server(); const opts = { keepCase: false, @@ -319,9 +322,13 @@ async function startServer(cb) { } return new Promise((resolve, reject) => { - server.bindAsync('localhost:0', credentials, (err, port) => { + server.bindAsync(`localhost:${port}`, credentials, (err, actualPort) => { + if (err) { + return reject(err); + } + server.start(); - resolve({ server, port }); + resolve({ server, port: actualPort }); }); }); } @@ -329,7 +336,7 @@ async function startServer(cb) { const { server, port } = await startServer((err, type, data) => { assert.ifError(err); process.send({ type, data }); -}); +}, args.values.port); process.send({ type: 'port', port }); process.on('message', (message) => { diff --git a/test/common/nsolid-grpc-agent/validators.js b/test/common/nsolid-grpc-agent/validators.js new file mode 100644 index 0000000000..25afe14098 --- /dev/null +++ b/test/common/nsolid-grpc-agent/validators.js @@ -0,0 +1,278 @@ +'use strict'; + +const assert = require('node:assert'); +const { + validateArray, + validateNumber, + validateObject, + validateString, +} = require('internal/validators'); + +// Expected process metrics (same across all tests) +const expectedProcMetrics = [ + [ 'uptime', 's', 'asInt', 'sum' ], + [ 'systemUptime', 's', 'asInt', 'sum' ], + [ 'freeMem', 'byte', 'asInt', 'gauge' ], + [ 'blockInputOpCount', '', 'asInt', 'sum' ], + [ 'blockOutputOpCount', '', 'asInt', 'sum' ], + [ 'ctxSwitchInvoluntaryCount', '', 'asInt', 'sum' ], + [ 'ctxSwitchVoluntaryCount', '', 'asInt', 'sum' ], + [ 'ipcReceivedCount', '', 'asInt', 'sum' ], + [ 'ipcSentCount', '', 'asInt', 'sum' ], + [ 'pageFaultHardCount', '', 'asInt', 'sum' ], + [ 'pageFaultSoftCount', '', 'asInt', 'sum' ], + [ 'signalCount', '', 'asInt', 'sum' ], + [ 'swapCount', '', 'asInt', 'sum' ], + [ 'rss', 'byte', 'asInt', 'gauge' ], + [ 'load1m', '', 'asDouble', 'gauge' ], + [ 'load5m', '', 'asDouble', 'gauge' ], + [ 'load15m', '', 'asDouble', 'gauge' ], + [ 'cpuUserPercent', '', 'asDouble', 'gauge' ], + [ 'cpuSystemPercent', '', 'asDouble', 'gauge' ], + [ 'cpuPercent', '', 'asDouble', 'gauge' ], +]; + +// Expected thread metrics (same across all tests) +const expectedThreadMetrics = [ + ['activeHandles', '', 'asInt', 'gauge'], + ['activeRequests', '', 'asInt', 'gauge'], + ['heapTotal', 'byte', 'asInt', 'gauge'], + ['totalHeapSizeExecutable', 'byte', 'asInt', 'gauge'], + ['totalPhysicalSize', 'byte', 'asInt', 'gauge'], + ['totalAvailableSize', 'byte', 'asInt', 'gauge'], + ['heapUsed', 'byte', 'asInt', 'gauge'], + ['heapSizeLimit', 'byte', 'asInt', 'gauge'], + ['mallocedMemory', 'byte', 'asInt', 'gauge'], + ['externalMem', 'byte', 'asInt', 'gauge'], + ['peakMallocedMemory', 'byte', 'asInt', 'gauge'], + ['numberOfNativeContexts', '', 'asInt', 'gauge'], + ['numberOfDetachedContexts', '', 'asInt', 'gauge'], + ['gcCount', '', 'asInt', 'sum'], + ['gcForcedCount', '', 'asInt', 'sum'], + ['gcFullCount', '', 'asInt', 'sum'], + ['gcMajorCount', '', 'asInt', 'sum'], + ['dnsCount', '', 'asInt', 'sum'], + ['httpClientAbortCount', '', 'asInt', 'sum'], + ['httpClientCount', '', 'asInt', 'sum'], + ['httpServerAbortCount', '', 'asInt', 'sum'], + ['httpServerCount', '', 'asInt', 'sum'], + ['loopIdleTime', 'ms', 'asInt', 'gauge'], + ['loopIterations', '', 'asInt', 'sum'], + ['loopIterWithEvents', '', 'asInt', 'sum'], + ['eventsProcessed', '', 'asInt', 'sum'], + ['eventsWaiting', '', 'asInt', 'gauge'], + ['providerDelay', 'ms', 'asInt', 'gauge'], + ['processingDelay', 'ms', 'asInt', 'gauge'], + ['loopTotalCount', '', 'asInt', 'sum'], + ['pipeServerCreatedCount', '', 'asInt', 'sum'], + ['pipeServerDestroyedCount', '', 'asInt', 'sum'], + ['pipeSocketCreatedCount', '', 'asInt', 'sum'], + ['pipeSocketDestroyedCount', '', 'asInt', 'sum'], + ['tcpServerCreatedCount', '', 'asInt', 'sum'], + ['tcpServerDestroyedCount', '', 'asInt', 'sum'], + ['tcpSocketCreatedCount', '', 'asInt', 'sum'], + ['tcpSocketDestroyedCount', '', 'asInt', 'sum'], + ['udpSocketCreatedCount', '', 'asInt', 'sum'], + ['udpSocketDestroyedCount', '', 'asInt', 'sum'], + ['promiseCreatedCount', '', 'asInt', 'sum'], + ['promiseResolvedCount', '', 'asInt', 'sum'], + ['fsHandlesOpenedCount', '', 'asInt', 'sum'], + ['fsHandlesClosedCount', '', 'asInt', 'sum'], + ['loopUtilization', '', 'asDouble', 'gauge'], + ['res5s', '', 'asDouble', 'gauge'], + ['res1m', '', 'asDouble', 'gauge'], + ['res5m', '', 'asDouble', 'gauge'], + ['res15m', '', 'asDouble', 'gauge'], + ['loopAvgTasks', '', 'asDouble', 'gauge'], + ['loopEstimatedLag', 'ms', 'asDouble', 'gauge'], + ['loopIdlePercent', '', 'asDouble', 'gauge'], + ['gcDurUs', 'us', 'asDouble', 'summary'], + ['dns', 'ms', 'asDouble', 'summary'], + ['httpClient', 'ms', 'asDouble', 'summary'], + ['httpServer', 'ms', 'asDouble', 'summary'], +]; + +function hasThreadAttributes(metric) { + return metric.gauge?.dataPoints?.[0]?.attributes?.some((attr) => attr.key === 'thread.id') || + metric.sum?.dataPoints?.[0]?.attributes?.some((attr) => attr.key === 'thread.id') || + metric.summary?.dataPoints?.[0]?.attributes?.some((attr) => attr.key === 'thread.id'); +} + +function checkResource(resource, agentId, config, metrics) { + validateArray(resource.attributes, 'attributes'); + + const expectedAttributes = { + 'telemetry.sdk.version': process.versions.opentelemetry, + 'telemetry.sdk.language': 'cpp', + 'telemetry.sdk.name': 'opentelemetry', + 'service.instance.id': agentId, + 'service.name': config.app, + 'service.version': config.appVersion, + }; + + if (metrics) { + expectedAttributes['process.title'] = metrics.title; + expectedAttributes['process.owner'] = metrics.user; + } + + assert.strictEqual(resource.attributes.length, Object.keys(expectedAttributes).length); + + resource.attributes.forEach((attribute) => { + assert.strictEqual(attribute.value.stringValue, expectedAttributes[attribute.key]); + delete expectedAttributes[attribute.key]; + }); + + assert.strictEqual(Object.keys(expectedAttributes).length, 0); +} + +function checkExitData(data, metadata, agentId, expectedData) { + console.dir(data, { depth: null }); + validateString(data.common.requestId, 'common.requestId'); + assert.strictEqual(data.common.command, 'exit'); + // From here check at least that all the fields are present + validateObject(data.common.recorded, 'recorded'); + const recSeconds = BigInt(data.common.recorded.seconds); + assert.ok(recSeconds); + const recNanoSecs = BigInt(data.common.recorded.nanoseconds); + assert.ok(recNanoSecs); + validateObject(data.body, 'body'); + // also the body fields + assert.strictEqual(data.body.code, expectedData.code); + assert.strictEqual(data.body.profile, expectedData.profile); + if (expectedData.error === null) { + assert.strictEqual(data.body.error, null); + } else { + assert.ok(data.body.error); + assert.strictEqual(data.body.error.message, expectedData.error.message); + validateString(data.body.error.stack, 'error.stack'); + } + + validateArray(metadata['user-agent'], 'metadata.user-agent'); + validateString(metadata['user-agent'][0], 'metadata.user-agent[0]'); + assert.strictEqual(metadata['nsolid-agent-id'][0], agentId); +} + +function checkOTLPMetricsData(resourceMetrics, + agentId, + nsolidConfig, + nsolidMetrics, + expectedBatchSize, + isThreadMetrics, + expectFlushedBatch = false, + expectFlushRange = null) { + validateArray(resourceMetrics, 'resourceMetrics'); + assert.strictEqual(resourceMetrics.length, 1); + checkResource(resourceMetrics[0].resource, agentId, nsolidConfig, nsolidMetrics); + + const scopeMetrics = resourceMetrics[0].scopeMetrics; + validateArray(scopeMetrics, 'scopeMetrics'); + assert.strictEqual(scopeMetrics.length, 1); + assert.strictEqual(scopeMetrics[0].scope.name, 'nsolid'); + assert.strictEqual(scopeMetrics[0].scope.version, + `${process.version}+nsv${process.versions.nsolid}`); + + const metrics = scopeMetrics[0].metrics; + + if (isThreadMetrics) { + console.log(`Checking thread metrics (${metrics.length} metrics)`); + // Check thread metrics + for (const expectedMetric of expectedThreadMetrics) { + const [ name, unit, type, aggregation ] = expectedMetric; + const metric = metrics.find((m) => m.name === name); + if (!metric) { + console.log(`Warning: Thread metric ${name} not found in this export`); + continue; + } + assert.strictEqual(metric.unit, unit); + assert.strictEqual(metric.data, aggregation); + const dataPoints = metric[aggregation].dataPoints; + validateArray(dataPoints, `${name}.dataPoints`); + assert.ok(dataPoints.length > 0, `Expected at least one datapoint for ${name}`); + + console.log(`Thread metric ${name}: ${dataPoints.length} datapoints (expected: ${expectFlushedBatch ? 'flushed range' : expectedBatchSize})`); + + // Thread metrics should have exactly expectedBatchSize datapoints, or a range if flushed + if (expectFlushedBatch) { + const min = expectFlushRange ? expectFlushRange[0] : 3; + const max = expectFlushRange ? expectFlushRange[1] : 7; + assert.ok(dataPoints.length >= min && dataPoints.length <= max, + `Thread metric ${name} should have ${min}-${max} datapoints when flushed, but got ${dataPoints.length}`); + } else { + assert.strictEqual(dataPoints.length, expectedBatchSize, + `Thread metric ${name} should have exactly ${expectedBatchSize} datapoints, but got ${dataPoints.length}`); + } + + const threadIds = new Set(); + const timestamps = new Set(); + for (const dataPoint of dataPoints) { + validateArray(dataPoint.attributes, `${name}.attributes`); + const threadIdAttr = dataPoint.attributes.find((a) => a.key === 'thread.id'); + assert.ok(threadIdAttr, `thread.id attribute missing for ${name}`); + threadIds.add(threadIdAttr.value.intValue); + const threadNameAttr = dataPoint.attributes.find((a) => a.key === 'thread.name'); + assert.ok(threadNameAttr, `thread.name attribute missing for ${name}`); + timestamps.add(dataPoint.timeUnixNano); + if (metric.data === 'summary') { + validateArray(dataPoint.quantileValues, `${name}.quantileValues`); + assert.strictEqual(dataPoint.quantileValues.length, 2); + assert.strictEqual(dataPoint.quantileValues[0].quantile, 0.99); + assert.strictEqual(dataPoint.quantileValues[1].quantile, 0.5); + if (dataPoint.quantileValues[0].value) { + validateNumber(dataPoint.quantileValues[0].value, `${name}.quantileValues[0].value`); + } + if (dataPoint.quantileValues[1].value) { + validateNumber(dataPoint.quantileValues[1].value, `${name}.quantileValues[1].value`); + } + } else { + assert.strictEqual(dataPoint.value, expectedMetric[2]); + if (type === 'asInt') { + validateNumber(parseInt(dataPoint[type], 10), `${name}.${type}`); + } else { // asDouble + validateNumber(dataPoint[type], `${name}.${type}`); + } + } + } + assert.ok(threadIds.size > 0, `No thread IDs recorded for ${name}`); + assert.strictEqual(timestamps.size, dataPoints.length, `Timestamps should be unique for thread metric ${name}`); + } + } else { + console.log(`Checking process metrics (${metrics.length} metrics)`); + // Check process metrics + for (const expectedMetric of expectedProcMetrics) { + const metric = metrics.find((m) => m.name === expectedMetric[0]); + assert.ok(metric, `Expected process metric ${expectedMetric[0]} not found`); + assert.strictEqual(metric.unit, expectedMetric[1]); + assert.strictEqual(metric.data, expectedMetric[3]); + const dataPoints = metric[expectedMetric[3]].dataPoints; + validateArray(dataPoints, `${expectedMetric[0]}.dataPoints`); + + console.log(`Process metric ${expectedMetric[0]}: ${dataPoints.length} datapoints (expected: ${expectFlushedBatch ? 'flushed range' : expectedBatchSize})`); + + // Process metrics should have exactly expectedBatchSize datapoints, or a range if flushed + if (expectFlushedBatch) { + const min = expectFlushRange ? expectFlushRange[0] : 3; + const max = expectFlushRange ? expectFlushRange[1] : 7; + assert.ok(dataPoints.length >= min && dataPoints.length <= max, + `Process metric ${expectedMetric[0]} should have ${min}-${max} datapoints when flushed, but got ${dataPoints.length}`); + } else { + assert.strictEqual(dataPoints.length, expectedBatchSize, + `Process metric ${expectedMetric[0]} should have exactly ${expectedBatchSize} datapoints, but got ${dataPoints.length}`); + } + + // Ensure timestamps are unique for each datapoint in the metric + const timestamps = new Set(dataPoints.map((dp) => dp.timeUnixNano)); + assert.strictEqual(timestamps.size, dataPoints.length, `Timestamps should be unique for process metric ${expectedMetric[0]}`); + + const dataPoint = dataPoints[0]; + assert.strictEqual(dataPoint.attributes.length, 0); + assert.strictEqual(dataPoint.value, expectedMetric[2]); + } + } +} + +module.exports = { + checkExitData, + checkResource, + checkOTLPMetricsData, + hasThreadAttributes, +}; diff --git a/test/fixtures/test-nsolid-config-metrics-env-script.js b/test/fixtures/test-nsolid-config-metrics-env-script.js new file mode 100644 index 0000000000..83fbaf65b9 --- /dev/null +++ b/test/fixtures/test-nsolid-config-metrics-env-script.js @@ -0,0 +1,16 @@ +'use strict'; + +// This script is used by test-nsolid-config-metrics-env.js +// to test environment variable configuration for metrics batch/buffer size + +require('../common'); +const nsolid = require('nsolid'); + +// Start N|Solid with default configuration so initializeConfig() runs +nsolid.start(); + +// Output the configuration as JSON +console.log(JSON.stringify({ + metricsBatchSize: nsolid.config.metricsBatchSize, + metricsBufferSize: nsolid.config.metricsBufferSize, +})); diff --git a/test/parallel/test-nsolid-config-metrics-env.js b/test/parallel/test-nsolid-config-metrics-env.js new file mode 100644 index 0000000000..f8e4b1cb42 --- /dev/null +++ b/test/parallel/test-nsolid-config-metrics-env.js @@ -0,0 +1,96 @@ +'use strict'; + +// This test verifies that metricsBatchSize and metricsBufferSize are correctly +// validated when set via environment variables (initializeConfig path). +// Invalid values (Infinity, negative, zero, non-numeric) must fall back to +// the default. + +require('../common'); +const assert = require('assert'); +const { spawnSync } = require('child_process'); +const path = require('path'); + +const DEFAULT_METRICS_BATCH_SIZE = 1; +const DEFAULT_METRICS_BUFFER_SIZE = 100; + +// Helper to run a small script with specific environment variables +function runWithEnv(envVars) { + const script = path.join( + __dirname, + '../fixtures/test-nsolid-config-metrics-env-script.js' + ); + + const result = spawnSync(process.execPath, [script], { + env: { + ...process.env, + ...envVars, + }, + encoding: 'utf8', + }); + + if (result.status !== 0) { + console.error(result.stderr); + throw new Error(`Script execution failed with status ${result.status}`); + } + + return JSON.parse(result.stdout.trim()); +} + +// Test default values (no env vars set) +{ + const config = runWithEnv({}); + assert.strictEqual(config.metricsBatchSize, DEFAULT_METRICS_BATCH_SIZE); + assert.strictEqual(config.metricsBufferSize, DEFAULT_METRICS_BUFFER_SIZE); +} + +// Test valid numeric string values +{ + const config = runWithEnv({ + NSOLID_METRICS_BATCH_SIZE: '5', + NSOLID_METRICS_BUFFER_SIZE: '200', + }); + assert.strictEqual(config.metricsBatchSize, 5); + assert.strictEqual(config.metricsBufferSize, 200); +} + +// Test valid float string values (should be accepted as positive finite) +{ + const config = runWithEnv({ + NSOLID_METRICS_BATCH_SIZE: '2.5', + NSOLID_METRICS_BUFFER_SIZE: '50.5', + }); + assert.strictEqual(config.metricsBatchSize, 2.5); + assert.strictEqual(config.metricsBufferSize, 50.5); +} + +// Test invalid env values all fall back to default +const invalidEnvValues = [ + '0', + '-1', + '-100', + 'Infinity', + '-Infinity', + 'NaN', + '', + ' ', + 'abc', + 'true', + 'false', +]; + +for (const value of invalidEnvValues) { + const config = runWithEnv({ + NSOLID_METRICS_BATCH_SIZE: value, + NSOLID_METRICS_BUFFER_SIZE: value, + }); + assert.strictEqual( + config.metricsBatchSize, + DEFAULT_METRICS_BATCH_SIZE, + `metricsBatchSize should be default for env value: "${value}"` + ); + assert.strictEqual( + config.metricsBufferSize, + DEFAULT_METRICS_BUFFER_SIZE, + `metricsBufferSize should be default for env value: "${value}"` + ); +} diff --git a/test/parallel/test-nsolid-start-metrics-buffer.js b/test/parallel/test-nsolid-start-metrics-buffer.js new file mode 100644 index 0000000000..dd0dba3ad1 --- /dev/null +++ b/test/parallel/test-nsolid-start-metrics-buffer.js @@ -0,0 +1,48 @@ +'use strict'; + +require('../common'); +const assert = require('assert'); +const nsolid = require('nsolid'); + +const defaultBatchSize = nsolid.config.metricsBatchSize; +const defaultBufferSize = nsolid.config.metricsBufferSize; + +nsolid.start({ + metricsBatchSize: defaultBatchSize + 1, + metricsBufferSize: defaultBufferSize + 50 +}); +assert.strictEqual(nsolid.config.metricsBatchSize, defaultBatchSize + 1); +assert.strictEqual(nsolid.config.metricsBufferSize, defaultBufferSize + 50); + +nsolid.start({ + metricsBatchSize: `${defaultBatchSize + 2}`, + metricsBufferSize: `${defaultBufferSize + 100}` +}); +assert.strictEqual(nsolid.config.metricsBatchSize, defaultBatchSize + 2); +assert.strictEqual(nsolid.config.metricsBufferSize, defaultBufferSize + 100); + +// Invalid inputs should be ignored +const invalidValues = [ + 0, -1, NaN, Infinity, -Infinity, + undefined, null, true, false, {}, [], () => {}, + '0', '-1', 'NaN', 'Infinity', '-Infinity', '', ' ', +]; +for (const value of invalidValues) { + nsolid.start({ + metricsBatchSize: value, + metricsBufferSize: value + }); + assert.strictEqual(nsolid.config.metricsBatchSize, + defaultBatchSize + 2, + `metricsBatchSize changed unexpectedly for value: ${String(value)}`); + assert.strictEqual(nsolid.config.metricsBufferSize, + defaultBufferSize + 100, + `metricsBufferSize changed unexpectedly for value: ${String(value)}`); +} + +nsolid.start({ + metricsBatchSize: defaultBatchSize, + metricsBufferSize: defaultBufferSize +}); +assert.strictEqual(nsolid.config.metricsBatchSize, defaultBatchSize); +assert.strictEqual(nsolid.config.metricsBufferSize, defaultBufferSize);