Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ There are some (relatively obsolete) documents from our exploration of zarr inte
## Storage types

PyActiveStorage is designed to interact with various storage backends.
The storage backend is automatically detected, but can still be specified using the `storage_type` argument to the `Active` constructor.
The storage backend is automatically detected, but can still be specified using the `interface_type` argument to the `Active` constructor.
There are two main integration points for a storage backend:

#. Load netCDF metadata
Expand All @@ -78,7 +78,7 @@ There are two main integration points for a storage backend:
### Local file

The default storage backend is a local file.
To use a local file, use a `storage_type` of `None`, which is its default value.
To use a local file, use a `interface_type` of `None`, which is its default value.
netCDF metadata is loaded using the [netCDF4](https://pypi.org/project/netCDF4/) library.
The chunk reductions are implemented in `activestorage.storage` using NumPy.

Expand All @@ -87,7 +87,7 @@ The chunk reductions are implemented in `activestorage.storage` using NumPy.
We now have support for Active runs with netCDF4 files on S3, from [PR 89](https://github.com/NCAS-CMS/PyActiveStorage/pull/89).
To achieve this we integrate with [Reductionist](https://github.com/stackhpc/reductionist-rs), an S3 Active Storage Server.
Reductionist is typically deployed "near" to an S3-compatible object store and provides an API to perform numerical reductions on object data.
To use Reductionist, use a `storage_type` of `s3`.
To use Reductionist, use a `interface_type` of `s3`.

To load metadata, netCDF files are opened using `s3fs`, with `h5netcdf` used to put the open file (which is nothing more than a memory view of the netCDF file) into an hdf5/netCDF-like object format.
Chunk reductions are implemented in `activestorage.reductionist`, with each operation resulting in an API request to the Reductionist server.
Expand Down
46 changes: 23 additions & 23 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from activestorage.storage import reduce_chunk, reduce_opens3_chunk


def return_storage_type(uri):
def return_interface_type(uri):
"""
Extract the gateway-protocol to infer what type of storage
"""
Expand Down Expand Up @@ -187,7 +187,7 @@ def __init__(self,
dataset: Optional[str | Path | object],
ncvar: str = None,
axis: tuple = None,
storage_type: str = None,
interface_type: str = None,
max_threads: int = 100,
storage_options: dict = None,
active_storage_url: str = None) -> None:
Expand Down Expand Up @@ -218,9 +218,9 @@ def __init__(self,
self.ds = dataset
self.uri = dataset

# determine the storage_type
# determine the interface_type
# based on what we have available
if not storage_type:
if not interface_type:
if not input_variable:
check_uri = self.uri
else:
Expand All @@ -236,20 +236,20 @@ def __init__(self,
else:
check_uri = os.path.join(base_url,
self.ds.id._filename)
storage_type = return_storage_type(check_uri)
interface_type = return_interface_type(check_uri)

# still allow for a passable storage_type
# still allow for a passable interface_type
# for special cases eg "special-POSIX" ie DDN
if not storage_type and storage_options is not None:
storage_type = urllib.parse.urlparse(dataset).scheme
self.storage_type = storage_type
if not interface_type and storage_options is not None:
interface_type = urllib.parse.urlparse(dataset).scheme
self.interface_type = interface_type

# set correct filename attr
if input_variable and not self.storage_type:
if input_variable and not self.interface_type:
self.filename = self.ds
elif input_variable and self.storage_type == "s3":
elif input_variable and self.interface_type == "s3":
self.filename = self.ds.id._filename
elif input_variable and self.storage_type == "https":
elif input_variable and self.interface_type == "https":
self.filename = self.ds

# get storage_options
Expand All @@ -258,7 +258,7 @@ def __init__(self,

# basic check on file
if not input_variable:
if not os.path.isfile(self.uri) and not self.storage_type:
if not os.path.isfile(self.uri) and not self.interface_type:
raise ValueError(
f"Must use existing file for uri. {self.uri} not found")

Expand Down Expand Up @@ -294,11 +294,11 @@ def __load_nc_file(self):
and `_filename` attribute.
"""
ncvar = self.ncvar
if self.storage_type is None:
if self.interface_type is None:
nc = pyfive.File(self.uri)
elif self.storage_type == "s3":
elif self.interface_type == "s3":
nc = load_from_s3(self.uri, self.storage_options)
elif self.storage_type == "https":
elif self.interface_type == "https":
nc = load_from_https(self.uri, self.storage_options)
self.filename = self.uri
self.ds = nc[ncvar]
Expand Down Expand Up @@ -512,7 +512,7 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
out = np.ma.empty(out_shape, dtype=out_dtype, order=ds._order)

# Create a shared session object.
if self.storage_type == "s3" and self._version == 2:
if self.interface_type == "s3" and self._version == 2:
if self.storage_options is not None:
key, secret = None, None
if self.storage_options.get("anon", None) is True:
Expand All @@ -533,7 +533,7 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
session = reductionist.get_session(S3_ACCESS_KEY,
S3_SECRET_KEY,
S3_ACTIVE_STORAGE_CACERT)
elif self.storage_type == "https" and self._version == 2:
elif self.interface_type == "https" and self._version == 2:
username, password = None, None
if self.storage_options is not None:
username = self.storage_options.get("username", None)
Expand Down Expand Up @@ -660,7 +660,7 @@ def _process_chunk(self,
# Axes over which to apply a reduction
axis = self._axis

if self.storage_type == 's3' and self._version == 1:
if self.interface_type == 's3' and self._version == 1:
tmp, count = reduce_opens3_chunk(ds._fh,
offset,
size,
Expand All @@ -674,7 +674,7 @@ def _process_chunk(self,
axis=axis,
method=self.method)

elif self.storage_type == "s3" and self._version == 2:
elif self.interface_type == "s3" and self._version == 2:
# S3: pass in pre-configured storage options (credentials)
parsed_url = urllib.parse.urlparse(self.filename)
bucket = parsed_url.netloc
Expand Down Expand Up @@ -723,7 +723,7 @@ def _process_chunk(self,
chunk_selection,
axis,
operation=self._method)
elif self.storage_type == "https" and self._version == 2:
elif self.interface_type == "https" and self._version == 2:
tmp, count = reductionist.reduce_chunk(session,
self.active_storage_url,
f"{self.uri}",
Expand All @@ -738,9 +738,9 @@ def _process_chunk(self,
chunk_selection,
axis,
operation=self._method,
storage_type="https")
interface_type="https")

elif self.storage_type == 'ActivePosix' and self.version == 2:
elif self.interface_type == 'ActivePosix' and self.version == 2:
# This is where the DDN Fuse and Infinia wrappers go
raise NotImplementedError
else:
Expand Down
10 changes: 5 additions & 5 deletions activestorage/reductionist.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def reduce_chunk(session,
chunk_selection,
axis,
operation,
storage_type=None):
interface_type=None):
"""Perform a reduction on a chunk using Reductionist.

:param server: Reductionist server URL
Expand All @@ -70,7 +70,7 @@ def reduce_chunk(session,
obtained or operated upon.
:param axis: tuple of the axes to be reduced (non-negative integers)
:param operation: name of operation to perform
:param storage_type: optional testing flag to allow HTTPS reduction
:param interface_type: optional testing flag to allow HTTPS reduction
:returns: the reduced data as a numpy array or scalar
:raises ReductionistError: if the request to Reductionist fails
"""
Expand All @@ -86,7 +86,7 @@ def reduce_chunk(session,
order,
chunk_selection,
axis,
storage_type=storage_type)
interface_type=interface_type)
if DEBUG:
print(f"Reductionist request data dictionary: {request_data}")
api_operation = "sum" if operation == "mean" else operation or "select"
Expand Down Expand Up @@ -184,10 +184,10 @@ def build_request_data(url: str,
order,
selection,
axis,
storage_type=None) -> dict:
interface_type=None) -> dict:
"""Build request data for Reductionist API."""
request_data = {
'interface_type': storage_type if storage_type else "s3",
'interface_type': interface_type if interface_type else "s3",
'url': url,
'dtype': dtype.name,
'byte_order': encode_byte_order(dtype),
Expand Down
6 changes: 3 additions & 3 deletions tests/s3_exploratory/test_s3_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_Active_s3_v0():
"""
# run Active on s3 file
s3_file = "s3://pyactivestorage/s3_test_bizarre_large.nc"
active = Active(s3_file, "data", storage_type="s3")
active = Active(s3_file, "data", interface_type="s3")
active._version = 0
active.components = True
result1 = active[0:2, 4:6, 7:9]
Expand All @@ -127,7 +127,7 @@ def test_Active_s3_v1():
"""
# run Active on s3 file
s3_file = "s3://pyactivestorage/s3_test_bizarre_large.nc"
active = Active(s3_file, "data", storage_type="s3")
active = Active(s3_file, "data", interface_type="s3")
active._version = 1
active.method = "mean"
active.components = True
Expand All @@ -140,7 +140,7 @@ def test_Active_s3_v2():
"""
# run Active on s3 file
s3_file = "s3://pyactivestorage/s3_test_bizarre_large.nc"
active = Active(s3_file, "data", storage_type="s3")
active = Active(s3_file, "data", interface_type="s3")
active._version = 2
active.method = "mean"
active.components = True
Expand Down
4 changes: 2 additions & 2 deletions tests/s3_exploratory/test_s3_reduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_Active():
print("S3 file uri", s3_testfile_uri)

# run Active on s3 file
active = Active(s3_testfile_uri, "data", storage_type="s3")
active = Active(s3_testfile_uri, "data", interface_type="s3")
active.method = "mean"
result1 = active[0:2, 4:6, 7:9]
print(result1)
Expand Down Expand Up @@ -116,7 +116,7 @@ def test_with_valid_netCDF_file(test_data_path):
print("S3 file uri", s3_testfile_uri)

# run Active on s3 file
active = Active(s3_testfile_uri, "TREFHT", storage_type="s3")
active = Active(s3_testfile_uri, "TREFHT", interface_type="s3")
active._version = 2
active.method = "mean"
active.components = True
Expand Down
Loading
Loading