Skip to content

Add Context.sudo_scope context manager #999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions invoke/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Generator,
Iterator,
List,
Dict,
Optional,
Union,
)
Expand Down Expand Up @@ -71,6 +72,9 @@ def __init__(self, config: Optional[Config] = None) -> None:
#: docs for details.
command_cwds: List[str] = list()
self._set(command_cwds=command_cwds)
#: Keyword arguments for each 'sudo_scope' context.
command_sudos: List[Dict] = list()
self._set(command_sudos=command_sudos)

@property
def config(self) -> Config:
Expand Down Expand Up @@ -110,6 +114,7 @@ def _run(
self, runner: "Runner", command: str, **kwargs: Any
) -> Optional[Result]:
command = self._prefix_commands(command)
command = self._sudo_commands(command)
return runner.run(command, **kwargs)

def sudo(self, command: str, **kwargs: Any) -> Optional[Result]:
Expand Down Expand Up @@ -184,14 +189,12 @@ def sudo(self, command: str, **kwargs: Any) -> Optional[Result]:
runner = self.config.runners.local(self)
return self._sudo(runner, command, **kwargs)

# NOTE: this is for runner injection; see NOTE above _run().
def _sudo(
self, runner: "Runner", command: str, **kwargs: Any
) -> Optional[Result]:
prompt = self.config.sudo.prompt
password = kwargs.pop("password", self.config.sudo.password)
user = kwargs.pop("user", self.config.sudo.user)
env = kwargs.get("env", {})
def _get_sudo_command(
self, command: str, prompt: str, user: str, env: Dict[str, str]
) -> str:
"""
Create the command prefixed by sudo from the arguments within kwargs.
"""
# TODO: allow subclassing for 'get the password' so users who REALLY
# want lazy runtime prompting can have it easily implemented.
# TODO: want to print a "cleaner" echo with just 'sudo <command>'; but
Expand All @@ -210,10 +213,22 @@ def _sudo(
env_flags = ""
if env:
env_flags = "--preserve-env='{}' ".format(",".join(env.keys()))
command = self._prefix_commands(command)
cmd_str = "sudo -S -p '{}' {}{}{}".format(
prompt, env_flags, user_flags, command
)

return cmd_str

# NOTE: this is for runner injection; see NOTE above _run().
def _sudo(
self, runner: "Runner", command: str, **kwargs: Any
) -> Optional[Result]:
prompt = self.config.sudo.prompt
password = kwargs.pop("password", self.config.sudo.password)
user = kwargs.pop("user", self.config.sudo.user)
env = kwargs.get("env", {})
command = self._prefix_commands(command)
cmd_str = self._get_sudo_command(command, prompt, user, env)
watcher = FailingResponder(
pattern=re.escape(prompt),
response="{}\n".format(password),
Expand Down Expand Up @@ -319,6 +334,35 @@ def prefix(self, command: str) -> Generator[None, None, None]:
finally:
self.command_prefixes.pop()

def _sudo_commands(self, command: str) -> str:
"""
Prefixes ``command`` with the sudo found in ``command_sudos``.
"""
if self.command_sudos == []:
return command

# Use only the nearest sudo context args
kwargs = self.command_sudos[-1]
prompt = self.config.sudo.prompt
user = kwargs.get("user", self.config.sudo.user)
env = kwargs.get("env", {})
cmd_str = self._get_sudo_command(command, prompt, user, env)

return cmd_str

@contextmanager
def sudo_scope(self, **kwargs: Any) -> Generator[None, None, None]:
"""
Context manager to temporarily change sudo behavior.

See ``sudo`` for details on keyword arguments.
"""
self.command_sudos.append(kwargs)
try:
yield
finally:
self.command_sudos.pop()

@property
def cwd(self) -> str:
"""
Expand Down
78 changes: 78 additions & 0 deletions tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,84 @@ def can_be_pickled(self):
assert c is not c2
assert c.foo.bar.biz is not c2.foo.bar.biz

class sudo_scope:
# Context manager tests
@patch(local_path)
def sudo_should_apply_to_run(self, Local):
runner = Local.return_value
c = Context()
with c.sudo_scope():
c.run("whoami")

cmd = "sudo -S -p '[sudo] password: ' whoami"
assert runner.run.called, "run() never called runner.run()!"
assert runner.run.call_args[0][0] == cmd

@patch(local_path)
def sudo_should_apply_to_run_two_times(self, Local):
runner = Local.return_value
c = Context()
with c.sudo_scope():
c.run("whoami")
cmd = "sudo -S -p '[sudo] password: ' whoami"
assert runner.run.called, "run() never called runner.run()!"
assert runner.run.call_args[0][0] == cmd
c.run("echo hello")
cmd = "sudo -S -p '[sudo] password: ' echo hello"
assert runner.run.called, "run() never called runner.run()!"
assert runner.run.call_args[0][0] == cmd

@patch(local_path)
def sudo_user_should_apply_to_run_two_times(self, Local):
runner = Local.return_value
c = Context()
with c.sudo_scope(user="Joseph"):
c.run("whoami")
assert runner.run.called, "run() never called runner.run()!"
assert "-H -u Joseph" in runner.run.call_args[0][0]

c.run("whoami")
assert runner.run.called, "run() never called runner.run()!"
assert "-H -u Joseph" in runner.run.call_args[0][0]

@patch(local_path)
def sudo_should_use_last_context(self, Local):
runner = Local.return_value
c = Context()
with c.sudo_scope(user="Joseph"):
with c.sudo_scope(user="Marchand"):
c.run("whoami")

cmd = "sudo -S -p '[sudo] password: ' -H -u Marchand whoami"
assert runner.run.called, "run() never called runner.run()!"
assert runner.run.call_args[0][0] == cmd

# Option tests
@patch(local_path)
def sudo_user_should_apply_to_run(self, Local):
runner = Local.return_value
c = Context()
with c.sudo_scope(user="Joseph"):
c.run("whoami")

assert runner.run.called, "run() never called runner.run()!"
assert "-H -u Joseph" in runner.run.call_args[0][0]

@patch(local_path)
def sudo_env_should_apply_to_run(self, Local):
runner = Local.return_value
c = Context()
with c.sudo_scope(
env={"GRATUITOUS_ENVIRONMENT_VARIABLE": "arbitrary value"}
):
c.run("whoami")

assert runner.run.called, "run() never called runner.run()!"
assert (
"--preserve-env='GRATUITOUS_ENVIRONMENT_VARIABLE'"
in runner.run.call_args[0][0]
)


class MockContext_:
def init_still_acts_like_superclass_init(self):
Expand Down