-
-
Notifications
You must be signed in to change notification settings - Fork 329
obstore
-based Store implementation
#1661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
TomAugspurger
merged 125 commits into
zarr-developers:main
from
kylebarron:kyle/object-store
Mar 24, 2025
Merged
Changes from 25 commits
Commits
Show all changes
125 commits
Select commit
Hold shift + click to select a range
14be826
Initial object-store implementation
kylebarron a492bf0
Merge branch 'v3' into kyle/object-store
d-v-b 50b6c47
Merge branch 'v3' into kyle/object-store
jhamman afa79af
Update src/zarr/v3/store/object_store.py
kylebarron c466f9f
Merge branch 'main' into kyle/object-store
kylebarron c3e7296
Merge branch 'main' into kyle/object-store
kylebarron f5c884b
update
kylebarron af2a39b
Handle list streams
kylebarron d7cfbee
Update get
kylebarron cb40015
wip refactor get_partial_values
kylebarron 619df43
Merge branch 'main' into kyle/object-store
kylebarron b976450
Fixes to _get_partial_values
kylebarron cca70d7
Merge branch 'main' into kyle/object-store
kylebarron f2c827d
Fix constructing prototype from get
kylebarron 5c8903f
lint
kylebarron 50e1dec
Merge branch 'main' into kyle/object-store
kylebarron 8bb252e
Add docstring
kylebarron 559eafd
Make names private
kylebarron 5486e69
Implement eq
kylebarron 9a05c01
Add obstore as a test dep
maxrjones 56b7a0b
Run store tests on ObjectStore
maxrjones d5d0d4d
Merge pull request #1 from maxrjones/object-store-tests
kylebarron b38ada1
import or skip
kylebarron ab00b46
Bump obstore beta version
kylebarron 9c65e4d
bump pre-commit
kylebarron 77d7c12
Add read_only param for __init__
kylebarron 4418426
Bump obstore
maxrjones 7a71174
Add fixtures for object store tests
maxrjones e73bcc9
Cast return from __eq__ as bool
maxrjones c2cd6b8
Avoid recursion error on repr
maxrjones a95ec59
Check store type at runtime
maxrjones fb8b16d
Merge pull request #2 from maxrjones/update-tests
kylebarron ca261b1
Check if store is writable for setting or deleting objects
maxrjones 0eb416a
Add test for object store repr
maxrjones 247432f
Add attribute tests
maxrjones 4b31b3c
Add get and set methods to test class
maxrjones d49d1ff
Raise an exeption for previously set key
maxrjones c2ebc8f
Update src/zarr/testing/store.py
maxrjones eb76698
Update _transform_list_dir to not remove all items
maxrjones f310260
Return bytes from GetResult
maxrjones 86951b8
Don't raise an exception on set_if_not_exists
maxrjones f989884
Remove test that stores empty file
maxrjones 40e1b25
Handle None as start or end of byte range request
maxrjones 3aa3578
Merge pull request #3 from maxrjones/check_writable
kylebarron 6da7976
Merge branch 'main' into object-store-update
maxrjones 26fa37e
Use new ByteRequest syntax
maxrjones 315e22e
Raise not implemented error on pickling
maxrjones 264eac6
Merge pull request #4 from maxrjones/object-store-update
kylebarron fc93029
Bump obstore
maxrjones 1b9f9f2
Catch allowed exceptions
maxrjones 72c9b30
Merge pull request #5 from maxrjones/object-store
kylebarron 0f8820d
Merge branch 'main' into kyle/object-store
kylebarron f1a0534
Merge branch 'main' into kyle/object-store
maxrjones b18de38
WIP
maxrjones 6976738
Fix list dir
kylebarron 65f2db4
Merge branch 'main' into kyle/object-store
kylebarron ebf7be1
Merge branch 'kyle/object-store' into kyle/fix-transform-list-dir
kylebarron 72bf9f4
Merge pull request #6 from kylebarron/kyle/fix-transform-list-dir
kylebarron f287780
Fix failing tests
maxrjones 08b7771
Make module structure consistent with other stores
maxrjones 8038b94
Remove override of pytest parameterization
maxrjones b65d439
Use store prefix
kylebarron 1fa0125
Remove xfail for pickle
kylebarron b0b9d56
Restore pickle xfail
kylebarron aa5bdac
Mark xfail
maxrjones 01de03d
Mark xfails
maxrjones fdd7f8e
Serialization should now be supported by obstore
maxrjones 73fa459
Update equality
maxrjones 6af1e85
Bump obstore
maxrjones f4bb423
Fix serialization
maxrjones 1f7df52
Improve equality checking
maxrjones fc728a4
Merge pull request #8 from maxrjones/obstore-xfails
kylebarron 12ccb49
Update src/zarr/storage/_object.py
kylebarron 25676a3
Update src/zarr/storage/_object.py
kylebarron 4d15e43
update __eq__
kylebarron 9a25057
Fix serialization
maxrjones 9e90a02
Merge pull request #9 from maxrjones/fix-serialization
kylebarron 6f5f960
Expand test coverage
maxrjones d3ce999
Merge pull request #10 from maxrjones/improve-test-coverage
kylebarron 0fab774
Merge branch 'main' into kyle/object-store
jhamman 8a01257
Add docs
maxrjones 9aa6260
Merge pull request #12 from maxrjones/add-docs
kylebarron cdd956f
Bump to 0.4.0
kylebarron 5cc3508
Satisfy mypy
kylebarron c06467c
Remove re-export from `__init__.py`
kylebarron 9a2ae39
fix imports
kylebarron b8bb0c3
Add obstore to upstream env
maxrjones 2978ce7
Update pyproject.toml
maxrjones deda2d7
Merge pull request #13 from maxrjones/patch-2
kylebarron 4465402
Import obstore inside class
kylebarron ba09040
Merge branch 'main' into kyle/object-store
kylebarron 6599d10
remove typo
kylebarron a0599b7
Merge branch 'main' into kyle/object-store
kylebarron 8c3a6f2
Merge branch 'main' into kyle/object-store
kylebarron 46efa02
Add S3 example for ObjectStore
maxrjones 43d2cb2
Add changelog entry
maxrjones 35a9819
Update docs/user-guide/storage.rst
kylebarron 9c259d2
Merge pull request #15 from maxrjones/obstore-remote-docs
kylebarron ce824a3
Bump obstore
maxrjones 46e43c6
Merge branch 'main' into kyle/object-store
maxrjones d69c0c9
Merge pull request #17 from maxrjones/bump-obstore
kylebarron f87e4d9
Simplify equality checking
maxrjones d535e50
Update syntax
maxrjones 2e727cd
Remove unnecessary check
maxrjones 3fa8b4e
Merge pull request #18 from maxrjones/simpler-eq
kylebarron fa211ce
Add stateful test.
dcherian 008c3f3
Update src/zarr/storage/_obstore.py
kylebarron 1d177c1
Update src/zarr/storage/_obstore.py
kylebarron 0735a43
Switch lstrip to removeprefix
kylebarron 6ce2258
remove test xfail
kylebarron c45c7d1
Add semaphore
kylebarron 50fd2b9
Remove buffer type runtime check
kylebarron 76b5f3f
fix pre-commit
kylebarron a767c95
fix pre-commit
kylebarron cd7b03c
Merge branch 'main' into kyle/object-store
kylebarron 7d415f5
Simpler rough obstore store check
kylebarron 0f1092a
Update get parameterization
kylebarron 4dffea5
Don't exact pin
kylebarron bcbe25e
Add partially satisiable ranges
kylebarron 6dc684d
fix doctest
kylebarron f84284e
Remove partially satisfiable tests (fails on fsspec)
kylebarron 8912e43
fix tests?
kylebarron bfb70cd
fix repr
kylebarron a3afa44
Obstore interlinking
kylebarron 622dbf2
Merge branch 'main' into kyle/object-store
dcherian File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,6 +70,7 @@ test = [ | |
"mypy", | ||
"hypothesis", | ||
"universal-pathlib", | ||
"obstore==0.3.0b8", | ||
] | ||
|
||
jupyter = [ | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,306 @@ | ||||||||||||||||||||
from __future__ import annotations | ||||||||||||||||||||
|
||||||||||||||||||||
import asyncio | ||||||||||||||||||||
from collections import defaultdict | ||||||||||||||||||||
from collections.abc import Iterable | ||||||||||||||||||||
from typing import TYPE_CHECKING, Any, TypedDict | ||||||||||||||||||||
|
||||||||||||||||||||
import obstore as obs | ||||||||||||||||||||
|
||||||||||||||||||||
from zarr.abc.store import ByteRangeRequest, Store | ||||||||||||||||||||
from zarr.core.buffer import Buffer | ||||||||||||||||||||
from zarr.core.buffer.core import BufferPrototype | ||||||||||||||||||||
|
||||||||||||||||||||
if TYPE_CHECKING: | ||||||||||||||||||||
from collections.abc import AsyncGenerator, Coroutine, Iterable | ||||||||||||||||||||
from typing import Any | ||||||||||||||||||||
|
||||||||||||||||||||
from obstore import ListStream, ObjectMeta, OffsetRange, SuffixRange | ||||||||||||||||||||
from obstore.store import ObjectStore as _ObjectStore | ||||||||||||||||||||
|
||||||||||||||||||||
from zarr.core.buffer import Buffer, BufferPrototype | ||||||||||||||||||||
from zarr.core.common import BytesLike | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
class ObjectStore(Store): | ||||||||||||||||||||
"""A Zarr store that uses obstore for fast read/write from AWS, GCP, and Azure. | ||||||||||||||||||||
|
||||||||||||||||||||
Parameters | ||||||||||||||||||||
---------- | ||||||||||||||||||||
store : obstore.store.ObjectStore | ||||||||||||||||||||
An obstore store instance that is set up with the proper credentials. | ||||||||||||||||||||
""" | ||||||||||||||||||||
|
||||||||||||||||||||
store: _ObjectStore | ||||||||||||||||||||
"""The underlying obstore instance.""" | ||||||||||||||||||||
|
||||||||||||||||||||
def __eq__(self, value: object) -> bool: | ||||||||||||||||||||
if not isinstance(value, ObjectStore): | ||||||||||||||||||||
return False | ||||||||||||||||||||
|
||||||||||||||||||||
return self.store.__eq__(value.store) | ||||||||||||||||||||
|
||||||||||||||||||||
def __init__(self, store: _ObjectStore) -> None: | ||||||||||||||||||||
self.store = store | ||||||||||||||||||||
|
||||||||||||||||||||
def __str__(self) -> str: | ||||||||||||||||||||
return f"object://{self.store}" | ||||||||||||||||||||
|
||||||||||||||||||||
def __repr__(self) -> str: | ||||||||||||||||||||
return f"ObjectStore({self!r})" | ||||||||||||||||||||
|
||||||||||||||||||||
async def get( | ||||||||||||||||||||
self, key: str, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None | ||||||||||||||||||||
) -> Buffer: | ||||||||||||||||||||
if byte_range is None: | ||||||||||||||||||||
resp = await obs.get_async(self.store, key) | ||||||||||||||||||||
return prototype.buffer.from_bytes(await resp.bytes_async()) | ||||||||||||||||||||
|
||||||||||||||||||||
start, end = byte_range | ||||||||||||||||||||
if start is not None and end is not None: | ||||||||||||||||||||
resp = await obs.get_range_async(self.store, key, start=start, end=end) | ||||||||||||||||||||
return prototype.buffer.from_bytes(memoryview(resp)) | ||||||||||||||||||||
elif start is not None: | ||||||||||||||||||||
if start >= 0: | ||||||||||||||||||||
# Offset request | ||||||||||||||||||||
resp = await obs.get_async(self.store, key, options={"range": {"offset": start}}) | ||||||||||||||||||||
else: | ||||||||||||||||||||
resp = await obs.get_async(self.store, key, options={"range": {"suffix": start}}) | ||||||||||||||||||||
|
||||||||||||||||||||
return prototype.buffer.from_bytes(await resp.bytes_async()) | ||||||||||||||||||||
else: | ||||||||||||||||||||
raise ValueError(f"Unexpected input to `get`: {start=}, {end=}") | ||||||||||||||||||||
|
||||||||||||||||||||
async def get_partial_values( | ||||||||||||||||||||
self, | ||||||||||||||||||||
prototype: BufferPrototype, | ||||||||||||||||||||
key_ranges: Iterable[tuple[str, ByteRangeRequest]], | ||||||||||||||||||||
) -> list[Buffer | None]: | ||||||||||||||||||||
return await _get_partial_values(self.store, prototype=prototype, key_ranges=key_ranges) | ||||||||||||||||||||
|
||||||||||||||||||||
async def exists(self, key: str) -> bool: | ||||||||||||||||||||
try: | ||||||||||||||||||||
await obs.head_async(self.store, key) | ||||||||||||||||||||
except FileNotFoundError: | ||||||||||||||||||||
return False | ||||||||||||||||||||
else: | ||||||||||||||||||||
return True | ||||||||||||||||||||
|
||||||||||||||||||||
@property | ||||||||||||||||||||
def supports_writes(self) -> bool: | ||||||||||||||||||||
return True | ||||||||||||||||||||
|
||||||||||||||||||||
async def set(self, key: str, value: Buffer) -> None: | ||||||||||||||||||||
buf = value.to_bytes() | ||||||||||||||||||||
await obs.put_async(self.store, key, buf) | ||||||||||||||||||||
|
||||||||||||||||||||
async def set_if_not_exists(self, key: str, value: Buffer) -> None: | ||||||||||||||||||||
buf = value.to_bytes() | ||||||||||||||||||||
await obs.put_async(self.store, key, buf, mode="create") | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks Joe, pre-commit recommended a slightly different version (kylebarron@86951b8) but this fixed one of the failing tests |
||||||||||||||||||||
|
||||||||||||||||||||
@property | ||||||||||||||||||||
def supports_deletes(self) -> bool: | ||||||||||||||||||||
return True | ||||||||||||||||||||
|
||||||||||||||||||||
async def delete(self, key: str) -> None: | ||||||||||||||||||||
await obs.delete_async(self.store, key) | ||||||||||||||||||||
|
||||||||||||||||||||
@property | ||||||||||||||||||||
def supports_partial_writes(self) -> bool: | ||||||||||||||||||||
return False | ||||||||||||||||||||
|
||||||||||||||||||||
async def set_partial_values( | ||||||||||||||||||||
self, key_start_values: Iterable[tuple[str, int, BytesLike]] | ||||||||||||||||||||
) -> None: | ||||||||||||||||||||
raise NotImplementedError | ||||||||||||||||||||
|
||||||||||||||||||||
@property | ||||||||||||||||||||
def supports_listing(self) -> bool: | ||||||||||||||||||||
return True | ||||||||||||||||||||
|
||||||||||||||||||||
def list(self) -> AsyncGenerator[str, None]: | ||||||||||||||||||||
objects: ListStream[list[ObjectMeta]] = obs.list(self.store) | ||||||||||||||||||||
return _transform_list(objects) | ||||||||||||||||||||
|
||||||||||||||||||||
def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: | ||||||||||||||||||||
objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) | ||||||||||||||||||||
return _transform_list(objects) | ||||||||||||||||||||
|
||||||||||||||||||||
def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: | ||||||||||||||||||||
objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) | ||||||||||||||||||||
return _transform_list_dir(objects, prefix) | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
async def _transform_list( | ||||||||||||||||||||
list_stream: AsyncGenerator[list[ObjectMeta], None], | ||||||||||||||||||||
) -> AsyncGenerator[str, None]: | ||||||||||||||||||||
async for batch in list_stream: | ||||||||||||||||||||
for item in batch: | ||||||||||||||||||||
yield item["path"] | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
async def _transform_list_dir( | ||||||||||||||||||||
list_stream: AsyncGenerator[list[ObjectMeta], None], prefix: str | ||||||||||||||||||||
) -> AsyncGenerator[str, None]: | ||||||||||||||||||||
# We assume that the underlying object-store implementation correctly handles the | ||||||||||||||||||||
# prefix, so we don't double-check that the returned results actually start with the | ||||||||||||||||||||
# given prefix. | ||||||||||||||||||||
prefix_len = len(prefix) | ||||||||||||||||||||
async for batch in list_stream: | ||||||||||||||||||||
for item in batch: | ||||||||||||||||||||
# Yield this item if "/" does not exist after the prefix. | ||||||||||||||||||||
if "/" not in item["path"][prefix_len:]: | ||||||||||||||||||||
yield item["path"] | ||||||||||||||||||||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
class _BoundedRequest(TypedDict): | ||||||||||||||||||||
"""Range request with a known start and end byte. | ||||||||||||||||||||
|
||||||||||||||||||||
These requests can be multiplexed natively on the Rust side with | ||||||||||||||||||||
`obstore.get_ranges_async`. | ||||||||||||||||||||
""" | ||||||||||||||||||||
|
||||||||||||||||||||
original_request_index: int | ||||||||||||||||||||
"""The positional index in the original key_ranges input""" | ||||||||||||||||||||
|
||||||||||||||||||||
start: int | ||||||||||||||||||||
"""Start byte offset.""" | ||||||||||||||||||||
|
||||||||||||||||||||
end: int | ||||||||||||||||||||
"""End byte offset.""" | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
class _OtherRequest(TypedDict): | ||||||||||||||||||||
"""Offset or suffix range requests. | ||||||||||||||||||||
|
||||||||||||||||||||
These requests cannot be concurrent on the Rust side, and each need their own call | ||||||||||||||||||||
to `obstore.get_async`, passing in the `range` parameter. | ||||||||||||||||||||
""" | ||||||||||||||||||||
|
||||||||||||||||||||
original_request_index: int | ||||||||||||||||||||
"""The positional index in the original key_ranges input""" | ||||||||||||||||||||
|
||||||||||||||||||||
path: str | ||||||||||||||||||||
"""The path to request from.""" | ||||||||||||||||||||
|
||||||||||||||||||||
range: OffsetRange | SuffixRange | ||||||||||||||||||||
"""The range request type.""" | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
class _Response(TypedDict): | ||||||||||||||||||||
"""A response buffer associated with the original index that it should be restored to.""" | ||||||||||||||||||||
|
||||||||||||||||||||
original_request_index: int | ||||||||||||||||||||
"""The positional index in the original key_ranges input""" | ||||||||||||||||||||
|
||||||||||||||||||||
buffer: Buffer | ||||||||||||||||||||
"""The buffer returned from obstore's range request.""" | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
async def _make_bounded_requests( | ||||||||||||||||||||
store: obs.store.ObjectStore, | ||||||||||||||||||||
path: str, | ||||||||||||||||||||
requests: list[_BoundedRequest], | ||||||||||||||||||||
prototype: BufferPrototype, | ||||||||||||||||||||
) -> list[_Response]: | ||||||||||||||||||||
"""Make all bounded requests for a specific file. | ||||||||||||||||||||
|
||||||||||||||||||||
`obstore.get_ranges_async` allows for making concurrent requests for multiple ranges | ||||||||||||||||||||
within a single file, and will e.g. merge concurrent requests. This only uses one | ||||||||||||||||||||
single Python coroutine. | ||||||||||||||||||||
""" | ||||||||||||||||||||
|
||||||||||||||||||||
starts = [r["start"] for r in requests] | ||||||||||||||||||||
ends = [r["end"] for r in requests] | ||||||||||||||||||||
responses = await obs.get_ranges_async(store, path=path, starts=starts, ends=ends) | ||||||||||||||||||||
|
||||||||||||||||||||
buffer_responses: list[_Response] = [] | ||||||||||||||||||||
for request, response in zip(requests, responses, strict=True): | ||||||||||||||||||||
buffer_responses.append( | ||||||||||||||||||||
{ | ||||||||||||||||||||
"original_request_index": request["original_request_index"], | ||||||||||||||||||||
"buffer": prototype.buffer.from_bytes(memoryview(response)), | ||||||||||||||||||||
} | ||||||||||||||||||||
) | ||||||||||||||||||||
|
||||||||||||||||||||
return buffer_responses | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
async def _make_other_request( | ||||||||||||||||||||
store: obs.store.ObjectStore, | ||||||||||||||||||||
request: _OtherRequest, | ||||||||||||||||||||
prototype: BufferPrototype, | ||||||||||||||||||||
) -> list[_Response]: | ||||||||||||||||||||
"""Make suffix or offset requests. | ||||||||||||||||||||
|
||||||||||||||||||||
We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all | ||||||||||||||||||||
futures can be gathered together. | ||||||||||||||||||||
""" | ||||||||||||||||||||
resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) | ||||||||||||||||||||
buffer = await resp.bytes_async() | ||||||||||||||||||||
return [ | ||||||||||||||||||||
{ | ||||||||||||||||||||
"original_request_index": request["original_request_index"], | ||||||||||||||||||||
"buffer": prototype.buffer.from_bytes(buffer), | ||||||||||||||||||||
} | ||||||||||||||||||||
] | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
async def _get_partial_values( | ||||||||||||||||||||
store: obs.store.ObjectStore, | ||||||||||||||||||||
prototype: BufferPrototype, | ||||||||||||||||||||
key_ranges: Iterable[tuple[str, ByteRangeRequest]], | ||||||||||||||||||||
) -> list[Buffer | None]: | ||||||||||||||||||||
"""Make multiple range requests. | ||||||||||||||||||||
|
||||||||||||||||||||
ObjectStore has a `get_ranges` method that will additionally merge nearby ranges, | ||||||||||||||||||||
but it's _per_ file. So we need to split these key_ranges into **per-file** key | ||||||||||||||||||||
ranges, and then reassemble the results in the original order. | ||||||||||||||||||||
|
||||||||||||||||||||
We separate into different requests: | ||||||||||||||||||||
|
||||||||||||||||||||
- One call to `obstore.get_ranges_async` **per target file** | ||||||||||||||||||||
- One call to `obstore.get_async` for each other request. | ||||||||||||||||||||
""" | ||||||||||||||||||||
key_ranges = list(key_ranges) | ||||||||||||||||||||
per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list) | ||||||||||||||||||||
other_requests: list[_OtherRequest] = [] | ||||||||||||||||||||
|
||||||||||||||||||||
for idx, (path, (start, end)) in enumerate(key_ranges): | ||||||||||||||||||||
if start is None: | ||||||||||||||||||||
raise ValueError("Cannot pass `None` for the start of the range request.") | ||||||||||||||||||||
|
||||||||||||||||||||
if end is not None: | ||||||||||||||||||||
# This is a bounded request with known start and end byte. | ||||||||||||||||||||
per_file_bounded_requests[path].append( | ||||||||||||||||||||
{"original_request_index": idx, "start": start, "end": end} | ||||||||||||||||||||
) | ||||||||||||||||||||
elif start < 0: | ||||||||||||||||||||
# Suffix request from the end | ||||||||||||||||||||
other_requests.append( | ||||||||||||||||||||
{"original_request_index": idx, "path": path, "range": {"suffix": abs(start)}} | ||||||||||||||||||||
) | ||||||||||||||||||||
elif start >= 0: | ||||||||||||||||||||
# Offset request to the end | ||||||||||||||||||||
other_requests.append( | ||||||||||||||||||||
{"original_request_index": idx, "path": path, "range": {"offset": start}} | ||||||||||||||||||||
) | ||||||||||||||||||||
else: | ||||||||||||||||||||
raise ValueError(f"Unsupported range input: {start=}, {end=}") | ||||||||||||||||||||
|
||||||||||||||||||||
futs: list[Coroutine[Any, Any, list[_Response]]] = [] | ||||||||||||||||||||
for path, bounded_ranges in per_file_bounded_requests.items(): | ||||||||||||||||||||
futs.append(_make_bounded_requests(store, path, bounded_ranges, prototype)) | ||||||||||||||||||||
|
||||||||||||||||||||
for request in other_requests: | ||||||||||||||||||||
futs.append(_make_other_request(store, request, prototype)) # noqa: PERF401 | ||||||||||||||||||||
|
||||||||||||||||||||
buffers: list[Buffer | None] = [None] * len(key_ranges) | ||||||||||||||||||||
|
||||||||||||||||||||
# TODO: this gather a list of list of Response; not sure if there's a way to | ||||||||||||||||||||
# unpack these lists inside of an `asyncio.gather`? | ||||||||||||||||||||
for responses in await asyncio.gather(*futs): | ||||||||||||||||||||
for resp in responses: | ||||||||||||||||||||
buffers[resp["original_request_index"]] = resp["buffer"] | ||||||||||||||||||||
|
||||||||||||||||||||
return buffers |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# ruff: noqa: E402 | ||
import pytest | ||
|
||
pytest.importorskip("obstore") | ||
|
||
from zarr.core.buffer import cpu | ||
from zarr.storage.object_store import ObjectStore | ||
from zarr.testing.store import StoreTests | ||
|
||
|
||
class TestObjectStore(StoreTests[ObjectStore, cpu.Buffer]): | ||
store_cls = ObjectStore |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.