Skip to content

Commit 73057c8

Browse files
committed
Bugfix for helper CLI, expose client structs, implement helpers on process state
1 parent b601e8f commit 73057c8

File tree

9 files changed

+107
-29
lines changed

9 files changed

+107
-29
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -143,5 +143,8 @@ _template/labextension
143143

144144
# Supervisor
145145
supervisord.pid
146+
supervisor-2024*.cfg
147+
pydantic.json
148+
Untitled*.ipynb
146149
tmp
147150

Makefile

+3-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ annotate: ## run python type annotation checks with mypy
5454

5555
test: ## run python tests
5656
python -m pytest -v airflow_supervisor/tests --junitxml=junit.xml
57-
pgrep -iaf supervisord | xargs kill -15
57+
58+
kill:
59+
bash -c "pgrep -iaf supervisord | xargs kill -15"
5860

5961
coverage: ## run tests and collect test coverage
6062
python -m pytest -v airflow_supervisor/tests --junitxml=junit.xml --cov=airflow_supervisor --cov-branch --cov-fail-under=60 --cov-report term-missing --cov-report xml

airflow_supervisor/airflow/commands.py

+30-14
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,18 @@ def _wait_or_while(until, timeout: int = 5) -> bool:
3939
return False
4040

4141

42+
def _raise_or_exit(val: bool, exit: bool):
43+
if exit:
44+
raise Exit(int(not val))
45+
return val
46+
47+
4248
def write_supervisor_config(cfg_json: str, _exit: Annotated[bool, Argument(hidden=True)] = True):
4349
cfg_obj = SupervisorAirflowConfiguration.model_validate_json(cfg_json)
4450
if not _check_same(cfg_obj):
4551
log.critical("Configs don't match")
4652
cfg_obj._write_self()
47-
return Exit(0) if _exit else True
53+
return _raise_or_exit(True, _exit)
4854

4955

