diff --git a/CHANGELOG.md b/CHANGELOG.md index f01050441d..64b2213c58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,9 @@ Increment the: * [API] Add Enabled method to Tracer [#3357](https://github.com/open-telemetry/opentelemetry-cpp/pull/3357) +* [SDK] Optimize PeriodicExportingMetricReader thread usage + [#3383](https://github.com/open-telemetry/opentelemetry-cpp/pull/3383) + ## [1.20 2025-04-01] * [BUILD] Update opentelemetry-proto version diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index f9970312e4..0fbde50bc8 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -10,7 +10,6 @@ #include #include #include -#include #include #include "opentelemetry/common/timestamp.h" @@ -24,13 +23,6 @@ #include "opentelemetry/sdk/metrics/push_metric_exporter.h" #include "opentelemetry/version.h" -#if defined(_MSC_VER) -# pragma warning(suppress : 5204) -# include -#else -# include -#endif - #if OPENTELEMETRY_HAVE_EXCEPTIONS # include #endif @@ -98,11 +90,9 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->OnStart(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - do { auto start = std::chrono::steady_clock::now(); - #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW if (worker_thread_instrumentation_ != nullptr) { @@ -134,7 +124,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->BeforeWait(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - std::unique_lock lk(cv_m_); cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) @@ -151,7 +140,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->AfterWait(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - } while (IsShutdown() != true); #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW @@ -164,61 +152,39 @@ void PeriodicExportingMetricReader::DoBackgroundWork() bool PeriodicExportingMetricReader::CollectAndExportOnce() { - std::atomic cancel_export_for_timeout{false}; - std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire); - std::unique_ptr task_thread; - #if OPENTELEMETRY_HAVE_EXCEPTIONS try { #endif - std::promise sender; - auto receiver = sender.get_future(); - - task_thread.reset( - new std::thread([this, &cancel_export_for_timeout, sender = std::move(sender)] { #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW - if (collect_thread_instrumentation_ != nullptr) - { - collect_thread_instrumentation_->OnStart(); - collect_thread_instrumentation_->BeforeLoad(); - } + if (collect_thread_instrumentation_ != nullptr) + { + collect_thread_instrumentation_->OnStart(); + collect_thread_instrumentation_->BeforeLoad(); + } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - - this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { - if (cancel_export_for_timeout.load(std::memory_order_acquire)) - { - OTEL_INTERNAL_LOG_ERROR( - "[Periodic Exporting Metric Reader] Collect took longer configured time: " - << this->export_timeout_millis_.count() << " ms, and timed out"); - return false; - } - this->exporter_->Export(metric_data); - return true; - }); - - const_cast &>(sender).set_value(); + auto start = std::chrono::steady_clock::now(); + this->Collect([this, &start](ResourceMetrics &metric_data) { + auto end = std::chrono::steady_clock::now(); + if ((end - start) > this->export_timeout_millis_) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << this->export_timeout_millis_.count() << " ms, and timed out"); + return false; + } + this->exporter_->Export(metric_data); + return true; + }); #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW - if (collect_thread_instrumentation_ != nullptr) - { - collect_thread_instrumentation_->AfterLoad(); - collect_thread_instrumentation_->OnEnd(); - } -#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - })); - - std::future_status status; - do + if (collect_thread_instrumentation_ != nullptr) { - status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_)); - if (status == std::future_status::timeout) - { - cancel_export_for_timeout.store(true, std::memory_order_release); - break; - } - } while (status != std::future_status::ready); + collect_thread_instrumentation_->AfterLoad(); + collect_thread_instrumentation_->OnEnd(); + } +#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ #if OPENTELEMETRY_HAVE_EXCEPTIONS } catch (std::exception &e) @@ -235,11 +201,6 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() } #endif - if (task_thread && task_thread->joinable()) - { - task_thread->join(); - } - std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire); while (notify_force_flush > notified_sequence) {