diff --git a/invoke/context.py b/invoke/context.py index e9beaf4d1..08045967f 100644 --- a/invoke/context.py +++ b/invoke/context.py @@ -9,6 +9,7 @@ Generator, Iterator, List, + Dict, Optional, Union, ) @@ -71,6 +72,9 @@ def __init__(self, config: Optional[Config] = None) -> None: #: docs for details. command_cwds: List[str] = list() self._set(command_cwds=command_cwds) + #: Keyword arguments for each 'sudo_scope' context. + command_sudos: List[Dict] = list() + self._set(command_sudos=command_sudos) @property def config(self) -> Config: @@ -110,6 +114,7 @@ def _run( self, runner: "Runner", command: str, **kwargs: Any ) -> Optional[Result]: command = self._prefix_commands(command) + command = self._sudo_commands(command) return runner.run(command, **kwargs) def sudo(self, command: str, **kwargs: Any) -> Optional[Result]: @@ -184,14 +189,12 @@ def sudo(self, command: str, **kwargs: Any) -> Optional[Result]: runner = self.config.runners.local(self) return self._sudo(runner, command, **kwargs) - # NOTE: this is for runner injection; see NOTE above _run(). - def _sudo( - self, runner: "Runner", command: str, **kwargs: Any - ) -> Optional[Result]: - prompt = self.config.sudo.prompt - password = kwargs.pop("password", self.config.sudo.password) - user = kwargs.pop("user", self.config.sudo.user) - env = kwargs.get("env", {}) + def _get_sudo_command( + self, command: str, prompt: str, user: str, env: Dict[str, str] + ) -> str: + """ + Create the command prefixed by sudo from the arguments within kwargs. + """ # TODO: allow subclassing for 'get the password' so users who REALLY # want lazy runtime prompting can have it easily implemented. # TODO: want to print a "cleaner" echo with just 'sudo '; but @@ -210,10 +213,22 @@ def _sudo( env_flags = "" if env: env_flags = "--preserve-env='{}' ".format(",".join(env.keys())) - command = self._prefix_commands(command) cmd_str = "sudo -S -p '{}' {}{}{}".format( prompt, env_flags, user_flags, command ) + + return cmd_str + + # NOTE: this is for runner injection; see NOTE above _run(). + def _sudo( + self, runner: "Runner", command: str, **kwargs: Any + ) -> Optional[Result]: + prompt = self.config.sudo.prompt + password = kwargs.pop("password", self.config.sudo.password) + user = kwargs.pop("user", self.config.sudo.user) + env = kwargs.get("env", {}) + command = self._prefix_commands(command) + cmd_str = self._get_sudo_command(command, prompt, user, env) watcher = FailingResponder( pattern=re.escape(prompt), response="{}\n".format(password), @@ -319,6 +334,35 @@ def prefix(self, command: str) -> Generator[None, None, None]: finally: self.command_prefixes.pop() + def _sudo_commands(self, command: str) -> str: + """ + Prefixes ``command`` with the sudo found in ``command_sudos``. + """ + if self.command_sudos == []: + return command + + # Use only the nearest sudo context args + kwargs = self.command_sudos[-1] + prompt = self.config.sudo.prompt + user = kwargs.get("user", self.config.sudo.user) + env = kwargs.get("env", {}) + cmd_str = self._get_sudo_command(command, prompt, user, env) + + return cmd_str + + @contextmanager + def sudo_scope(self, **kwargs: Any) -> Generator[None, None, None]: + """ + Context manager to temporarily change sudo behavior. + + See ``sudo`` for details on keyword arguments. + """ + self.command_sudos.append(kwargs) + try: + yield + finally: + self.command_sudos.pop() + @property def cwd(self) -> str: """ diff --git a/tests/context.py b/tests/context.py index b7266042a..8758b452c 100644 --- a/tests/context.py +++ b/tests/context.py @@ -536,6 +536,84 @@ def can_be_pickled(self): assert c is not c2 assert c.foo.bar.biz is not c2.foo.bar.biz + class sudo_scope: + # Context manager tests + @patch(local_path) + def sudo_should_apply_to_run(self, Local): + runner = Local.return_value + c = Context() + with c.sudo_scope(): + c.run("whoami") + + cmd = "sudo -S -p '[sudo] password: ' whoami" + assert runner.run.called, "run() never called runner.run()!" + assert runner.run.call_args[0][0] == cmd + + @patch(local_path) + def sudo_should_apply_to_run_two_times(self, Local): + runner = Local.return_value + c = Context() + with c.sudo_scope(): + c.run("whoami") + cmd = "sudo -S -p '[sudo] password: ' whoami" + assert runner.run.called, "run() never called runner.run()!" + assert runner.run.call_args[0][0] == cmd + c.run("echo hello") + cmd = "sudo -S -p '[sudo] password: ' echo hello" + assert runner.run.called, "run() never called runner.run()!" + assert runner.run.call_args[0][0] == cmd + + @patch(local_path) + def sudo_user_should_apply_to_run_two_times(self, Local): + runner = Local.return_value + c = Context() + with c.sudo_scope(user="Joseph"): + c.run("whoami") + assert runner.run.called, "run() never called runner.run()!" + assert "-H -u Joseph" in runner.run.call_args[0][0] + + c.run("whoami") + assert runner.run.called, "run() never called runner.run()!" + assert "-H -u Joseph" in runner.run.call_args[0][0] + + @patch(local_path) + def sudo_should_use_last_context(self, Local): + runner = Local.return_value + c = Context() + with c.sudo_scope(user="Joseph"): + with c.sudo_scope(user="Marchand"): + c.run("whoami") + + cmd = "sudo -S -p '[sudo] password: ' -H -u Marchand whoami" + assert runner.run.called, "run() never called runner.run()!" + assert runner.run.call_args[0][0] == cmd + + # Option tests + @patch(local_path) + def sudo_user_should_apply_to_run(self, Local): + runner = Local.return_value + c = Context() + with c.sudo_scope(user="Joseph"): + c.run("whoami") + + assert runner.run.called, "run() never called runner.run()!" + assert "-H -u Joseph" in runner.run.call_args[0][0] + + @patch(local_path) + def sudo_env_should_apply_to_run(self, Local): + runner = Local.return_value + c = Context() + with c.sudo_scope( + env={"GRATUITOUS_ENVIRONMENT_VARIABLE": "arbitrary value"} + ): + c.run("whoami") + + assert runner.run.called, "run() never called runner.run()!" + assert ( + "--preserve-env='GRATUITOUS_ENVIRONMENT_VARIABLE'" + in runner.run.call_args[0][0] + ) + class MockContext_: def init_still_acts_like_superclass_init(self):