Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming support for load_from_disk #5838

Closed
Nilabhra opened this issue May 10, 2023 · 12 comments
Closed

Streaming support for load_from_disk #5838

Nilabhra opened this issue May 10, 2023 · 12 comments
Labels
enhancement New feature or request

Comments

@Nilabhra
Copy link

Feature request

Support for streaming datasets stored in object stores in load_from_disk.

Motivation

The load_from_disk function supports fetching datasets stored in object stores such as s3. In many cases, the datasets that are stored in object stores are very large and being able to stream the data from the buckets becomes essential.

Your contribution

I'd be happy to contribute this feature if I could get the guidance on how to do so.

@Nilabhra Nilabhra added the enhancement New feature or request label May 10, 2023
@lhoestq
Copy link
Member

lhoestq commented May 10, 2023

As the name says, load_from_disk load the data from your disk. If the data is hosted on S3, it is first downloaded locally and then loaded from your disk.

There is a discussion on streaming data from S3 here though: #5281

@Nilabhra
Copy link
Author

Nilabhra commented May 11, 2023

@lhoestq
Thanks for your comment. I have checked out the discussion before and attempted at replicating the mentioned changes in the main branch (#5580). What I found was that if a dataset is saved using save_to_disk, it cannot be read by load_dataset. The error message asks me to to use load_from_disk instead. What would be the correct way of saving the data in this scenario?

@lhoestq
Copy link
Member

lhoestq commented May 11, 2023

Using push_to_hub you can save the dataset on the HF Hub as parquet files, and reload it / stream it using load_dataset :)

If you want to save your dataset somewhere else you can use .to_parquet to get a parquet file. If your dataset is big it's usually recommended to shard it into multi parquet files (around 1GB each).

@Nilabhra
Copy link
Author

@lhoestq
Thanks for the explanation. Appreciate it. I'll try this out.

@Nilabhra
Copy link
Author

@lhoestq
I tried the method you mentioned. This the current scenario I'm facing:

  • The parquet file can be read from disk and streaming can be enabled.
  • The parquet file can be read from s3 (local MinIO).
  • When streaming=True is enabled for s3, I get the error mentioned below:
File ~/.../lib/python3.8/site-packages/s3fs/core.py:502, in S3FileSystem.set_session(self, refresh, kwargs)
    500 conf = AioConfig(**config_kwargs)
    501 if self.session is None:
--> 502     self.session = aiobotocore.session.AioSession(**self.kwargs)
    504 for parameters in (config_kwargs, self.kwargs, init_kwargs, client_kwargs):
    505     for option in ("region_name", "endpoint_url"):

TypeError: __init__() got an unexpected keyword argument 'headers'

Does this mean there is a bug in the main branch?

@lhoestq
Copy link
Member

lhoestq commented May 11, 2023

Streaming from S3 is still experimental, there might be a few bugs unfortunately.

Can you share the full stack trace ?

@Nilabhra
Copy link
Author

@lhoestq
Sure, here you go:

TypeError                                 Traceback (most recent call last)
Cell In[8], line 1
----> 1 dataset = load_dataset("parquet", data_files=["s3://<bucket name>/<data folder>/data-parquet"], storage_options=fs.storage_options, streaming=True)

File ~/.../datasets/src/datasets/load.py:1790, in load_dataset(path, name, data_dir, data_files, split, cache_dir, features, download_config, download_mode, verification_mode, ignore_verifications, keep_in_memory, save_infos, revision, use_auth_token, task, streaming, num_proc, storage_options, **config_kwargs)
   1788 # Return iterable dataset in case of streaming
   1789 if streaming:
-> 1790     return builder_instance.as_streaming_dataset(split=split)
   1792 # Some datasets are already processed on the HF google storage
   1793 # Don't try downloading from Google storage for the packaged datasets as text, json, csv or pandas
   1794 try_from_hf_gcs = path not in _PACKAGED_DATASETS_MODULES

