This repository was archived by the owner on Aug 6, 2024. It is now read-only.
forked from PyFilesystem/pyfilesystem2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathregistry.py
297 lines (240 loc) · 9.71 KB
/
registry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# coding: utf-8
"""`Registry` class mapping protocols and FS URLs to their `Opener`.
"""
from __future__ import absolute_import, print_function, unicode_literals
import typing
import collections
import contextlib
from importlib.metadata import entry_points
from ..errors import ResourceReadOnly
from .base import Opener
from .errors import EntryPointError, UnsupportedProtocol
from .parse import parse_fs_url
if typing.TYPE_CHECKING:
from typing import Callable, Dict, Iterator, List, Text, Tuple, Type, Union
from ..base import FS
class Registry(object):
"""A registry for `Opener` instances."""
def __init__(self, default_opener="osfs", load_extern=False):
# type: (Text, bool) -> None
"""Create a registry object.
Arguments:
default_opener (str, optional): The protocol to use, if one
is not supplied. The default is to use 'osfs', so that the
FS URL is treated as a system path if no protocol is given.
load_extern (bool, optional): Set to `True` to load openers from
PyFilesystem2 extensions. Defaults to `False`.
"""
self.default_opener = default_opener
self.load_extern = load_extern
self._protocols = {} # type: Dict[Text, Opener]
def __repr__(self):
# type: () -> Text
return "<fs-registry {!r}>".format(self.protocols)
def install(self, opener):
# type: (Union[Type[Opener], Opener, Callable[[], Opener]]) -> Opener
"""Install an opener.
Arguments:
opener (`Opener`): an `Opener` instance, or a callable that
returns an opener instance.
Note:
May be used as a class decorator. For example::
registry = Registry()
@registry.install
class ArchiveOpener(Opener):
protocols = ['zip', 'tar']
"""
_opener = opener if isinstance(opener, Opener) else opener()
assert isinstance(_opener, Opener), "Opener instance required"
assert _opener.protocols, "must list one or more protocols"
for protocol in _opener.protocols:
self._protocols[protocol] = _opener
return _opener
@property
def protocols(self):
# type: () -> List[Text]
"""`list`: the list of supported protocols."""
_protocols = list(self._protocols)
if self.load_extern:
eps = entry_points()
_protocols.extend(
entry_point.name
for entry_point in eps.get("fs.opener") or []
)
_protocols = list(collections.OrderedDict.fromkeys(_protocols))
return _protocols
def get_opener(self, protocol):
# type: (Text) -> Opener
"""Get the opener class associated to a given protocol.
Arguments:
protocol (str): A filesystem protocol.
Returns:
Opener: an opener instance.
Raises:
~fs.opener.errors.UnsupportedProtocol: If no opener
could be found for the given protocol.
EntryPointLoadingError: If the returned entry point
is not an `Opener` subclass or could not be loaded
successfully.
"""
protocol = protocol or self.default_opener
if self.load_extern:
eps = entry_points(group="fs.opener")
entry_point = next(
(ep for ep in eps if ep.name == protocol), None
)
else:
entry_point = None
# If not entry point was loaded from the extensions, try looking
# into the registered protocols
if entry_point is None:
if protocol in self._protocols:
opener_instance = self._protocols[protocol]
else:
raise UnsupportedProtocol(
"protocol '{}' is not supported".format(protocol)
)
# If an entry point was found in an extension, attempt to load it
else:
try:
opener = entry_point.load()
except Exception as exception:
raise EntryPointError(
"could not load entry point; {}".format(exception)
)
if not issubclass(opener, Opener):
raise EntryPointError("entry point did not return an opener")
try:
opener_instance = opener()
except Exception as exception:
raise EntryPointError(
"could not instantiate opener; {}".format(exception)
)
return opener_instance
def open(
self,
fs_url, # type: Text
writeable=True, # type: bool
create=False, # type: bool
cwd=".", # type: Text
default_protocol="osfs", # type: Text
):
# type: (...) -> Tuple[FS, Text]
"""Open a filesystem from a FS URL.
Returns a tuple of a filesystem object and a path. If there is
no path in the FS URL, the path value will be `None`.
Arguments:
fs_url (str): A filesystem URL.
writeable (bool, optional): `True` if the filesystem must be
writeable.
create (bool, optional): `True` if the filesystem should be
created if it does not exist.
cwd (str): The current working directory.
Returns:
(FS, str): a tuple of ``(<filesystem>, <path from url>)``
"""
if "://" not in fs_url:
# URL may just be a path
fs_url = "{}://{}".format(default_protocol, fs_url)
parse_result = parse_fs_url(fs_url)
protocol = parse_result.protocol
open_path = parse_result.path
opener = self.get_opener(protocol)
open_fs = opener.open_fs(fs_url, parse_result, writeable, create, cwd)
return open_fs, open_path
def open_fs(
self,
fs_url, # type: Union[FS, Text]
writeable=False, # type: bool
create=False, # type: bool
cwd=".", # type: Text
default_protocol="osfs", # type: Text
):
# type: (...) -> FS
"""Open a filesystem from a FS URL (ignoring the path component).
Arguments:
fs_url (str): A filesystem URL. If a filesystem instance is
given instead, it will be returned transparently.
writeable (bool, optional): `True` if the filesystem must
be writeable.
create (bool, optional): `True` if the filesystem should be
created if it does not exist.
cwd (str): The current working directory (generally only
relevant for OS filesystems).
default_protocol (str): The protocol to use if one is not
supplied in the FS URL (defaults to ``"osfs"``).
Returns:
~fs.base.FS: A filesystem instance.
Caution:
The ``writeable`` parameter only controls whether the
filesystem *needs* to be writable, which is relevant for
some archive filesystems. Passing ``writeable=False`` will
**not** make the return filesystem read-only. For this,
consider using `fs.wrap.read_only` to wrap the returned
instance.
"""
from ..base import FS
if isinstance(fs_url, FS):
_fs = fs_url
else:
_fs, _path = self.open(
fs_url,
writeable=writeable,
create=create,
cwd=cwd,
default_protocol=default_protocol,
)
return _fs
@contextlib.contextmanager
def manage_fs(
self,
fs_url, # type: Union[FS, Text]
create=False, # type: bool
writeable=False, # type: bool
cwd=".", # type: Text
):
# type: (...) -> Iterator[FS]
"""Get a context manager to open and close a filesystem.
Arguments:
fs_url (FS or str): A filesystem instance or a FS URL.
create (bool, optional): If `True`, then create the filesystem if
it doesn't already exist.
writeable (bool, optional): If `True`, then the filesystem
must be writeable.
cwd (str): The current working directory, if opening a
`~fs.osfs.OSFS`.
Sometimes it is convenient to be able to pass either a FS object
*or* an FS URL to a function. This context manager handles the
required logic for that.
Example:
The `~Registry.manage_fs` method can be used to define a small
utility function::
>>> def print_ls(list_fs):
... '''List a directory.'''
... with manage_fs(list_fs) as fs:
... print(' '.join(fs.listdir()))
This function may be used in two ways. You may either pass
a ``str``, as follows::
>>> print_list('zip://projects.zip')
Or, an filesystem instance::
>>> from fs.osfs import OSFS
>>> projects_fs = OSFS('~/')
>>> print_list(projects_fs)
"""
from ..base import FS
def assert_writeable(fs):
if fs.getmeta().get("read_only", True):
raise ResourceReadOnly(path="/")
if isinstance(fs_url, FS):
if writeable:
assert_writeable(fs_url)
yield fs_url
else:
_fs = self.open_fs(fs_url, create=create, writeable=writeable, cwd=cwd)
if writeable:
assert_writeable(_fs)
try:
yield _fs
finally:
_fs.close()
registry = Registry(load_extern=True)