Skip to content

Commit 9451f0e

Browse files
authored
[SDK] Optimize PeriodicExportingMetricReader Thread Usage (#3383)
1 parent ed91e71 commit 9451f0e

File tree

2 files changed

+26
-62
lines changed

2 files changed

+26
-62
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ Increment the:
3333
* [API] Add Enabled method to Tracer
3434
[#3357](https://github.com/open-telemetry/opentelemetry-cpp/pull/3357)
3535

36+
* [SDK] Optimize PeriodicExportingMetricReader thread usage
37+
[#3383](https://github.com/open-telemetry/opentelemetry-cpp/pull/3383)
38+
3639
## [1.20 2025-04-01]
3740

3841
* [BUILD] Update opentelemetry-proto version

sdk/src/metrics/export/periodic_exporting_metric_reader.cc

+23-62
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include <ostream>
1111
#include <ratio>
1212
#include <thread>
13-
#include <type_traits>
1413
#include <utility>
1514

1615
#include "opentelemetry/common/timestamp.h"
@@ -24,13 +23,6 @@
2423
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"
2524
#include "opentelemetry/version.h"
2625

27-
#if defined(_MSC_VER)
28-
# pragma warning(suppress : 5204)
29-
# include <future>
30-
#else
31-
# include <future>
32-
#endif
33-
3426
#if OPENTELEMETRY_HAVE_EXCEPTIONS
3527
# include <exception>
3628
#endif
@@ -98,11 +90,9 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
9890
worker_thread_instrumentation_->OnStart();
9991
}
10092
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
101-
10293
do
10394
{
10495
auto start = std::chrono::steady_clock::now();
105-
10696
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
10797
if (worker_thread_instrumentation_ != nullptr)
10898
{
@@ -134,7 +124,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
134124
worker_thread_instrumentation_->BeforeWait();
135125
}
136126
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
137-
138127
std::unique_lock<std::mutex> lk(cv_m_);
139128
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() {
140129
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire))
@@ -151,7 +140,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
151140
worker_thread_instrumentation_->AfterWait();
152141
}
153142
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
154-
155143
} while (IsShutdown() != true);
156144

157145
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
@@ -164,61 +152,39 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
164152

165153
bool PeriodicExportingMetricReader::CollectAndExportOnce()
166154
{
167-
std::atomic<bool> cancel_export_for_timeout{false};
168-
169155
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire);
170-
std::unique_ptr<std::thread> task_thread;
171-
172156
#if OPENTELEMETRY_HAVE_EXCEPTIONS
173157
try
174158
{
175159
#endif
176-
std::promise<void> sender;
177-
auto receiver = sender.get_future();
178-
179-
task_thread.reset(
180-
new std::thread([this, &cancel_export_for_timeout, sender = std::move(sender)] {
181160
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
182-
if (collect_thread_instrumentation_ != nullptr)
183-
{
184-
collect_thread_instrumentation_->OnStart();
185-
collect_thread_instrumentation_->BeforeLoad();
186-
}
161+
if (collect_thread_instrumentation_ != nullptr)
162+
{
163+
collect_thread_instrumentation_->OnStart();
164+
collect_thread_instrumentation_->BeforeLoad();
165+
}
187166
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
188-
189-
this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
190-
if (cancel_export_for_timeout.load(std::memory_order_acquire))
191-
{
192-
OTEL_INTERNAL_LOG_ERROR(
193-
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
194-
<< this->export_timeout_millis_.count() << " ms, and timed out");
195-
return false;
196-
}
197-
this->exporter_->Export(metric_data);
198-
return true;
199-
});
200-
201-
const_cast<std::promise<void> &>(sender).set_value();
167+
auto start = std::chrono::steady_clock::now();
168+
this->Collect([this, &start](ResourceMetrics &metric_data) {
169+
auto end = std::chrono::steady_clock::now();
170+
if ((end - start) > this->export_timeout_millis_)
171+
{
172+
OTEL_INTERNAL_LOG_ERROR(
173+
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
174+
<< this->export_timeout_millis_.count() << " ms, and timed out");
175+
return false;
176+
}
177+
this->exporter_->Export(metric_data);
178+
return true;
179+
});
202180

203181
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
204-
if (collect_thread_instrumentation_ != nullptr)
205-
{
206-
collect_thread_instrumentation_->AfterLoad();
207-
collect_thread_instrumentation_->OnEnd();
208-
}
209-
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
210-
}));
211-
212-
std::future_status status;
213-
do
182+
if (collect_thread_instrumentation_ != nullptr)
214183
{
215-
status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_));
216-
if (status == std::future_status::timeout)
217-
{
218-
cancel_export_for_timeout.store(true, std::memory_order_release);
219-
break;
220-
}
221-
} while (status != std::future_status::ready);
184+
collect_thread_instrumentation_->AfterLoad();
185+
collect_thread_instrumentation_->OnEnd();
186+
}
187+
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
222188
#if OPENTELEMETRY_HAVE_EXCEPTIONS
223189
}
224190
catch (std::exception &e)
@@ -235,11 +201,6 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
235201
}
236202
#endif
237203

238-
if (task_thread && task_thread->joinable())
239-
{
240-
task_thread->join();
241-
}
242-
243204
std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire);
244205
while (notify_force_flush > notified_sequence)
245206
{

0 commit comments

Comments
 (0)