Skip to content

Commit 93e0ddc

Browse files
committed
merge main
2 parents 5191cf9 + 4b8b436 commit 93e0ddc

38 files changed

+801
-131
lines changed

.coveragerc

+3
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ omit =
99

1010
# avoid measuring code of unittest
1111
tests/*
12+
13+
[report]
14+
ignore_errors = True

.github/workflows/deploy_sphinx_docs.yml

+4-1
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@ on:
1212
jobs:
1313
pages:
1414
runs-on: ubuntu-20.04
15+
strategy:
16+
matrix:
17+
python-version: [ "3.9", "3.10" ]
1518
steps:
1619
- name: Checkout
1720
uses: actions/checkout@v4
1821
- name: Setup Python ${{ matrix.python-version }}
1922
uses: actions/setup-python@master
2023
with:
21-
python_version: ${{ matrix.python-version }}
24+
python-version: ${{ matrix.python-version }}
2225
- name: Install dependencies
2326
run: |
2427
python -m pip install --upgrade pip

.github/workflows/perf-bench.yml

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# This workflow will install Python dependencies, run tests and lint with a single version of Python
2+
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python
3+
4+
name: performance_benchmark
5+
6+
on:
7+
workflow_dispatch:
8+
push:
9+
branches:
10+
- main
11+
12+
permissions:
13+
contents: read
14+
15+
env:
16+
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
17+
18+
jobs:
19+
perf_bench:
20+
runs-on: [GPU, unittest]
21+
environment: Testing
22+
steps:
23+
- uses: actions/checkout@v3
24+
with:
25+
path: dj-${{ github.run_id }}
26+
27+
- name: Setup docker compose
28+
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
29+
run: |
30+
docker compose up -d
31+
32+
- name: Install data-juicer
33+
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
34+
run: |
35+
docker compose exec ray-head pip install -e .\[all\]
36+
37+
- name: Clean dataset cache
38+
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
39+
run: |
40+
docker compose exec ray-head rm -rf /data/huggingface/dataset
41+
42+
- name: Run performance benchmark standalone
43+
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
44+
run: |
45+
docker compose exec ray-head bash tests/benchmark_performance/run.sh ${{ secrets.INTERNAL_WANDB_URL }} ${{ secrets.INTERNAL_WANDB_API_KEY }}
46+
47+
- name: Remove docker compose
48+
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
49+
if: always()
50+
run: |
51+
docker compose down --remove-orphans
52+
53+
- name: Cleanup workspace
54+
if: always()
55+
run: |
56+
rm -rf dj-${{ github.run_id }}

configs/config_all.yaml

+10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ text_keys: 'text' # the key name of fi
1515
suffixes: [] # the suffix of files that will be read. For example: '.txt', 'txt' or ['txt', '.pdf', 'docx']
1616
use_cache: true # whether to use the cache management of Hugging Face datasets. It might take up lots of disk space when using cache
1717
ds_cache_dir: null # cache dir for Hugging Face datasets. In default, it\'s the same as the environment variable `HF_DATASETS_CACHE`, whose default value is usually "~/.cache/huggingface/datasets". If this argument is set to a valid path by users, it will override the default cache dir
18+
open_monitor: true # Whether to open the monitor to trace resource utilization for each OP during data processing. It\'s True in default.
1819
use_checkpoint: false # whether to use the checkpoint management to save the latest version of dataset to work dir when processing. Rerun the same config will reload the checkpoint and skip ops before it. Cache will be disabled when using checkpoint. If args of ops before the checkpoint are changed, all ops will be rerun from the beginning.
1920
temp_dir: null # the path to the temp directory to store intermediate caches when cache is disabled, these cache files will be removed on-the-fly. In default, it's None, so the temp dir will be specified by system. NOTICE: you should be caution when setting this argument because it might cause unexpected program behaviors when this path is set to an unsafe directory.
2021
open_tracer: false # whether to open the tracer to trace the changes during process. It might take more time when opening tracer
@@ -211,6 +212,7 @@ process:
211212
radius: 2 # radius of blur kernel
212213
- image_tagging_mapper: # Mapper to generate image tags.
213214
tag_field_name: '__dj__image_tags__' # the field name to store the tags. It's "__dj__image_tags__" in default.
215+
mem_required: '9GB'
214216
- nlpaug_en_mapper: # simply augment texts in English based on the nlpaug library
215217
sequential: false # whether combine all augmentation methods to a sequence. If it's True, a sample will be augmented by all opened augmentation methods sequentially. If it's False, each opened augmentation method would generate its augmented samples independently.
216218
aug_num: 1 # number of augmented samples to be generated. If `sequential` is True, there will be total aug_num augmented samples generated. If it's False, there will be (aug_num * #opened_aug_method) augmented samples generated.
@@ -257,6 +259,12 @@ process:
257259
model_params: {} # Parameters for initializing the API model.
258260
sampling_params: {} # Extra parameters passed to the API call.
259261
- punctuation_normalization_mapper: # normalize unicode punctuations to English punctuations.
262+
- python_python_mapper: # executing Python lambda function defined in a file.
263+
file_path: '' # The path to the Python file containing the function to be executed.
264+
function_name: 'process_single' # The name of the function defined in the file to be executed.
265+
- python_lambda_mapper: # executing Python lambda function on data samples.
266+
lambda_str: '' # A string representation of the lambda function to be executed on data samples. If empty, the identity function is used.
267+
batched: False # A boolean indicating whether to process input data in batches.
260268
- remove_bibliography_mapper: # remove bibliography from Latex text.
261269
- remove_comments_mapper: # remove comments from Latex text, code, etc.
262270
doc_type: tex # comment type you want to remove. Only support 'tex' for now.
@@ -375,6 +383,7 @@ process:
375383
frame_sampling_method: 'all_keyframes' # sampling method of extracting frame images from the videos. Should be one of ["all_keyframes", "uniform"]. The former one extracts all key frames and the latter one extract specified number of frames uniformly from the video. Default: "all_keyframes".
376384
frame_num: 3 # the number of frames to be extracted uniformly from the video. Only works when frame_sampling_method is "uniform". If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration.
377385
tag_field_name: '__dj__video_frame_tags__' # the field name to store the tags. It's "__dj__video_frame_tags__" in default.
386+
mem_required: '9GB'
378387
- whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace.
379388

380389
# Filter ops
@@ -607,6 +616,7 @@ process:
607616
frame_num: 3 # the number of frames to be extracted uniformly from the video. Only works when frame_sampling_method is "uniform". If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration.
608617
tag_field_name: '__dj__video_frame_tags__' # the field name to store the tags. It's "__dj__video_frame_tags__" in default.
609618
any_or_all: any # keep this sample when any/all videos meet the filter condition
619+
mem_required: '9GB'
610620
- words_num_filter: # filter text with number of words out of specific range
611621
lang: en # sample in which language
612622
tokenization: false # whether to use model to tokenize documents

data_juicer/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = '1.0.0'
1+
__version__ = '1.0.1'
22

33
import os
44
import subprocess

data_juicer/config/config.py

+6
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,12 @@ def init_configs(args: Optional[List[str]] = None):
230230
help='The compression method of the cache file, which can be'
231231
'specified in ["gzip", "zstd", "lz4"]. If this parameter is'
232232
'None, the cache file will not be compressed.')
233+
parser.add_argument(
234+
'--open_monitor',
235+
type=bool,
236+
default=True,
237+
help='Whether to open the monitor to trace resource utilization for '
238+
'each OP during data processing. It\'s True in default.')
233239
parser.add_argument(
234240
'--use_checkpoint',
235241
type=bool,

data_juicer/core/data.py

+39-51
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,16 @@ def __getitem__(self, key):
164164
res = super().__getitem__(key)
165165
return nested_obj_factory(res)
166166

167-
def process(self,
168-
operators,
169-
*,
170-
work_dir=None,
171-
exporter=None,
172-
checkpointer=None,
173-
tracer=None):
167+
def process(
168+
self,
169+
operators,
170+
*,
171+
work_dir=None,
172+
exporter=None,
173+
checkpointer=None,
174+
tracer=None,
175+
open_monitor=True,
176+
):
174177
if operators is None:
175178
return self
176179

@@ -179,7 +182,8 @@ def process(self,
179182
unforkable_operators = set(UNFORKABLE.modules.keys())
180183

181184
# resource utilization monitor
182-
resource_util_list = []
185+
if open_monitor:
186+
resource_util_list = []
183187

184188
dataset = self
185189
try:
@@ -196,12 +200,16 @@ def process(self,
196200
'exporter': exporter,
197201
'tracer': tracer,
198202
}
199-
dataset, resource_util_per_op = Monitor.monitor_func(
200-
op.run, args=run_args)
203+
if open_monitor:
204+
dataset, resource_util_per_op = Monitor.monitor_func(
205+
op.run, args=run_args)
206+
else:
207+
dataset = op.run(**run_args)
201208
# record processed ops
202209
if checkpointer is not None:
203210
checkpointer.record(op._op_cfg)
204-
resource_util_list.append(resource_util_per_op)
211+
if open_monitor:
212+
resource_util_list.append(resource_util_per_op)
205213
end = time()
206214
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
207215
f'Left {len(dataset)} samples.')
@@ -215,7 +223,10 @@ def process(self,
215223
'last op...')
216224
dataset.cleanup_cache_files()
217225
checkpointer.save_ckpt(dataset)
218-
if work_dir:
226+
if work_dir and open_monitor:
227+
# get the analyzed version
228+
resource_util_list = Monitor.analyze_resource_util_list(
229+
resource_util_list)
219230
monitor_dir = os.path.join(work_dir, 'monitor')
220231
os.makedirs(monitor_dir, exist_ok=True)
221232
with open(os.path.join(monitor_dir, 'monitor.json'),
@@ -225,9 +236,7 @@ def process(self,
225236
monitor_dir)
226237
return dataset
227238

228-
def map(self, *args, **kargs):
229-
"""Override the map func, which is called by most common operations,
230-
such that the processed samples can be accessed by nested manner."""
239+
def update_args(self, args, kargs, is_filter=False):
231240
if args:
232241
args = list(args)
233242
# the first positional para is function
@@ -253,15 +262,17 @@ def map(self, *args, **kargs):
253262
# batched is required for fault-tolerant or batched OP
254263
if callable(getattr(
255264
called_func.__self__,
256-
'is_batched_op')) and called_func.__self__.is_batched_op(
257-
) or not getattr(called_func.__self__, 'turbo', False):
265+
'is_batched_op')) and called_func.__self__.is_batched_op():
258266
kargs['batched'] = True
259267
kargs['batch_size'] = kargs.pop('batch_size', 1)
268+
elif not getattr(called_func.__self__, 'turbo', False):
269+
kargs['batched'] = True
270+
kargs['batch_size'] = 1
260271
else:
261272
kargs['batched'] = False
262273

263-
# rank is required for cuda model loading
264-
if callable(
274+
# rank is required for cuda model loading for map
275+
if not is_filter and callable(
265276
getattr(called_func.__self__,
266277
'use_cuda')) and called_func.__self__.use_cuda():
267278
kargs['with_rank'] = True
@@ -270,6 +281,14 @@ def map(self, *args, **kargs):
270281
new_fingerprint = generate_fingerprint(self, *args, **kargs)
271282
kargs['new_fingerprint'] = new_fingerprint
272283

284+
return args, kargs
285+
286+
def map(self, *args, **kargs):
287+
"""Override the map func, which is called by most common operations,
288+
such that the processed samples can be accessed by nested manner."""
289+
290+
args, kargs = self.update_args(args, kargs)
291+
273292
if cache_utils.CACHE_COMPRESS:
274293
decompress(self, kargs['new_fingerprint'],
275294
kargs['num_proc'] if 'num_proc' in kargs else 1)
@@ -288,38 +307,7 @@ def map(self, *args, **kargs):
288307
def filter(self, *args, **kargs):
289308
"""Override the filter func, which is called by most common operations,
290309
such that the processed samples can be accessed by nested manner."""
291-
if args:
292-
args = list(args)
293-
# the first positional para is function
294-
if args[0] is None:
295-
args[0] = lambda x: nested_obj_factory(x)
296-
else:
297-
args[0] = wrap_func_with_nested_access(args[0])
298-
called_func = args[0]
299-
else:
300-
if 'function' not in kargs or kargs['function'] is None:
301-
kargs['function'] = lambda x: nested_obj_factory(x)
302-
else:
303-
kargs['function'] = wrap_func_with_nested_access(
304-
kargs['function'])
305-
called_func = kargs['function']
306-
307-
# For wrapped function, try to get its unwrapped (bound) method
308-
while not inspect.ismethod(called_func) and hasattr(
309-
called_func, '__wrapped__'):
310-
called_func = called_func.__wrapped__
311-
312-
# Batched is always required for fault tolerance
313-
if inspect.ismethod(called_func):
314-
if callable(getattr(
315-
called_func.__self__,
316-
'is_batched_op')) and called_func.__self__.is_batched_op():
317-
kargs['batched'] = True
318-
kargs['batch_size'] = kargs.pop('batch_size', 1)
319-
320-
if 'new_fingerprint' not in kargs or kargs['new_fingerprint'] is None:
321-
new_fingerprint = generate_fingerprint(self, *args, **kargs)
322-
kargs['new_fingerprint'] = new_fingerprint
310+
args, kargs = self.update_args(args, kargs, is_filter=True)
323311

324312
# For filter, it involves a map and a filter operations, so the final
325313
# cache files includes two sets with different fingerprint (before and

data_juicer/core/executor.py

+8-5
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,14 @@ def run(self,
193193
# - If checkpoint is open, clean the cache files after each process
194194
logger.info('Processing data...')
195195
tstart = time()
196-
dataset = dataset.process(ops,
197-
work_dir=self.work_dir,
198-
exporter=self.exporter,
199-
checkpointer=self.ckpt_manager,
200-
tracer=self.tracer)
196+
dataset = dataset.process(
197+
ops,
198+
work_dir=self.work_dir,
199+
exporter=self.exporter,
200+
checkpointer=self.ckpt_manager,
201+
tracer=self.tracer,
202+
open_monitor=self.cfg.open_monitor,
203+
)
201204
tend = time()
202205
logger.info(f'All OPs are done in {tend - tstart:.3f}s.')
203206

data_juicer/core/monitor.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,10 @@ def monitor_func(func, args=None, sample_interval=0.5):
205205
resource_util_dict = {}
206206

207207
# start monitor
208-
ctx = get_context('fork')
208+
start_method = 'fork'
209+
if os.name == 'nt': # for Windows
210+
start_method = 'spawn'
211+
ctx = get_context(start_method)
209212
with ctx.Manager() as manager:
210213
mdict = manager.dict()
211214
mdict['stop'] = False

data_juicer/ops/base_op.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def wrapper(samples, *args, **kwargs):
7070
return wrapper
7171

7272

73-
def catch_map_single_exception(method):
73+
def catch_map_single_exception(method, return_sample=True):
7474
"""
7575
For single-map sample-level fault tolerance.
7676
The input sample is expected batch_size = 1.
@@ -92,8 +92,11 @@ def wrapper(sample, *args, **kwargs):
9292
if is_batched(sample):
9393
try:
9494
sample = convert_dict_list_to_list_dict(sample)[0]
95-
res_sample = method(sample, *args, **kwargs)
96-
return convert_list_dict_to_dict_list([res_sample])
95+
res = method(sample, *args, **kwargs)
96+
if return_sample:
97+
return convert_list_dict_to_dict_list([res])
98+
else:
99+
return [res]
97100
except Exception as e:
98101
from loguru import logger
99102
logger.error(
@@ -166,9 +169,8 @@ def __init__(self, *args, **kwargs):
166169
method = wrap_func_with_nested_access(method)
167170
setattr(self, name, method)
168171

169-
@classmethod
170-
def is_batched_op(cls):
171-
return cls._batched_op
172+
def is_batched_op(self):
173+
return self._batched_op
172174

173175
def process(self, *args, **kwargs):
174176
raise NotImplementedError
@@ -326,7 +328,8 @@ def __init__(self, *args, **kwargs):
326328
else:
327329
self.compute_stats = catch_map_single_exception(
328330
self.compute_stats_single)
329-
self.process = catch_map_single_exception(self.process_single)
331+
self.process = catch_map_single_exception(self.process_single,
332+
return_sample=False)
330333

331334
# set the process method is not allowed to be overridden
332335
def __init_subclass__(cls, **kwargs):

0 commit comments

Comments
 (0)