File ~/.../datasets/src/datasets/builder.py:1264, in DatasetBuilder.as_streaming_dataset(self, split, base_path)
   1257 dl_manager = StreamingDownloadManager(
   1258     base_path=base_path or self.base_path,
   1259     download_config=DownloadConfig(use_auth_token=self.use_auth_token, storage_options=self.storage_options),
   1260     dataset_name=self.name,
   1261     data_dir=self.config.data_dir,
   1262 )
   1263 self._check_manual_download(dl_manager)
-> 1264 splits_generators = {sg.name: sg for sg in self._split_generators(dl_manager)}
   1265 # By default, return all splits
   1266 if split is None:

File ~/.../datasets/src/datasets/packaged_modules/parquet/parquet.py:34, in Parquet._split_generators(self, dl_manager)
     32 if not self.config.data_files:
     33     raise ValueError(f"At least one data file must be specified, but got data_files={self.config.data_files}")
---> 34 data_files = dl_manager.download_and_extract(self.config.data_files)
     35 if isinstance(data_files, (str, list, tuple)):
     36     files = data_files

File ~/.../datasets/src/datasets/download/streaming_download_manager.py:1087, in StreamingDownloadManager.download_and_extract(self, url_or_urls)
   1069 def download_and_extract(self, url_or_urls):
   1070     """Prepare given `url_or_urls` for streaming (add extraction protocol).
   1071 
   1072     This is the lazy version of `DownloadManager.download_and_extract` for streaming.
   (...)
   1085         url(s): (`str` or `list` or `dict`), URL(s) to stream data from matching the given input `url_or_urls`.
   1086     """
-> 1087     return self.extract(self.download(url_or_urls))

File ~/.../datasets/src/datasets/download/streaming_download_manager.py:1039, in StreamingDownloadManager.extract(self, url_or_urls)
   1020 def extract(self, url_or_urls):
   1021     """Add extraction protocol for given url(s) for streaming.
   1022 
   1023     This is the lazy version of `DownloadManager.extract` for streaming.
   (...)
   1037     ```
   1038     """
-> 1039     urlpaths = map_nested(self._extract, url_or_urls, map_tuple=True)
   1040     return urlpaths

File ~/.../datasets/src/datasets/utils/py_utils.py:443, in map_nested(function, data_struct, dict_only, map_list, map_tuple, map_numpy, num_proc, parallel_min_length, types, disable_tqdm, desc)
    441     num_proc = 1
    442 if num_proc <= 1 or len(iterable) < parallel_min_length:
--> 443     mapped = [
    444         _single_map_nested((function, obj, types, None, True, None))
    445         for obj in logging.tqdm(iterable, disable=disable_tqdm, desc=desc)
    446     ]
    447 else:
    448     num_proc = num_proc if num_proc <= len(iterable) else len(iterable)

File ~/.../datasets/src/datasets/utils/py_utils.py:444, in <listcomp>(.0)
    441     num_proc = 1
    442 if num_proc <= 1 or len(iterable) < parallel_min_length:
    443     mapped = [
--> 444         _single_map_nested((function, obj, types, None, True, None))
    445         for obj in logging.tqdm(iterable, disable=disable_tqdm, desc=desc)
    446     ]
    447 else:
    448     num_proc = num_proc if num_proc <= len(iterable) else len(iterable)

File ~/.../datasets/src/datasets/utils/py_utils.py:363, in _single_map_nested(args)
    361     return {k: _single_map_nested((function, v, types, None, True, None)) for k, v in pbar}
    362 else:
--> 363     mapped = [_single_map_nested((function, v, types, None, True, None)) for v in pbar]
    364     if isinstance(data_struct, list):
    365         return mapped

File ~/.../datasets/src/datasets/utils/py_utils.py:363, in <listcomp>(.0)
    361     return {k: _single_map_nested((function, v, types, None, True, None)) for k, v in pbar}
    362 else:
