19
19
20
20
21
21
def decompress (content : Union [Path , str , bytes ], * ,
22
- skip_strange_epochs : bool = False ) -> bytes :
22
+ skip_strange_epochs : bool = False , strict : bool = False ) -> bytes :
23
23
"""Decompress compressed RINEX files.
24
24
25
25
Any RINEX files compressed with Hatanaka compression (.crx|.##d) and/or with a conventional
@@ -41,6 +41,8 @@ def decompress(content: Union[Path, str, bytes], *,
41
41
Using this together with of reinit_every_nth option of rnx2crx may be effective.
42
42
Caution: It is assumed that no change in the list of data types happens in the
43
43
lost part of the data.
44
+ strict : bool, default False
45
+ If True, a ValueError is raised if the decoded file is not RINEX.
44
46
45
47
Returns
46
48
-------
@@ -55,14 +57,14 @@ def decompress(content: Union[Path, str, bytes], *,
55
57
For invalid file contents.
56
58
"""
57
59
if isinstance (content , (Path , str )):
58
- content = _decompress (Path (content ).read_bytes (), skip_strange_epochs )[1 ]
60
+ content = _decompress (Path (content ).read_bytes (), skip_strange_epochs , strict )[1 ]
59
61
elif not isinstance (content , bytes ):
60
62
raise ValueError ('input must be either a path or a binary string' )
61
- return _decompress (content , skip_strange_epochs )[1 ]
63
+ return _decompress (content , skip_strange_epochs , strict )[1 ]
62
64
63
65
64
66
def decompress_on_disk (path : Union [Path , str ], * , delete : bool = False ,
65
- skip_strange_epochs : bool = False ) -> Path :
67
+ skip_strange_epochs : bool = False , strict : bool = False ) -> Path :
66
68
"""Decompress compressed RINEX files and write the resulting file to disk.
67
69
68
70
Any RINEX files compressed with Hatanaka compression (.crx|.##d) and/or with a conventional
@@ -86,6 +88,8 @@ def decompress_on_disk(path: Union[Path, str], *, delete: bool = False,
86
88
Using this together with of reinit_every_nth option of rnx2crx may be effective.
87
89
Caution: It is assumed that no change in the list of data types happens in the
88
90
lost part of the data.
91
+ strict : bool, default False
92
+ If True, a ValueError is raised if the decoded file is not RINEX.
89
93
90
94
Returns
91
95
-------
@@ -101,7 +105,7 @@ def decompress_on_disk(path: Union[Path, str], *, delete: bool = False,
101
105
"""
102
106
path = Path (path )
103
107
with _record_warnings () as warning_list :
104
- is_obs , txt = _decompress (path .read_bytes (), skip_strange_epochs = skip_strange_epochs )
108
+ is_obs , txt = _decompress (path .read_bytes (), skip_strange_epochs , strict )
105
109
out_path = get_decompressed_path (path )
106
110
if out_path == path :
107
111
# file does not need decompressing
@@ -307,15 +311,15 @@ def _is_bz2(magic_bytes: bytes) -> bool:
307
311
return magic_bytes == b'\x42 \x5A '
308
312
309
313
310
- def _decompress (txt : bytes , skip_strange_epochs : bool ) -> (bool , bytes ):
314
+ def _decompress (txt : bytes , skip_strange_epochs : bool , strict : bool ) -> (bool , bytes ):
311
315
if len (txt ) < 2 :
312
316
raise ValueError ('empty file' )
313
317
magic_bytes = txt [:2 ]
314
318
315
319
if _is_gz (magic_bytes ):
316
- return _decompress_hatanaka (gzip .decompress (txt ), skip_strange_epochs )
320
+ return _decompress_hatanaka (gzip .decompress (txt ), skip_strange_epochs , strict )
317
321
if _is_bz2 (magic_bytes ):
318
- return _decompress_hatanaka (bz2 .decompress (txt ), skip_strange_epochs )
322
+ return _decompress_hatanaka (bz2 .decompress (txt ), skip_strange_epochs , strict )
319
323
elif _is_zip (magic_bytes ):
320
324
with zipfile .ZipFile (BytesIO (txt ), 'r' ) as z :
321
325
flist = z .namelist ()
@@ -324,21 +328,21 @@ def _decompress(txt: bytes, skip_strange_epochs: bool) -> (bool, bytes):
324
328
elif len (flist ) > 1 :
325
329
raise ValueError ('more than one file in zip archive' )
326
330
with z .open (flist [0 ], 'r' ) as f :
327
- return _decompress_hatanaka (f .read (), skip_strange_epochs )
331
+ return _decompress_hatanaka (f .read (), skip_strange_epochs , strict )
328
332
elif _is_lzw (magic_bytes ):
329
- return _decompress_hatanaka (lzw .decompress (txt ), skip_strange_epochs )
333
+ return _decompress_hatanaka (lzw .decompress (txt ), skip_strange_epochs , strict )
330
334
else :
331
- return _decompress_hatanaka (txt , skip_strange_epochs )
335
+ return _decompress_hatanaka (txt , skip_strange_epochs , strict )
332
336
333
337
334
- def _decompress_hatanaka (txt : bytes , skip_strange_epochs ) -> (bool , bytes ):
338
+ def _decompress_hatanaka (txt : bytes , skip_strange_epochs , strict ) -> (bool , bytes ):
335
339
if len (txt ) < 80 :
336
340
raise ValueError ('file is too short to be a valid RINEX file' )
337
341
header = txt [:80 ]
338
342
is_crinex = b'COMPACT RINEX' in header
339
343
if is_crinex :
340
344
txt = crx2rnx (txt , skip_strange_epochs = skip_strange_epochs )
341
- elif not header .endswith (b'RINEX VERSION / TYPE' ):
345
+ elif strict and not header .endswith (b'RINEX VERSION / TYPE' ):
342
346
raise ValueError ('not a valid RINEX file' )
343
347
is_obs = b'OBSERVATION DATA' in txt [:80 ]
344
348
return is_obs , txt
0 commit comments