1
1
# Contains standalone functions to accompany the index implementation and make it
2
2
# more versatile
3
3
# NOTE: Autodoc hates it if this is a docstring
4
- from git . types import PathLike
4
+
5
5
from io import BytesIO
6
6
import os
7
7
from stat import (
13
13
S_IFREG ,
14
14
)
15
15
import subprocess
16
- from typing import List , Tuple , Union , cast
17
16
18
17
from git .cmd import PROC_CREATIONFLAGS , handle_process_output
19
18
from git .compat import (
49
48
unpack
50
49
)
51
50
51
+ # typing -----------------------------------------------------------------------------
52
+
53
+ from typing import (Dict , IO , List , Sequence , TYPE_CHECKING , Tuple , Type , Union , cast )
54
+
55
+ from git .types import PathLike
56
+
57
+ if TYPE_CHECKING :
58
+ from .base import IndexFile
59
+
60
+ # ------------------------------------------------------------------------------------
61
+
52
62
53
63
S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule
54
64
CE_NAMEMASK_INV = ~ CE_NAMEMASK
57
67
'stat_mode_to_index_mode' , 'S_IFGITLINK' , 'run_commit_hook' , 'hook_path' )
58
68
59
69
60
- def hook_path (name , git_dir ) :
70
+ def hook_path (name : str , git_dir : PathLike ) -> str :
61
71
""":return: path to the given named hook in the given git repository directory"""
62
72
return osp .join (git_dir , 'hooks' , name )
63
73
64
74
65
- def run_commit_hook (name , index , * args ) :
75
+ def run_commit_hook (name : str , index : IndexFile , * args : str ) -> None :
66
76
"""Run the commit hook of the given name. Silently ignores hooks that do not exist.
67
77
:param name: name of hook, like 'pre-commit'
68
78
:param index: IndexFile instance
69
79
:param args: arguments passed to hook file
70
80
:raises HookExecutionError: """
71
81
hp = hook_path (name , index .repo .git_dir )
72
82
if not os .access (hp , os .X_OK ):
73
- return
83
+ return None
74
84
75
85
env = os .environ .copy ()
76
- env ['GIT_INDEX_FILE' ] = safe_decode (index .path )
86
+ env ['GIT_INDEX_FILE' ] = safe_decode (str ( index .path ) )
77
87
env ['GIT_EDITOR' ] = ':'
78
88
try :
79
89
cmd = subprocess .Popen ([hp ] + list (args ),
@@ -86,14 +96,14 @@ def run_commit_hook(name, index, *args):
86
96
except Exception as ex :
87
97
raise HookExecutionError (hp , ex ) from ex
88
98
else :
89
- stdout = []
90
- stderr = []
91
- handle_process_output (cmd , stdout .append , stderr .append , finalize_process )
92
- stdout = '' .join (stdout )
93
- stderr = '' .join (stderr )
99
+ stdout_list = [] # type: List[str ]
100
+ stderr_list = [] # type: List[str ]
101
+ handle_process_output (cmd , stdout_list .append , stderr_list .append , finalize_process )
102
+ stdout_str = '' .join (stderr_list )
103
+ stderr_str = '' .join (stderr_list )
94
104
if cmd .returncode != 0 :
95
- stdout = force_text (stdout , defenc )
96
- stderr = force_text (stderr , defenc )
105
+ stdout = force_text (stdout_str , defenc )
106
+ stderr = force_text (stderr_str , defenc )
97
107
raise HookExecutionError (hp , cmd .returncode , stderr , stdout )
98
108
# end handle return code
99
109
@@ -108,7 +118,9 @@ def stat_mode_to_index_mode(mode):
108
118
return S_IFREG | 0o644 | (mode & 0o111 ) # blobs with or without executable bit
109
119
110
120
111
- def write_cache (entries , stream , extension_data = None , ShaStreamCls = IndexFileSHA1Writer ):
121
+ def write_cache (entries : Sequence [Union [BaseIndexEntry , 'IndexEntry' ]], stream : IO [bytes ],
122
+ extension_data : Union [None , bytes ] = None ,
123
+ ShaStreamCls : Type [IndexFileSHA1Writer ] = IndexFileSHA1Writer ) -> None :
112
124
"""Write the cache represented by entries to a stream
113
125
114
126
:param entries: **sorted** list of entries
@@ -121,10 +133,10 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1
121
133
:param extension_data: any kind of data to write as a trailer, it must begin
122
134
a 4 byte identifier, followed by its size ( 4 bytes )"""
123
135
# wrap the stream into a compatible writer
124
- stream = ShaStreamCls (stream )
136
+ stream_sha = ShaStreamCls (stream )
125
137
126
- tell = stream .tell
127
- write = stream .write
138
+ tell = stream_sha .tell
139
+ write = stream_sha .write
128
140
129
141
# header
130
142
version = 2
@@ -136,8 +148,8 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1
136
148
beginoffset = tell ()
137
149
write (entry [4 ]) # ctime
138
150
write (entry [5 ]) # mtime
139
- path = entry [3 ]
140
- path = force_bytes (path , encoding = defenc )
151
+ path_str = entry [3 ] # type: str
152
+ path = force_bytes (path_str , encoding = defenc )
141
153
plen = len (path ) & CE_NAMEMASK # path length
142
154
assert plen == len (path ), "Path %s too long to fit into index" % entry [3 ]
143
155
flags = plen | (entry [2 ] & CE_NAMEMASK_INV ) # clear possible previous values
@@ -150,18 +162,19 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1
150
162
151
163
# write previously cached extensions data
152
164
if extension_data is not None :
153
- stream .write (extension_data )
165
+ stream_sha .write (extension_data )
154
166
155
167
# write the sha over the content
156
- stream .write_sha ()
168
+ stream_sha .write_sha ()
157
169
158
170
159
- def read_header (stream ) :
171
+ def read_header (stream : IO [ bytes ]) -> Tuple [ int , int ] :
160
172
"""Return tuple(version_long, num_entries) from the given stream"""
161
173
type_id = stream .read (4 )
162
174
if type_id != b"DIRC" :
163
175
raise AssertionError ("Invalid index file header: %r" % type_id )
164
- version , num_entries = unpack (">LL" , stream .read (4 * 2 ))
176
+ unpacked = cast (Tuple [int , int ], unpack (">LL" , stream .read (4 * 2 )))
177
+ version , num_entries = unpacked
165
178
166
179
# TODO: handle version 3: extended data, see read-cache.c
167
180
assert version in (1 , 2 )
@@ -180,7 +193,7 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i
180
193
# END handle entry
181
194
182
195
183
- def read_cache (stream ) :
196
+ def read_cache (stream : IO [ bytes ]) -> Tuple [ int , Dict [ Tuple [ PathLike , int ], 'IndexEntry' ], bytes , bytes ] :
184
197
"""Read a cache file from the given stream
185
198
:return: tuple(version, entries_dict, extension_data, content_sha)
186
199
* version is the integer version number
@@ -189,7 +202,7 @@ def read_cache(stream):
189
202
* content_sha is a 20 byte sha on all cache file contents"""
190
203
version , num_entries = read_header (stream )
191
204
count = 0
192
- entries = {}
205
+ entries = {} # type: Dict[Tuple[PathLike, int], 'IndexEntry']
193
206
194
207
read = stream .read
195
208
tell = stream .tell
@@ -228,7 +241,8 @@ def read_cache(stream):
228
241
return (version , entries , extension_data , content_sha )
229
242
230
243
231
- def write_tree_from_cache (entries , odb , sl , si = 0 ):
244
+ def write_tree_from_cache (entries : List [IndexEntry ], odb , sl : slice , si : int = 0
245
+ ) -> Tuple [bytes , List [Tuple [str , int , str ]]]:
232
246
"""Create a tree from the given sorted list of entries and put the respective
233
247
trees into the given object database
234
248
@@ -238,7 +252,7 @@ def write_tree_from_cache(entries, odb, sl, si=0):
238
252
:param sl: slice indicating the range we should process on the entries list
239
253
:return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of
240
254
tree entries being a tuple of hexsha, mode, name"""
241
- tree_items = []
255
+ tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]]
242
256
tree_items_append = tree_items .append
243
257
ci = sl .start
244
258
end = sl .stop
@@ -277,18 +291,19 @@ def write_tree_from_cache(entries, odb, sl, si=0):
277
291
278
292
# finally create the tree
279
293
sio = BytesIO ()
280
- tree_to_stream (tree_items , sio .write )
294
+ tree_to_stream (tree_items , sio .write ) # converts bytes of each item[0] to str
295
+ tree_items_stringified = cast (List [Tuple [str , int , str ]], tree_items ) # type: List[Tuple[str, int, str]]
281
296
sio .seek (0 )
282
297
283
298
istream = odb .store (IStream (str_tree_type , len (sio .getvalue ()), sio ))
284
- return (istream .binsha , tree_items )
299
+ return (istream .binsha , tree_items_stringified )
285
300
286
301
287
- def _tree_entry_to_baseindexentry (tree_entry , stage ) :
302
+ def _tree_entry_to_baseindexentry (tree_entry : Tuple [ str , int , str ], stage : int ) -> BaseIndexEntry :
288
303
return BaseIndexEntry ((tree_entry [1 ], tree_entry [0 ], stage << CE_STAGESHIFT , tree_entry [2 ]))
289
304
290
305
291
- def aggressive_tree_merge (odb , tree_shas ) -> List [BaseIndexEntry ]:
306
+ def aggressive_tree_merge (odb , tree_shas : Sequence [ bytes ] ) -> List [BaseIndexEntry ]:
292
307
"""
293
308
:return: list of BaseIndexEntries representing the aggressive merge of the given
294
309
trees. All valid entries are on stage 0, whereas the conflicting ones are left
0 commit comments