5056
def start_supervisor(
@@ -57,13 +63,13 @@ def start_supervisor(
5763
if not _check_same(cfg_obj):
5864
log.critical("Configs don't match")
5965
if _check_running(cfg_obj):
60-
return Exit(0) if _exit else True
66+
return _raise_or_exit(True, _exit)
6167
cfg_obj.start(daemon=True)
6268
running = _wait_or_while(until=lambda: cfg_obj.running(), timeout=30)
6369
if not running:
6470
log.critical("Still not running 30s after start command!")
65-
return Exit(1) if _exit else False
66-
return Exit(0) if _exit else True
71+
return _raise_or_exit(False, _exit)
72+
return _raise_or_exit(True, _exit)
6773

6874

6975
def start_programs(
@@ -77,21 +83,31 @@ def start_programs(
7783
# TODO
7884
ret = client.startAllProcesses()
7985
log.info(ret)
80-
return Exit(0) if _exit else True
86+
return _raise_or_exit(True, _exit)
8187

8288

8389
def check_programs(
8490
cfg: Annotated[
8591
Path, Option(exists=True, file_okay=True, dir_okay=False, writable=False, readable=True, resolve_path=True)
8692
],
93+
check_running: bool = False,
8794
_exit: Annotated[bool, Argument(hidden=True)] = True,
8895
):
8996
cfg_obj = SupervisorAirflowConfiguration.model_validate_json(cfg.read_text())
9097
client = SupervisorRemoteXMLRPCClient(cfg=cfg_obj)
9198
# TODO
9299
ret = client.getAllProcessInfo()
93-
log.info(ret)
94-
return Exit(0) if _exit else True
100+
for r in ret:
101+
log.info(r.model_dump_json())
102+
if check_running:
103+
meth = "running"
104+
else:
105+
meth = "ok"
106+
if all(getattr(p, meth)() for p in ret):
107+
log.info("all processes ok")
108+
return _raise_or_exit(True, _exit)
109+
log.info("processes not ok")
110+
return _raise_or_exit(False, _exit)
95111

96112

97113
def restart_programs(
@@ -107,7 +123,7 @@ def restart_programs(
107123
log.info(ret1)
108124
ret2 = client.startAllProcesses()
109125
log.info(ret2)
110-
return Exit(0) if _exit else True
126+
return _raise_or_exit(True, _exit)
111127

112128

113129
def stop_supervisor(
@@ -121,8 +137,8 @@ def stop_supervisor(
121137
not_running = _wait_or_while(until=lambda: not cfg_obj.running(), timeout=30)
122138
if not not_running:
123139
log.critical("Still running 30s after stop command!")
124-
return Exit(1) if _exit else False
125-
return Exit(0) if _exit else True
140+
return _raise_or_exit(False, _exit)
141+
return _raise_or_exit(True, _exit)
126142

127143

128144
def kill_supervisor(
@@ -136,8 +152,8 @@ def kill_supervisor(
136152
still_running = _wait_or_while(until=lambda: not cfg_obj.running(), timeout=30)
137153
if still_running:
138154
log.critical("Still running 30s after kill command!")
139-
return Exit(1) if _exit else False
140-
return Exit(0) if _exit else True
155+
return _raise_or_exit(False, _exit)
156+
return _raise_or_exit(True, _exit)
141157

142158

143159
def remove_supervisor_config(
@@ -152,14 +168,14 @@ def remove_supervisor_config(
152168
still_running = kill_supervisor(cfg_obj, _exit=False)
153169

154170
if still_running:
155-
return Exit(1) if _exit else True
171+
return _raise_or_exit(False, _exit)
156172

157173
# TODO move to config
158174
sleep(5)
159175

160176
# TODO make optional
161177
cfg_obj.rmdir()
162-
return Exit(0) if _exit else True
178+
return _raise_or_exit(True, _exit)
163179

164180

165181
def _add_to_typer(app, command: _SupervisorTaskStep, foo):

airflow_supervisor/client/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
from .xmlrpc import SupervisorRemoteXMLRPCClient
1+
from .xmlrpc import *

airflow_supervisor/client/xmlrpc.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66

77
from ..config import SupervisorAirflowConfiguration
88

9-
__all__ = ("SupervisorRemoteXMLRPCClient",)
9+
__all__ = ("SupervisorRemoteXMLRPCClient", "ProcessState", "SupervisorState", "SupervisorMethodResult", "ProcessInfo")
1010

1111

12-
class State(Enum):
12+
class ProcessState(Enum):
1313
STOPPED = 0
1414
STARTING = 10
1515
RUNNING = 20
@@ -66,7 +66,7 @@ class SupervisorMethodResult(Enum):
6666
class ProcessInfo(BaseModel):
6767
name: str
6868
group: str
69-
state: State
69+
state: ProcessState
7070
description: str
7171
start: datetime
7272
stop: datetime
@@ -78,6 +78,17 @@ class ProcessInfo(BaseModel):
7878
stderr_logfile: str
7979
pid: int
8080

81+
def running(self):
82+
return self.state in (ProcessState.STARTING, ProcessState.RUNNING, ProcessState.STOPPING)
83+
84+
def ok(self):
85+
return self.state in (
86+
ProcessState.STARTING,
87+
ProcessState.RUNNING,
88+
ProcessState.STOPPING,
89+
ProcessState.STOPPED,
90+
) or (self.state == ProcessState.EXITED and self.exitstatus == 0)
91+
8192

8293
class SupervisorRemoteXMLRPCClient(object):
8394
"""A light wrapper over the supervisor xmlrpc api: http://supervisord.org/api.html"""

airflow_supervisor/config/supervisor.py

+8-10
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,18 @@ def _setup_config_and_working_dir(self):
8787
else:
8888
self.working_dir = tempdir / f"supervisor-{now}"
8989
self.supervisord.directory = self.working_dir
90-
91-
# force pidfile to be in working dir if not otherwise set
92-
if not self.supervisord.pidfile:
93-
self.supervisord.pidfile = self.working_dir / "supervisord.pid"
94-
95-
# force logfile to be in working dir if not otherwise set
96-
if not self.supervisord.logfile:
97-
self.supervisord.logfile = self.working_dir / "supervisord.log"
98-
9990
using_default_working_dir = True
100-
10191
else:
10292
using_default_working_dir = False
10393

94+
# force pidfile to be in working dir if not otherwise set
95+
if not self.supervisord.pidfile:
96+
self.supervisord.pidfile = self.working_dir / "supervisord.pid"
97+
98+
# force logfile to be in working dir if not otherwise set
99+
if not self.supervisord.logfile:
100+
self.supervisord.logfile = self.working_dir / "supervisord.log"
101+
104102
if self.config_path == "":
105103
if using_default_working_dir:
106104
self.config_path = self.working_dir / "supervisor.cfg"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from datetime import datetime
2+
3+
from airflow_supervisor.client import ProcessInfo, ProcessState
4+
5+
6+
def _gen() -> ProcessInfo:
7+
return ProcessInfo(
8+
name="test",
9+
group="test",
10+
state=ProcessState.UNKNOWN,
11+
description="",
12+
start=datetime.now(),
13+
stop=datetime.now(),
14+
now=datetime.now(),
15+
spawner="",
16+
exitstatus=0,
17+
logfile="",
18+
stdout_logfile="",
19+
stderr_logfile="",
20+
pid=0,
21+
)
22+
23+
24+
def test_ok():
25+
x = _gen()
26+
x.state = ProcessState.RUNNING
27+
assert x.ok()
28+
x.state = ProcessState.EXITED
29+
x.exitstatus = 0
30+
assert x.ok()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from pathlib import Path
2+
3+
from airflow_supervisor import AirflowConfiguration, ProgramConfiguration, SupervisorAirflowConfiguration
4+
5+
if __name__ == "__main__":
6+
path = Path(__file__).parent.parent.parent
7+
cfg = SupervisorAirflowConfiguration(
8+
airflow=AirflowConfiguration(port="*:9090"),
9+
working_dir=path,
10+
path=path,
11+
program={
12+
"test": ProgramConfiguration(
13+
command="sleep 1 && exit 1",
14+
)
15+
},
16+
)
17+
print(cfg._pydantic_path)
18+
cfg._write_self()

0 commit comments

Comments
 (0)