Skip to content

Commit a5558cc

Browse files
colorzzrZhiren Zhan
and
Zhiren Zhan
authored
PILOT-6724: add hard limit for queueing jobs into thread pool (#199)
* add hard limit for queueing jobs into thread pool * fixup the test cases * remove unused variable --------- Co-authored-by: Zhiren Zhan <[email protected]>
1 parent 6e84495 commit a5558cc

File tree

4 files changed

+12
-2
lines changed

4 files changed

+12
-2
lines changed

app/configs/app_config.py

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ class Env:
2828
greenroom_bucket_prefix = 'gr'
2929
# the number of items to active interative mode
3030
interative_threshold = 10
31+
# set hard limit for pending jobs, otherwise cli will consume all memory
32+
# to cache jobs. If later on the speed of chunk deliver become faster, we
33+
# can increase the concurrency number.
34+
num_of_jobs = 20
3135

3236
github_url = 'PilotDataPlatform/cli'
3337

app/services/file_manager/file_upload/upload_client.py

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import json
88
import math
99
import os
10+
import threading
1011
import time
1112
from logging import getLogger
1213
from multiprocessing.pool import ApplyResult
@@ -302,6 +303,10 @@ def stream_upload(self, file_object: FileObject, pool: ThreadPool) -> List[Apply
302303
been uploaded.
303304
"""
304305
count = 0
306+
semaphore = threading.Semaphore(AppConfig.Env.num_of_jobs)
307+
308+
def on_complete(result):
309+
semaphore.release()
305310

306311
# process on the file content
307312
f = open(file_object.local_path, 'rb')
@@ -327,9 +332,11 @@ def stream_upload(self, file_object: FileObject, pool: ThreadPool) -> List[Apply
327332
chunk_size = chunk_info.get('chunk_size', self.chunk_size)
328333
file_object.update_progress(chunk_size)
329334
else:
335+
semaphore.acquire()
330336
res = pool.apply_async(
331337
self.upload_chunk,
332338
args=(file_object, count + 1, chunk, local_chunk_etag, len(chunk)),
339+
callback=on_complete,
333340
)
334341
chunk_result.append(res)
335342

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "app"
3-
version = "3.11.0"
3+
version = "3.12.0"
44
description = "This service is designed to support pilot platform"
55
authors = ["Indoc Systems"]
66

tests/app/commands/test_dataset.py

-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ def test_dataset_list_total_less_than_10(mocker, cli_runner):
3131
question_mock = mocker.patch.object(questionary, 'select', return_value=questionary.select)
3232

3333
result = cli_runner.invoke(dataset_list, ['--page', 0, '--page-size', page_size])
34-
3534
assert result.exit_code == 0
3635
assert '' == result.output
3736
assert question_mock.call_count == 0

0 commit comments

Comments
 (0)