16
16
from typing import Any
17
17
from typing import Callable
18
18
from typing import Dict
19
- from typing import List
20
19
from typing import NoReturn
21
20
from typing import Optional
22
21
from typing import Union
25
24
from ansible .module_utils ._text import to_bytes
26
25
from ansible .module_utils ._text import to_text
27
26
28
- """
29
- - The methods related to SSM session management (e.g., start_session, exec_command, _connect) are moved into a new class named SSMSessionManager.
30
- - The Connection class should now delegate SSM session tasks to the SSMSessionManager.
31
- - The SSMSessionManager class is moved into its own file for better modularization.
32
- - Python type hints should be added to method signatures for better clarity and static analysis.
33
- - Docstrings should describe each method's purpose, inputs, outputs, and any special handling.
34
- - Unit tests for SSMSessionManager should be written to ensure it functions as expected and independently.
35
- - The code should pass all existing and new unit tests.
36
- """
37
-
38
27
verbosity_display_type = Callable [[int , str ], None ]
39
28
40
29
41
30
class SSMProcessManagerTimeOutFailure (AnsibleConnectionFailure ):
42
31
pass
43
32
44
33
34
+ def _create_polling_obj (fd : Any ) -> Any :
35
+ """create polling object using select.poll, this is helpful for unit testing"""
36
+ poller = select .poll ()
37
+ poller .register (fd , select .POLLIN )
38
+ return poller
39
+
40
+
45
41
class ProcessManager :
46
- def __init__ (self , session : Any , stdout : Any , timeout : int , verbosity_display : verbosity_display_type ) -> None :
42
+ def __init__ (
43
+ self , instance_id : str , session : Any , stdout : Any , timeout : int , verbosity_display : verbosity_display_type
44
+ ) -> None :
47
45
self ._session = session
48
46
self ._stdout = stdout
49
47
self .verbosity_display = verbosity_display
50
48
self ._timeout = timeout
51
49
self ._poller = None
52
-
53
- self ._poller = select . poll ()
54
- self ._poller . register (self ._stdout , select . POLLIN )
50
+ self . instance_id = instance_id
51
+ self ._session_id = None
52
+ self ._poller = _create_polling_obj (self ._stdout )
55
53
56
54
def stdin_write (self , command : Union [bytes , str ]) -> None :
57
55
self ._session .stdin .write (command )
@@ -65,10 +63,8 @@ def stdout_readline(self) -> str:
65
63
def flush_stderr (self ) -> str :
66
64
"""read and return stderr with minimal blocking"""
67
65
68
- poller = select .poll ()
69
- poller .register (self ._session .stderr , select .POLLIN )
66
+ poller = _create_polling_obj (self ._session .stderr )
70
67
stderr = ""
71
-
72
68
while self ._session .poll () is None :
73
69
if not poller .poll (1 ):
74
70
break
@@ -77,23 +73,27 @@ def flush_stderr(self) -> str:
77
73
stderr = stderr + line
78
74
return stderr
79
75
76
+ def poll_stdout (self , length : int = 1000 ) -> bool :
77
+ return bool (self ._poller .poll (length ))
78
+
80
79
def poll (self , label : str , cmd : str ) -> NoReturn :
81
80
"""Poll session to retrieve content from stdout.
82
81
83
82
:param label: A label for the display (EXEC, PRE...)
84
83
:param cmd: The command being executed
85
84
"""
86
85
start = round (time .time ())
87
- yield bool ( self ._poller . poll ( 1000 ) )
86
+ yield self .poll_stdout ( )
88
87
while self ._session .poll () is None :
89
88
remaining = start + self ._timeout - round (time .time ())
90
89
self .verbosity_display (4 , f"{ label } remaining: { remaining } second(s)" )
91
90
if remaining < 0 :
92
91
raise SSMProcessManagerTimeOutFailure (f"{ label } command '{ cmd } ' timeout on host: { self .instance_id } " )
93
- yield bool ( self ._poller . poll ( 1000 ) )
92
+ yield self .poll_stdout ( )
94
93
95
94
def wait_for_match (self , label : str , cmd : str , match : Union [str , Callable [[str ], bool ]]) -> None :
96
95
stdout = ""
96
+ self .verbosity_display (4 , f"{ label } WAIT FOR: { match } - Command = { cmd } " )
97
97
for result in self .poll (label = label , cmd = cmd ):
98
98
if result :
99
99
text = self .stdout_read_text ()
@@ -109,13 +109,17 @@ def wait_for_match(self, label: str, cmd: str, match: Union[str, Callable[[str],
109
109
class SSMSessionManager :
110
110
MARK_LENGTH = 26
111
111
112
- def __init__ (self , ssm_client : Any , instance_id : str , verbosity_display : verbosity_display_type ) -> None :
112
+ def __init__ (
113
+ self , ssm_client : Any , instance_id : str , ssm_timeout : int , verbosity_display : verbosity_display_type
114
+ ) -> None :
113
115
self ._session_id = None
114
116
self ._instance_id = instance_id
115
117
self .verbosity_display = verbosity_display
116
118
self ._client = ssm_client
117
119
self ._has_timeout = False
118
120
self ._process_mgr = None
121
+ self ._timeout = ssm_timeout
122
+ self ._session = None
119
123
120
124
@property
121
125
def instance_id (self ) -> str :
@@ -137,9 +141,10 @@ def session(self) -> Any:
137
141
def stdout (self ) -> Any :
138
142
return self ._stdout
139
143
140
- @property
141
- def process_manager (self ) -> ProcessManager :
142
- return self ._process_mgr
144
+ def __getattr__ (self , attr : str ) -> Callable :
145
+ if self ._process_mgr and hasattr (self ._process_mgr , attr ):
146
+ return getattr (self ._process_mgr , attr )
147
+ raise AttributeError (f"class SSMSessionManager has no attribute '{ attr } '" )
143
148
144
149
def start_session (
145
150
self ,
@@ -148,11 +153,13 @@ def start_session(
148
153
region_name : Optional [str ],
149
154
profile_name : str ,
150
155
prepare_terminal : bool ,
151
- parameters : Dict [str , Any ] = {} ,
156
+ parameters : Dict [str , Any ] = None ,
152
157
) -> None :
153
158
"""Start SSM Session manager session and eventually prepare terminal"""
154
159
155
160
self .verbosity_display (3 , f"ESTABLISH SSM CONNECTION TO: { self .instance_id } " )
161
+ if parameters is None :
162
+ parameters = {}
156
163
start_session_args = dict (Target = self .instance_id , Parameters = parameters )
157
164
if document_name is not None :
158
165
start_session_args ["DocumentName" ] = document_name
@@ -173,12 +180,18 @@ def start_session(
173
180
self .verbosity_display (4 , f"SSM COMMAND: { to_text (cmd )} " )
174
181
175
182
stdout_r , stdout_w = openpty ()
176
- session = Popen (cmd , stdin = PIPE , stdout = stdout_w , stderr = PIPE , close_fds = True , bufsize = 0 )
183
+ self . _session = Popen (cmd , stdin = PIPE , stdout = stdout_w , stderr = PIPE , close_fds = True , bufsize = 0 )
177
184
178
185
os .close (stdout_w )
179
186
stdout = os .fdopen (stdout_r , "rb" , 0 )
180
187
181
- self ._process_mgr = ProcessManager (session = session , stdout = stdout , verbosity_display = self .verbosity_display )
188
+ self ._process_mgr = ProcessManager (
189
+ instance_id = self .instance_id ,
190
+ session = self ._session ,
191
+ stdout = stdout ,
192
+ timeout = self ._timeout ,
193
+ verbosity_display = self .verbosity_display ,
194
+ )
182
195
183
196
# For non-windows Hosts: Ensure the session has started, and disable command echo and prompt.
184
197
if prepare_terminal :
@@ -198,7 +211,9 @@ def _disable_prompt_command(self) -> None:
198
211
self ._process_mgr .stdin_write (disable_prompt_cmd )
199
212
200
213
# Read output until we got expression
201
- self .wait_for_match (label = "DISABLE PROMPT" , cmd = disable_prompt_cmd , match = disable_prompt_reply .search )
214
+ self ._process_mgr .wait_for_match (
215
+ label = "DISABLE PROMPT" , cmd = disable_prompt_cmd , match = disable_prompt_reply .search
216
+ )
202
217
203
218
def _disable_echo_command (self ) -> None :
204
219
"""Disable echo command from the host"""
@@ -209,12 +224,15 @@ def _disable_echo_command(self) -> None:
209
224
self ._process_mgr .stdin_write (disable_echo_cmd )
210
225
211
226
# Read output until we got expression
212
- self .wait_for_match (label = "DISABLE ECHO" , cmd = disable_echo_cmd , match = "stty -echo" )
227
+ self ._process_mgr . wait_for_match (label = "DISABLE ECHO" , cmd = disable_echo_cmd , match = "stty -echo" )
213
228
214
229
def _prepare_terminal (self ) -> None :
215
230
"""perform any one-time terminal settings"""
231
+ self .verbosity_display (4 , "PREPARE TERMINAL" )
216
232
# Ensure SSM Session has started
217
- self .wait_for_match (label = "START SSM SESSION" , cmd = "start_session" , match = "Starting session with SessionId" )
233
+ self ._process_mgr .wait_for_match (
234
+ label = "START SSM SESSION" , cmd = "start_session" , match = "Starting session with SessionId"
235
+ )
218
236
219
237
# Disable echo command
220
238
self ._disable_echo_command () # pylint: disable=unreachable
0 commit comments