@@ -30,7 +30,6 @@ def decompress(content: Union[Path, str, bytes], *,
30
30
Already decompressed files are returned as is.
31
31
32
32
Compression type is deduced automatically from the file contents.
33
- File extensions are not validated (as is done in decompress_on_disk()).
34
33
35
34
Parameters
36
35
----------
@@ -73,8 +72,6 @@ def decompress_on_disk(path: Union[Path, str], *, skip_strange_epochs: bool = Fa
73
72
Already decompressed files are ignored.
74
73
75
74
Compression type is deduced automatically from the file contents.
76
- Input file extensions are loosely validated to assign a correct extension
77
- to the decompressed file.
78
75
79
76
Parameters
80
77
----------
@@ -104,7 +101,7 @@ def decompress_on_disk(path: Union[Path, str], *, skip_strange_epochs: bool = Fa
104
101
"""
105
102
path = Path (path )
106
103
is_obs , txt = _decompress (path .read_bytes (), skip_strange_epochs = skip_strange_epochs )
107
- out_path = get_decompressed_path (path , is_obs )
104
+ out_path = get_decompressed_path (path )
108
105
if out_path == path :
109
106
# file does not need decompressing
110
107
return out_path
@@ -113,16 +110,13 @@ def decompress_on_disk(path: Union[Path, str], *, skip_strange_epochs: bool = Fa
113
110
return out_path
114
111
115
112
116
- def get_decompressed_path (path : Union [Path , str ], is_obs : bool ) -> Path :
113
+ def get_decompressed_path (path : Union [Path , str ]) -> Path :
117
114
"""Get the decompressed path corresponding to a compressed RINEX file after decompression.
118
115
119
116
Parameters
120
117
----------
121
118
path : path or str
122
- Path to the compression RINEX file.
123
- is_obs : bool
124
- Whether the RINEX file contains observation data.
125
- Needed for correct renaming of Hatanaka-compressed files.
119
+ Path to the compressed RINEX file.
126
120
127
121
Returns
128
122
-------
@@ -136,18 +130,14 @@ def get_decompressed_path(path: Union[Path, str], is_obs: bool) -> Path:
136
130
if parts [- 1 ].lower () in ['z' , 'gz' , 'bz2' , 'zip' ]:
137
131
parts .pop ()
138
132
suffix = parts [- 1 ]
139
- if is_obs :
140
- is_valid = re .match (r'^(?:crx|rnx|\d\d[od])$' , suffix , flags = re .I )
141
- if not is_valid :
142
- raise ValueError (f"'{ str (path )} ' is not a valid RINEX file name" )
143
- if suffix [2 ] == 'd' :
144
- suffix = suffix [:2 ] + 'o'
145
- elif suffix [2 ] == 'D' :
146
- suffix = suffix [:2 ] + 'O'
147
- elif suffix == 'crx' :
148
- suffix = 'rnx'
149
- elif suffix == 'CRX' :
150
- suffix = 'RNX'
133
+ if re .fullmatch (r'\d\dd' , suffix ):
134
+ suffix = suffix [:2 ] + 'o'
135
+ elif re .fullmatch (r'\d\dD' , suffix ):
136
+ suffix = suffix [:2 ] + 'O'
137
+ elif suffix == 'crx' :
138
+ suffix = 'rnx'
139
+ elif suffix == 'CRX' :
140
+ suffix = 'RNX'
151
141
out_path = path .parent / '.' .join (parts [:- 1 ] + [suffix ])
152
142
return out_path
153
143
@@ -241,16 +231,17 @@ def compress_on_disk(path: Union[Path, str], *, compression: str = 'gz',
241
231
return out_path
242
232
243
233
244
- def get_compressed_path (path , is_obs , compression = 'gz' ):
234
+ def get_compressed_path (path , is_obs = None , compression = 'gz' ):
245
235
"""Get the compressed path corresponding to a RINEX file after compression.
246
236
247
237
Parameters
248
238
----------
249
239
path : path or str
250
- Path to the compression RINEX file.
251
- is_obs : bool
240
+ Path to the RINEX file being compressed .
241
+ is_obs : bool, optional
252
242
Whether the RINEX file contains observation data.
253
- Needed for correct renaming of Hatanaka-compressed files.
243
+ Needed for correct renaming of files with .rnx suffix,
244
+ which will be Hatanaka-compressed if they contain observation data.
254
245
compression : 'gz' (default), 'bz2', or 'none'
255
246
Compression (if any) applied in addition to the Hatanaka compression.
256
247
@@ -264,18 +255,19 @@ def get_compressed_path(path, is_obs, compression='gz'):
264
255
if len (parts ) < 2 :
265
256
raise ValueError (f"'{ str (path )} ' is not a valid RINEX file name" )
266
257
suffix = parts [- 1 ]
267
- if is_obs :
268
- is_valid = re .match (r'^(?:crx|rnx|\d\d[od])$' , suffix , flags = re .I )
269
- if not is_valid :
270
- raise ValueError (f"'{ str (path )} ' is not a valid RINEX file name" )
271
- if suffix [2 ] == 'o' :
272
- suffix = suffix [:2 ] + 'd'
273
- elif suffix [2 ] == 'O' :
274
- suffix = suffix [:2 ] + 'D'
275
- elif suffix == 'rnx' :
276
- suffix = 'crx'
277
- elif suffix == 'RNX' :
278
- suffix = 'CRX'
258
+ if re .fullmatch (r'\d\do' , suffix ):
259
+ suffix = suffix [:2 ] + 'd'
260
+ elif re .fullmatch (r'\d\dO' , suffix ):
261
+ suffix = suffix [:2 ] + 'D'
262
+ elif suffix .lower () == 'rnx' :
263
+ if is_obs is None :
264
+ raise ValueError (f'whether { path .name } contains observation data is ambiguous, '
265
+ 'need to specify is_obs argument' )
266
+ elif is_obs :
267
+ if suffix == 'RNX' :
268
+ suffix = 'CRX'
269
+ else :
270
+ suffix = 'crx'
279
271
out_parts = parts [:- 1 ] + [suffix ]
280
272
if compression != 'none' :
281
273
out_parts .append (compression )
0 commit comments