1
1
import hashlib
2
2
import inspect
3
3
import json
4
+ import os
5
+ import random
6
+ import time
4
7
from datetime import datetime
5
8
from functools import lru_cache
6
9
from typing import Any , Dict , List , Optional , Union
7
10
11
+ import filelock
8
12
from datasets import Dataset , DatasetDict , IterableDataset , IterableDatasetDict
9
13
from datasets .exceptions import DatasetGenerationError
14
+ from huggingface_hub import constants as hf_constants
10
15
11
16
from .artifact import fetch_artifact
12
17
from .card import TaskCard
@@ -171,16 +176,21 @@ def _source_to_dataset(
171
176
streaming = False ,
172
177
lock_timeout = 60 , # Timeout in seconds for acquiring the lock
173
178
):
174
- import json
175
- import os
176
-
177
- import filelock
178
-
179
179
from .dataset import Dataset as UnitxtDataset
180
180
181
181
# Generate a unique signature for the source
182
182
source_signature = json .dumps (to_dict (source , object_to_str_without_addresses ), sort_keys = True )
183
183
config_name = "recipe-" + short_hex_hash (source_signature )
184
+ hf_cache_home = hf_constants .HF_HOME
185
+ lock_dir = os .path .join (hf_cache_home , "locks" )
186
+ os .makedirs (lock_dir , exist_ok = True )
187
+
188
+ # Create a lock file path based on the dataset configuration
189
+ lock_file = os .path .join (lock_dir , f"unitxt_{ config_name } .lock" )
190
+
191
+ # Add retry logic
192
+ max_attempts = 5
193
+ base_wait = 5 # seconds
184
194
185
195
stream = source ()
186
196
@@ -196,28 +206,32 @@ def _source_to_dataset(
196
206
197
207
ds_builder ._generators = stream
198
208
199
- # Create a lock file path based on the dataset configuration
200
- lock_file = os .path .join (os .path .expanduser ("~" ), ".cache" , "unitxt" , f"{ config_name } .lock" )
201
- os .makedirs (os .path .dirname (lock_file ), exist_ok = True )
202
209
203
- # Create a file lock
204
- lock = filelock .FileLock (lock_file , timeout = lock_timeout )
210
+ for attempt in range (max_attempts ):
211
+ # Create a file lock with appropriate timeout
212
+ lock = filelock .FileLock (lock_file , timeout = 300 ) # 5 minutes
205
213
206
- # Only protect the download_and_prepare operation with the lock
207
- try :
208
- with lock :
209
- ds_builder .download_and_prepare (
210
- verification_mode = "no_checks" ,
211
- download_mode = None if use_cache else "force_redownload" ,
214
+ try :
215
+ with lock :
216
+ ds_builder .download_and_prepare (
217
+ verification_mode = "no_checks" ,
218
+ download_mode = None if use_cache else "force_redownload" ,
219
+ )
220
+
221
+ # If we reach here, the lock was successfully acquired and released
222
+ if streaming :
223
+ return ds_builder .as_streaming_dataset (split = split )
224
+ return ds_builder .as_dataset (
225
+ split = split , run_post_process = False , verification_mode = "no_checks"
212
226
)
213
- except filelock .Timeout :
214
- raise TimeoutError (f"Could not acquire lock for { config_name } within { lock_timeout } seconds. Another process may be preparing the same dataset." )
215
227
216
- if streaming :
217
- return ds_builder .as_streaming_dataset (split = split )
218
- return ds_builder .as_dataset (
219
- split = split , run_post_process = False , verification_mode = "no_checks"
220
- )
228
+ except filelock .Timeout :
229
+ if attempt < max_attempts - 1 : # Not the last attempt
230
+ wait_time = base_wait * (2 ** attempt ) + random .uniform (0 , 1 )
231
+ time .sleep (wait_time )
232
+ else :
233
+ raise TimeoutError (f"Could not acquire lock for { config_name } after { max_attempts } attempts" )
234
+
221
235
except DatasetGenerationError as e :
222
236
raise e .__cause__
223
237
0 commit comments