Skip to content

Commit 50973df

Browse files
committed
Limit the upload hook thread pool to 64 workers
1 parent 96a9d0f commit 50973df

File tree

3 files changed

+18
-1
lines changed

3 files changed

+18
-1
lines changed

util/opentelemetry-util-genai/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3943](#3943))
1313
- Add more Semconv attributes to LLMInvocation spans.
1414
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3862](#3862))
15+
- Limit the upload hook thread pool to 64 workers
16+
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3944](#3944))
1517

1618
## Version 0.2b0 (2025-10-14)
1719

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ def __init__(
122122

123123
# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
124124
# limits the number of queued tasks. If the queue is full, data will be dropped.
125-
self._executor = ThreadPoolExecutor(max_workers=self._max_queue_size)
125+
self._executor = ThreadPoolExecutor(
126+
max_workers=min(self._max_queue_size, 64)
127+
)
126128
self._semaphore = threading.BoundedSemaphore(self._max_queue_size)
127129

128130
def _submit_all(self, upload_data: UploadData) -> None:

util/opentelemetry-util-genai/tests/test_upload.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,19 @@ def test_upload_after_shutdown_logs(self):
285285
logs.output[0],
286286
)
287287

288+
def test_threadpool_max_workers(self):
289+
for max_queue_size, expect_threadpool_workers in ((10, 10), (100, 64)):
290+
with patch(
291+
"opentelemetry.util.genai._upload.completion_hook.ThreadPoolExecutor"
292+
) as mock:
293+
hook = UploadCompletionHook(
294+
base_path=BASE_PATH, max_queue_size=max_queue_size
295+
)
296+
self.addCleanup(hook.shutdown)
297+
mock.assert_called_once_with(
298+
max_workers=expect_threadpool_workers
299+
)
300+
288301

289302
class TestUploadCompletionHookIntegration(TestBase):
290303
def setUp(self):

0 commit comments

Comments
 (0)