--> 363     mapped = [_single_map_nested((function, v, types, None, True, None)) for v in pbar]
    364     if isinstance(data_struct, list):
    365         return mapped

File ~/.../datasets/src/datasets/utils/py_utils.py:346, in _single_map_nested(args)
    344 # Singleton first to spare some computation
    345 if not isinstance(data_struct, dict) and not isinstance(data_struct, types):
--> 346     return function(data_struct)
    348 # Reduce logging to keep things readable in multiprocessing with tqdm
    349 if rank is not None and logging.get_verbosity() < logging.WARNING:

File ~/.../datasets/src/datasets/download/streaming_download_manager.py:1044, in StreamingDownloadManager._extract(self, urlpath)
   1042 def _extract(self, urlpath: str) -> str:
   1043     urlpath = str(urlpath)
-> 1044     protocol = _get_extraction_protocol(urlpath, use_auth_token=self.download_config.use_auth_token)
   1045     # get inner file: zip://train-00000.json.gz::https://foo.bar/data.zip -> zip://train-00000.json.gz
   1046     path = urlpath.split("::")[0]

File ~/.../datasets/src/datasets/download/streaming_download_manager.py:433, in _get_extraction_protocol(urlpath, use_auth_token)
    431 else:
    432     urlpath, kwargs = urlpath, {}
--> 433 with fsspec.open(urlpath, **kwargs) as f:
    434     return _get_extraction_protocol_with_magic_number(f)

File ~/.../lib/python3.8/site-packages/fsspec/core.py:102, in OpenFile.__enter__(self)
     99 def __enter__(self):
    100     mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 102     f = self.fs.open(self.path, mode=mode)
    104     self.fobjects = [f]
    106     if self.compression is not None:

File ~/.../lib/python3.8/site-packages/fsspec/spec.py:1199, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1197 else:
   1198     ac = kwargs.pop("autocommit", not self._intrans)
-> 1199     f = self._open(
   1200         path,
   1201         mode=mode,
   1202         block_size=block_size,
   1203         autocommit=ac,
   1204         cache_options=cache_options,
   1205         **kwargs,
   1206     )
   1207     if compression is not None:
   1208         from fsspec.compression import compr

File ~/.../lib/python3.8/site-packages/s3fs/core.py:659, in S3FileSystem._open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, cache_options, **kwargs)
    656 if cache_type is None:
    657     cache_type = self.default_cache_type
--> 659 return S3File(
    660     self,
    661     path,
    662     mode,
    663     block_size=block_size,
    664     acl=acl,
    665     version_id=version_id,
    666     fill_cache=fill_cache,
    667     s3_additional_kwargs=kw,
    668     cache_type=cache_type,
    669     autocommit=autocommit,
    670     requester_pays=requester_pays,
    671     cache_options=cache_options,
    672 )

File ~/.../lib/python3.8/site-packages/s3fs/core.py:2043, in S3File.__init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays, cache_options)
   2041         self.details = s3.info(path)
   2042         self.version_id = self.details.get("VersionId")
-> 2043 super().__init__(
   2044     s3,
   2045     path,
   2046     mode,
   2047     block_size,
   2048     autocommit=autocommit,
   2049     cache_type=cache_type,
   2050     cache_options=cache_options,
   2051 )
   2052 self.s3 = self.fs  # compatibility
   2054 # when not using autocommit we want to have transactional state to manage

File ~/.../lib/python3.8/site-packages/fsspec/spec.py:1555, in AbstractBufferedFile.__init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, size, **kwargs)
   1553         self.size = size
   1554     else:
-> 1555         self.size = self.details["size"]
   1556     self.cache = caches[cache_type](
   1557         self.blocksize, self._fetch_range, self.size, **cache_options
   1558     )
   1559 else:

File ~/.../lib/python3.8/site-packages/fsspec/spec.py:1568, in AbstractBufferedFile.details(self)
   1565 @property
   1566 def details(self):
   1567     if self._details is None:
