-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathssh_manager.py
134 lines (112 loc) · 4.86 KB
/
ssh_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from pydbus import SystemBus
import subprocess
import re
from config import MeticulousConfig, CONFIG_SYSTEM, ROOT_PASSWORD
from machine import Machine
from log import MeticulousLogger
logger = MeticulousLogger.getLogger(__name__)
class SSHManager:
ISSUE_PATH = "/etc/issue"
@staticmethod
def set_ssh_state(enabled: bool) -> bool:
"""
Enable or disable SSH service using systemd D-Bus interface.
"""
try:
bus = SystemBus()
systemd = bus.get(".systemd1")
if enabled:
systemd.EnableUnitFiles(["ssh.service"], False, False)
systemd.StartUnit("ssh.service", "fail")
logger.info("SSH service enabled and started")
else:
systemd.StopUnit("ssh.service", "fail")
systemd.DisableUnitFiles(["ssh.service"], False)
logger.info("SSH service stopped and disabled")
systemd.Reload()
return True
except Exception as e:
logger.error(f"Error while managing SSH service: {e}")
return False
@staticmethod
def init():
# First of all, I get the password from the current configuration
current_password = SSHManager.get_root_password()
logger.info(
f"Initializing SSH manager... Manufactoring mode: {Machine.enable_manufacturing}. Password already set: {current_password is not None}"
)
# Dont mess with the SSH service if we are emulating
if Machine.emulated:
return
# This depends on the machine class to have initialized. If a serial is in the config it means the machine
# has been initialized and we can set a password. If the ESP does know its serial but the config is empty
# we assume the machine was reset and set a new password on next boot
if not Machine.enable_manufacturing and current_password is None:
logger.info("Detected exit from manufacturing mode and generating password")
if SSHManager.generate_root_password():
logger.warning("Root password generated successfully")
# I update this value with the newly generated password
current_password = SSHManager.get_root_password()
else:
logger.error("Root password generation failed")
if current_password is not None:
SSHManager.update_issue_file(current_password)
logger.info("Updated /etc/issue with current root password")
SSHManager.set_root_password(current_password)
logger.info("The root password matches the configuration")
@staticmethod
def generate_root_password() -> bool:
try:
new_password = SSHManager.generate_random_password()
SSHManager.set_root_password(new_password)
MeticulousConfig[CONFIG_SYSTEM][ROOT_PASSWORD] = new_password
MeticulousConfig.save()
logger.info("Successfully generated and set new root password")
return True
except Exception as e:
logger.error(f"Error generating or setting root password:{e}")
return False
@staticmethod
def generate_random_password() -> str:
result = subprocess.run(
["openssl", "rand", "-base64", "9"],
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
@staticmethod
def set_root_password(password: str) -> bool:
try:
subprocess.run(["chpasswd"], input=f"root:{password}".encode(), check=True)
return True
except Exception as e:
logger.error(f"Error setting the root password: {e}")
return False
@staticmethod
def get_root_password() -> str:
"""Get the stored root password from config"""
return MeticulousConfig[CONFIG_SYSTEM][ROOT_PASSWORD]
@staticmethod
def update_issue_file(password: str) -> bool:
try:
password_info = f"\nRoot password: {password}\n\n"
try:
with open(SSHManager.ISSUE_PATH, "r") as issue_file:
content = issue_file.read()
pattern = r"\nRoot password: .+\n\n"
if re.search(pattern, content):
content = re.sub(pattern, password_info, content)
else:
content += password_info
except FileNotFoundError:
content = password_info
with open(SSHManager.ISSUE_PATH, "w") as issue_file:
issue_file.write(content)
logger.info(
f"Successfully updated {SSHManager.ISSUE_PATH} with root password"
)
return True
except Exception as e:
logger.error(f"Error updating {SSHManager.ISSUE_PATH} file: {e}")
return False