2
2
3
3
from __future__ import annotations
4
4
5
+ import os
5
6
from os import PathLike
6
- from os . path import join
7
+ from pathlib import Path
7
8
from shutil import copyfileobj , move , rmtree
8
9
from tempfile import TemporaryFile , mkdtemp
9
10
from types import TracebackType
10
11
from typing import IO , Literal
11
12
from zipfile import ZIP_STORED , ZipFile , ZipInfo
12
13
14
+ from typing_extensions import Self
15
+
13
16
14
17
class MutableZipFile (ZipFile ):
15
18
"""
@@ -24,12 +27,43 @@ class DeleteMarker:
24
27
25
28
def __init__ (
26
29
self ,
27
- file : str | IO [bytes ],
30
+ file : str | IO [bytes ] | os . PathLike ,
28
31
mode : Literal ["r" , "w" , "x" , "a" ] = "r" ,
29
32
compression : int = ZIP_STORED ,
30
- allowZip64 : bool = False ,
33
+ allowZip64 : bool = True , # noqa: FBT001, FBT002 # Normally, I'd address the boolean
34
+ # typed issue but here we need to maintain compat with ZipFile
35
+ compresslevel : int | None = None ,
36
+ * ,
37
+ strict_timestamps : bool = True ,
31
38
) -> None :
32
- super ().__init__ (file , mode = mode , compression = compression , allowZip64 = allowZip64 )
39
+ """Open a ZIP file, where file can be a path to a file (a string), a
40
+ file-like object or a path-like object.
41
+
42
+ :param str | IO[bytes] | os.PathLike file: can be a path to a file (a string), a
43
+ file-like object or a path-like object.
44
+ :param Literal["r", "w", "x", "a"] mode: parameter should be 'r' to read an
45
+ existing file, 'w' to truncate and write a new file, 'a' to append to an existing
46
+ file, or 'x' to exclusively create and write a new file
47
+ :param int compression: the ZIP compression method to use when writing the
48
+ archive, and should be ZIP_STORED, ZIP_DEFLATED, ZIP_BZIP2 or ZIP_LZMA
49
+ :param bool allowZip64: s True (the default) zipfile will create ZIP files
50
+ that use the ZIP64 extensions when the zipfile is larger than 4 GiB.
51
+ :param int | None compresslevel: controls the compression level to use when
52
+ writing files to the archive. When using ZIP_STORED or ZIP_LZMA it has no effect.
53
+ When using ZIP_DEFLATED integers 0 through 9 are accepted
54
+ :param bool strict_timestamps: when set to False, allows to zip files older than
55
+ 1980-01-01 and newer than 2107-12-31, defaults to True
56
+
57
+ https://docs.python.org/3/library/zipfile.html
58
+ """
59
+ super ().__init__ (
60
+ file ,
61
+ mode = mode ,
62
+ compression = compression ,
63
+ allowZip64 = allowZip64 ,
64
+ compresslevel = compresslevel ,
65
+ strict_timestamps = strict_timestamps ,
66
+ )
33
67
# track file to override in zip
34
68
self ._replace = {}
35
69
# Whether the with statement was called
@@ -48,6 +82,14 @@ def writestr(
48
82
compress_type : int | None = None ,
49
83
compresslevel : int | None = None ,
50
84
) -> None :
85
+ """Write a file into the archive. The contents is data, which may be either a
86
+ str or a bytes instance; if it is a str, it is encoded as UTF-8 first.
87
+
88
+ zinfo_or_arcname is either the file name it will be given in the archive, or a
89
+ ZipInfo instance. If it's an instance, at least the filename, date, and time
90
+ must be given. If it's a name, the date and time is set to the current date and
91
+ time. The archive must be opened with mode 'w', 'x' or 'a'.
92
+ """
51
93
if isinstance (zinfo_or_arcname , ZipInfo ):
52
94
name = zinfo_or_arcname .filename
53
95
else :
@@ -56,7 +98,7 @@ def writestr(
56
98
# mark the entry, and create a temp-file for it
57
99
# we allow this only if the with statement is used
58
100
if self ._allowUpdates and name in self .namelist ():
59
- tempFile = self ._replace [ name ] = self . _replace . get (name , TemporaryFile ())
101
+ tempFile = self ._replace . setdefault (name , TemporaryFile ())
60
102
if isinstance (data , str ):
61
103
tempFile .write (data .encode ("utf-8" )) # strings are unicode
62
104
else :
@@ -77,14 +119,22 @@ def write(
77
119
compress_type : int | None = None ,
78
120
compresslevel : int | None = None ,
79
121
) -> None :
122
+ """Write the file named filename to the archive, giving it the archive name
123
+ arcname (by default, this will be the same as filename, but without a drive
124
+ letter and with leading path separators removed). If given, compress_type
125
+ overrides the value given for the compression parameter to the constructor
126
+ for the new entry. Similarly, compresslevel will override the constructor if
127
+ given. The archive must be open with mode 'w', 'x' or 'a'.
128
+
129
+ """
80
130
arcname = arcname or filename
81
131
# If the file exits, and needs to be overridden,
82
132
# mark the entry, and create a temp-file for it
83
133
# we allow this only if the with statement is used
84
134
if self ._allowUpdates and arcname in self .namelist ():
85
- tempFile = self ._replace [arcname ] = self ._replace .get (arcname , TemporaryFile ())
86
- with open (filename , "rb" ) as source :
135
+ with TemporaryFile () as tempFile , Path (filename ).open ("rb" ) as source :
87
136
copyfileobj (source , tempFile )
137
+
88
138
# Behave normally
89
139
else :
90
140
super ().write (
@@ -94,7 +144,7 @@ def write(
94
144
compresslevel = compresslevel ,
95
145
)
96
146
97
- def __enter__ (self ):
147
+ def __enter__ (self ) -> Self :
98
148
# Allow updates
99
149
self ._allowUpdates = True
100
150
return self
@@ -104,7 +154,7 @@ def __exit__(
104
154
exc_type : type [BaseException ] | None ,
105
155
exc_val : BaseException | None ,
106
156
exc_tb : TracebackType | None ,
107
- ):
157
+ ) -> None :
108
158
# Call base to close zip
109
159
try :
110
160
super ().__exit__ (exc_type , exc_val , exc_tb )
@@ -128,37 +178,40 @@ def removeFile(self, path: str | PathLike[str]) -> None:
128
178
def _rebuildZip (self ) -> None :
129
179
tempdir = mkdtemp ()
130
180
try :
131
- tempZipPath = join (tempdir , "new.zip" )
132
- with ZipFile (self .file , "r" ) as zipRead :
133
- # Create new zip with assigned properties
134
- with ZipFile (
135
- tempZipPath ,
136
- "w" ,
137
- compression = self .compression ,
138
- allowZip64 = self .allowZip64 ,
139
- ) as zipWrite :
140
- for item in zipRead .infolist ():
141
- # Check if the file should be replaced / or deleted
142
- replacement = self ._replace .get (item .filename , None )
143
- # If marked for deletion, do not copy file to new zipfile
144
- if isinstance (replacement , self .DeleteMarker ):
145
- del self ._replace [item .filename ]
146
- continue
147
- # If marked for replacement, copy temp_file, instead of old file
148
- if replacement is not None :
149
- del self ._replace [item .filename ]
150
- # Write replacement to archive,
151
- # and then close it (deleting the temp file)
152
- replacement .seek (0 )
153
- data = replacement .read ()
154
- replacement .close ()
155
- else :
156
- data = zipRead .read (item .filename )
157
- zipWrite .writestr (item , data )
181
+ tempZipPath = Path (tempdir ) / "new.zip"
182
+ with ZipFile (self .file , "r" ) as zipRead , ZipFile (
183
+ tempZipPath ,
184
+ "w" ,
185
+ compression = self .compression ,
186
+ allowZip64 = self .allowZip64 ,
187
+ ) as zipWrite :
188
+ for item in zipRead .infolist ():
189
+ # Check if the file should be replaced / or deleted
190
+ replacement = self ._replace .get (item .filename , None )
191
+ # If marked for deletion, do not copy file to new zipfile
192
+ if isinstance (replacement , self .DeleteMarker ):
193
+ del self ._replace [item .filename ]
194
+ continue
195
+ # If marked for replacement, copy temp_file, instead of old file
196
+ if replacement is not None :
197
+ del self ._replace [item .filename ]
198
+ # Write replacement to archive,
199
+ # and then close it ,deleting the temp file
200
+ replacement .seek (0 )
201
+ data = replacement .read ()
202
+ replacement .close ()
203
+ else :
204
+ data = zipRead .read (item .filename )
205
+ zipWrite .writestr (item , data )
158
206
# Override the archive with the updated one
159
207
if isinstance (self .file , str ):
160
- move (tempZipPath , self .file )
208
+ move (tempZipPath .as_posix (), self .file )
209
+ elif hasattr (self .file , "name" ):
210
+ move (tempZipPath .as_posix (), self .file .name )
211
+ elif hasattr (self .file , "write" ):
212
+ self .file .write (tempZipPath .read_bytes ())
161
213
else :
162
- move (tempZipPath , self .file .name )
214
+ msg = f"Sorry but { type (self .file ).__name__ } is not supported at this time!"
215
+ raise RuntimeError (msg )
163
216
finally :
164
217
rmtree (tempdir )
0 commit comments