1
1
import bz2
2
2
import gzip
3
3
import re
4
+ import warnings
4
5
import zipfile
6
+ from contextlib import contextmanager
5
7
from io import BytesIO
6
8
from pathlib import Path
7
9
from typing import Union
@@ -64,7 +66,8 @@ def decompress(content: Union[Path, str, bytes], *,
64
66
return _decompress (content , skip_strange_epochs )[1 ]
65
67
66
68
67
- def decompress_on_disk (path : Union [Path , str ], * , skip_strange_epochs : bool = False ) -> Path :
69
+ def decompress_on_disk (path : Union [Path , str ], * , delete : bool = False ,
70
+ skip_strange_epochs : bool = False ) -> Path :
68
71
"""Decompress compressed RINEX files and write the resulting file to disk.
69
72
70
73
Any RINEX files compressed with Hatanaka compression (.crx|.##d) and/or with a conventional
@@ -77,6 +80,8 @@ def decompress_on_disk(path: Union[Path, str], *, skip_strange_epochs: bool = Fa
77
80
----------
78
81
path : Path or str
79
82
Path to a compressed RINEX file.
83
+ delete : bool, default False
84
+ Delete the source file after successful decompression if no errors or warnings were raised.
80
85
skip_strange_epochs : bool, default False
81
86
For Hatanaka decompression.
82
87
Warn and skip strange epochs instead of raising an exception.
@@ -100,13 +105,18 @@ def decompress_on_disk(path: Union[Path, str], *, skip_strange_epochs: bool = Fa
100
105
For invalid file contents.
101
106
"""
102
107
path = Path (path )
103
- is_obs , txt = _decompress (path .read_bytes (), skip_strange_epochs = skip_strange_epochs )
108
+ with _record_warnings () as warning_list :
109
+ is_obs , txt = _decompress (path .read_bytes (), skip_strange_epochs = skip_strange_epochs )
104
110
out_path = get_decompressed_path (path )
105
111
if out_path == path :
106
112
# file does not need decompressing
107
113
return out_path
108
114
with out_path .open ('wb' ) as f_out :
109
115
f_out .write (txt )
116
+ assert out_path .exists ()
117
+ if delete :
118
+ if len (warning_list ) == 0 and out_path != path :
119
+ path .unlink ()
110
120
return out_path
111
121
112
122
@@ -186,7 +196,7 @@ def compress(content: Union[Path, str, bytes], *, compression: str = 'gz',
186
196
return _compress (content , compression , skip_strange_epochs , reinit_every_nth )[1 ]
187
197
188
198
189
- def compress_on_disk (path : Union [Path , str ], * , compression : str = 'gz' ,
199
+ def compress_on_disk (path : Union [Path , str ], * , compression : str = 'gz' , delete : bool = False ,
190
200
skip_strange_epochs : bool = False ,
191
201
reinit_every_nth : int = None ) -> Path :
192
202
"""Compress RINEX files.
@@ -200,6 +210,8 @@ def compress_on_disk(path: Union[Path, str], *, compression: str = 'gz',
200
210
Path to a RINEX file.
201
211
compression : 'gz' (default), 'bz2', or 'none'
202
212
Which compression (if any) to apply in addition to the Hatanaka compression.
213
+ delete : bool, default False
214
+ Delete the source file after successful compression if no errors or warnings were raised.
203
215
skip_strange_epochs : bool, default False
204
216
For Hatanaka compression. Warn and skip strange epochs instead of raising an exception.
205
217
reinit_every_nth : int, optional
@@ -225,11 +237,18 @@ def compress_on_disk(path: Union[Path, str], *, compression: str = 'gz',
225
237
if path .name .lower ().endswith (('.gz' , '.bz2' , '.z' , '.zip' )):
226
238
# already compressed
227
239
return path
228
- is_obs , txt = _compress (path .read_bytes (), compression = compression ,
229
- skip_strange_epochs = skip_strange_epochs ,
230
- reinit_every_nth = reinit_every_nth )
240
+ with _record_warnings () as warning_list :
241
+ is_obs , txt = _compress (path .read_bytes (), compression = compression ,
242
+ skip_strange_epochs = skip_strange_epochs ,
243
+ reinit_every_nth = reinit_every_nth )
231
244
out_path = get_compressed_path (path , is_obs , compression )
245
+ if out_path == path :
246
+ return out_path
232
247
out_path .write_bytes (txt )
248
+ assert out_path .exists ()
249
+ if delete :
250
+ if len (warning_list ) == 0 :
251
+ path .unlink ()
233
252
return out_path
234
253
235
254
@@ -353,3 +372,12 @@ def _compress_hatanaka(txt: bytes, skip_strange_epochs, reinit_every_nth) -> (bo
353
372
else :
354
373
is_obs = b'COMPACT RINEX' in txt [:80 ]
355
374
return is_obs , txt
375
+
376
+
377
+ @contextmanager
378
+ def _record_warnings ():
379
+ with warnings .catch_warnings (record = True ) as warning_list :
380
+ yield warning_list
381
+ for w in warning_list :
382
+ warnings .showwarning (message = w .message , category = w .category , filename = w .filename ,
383
+ lineno = w .lineno , file = w .file , line = w .line )
0 commit comments