Skip to content

Commit 3c10162

Browse files
committed
remove attr _ray_mode and optimize api
1 parent a002f4c commit 3c10162

File tree

5 files changed

+36
-30
lines changed

5 files changed

+36
-30
lines changed

data_juicer/core/ray_data.py

+4-10
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,7 @@ def _run_single_op(self, op):
118118
batch_size = getattr(op, 'batch_size',
119119
1) if op.is_batched_op() else 1
120120
if isinstance(op, Mapper):
121-
if op.use_ray_actor():
122-
# TODO: auto calculate concurrency
123-
concurrency = getattr(op, 'concurrency', 1)
124-
121+
if op.use_cuda():
125122
init_params = op._init_parameters
126123
op_args = init_params.pop('args', ())
127124
op_kwargs = init_params.pop('kwargs', {})
@@ -135,18 +132,15 @@ def _run_single_op(self, op):
135132
fn_constructor_kwargs=op_kwargs,
136133
batch_size=batch_size,
137134
num_gpus=num_gpus,
138-
concurrency=concurrency,
135+
concurrency=op_proc,
139136
batch_format='pyarrow')
140137
else:
141138
self.data = self.data.map_batches(op.process,
142139
batch_size=batch_size,
143140
batch_format='pyarrow',
144141
num_gpus=num_gpus)
145142
elif isinstance(op, Filter):
146-
if op.use_ray_actor():
147-
# TODO: auto calculate concurrency
148-
concurrency = getattr(op, 'concurrency', 1)
149-
143+
if op.use_cuda():
150144
init_params = op._init_parameters
151145
op_args = init_params.pop('args', ())
152146
op_kwargs = init_params.pop('kwargs', {})
@@ -160,7 +154,7 @@ def _run_single_op(self, op):
160154
fn_constructor_kwargs=op_kwargs,
161155
batch_size=batch_size,
162156
num_gpus=num_gpus,
163-
concurrency=concurrency,
157+
concurrency=op_proc,
164158
batch_format='pyarrow')
165159
else:
166160
self.data = self.data.map_batches(op.compute_stats,

data_juicer/ops/base_op.py

-4
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ class OP:
118118

119119
_accelerator = 'cpu'
120120
_batched_op = False
121-
_ray_mode = 'task'
122121

123122
def __init__(self, *args, **kwargs):
124123
"""
@@ -174,9 +173,6 @@ def __init__(self, *args, **kwargs):
174173
def is_batched_op(self):
175174
return self._batched_op
176175

177-
def use_ray_actor(self):
178-
return self._ray_mode == 'actor'
179-
180176
def process(self, *args, **kwargs):
181177
raise NotImplementedError
182178

data_juicer/ops/filter/image_nsfw_filter.py

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ class ImageNSFWFilter(Filter):
1919
"""Filter to keep samples whose images have low nsfw scores."""
2020

2121
_accelerator = 'cuda'
22-
_ray_mode = 'actor'
2322

2423
def __init__(self,
2524
hf_nsfw_model: str = 'Falconsai/nsfw_image_detection',

data_juicer/ops/mapper/image_captioning_mapper.py

-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class ImageCaptioningMapper(Mapper):
3030

3131
_accelerator = 'cuda'
3232
_batched_op = True
33-
_ray_mode = 'actor'
3433

3534
def __init__(self,
3635
hf_img2seq: str = 'Salesforce/blip2-opt-2.7b',

data_juicer/utils/process_utils.py

+32-14
Original file line numberDiff line numberDiff line change
@@ -57,32 +57,50 @@ def calculate_np(name,
5757
"""Calculate the optimum number of processes for the given OP"""
5858
eps = 1e-9 # about 1 byte
5959

60-
if num_proc is None:
61-
num_proc = psutil.cpu_count()
62-
6360
if use_cuda:
61+
auto_num_proc = None
6462
cuda_mem_available = get_min_cuda_memory() / 1024
65-
op_proc = min(
66-
num_proc,
67-
math.floor(cuda_mem_available / (mem_required + eps)) *
68-
cuda_device_count())
69-
if use_cuda and mem_required == 0:
63+
if mem_required == 0:
7064
logger.warning(f'The required cuda memory of Op[{name}] '
7165
f'has not been specified. '
7266
f'Please specify the mem_required field in the '
7367
f'config file, or you might encounter CUDA '
7468
f'out of memory error. You can reference '
7569
f'the mem_required field in the '
7670
f'config_all.yaml file.')
77-
if op_proc < 1.0:
78-
logger.warning(f'The required cuda memory:{mem_required}GB might '
79-
f'be more than the available cuda memory:'
80-
f'{cuda_mem_available}GB.'
81-
f'This Op[{name}] might '
82-
f'require more resource to run.')
71+
else:
72+
auto_num_proc = math.floor(
73+
cuda_mem_available / mem_required) * cuda_device_count()
74+
if cuda_mem_available / mem_required < 1.0:
75+
logger.warning(
76+
f'The required cuda memory:{mem_required}GB might '
77+
f'be more than the available cuda memory:'
78+
f'{cuda_mem_available}GB.'
79+
f'This Op[{name}] might '
80+
f'require more resource to run.')
81+
82+
if auto_num_proc and num_proc:
83+
op_proc = min(auto_num_proc, num_proc)
84+
if num_proc > auto_num_proc:
85+
logger.warning(
86+
f'The given num_proc: {num_proc} is greater than '
87+
f'the value {auto_num_proc} auto calculated based '
88+
f'on the mem_required of Op[{name}]. '
89+
f'Set the `num_proc` to {auto_num_proc}.')
90+
elif not auto_num_proc and not num_proc:
91+
op_proc = cuda_device_count()
92+
logger.warning(
93+
f'Both mem_required and num_proc of Op[{name}] are not set.'
94+
f'Set the `num_proc` to {op_proc}.')
95+
else:
96+
op_proc = auto_num_proc if auto_num_proc else num_proc
97+
8398
op_proc = max(op_proc, 1)
8499
return op_proc
85100
else:
101+
if num_proc is None:
102+
num_proc = psutil.cpu_count()
103+
86104
op_proc = num_proc
87105
cpu_available = psutil.cpu_count()
88106
mem_available = psutil.virtual_memory().available

0 commit comments

Comments
 (0)