Skip to content

Commit 8679da7

Browse files
authored
Add Actors for Ray Dedup. (modelscope#526)
1 parent d785ee9 commit 8679da7

6 files changed

+109
-45
lines changed

configs/config_all.yaml

+7-7
Original file line numberDiff line numberDiff line change
@@ -782,15 +782,15 @@ process:
782782
- video_deduplicator: # deduplicator to deduplicate samples at document-level using exact matching of videos between documents.
783783
consider_text: false # whether to consider text hash together with video hash when applying deduplication.
784784
- ray_video_deduplicator: # the simple video deduplicator that can run on multi-nodes using md5 hashing exact matching method
785-
redis_host: 'redis_host' # the host of the redis instance
786-
redis_port: 6380 # the port of redis instance, please note that the default port of redis is 6379 which is the same as default port for ray, so we need to modify the default redis config to use it in other port
785+
backend: 'ray_actor' # the backend for dedup, either 'ray_actor' or 'redis'
786+
redis_address: 'redis://localhost:6379' # the address of redis server
787787
- ray_image_deduplicator: # the simple image deduplicator that can deduplicate samples at document-level using exact matching of images between documents.
788-
redis_host: 'redis_host' # the host of the redis instance
789-
redis_port: 6380 # the port of redis instance, please note that the default port of redis is 6379 which is the same as default port for ray, so we need to modify the default redis config to use it in other port
790-
method: phash # hash method for image. One of [phash, dhash, whash, ahash]
788+
backend: 'ray_actor' # the backend for dedup, either 'ray_actor' or 'redis'
789+
redis_address: 'redis://localhost:6379' # the address of redis server
790+
method: phash # hash method for image. One of [phash, dhash, whash, ahash]
791791
- ray_document_deduplicator: # the simple document deduplicator that can run on multi-nodes using md5 hashing exact matching method
792-
redis_host: 'redis_host' # the host of the redis instance
793-
redis_port: 6380 # the port of redis instance, please note that the default port of redis is 6379 which is the same as default port for ray, so we need to modify the default redis config to use it in other port
792+
backend: 'ray_actor' # the backend for dedup, either 'ray_actor' or 'redis'
793+
redis_address: 'redis://localhost:6379' # the address of redis server
794794
lowercase: false # whether to convert text to lower case
795795
ignore_non_character: false # whether to ignore non-alphabet characters, including whitespaces, digits, and punctuations
796796
- ray_bts_minhash_deduplicator: # the document deduplicator that can run on multi-nodes using minhashLSH algorithm
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from pydantic import PositiveInt
1+
from abc import ABC, abstractmethod
2+
3+
import ray
24

35
from data_juicer.utils.constant import HashKeys
46
from data_juicer.utils.lazy_loader import LazyLoader
@@ -7,6 +9,69 @@
79

810
redis = LazyLoader('redis', 'redis')
911

12+
MERSENNE_PRIME = (1 << 61) - 1
13+
14+
15+
@ray.remote(scheduling_strategy='SPREAD')
16+
class DedupSet:
17+
18+
def __init__(self):
19+
self.hash_record = set()
20+
21+
def is_unique(self, key):
22+
if key not in self.hash_record:
23+
self.hash_record.add(key)
24+
return True
25+
else:
26+
return False
27+
28+
29+
class Backend(ABC):
30+
"""
31+
Backend for deduplicator.
32+
"""
33+
34+
@abstractmethod
35+
def __init__(self, *args, **kwargs):
36+
pass
37+
38+
@abstractmethod
39+
def is_unique(self, md5_value: str):
40+
pass
41+
42+
43+
class ActorBackend(Backend):
44+
"""
45+
Ray actor backend for deduplicator.
46+
"""
47+
48+
def __init__(self, dedup_set_num: int):
49+
self.dedup_set_num = dedup_set_num
50+
self.dedup_sets = [
51+
DedupSet.remote() for _ in range(self.dedup_set_num)
52+
]
53+
54+
def is_unique(self, md5_value: str):
55+
dedup_set_id = int.from_bytes(
56+
md5_value.encode(),
57+
byteorder='little') % MERSENNE_PRIME % self.dedup_set_num
58+
return ray.get(
59+
self.dedup_sets[dedup_set_id].is_unique.remote(md5_value))
60+
61+
62+
class RedisBackend(Backend):
63+
"""
64+
Redis backend for deduplicator.
65+
"""
66+
67+
def __init__(self, redis_address: str):
68+
self.redis_address = redis_address
69+
self.redis_client = redis.from_url(url=self.redis_address)
70+
self.redis_client.flushdb(0)
71+
72+
def is_unique(self, md5_value: str):
73+
return self.redis_client.setnx(md5_value, 1)
74+
1075

1176
class RayBasicDeduplicator(Filter):
1277
"""
@@ -19,37 +84,40 @@ class RayBasicDeduplicator(Filter):
1984
EMPTY_HASH_VALUE = 'EMPTY'
2085

2186
def __init__(self,
22-
redis_host: str = 'localhost',
23-
redis_port: PositiveInt = 6380,
87+
backend: str = 'ray_actor',
88+
redis_address: str = 'redis://localhost:6379',
2489
*args,
2590
**kwargs):
2691
"""
2792
Initialization.
28-
:param redis_host: the hostname of redis server
29-
:param redis_port: the port of redis server
93+
:param backend: the backend for dedup, either 'ray_actor' or 'redis'
94+
:param redis_address: the address of redis server
3095
:param args: extra args
3196
:param kwargs: extra args
3297
"""
3398
super().__init__(*args, **kwargs)
34-
self.redis_host = redis_host
35-
self.redis_port = redis_port
36-
# TODO: add a barrier to ensure that flushdb is performed before
37-
# the operator is called
38-
r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0)
39-
r.flushdb(0)
99+
self.redis_address = redis_address
100+
self.backend = backend
101+
if backend == 'ray_actor':
102+
dedup_set_num = int(ray.cluster_resources().get('CPU') / 2)
103+
self.backend = ActorBackend(dedup_set_num)
104+
elif backend == 'redis':
105+
# TODO: add a barrier to ensure that flushdb is performed before
106+
# the operator is called
107+
self.backend = RedisBackend(redis_address)
108+
else:
109+
raise ValueError(f'Unknown backend: {backend}')
40110

41111
def calculate_hash(self, sample, context=False):
42112
"""Calculate hash value for the sample."""
43113
raise NotImplementedError
44114

45115
def compute_stats_single(self, sample, context=False):
46-
# init redis client
47-
r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0)
48116
# compute hash
49117
md5_value = self.calculate_hash(sample, context)
50118
# check existing
51-
sample[HashKeys.is_duplicate] = r.setnx(md5_value, 1)
119+
sample[HashKeys.is_unique] = self.backend.is_unique(md5_value)
52120
return sample
53121

54122
def process_single(self, sample):
55-
return sample[HashKeys.is_duplicate]
123+
return sample[HashKeys.is_unique]

data_juicer/ops/deduplicator/ray_document_deduplicator.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import string
33

44
import regex as re
5-
from pydantic import PositiveInt
65

76
from ..base_op import OPERATORS
87
from .ray_basic_deduplicator import RayBasicDeduplicator
@@ -17,24 +16,24 @@ class RayDocumentDeduplicator(RayBasicDeduplicator):
1716
"""
1817

1918
def __init__(self,
20-
redis_host: str = 'localhost',
21-
redis_port: PositiveInt = 6380,
19+
backend: str = 'ray_actor',
20+
redis_address: str = 'redis://localhost:6379',
2221
lowercase: bool = False,
2322
ignore_non_character: bool = False,
2423
*args,
2524
**kwargs):
2625
"""
2726
Initialization method.
28-
:param redis_host: the hostname of redis server
29-
:param redis_port: the port of redis server
27+
:param backend: the backend for dedup, either 'ray_actor' or 'redis'
28+
:param redis_address: the address of redis server
3029
:param lowercase: Whether to convert sample text to lower case
3130
:param ignore_non_character: Whether to ignore non-alphabet
3231
characters, including whitespaces, digits, and punctuations
3332
:param args: extra args
3433
:param kwargs: extra args.
3534
"""
36-
super().__init__(redis_host=redis_host,
37-
redis_port=redis_port,
35+
super().__init__(backend=backend,
36+
redis_address=redis_address,
3837
*args,
3938
**kwargs)
4039
self.lowercase = lowercase

data_juicer/ops/deduplicator/ray_image_deduplicator.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import numpy as np
2-
from pydantic import PositiveInt
32

43
from data_juicer.utils.lazy_loader import LazyLoader
54
from data_juicer.utils.mm_utils import load_data_with_context, load_image
@@ -36,20 +35,20 @@ class RayImageDeduplicator(RayBasicDeduplicator):
3635
"""
3736

3837
def __init__(self,
39-
redis_host: str = 'localhost',
40-
redis_port: PositiveInt = 6380,
38+
backend: str = 'ray_actor',
39+
redis_address: str = 'redis://localhost:6379',
4140
method: str = 'phash',
4241
*args,
4342
**kwargs):
4443
"""
4544
Initialization.
46-
:param redis_host: the hostname of redis server
47-
:param redis_port: the port of redis server
45+
:param backend: the backend for dedup, either 'ray_actor' or 'redis'
46+
:param redis_address: the address of redis server
4847
:param args: extra args
4948
:param kwargs: extra args
5049
"""
51-
super().__init__(redis_host=redis_host,
52-
redis_port=redis_port,
50+
super().__init__(backend=backend,
51+
redis_address=redis_address,
5352
*args,
5453
**kwargs)
5554
if method not in HASH_METHOD:

data_juicer/ops/deduplicator/ray_video_deduplicator.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import hashlib
22

3-
from pydantic import PositiveInt
4-
53
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
64
load_video)
75

@@ -21,19 +19,19 @@ class RayVideoDeduplicator(RayBasicDeduplicator):
2119
"""
2220

2321
def __init__(self,
24-
redis_host: str = 'localhost',
25-
redis_port: PositiveInt = 6380,
22+
backend: str = 'ray_actor',
23+
redis_address: str = 'redis://localhost:6379',
2624
*args,
2725
**kwargs):
2826
"""
2927
Initialization.
30-
:param redis_host: the hostname of redis server
31-
:param redis_port: the port of redis server
28+
:param backend: the backend for dedup, either 'ray_actor' or 'redis'
29+
:param redis_address: the address of redis server
3230
:param args: extra args
3331
:param kwargs: extra args
3432
"""
35-
super().__init__(redis_host=redis_host,
36-
redis_port=redis_port,
33+
super().__init__(backend=backend,
34+
redis_address=redis_address,
3735
*args,
3836
**kwargs)
3937

data_juicer/utils/constant.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ class HashKeys(object):
268268
videohash = DEFAULT_PREFIX + 'videohash'
269269

270270
# duplicate flag
271-
is_duplicate = DEFAULT_PREFIX + 'is_duplicate'
271+
is_unique = DEFAULT_PREFIX + 'is_unique'
272272

273273

274274
class InterVars(object):

0 commit comments

Comments
 (0)