-> 1568         self._details = self.fs.info(self.path)
   1569     return self._details

File ~/.../lib/python3.8/site-packages/fsspec/asyn.py:115, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
    112 @functools.wraps(func)
    113 def wrapper(*args, **kwargs):
    114     self = obj or args[0]
--> 115     return sync(self.loop, func, *args, **kwargs)

File ~/.../lib/python3.8/site-packages/fsspec/asyn.py:100, in sync(loop, func, timeout, *args, **kwargs)
     98     raise FSTimeoutError from return_result
     99 elif isinstance(return_result, BaseException):
--> 100     raise return_result
    101 else:
    102     return return_result

File ~/.../lib/python3.8/site-packages/fsspec/asyn.py:55, in _runner(event, coro, result, timeout)
     53     coro = asyncio.wait_for(coro, timeout=timeout)
     54 try:
---> 55     result[0] = await coro
     56 except Exception as ex:
     57     result[0] = ex

File ~/.../lib/python3.8/site-packages/s3fs/core.py:1248, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
   1246 if key:
   1247     try:
-> 1248         out = await self._call_s3(
   1249             "head_object",
   1250             self.kwargs,
   1251             Bucket=bucket,
   1252             Key=key,
   1253             **version_id_kw(version_id),
   1254             **self.req_kw,
   1255         )
   1256         return {
   1257             "ETag": out.get("ETag", ""),
   1258             "LastModified": out["LastModified"],
   (...)
   1264             "ContentType": out.get("ContentType"),
   1265         }
   1266     except FileNotFoundError:

File ~/.../lib/python3.8/site-packages/s3fs/core.py:341, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
    340 async def _call_s3(self, method, *akwarglist, **kwargs):
--> 341     await self.set_session()
    342     s3 = await self.get_s3(kwargs.get("Bucket"))
    343     method = getattr(s3, method)

File ~/.../lib/python3.8/site-packages/s3fs/core.py:502, in S3FileSystem.set_session(self, refresh, kwargs)
    500 conf = AioConfig(**config_kwargs)
    501 if self.session is None:
--> 502     self.session = aiobotocore.session.AioSession(**self.kwargs)
    504 for parameters in (config_kwargs, self.kwargs, init_kwargs, client_kwargs):
    505     for option in ("region_name", "endpoint_url"):

TypeError: __init__() got an unexpected keyword argument 'headers'

@lhoestq
Copy link
Member

lhoestq commented May 12, 2023

Is "data-parquet" a file ? In data_files you should pass the paths to the parquet files (not to a directory). Glob patterns are not supported yet for S3 URLs.

The bug seems to happen because your provided data file has no extension. Because of that it tries to infer it from the file content, but fails because _get_extraction_protocol doesn't support S3 URLs yet.

@Nilabhra
Copy link
Author

@lhoestq
Thank you for your answer. Saving the file with .parquet extension solved the issue! This is really great! Really appreciate all the help!

Let me know if I should close the issue or feel free to close it if you want.

@lhoestq
Copy link
Member

lhoestq commented May 12, 2023

Cool ! I'm glad it worked out :)

Sure feel free to close the issue, since the original question about streaming with load_from_disk has been answered anyway

@TianduoWang
Copy link

As the name says, load_from_disk load the data from your disk. If the data is hosted on S3, it is first downloaded locally and then loaded from your disk.

There is a discussion on streaming data from S3 here though: #5281

Hi @lhoestq,

Thanks for your answer here! I would like to know if it is possible to use load_from_disk from S3 without downloading it locally. For now my dataset is quite large, and my local machine doesn't have such big storage.

@lhoestq
Copy link
Member

lhoestq commented Oct 28, 2024

Hi ! Have you considered hosting your dataset on HF instead ? This way you can use load_dataset with streaming=True (which is not available in load_from_disk which is for memory mapping Arrow files on disk)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants