Skip to content

Commit 04b86c5

Browse files
authored
Merge pull request #278 from open-telemetry/main
[SDK] Optimize PeriodicExportingMetricReader Thread Usage (open-telemetry#3383)
2 parents 334b787 + 9451f0e commit 04b86c5

File tree

2 files changed

+26
-62
lines changed

2 files changed

+26
-62
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ Increment the:
3333
* [API] Add Enabled method to Tracer
3434
[#3357](https://github.com/open-telemetry/opentelemetry-cpp/pull/3357)
3535

36+
* [SDK] Optimize PeriodicExportingMetricReader thread usage
37+
[#3383](https://github.com/open-telemetry/opentelemetry-cpp/pull/3383)
38+
3639
## [1.20 2025-04-01]
3740

3841
* [BUILD] Update opentelemetry-proto version

sdk/src/metrics/export/periodic_exporting_metric_reader.cc

+23-62
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include <ostream>
1111
#include <ratio>
1212
#include <thread>
13-
#include <type_traits>
1413
#include <utility>
1514

1615
#include "opentelemetry/common/timestamp.h"
@@ -23,13 +22,6 @@
2322
#include "opentelemetry/sdk/metrics/instruments.h"
2423
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"
2524

26-
#if defined(_MSC_VER)
27-
# pragma warning(suppress : 5204)
28-
# include <future>
29-
#else
30-
# include <future>
31-
#endif
32-
3325
#if OPENTELEMETRY_HAVE_EXCEPTIONS
3426
# include <exception>
3527
#endif
@@ -97,11 +89,9 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
9789
worker_thread_instrumentation_->OnStart();
9890
}
9991
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
100-
10192
do
10293
{
10394
auto start = std::chrono::steady_clock::now();
104-
10595
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
10696
if (worker_thread_instrumentation_ != nullptr)
10797
{
@@ -133,7 +123,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
133123
worker_thread_instrumentation_->BeforeWait();
134124
}
135125
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
136-
137126
std::unique_lock<std::mutex> lk(cv_m_);
138127
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() {
139128
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire))
@@ -150,7 +139,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
150139
worker_thread_instrumentation_->AfterWait();
151140
}
152141
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
153-
154142
} while (IsShutdown() != true);
155143

156144
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
@@ -163,61 +151,39 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
163151

164152
bool PeriodicExportingMetricReader::CollectAndExportOnce()
165153
{
166-
std::atomic<bool> cancel_export_for_timeout{false};
167-
168154
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire);
169-
std::unique_ptr<std::thread> task_thread;
170-
171155
#if OPENTELEMETRY_HAVE_EXCEPTIONS
172156
try
173157
{
174158
#endif
175-
std::promise<void> sender;
176-
auto receiver = sender.get_future();
177-
178-
task_thread.reset(
179-
new std::thread([this, &cancel_export_for_timeout, sender = std::move(sender)] {
180159
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
181-
if (collect_thread_instrumentation_ != nullptr)
182-
{
183-
collect_thread_instrumentation_->OnStart();
184-
collect_thread_instrumentation_->BeforeLoad();
185-
}
160+
if (collect_thread_instrumentation_ != nullptr)
161+
{
162+
collect_thread_instrumentation_->OnStart();
163+
collect_thread_instrumentation_->BeforeLoad();
164+
}
186165
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
187-
188-
this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
189-
if (cancel_export_for_timeout.load(std::memory_order_acquire))
190-
{
191-
OTEL_INTERNAL_LOG_ERROR(
192-
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
193-
<< this->export_timeout_millis_.count() << " ms, and timed out");
194-
return false;
195-
}
196-
this->exporter_->Export(metric_data);
197-
return true;
198-
});
199-
200-
const_cast<std::promise<void> &>(sender).set_value();
166+
auto start = std::chrono::steady_clock::now();
167+
this->Collect([this, &start](ResourceMetrics &metric_data) {
168+
auto end = std::chrono::steady_clock::now();
169+
if ((end - start) > this->export_timeout_millis_)
170+
{
171+
OTEL_INTERNAL_LOG_ERROR(
172+
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
173+
<< this->export_timeout_millis_.count() << " ms, and timed out");
174+
return false;
175+
}
176+
this->exporter_->Export(metric_data);
177+
return true;
178+
});
201179

202180
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
203-
if (collect_thread_instrumentation_ != nullptr)
204-
{
205-
collect_thread_instrumentation_->AfterLoad();
206-
collect_thread_instrumentation_->OnEnd();
207-
}
208-
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
209-
}));
210-
211-
std::future_status status;
212-
do
181+
if (collect_thread_instrumentation_ != nullptr)
213182
{
214-
status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_));
215-
if (status == std::future_status::timeout)
216-
{
217-
cancel_export_for_timeout.store(true, std::memory_order_release);
218-
break;
219-
}
220-
} while (status != std::future_status::ready);
183+
collect_thread_instrumentation_->AfterLoad();
184+
collect_thread_instrumentation_->OnEnd();
185+
}
186+
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
221187
#if OPENTELEMETRY_HAVE_EXCEPTIONS
222188
}
223189
catch (std::exception &e)
@@ -234,11 +200,6 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
234200
}
235201
#endif
236202

237-
if (task_thread && task_thread->joinable())
238-
{
239-
task_thread->join();
240-
}
241-
242203
std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire);
243204
while (notify_force_flush > notified_sequence)
244205
{

0 commit comments

Comments
 (0)