Skip to content

Commit 5a2b3ea

Browse files
authored
Merge pull request #42 from airflow-laminar/tkp/foo
Make supervisor independent of DAG logic for better integration in airflow-config
2 parents c77c06e + 6754120 commit 5a2b3ea

File tree

3 files changed

+118
-40
lines changed

3 files changed

+118
-40
lines changed

README.md

+80
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,86 @@ Here is a nice overview of the DAG, with annotations for code paths and the acti
3838

3939
More docs and code examples coming soon!
4040

41+
### Example Dag:
42+
43+
```python
44+
from airflow import DAG
45+
from datetime import timedelta, datetime
46+
from airflow_supervisor import (
47+
Supervisor,
48+
SupervisorAirflowConfiguration,
49+
ProgramConfiguration,
50+
AirflowConfiguration,
51+
)
52+
53+
54+
# Create supervisor configuration
55+
cfg = SupervisorAirflowConfiguration(
56+
airflow=AirflowConfiguration(port="*:9091"),
57+
working_dir="/data/airflow/supervisor",
58+
config_path="/data/airflow/supervisor/supervisor.conf",
59+
program={
60+
"test": ProgramConfiguration(
61+
command="bash -c 'sleep 14400; exit 1'",
62+
)
63+
},
64+
)
65+
66+
# Create DAG as normal
67+
with DAG(
68+
dag_id="test-supervisor",
69+
schedule=timedelta(days=1),
70+
start_date=datetime(2024, 1, 1),
71+
catchup=False,
72+
) as dag:
73+
74+
# Link supervisor config to dag
75+
supervisor = Supervisor(dag=dag, cfg=cfg)
76+
```
77+
78+
## Example DAG: [`airflow-config`](https://github.com/airflow-laminar/airflow-config)
79+
80+
81+
```yaml
82+
# @package _global_
83+
_target_: airflow_config.Configuration
84+
default_args:
85+
_target_: airflow_config.DefaultArgs
86+
retries: 0
87+
depends_on_past: false
88+
all_dags:
89+
_target_: airflow_config.DagArgs
90+
start_date: "2024-01-01"
91+
catchup: false
92+
extensions:
93+
supervisor:
94+
_target_: airflow_supervisor.SupervisorAirflowConfiguration
95+
airflow:
96+
_target_: airflow_supervisor.AirflowConfiguration
97+
port: "*:9091"
98+
working_dir: "/data/airflow/supervisor"
99+
config_path: "/data/airflow/supervisor/supervisor.conf"
100+
program:
101+
test:
102+
_target_: airflow_supervisor.ProgramConfiguration
103+
command: "bash -c 'sleep 14400; exit 1'"
104+
```
105+
106+
```python
107+
from datetime import timedelta
108+
from airflow_config import load_config, DAG
109+
from airflow_supervisor import Supervisor
110+
111+
config = load_config(config_name="airflow")
112+
113+
with DAG(
114+
dag_id="test-supervisor",
115+
schedule=timedelta(days=1),
116+
config=config,
117+
) as dag:
118+
supervisor = Supervisor(dag=dag, cfg=config.extensions["supervisor"])
119+
```
120+
41121
## How To: Use as a supervisord configuration frontend
42122
43123
This library can be used outside airflow as a generic supervisord configuration framework, with the static typing benefits that entails. For an example, look at the [hydra configuration test](./airflow_supervisor/tests/hydra/test_hydra.py). This example generates a supervisor configuration file by composing independent hydra configs.

airflow_supervisor/airflow/local.py

+34-38
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,23 @@
1313
__all__ = ("Supervisor",)
1414

1515

16-
class Supervisor(DAG):
17-
_supervisor_cfg: SupervisorAirflowConfiguration
18-
_supervisor_kill: DAG
19-
_supervisor_xmlrpc_client: SupervisorRemoteXMLRPCClient
16+
class Supervisor(object):
17+
_dag: DAG
18+
_cfg: SupervisorAirflowConfiguration
19+
_kill_dag: DAG
20+
_xmlrpc_client: SupervisorRemoteXMLRPCClient
2021

21-
def __init__(self, supervisor_cfg: SupervisorAirflowConfiguration, **kwargs):
22+
def __init__(self, dag: DAG, cfg: SupervisorAirflowConfiguration, **kwargs):
2223
# store config
23-
self._supervisor_cfg = supervisor_cfg
24-
self._supervisor_xmlrpc_client = kwargs.pop(
25-
"supervisor_xmlrpc_client", SupervisorRemoteXMLRPCClient(self._supervisor_cfg)
26-
)
24+
self._cfg = cfg
2725

28-
# setup role and tweak dag id
29-
if "dag_id" not in kwargs:
30-
kwargs["dag_id"] = list(self._supervisor_cfg.program.keys())[0]
26+
# store or create client
27+
self._xmlrpc_client = kwargs.pop("xmlrpc_client", SupervisorRemoteXMLRPCClient(self._cfg))
3128

32-
# override dag kwargs that dont make sense
33-
kwargs["catchup"] = False
34-
kwargs["concurrency"] = 1
35-
kwargs["max_active_tasks"] = 1
36-
kwargs["max_active_runs"] = 1
29+
# store dag
30+
self._dag = dag
3731

38-
# init with base DAG
39-
super().__init__(**kwargs)
32+
self.setup_dag()
4033

4134
# initialize tasks
4235
self.initialize_tasks()
@@ -52,6 +45,13 @@ def __init__(self, supervisor_cfg: SupervisorAirflowConfiguration, **kwargs):
5245
# Default non running
5346
PythonOperator(task_id="skip", python_callable=skip_) >> self._force_kill
5447

48+
def setup_dag(self):
49+
# override dag kwargs that dont make sense
50+
self._dag.catchup = False
51+
self._dag.concurrency = 1
52+
self._dag.max_active_tasks = 1
53+
self._dag.max_active_runs = 1
54+
5555
def initialize_tasks(self):
5656
# tasks
5757
self._configure_supervisor = self.get_step_operator(step="configure-supervisor")
@@ -99,37 +99,35 @@ def unconfigure_supervisor(self) -> Operator:
9999

100100
@property
101101
def supervisor_client(self) -> SupervisorRemoteXMLRPCClient:
102-
return SupervisorRemoteXMLRPCClient(self._supervisor_cfg)
102+
return SupervisorRemoteXMLRPCClient(self._cfg)
103103

104104
def get_base_operator_kwargs(self) -> Dict:
105-
return dict(dag=self)
105+
return dict(dag=self._dag)
106106

107107
def get_step_kwargs(self, step: SupervisorTaskStep) -> Dict:
108108
if step == "configure-supervisor":
109109
from .commands import write_supervisor_config
110110

111-
return dict(
112-
python_callable=lambda: write_supervisor_config(self._supervisor_cfg, _exit=False), do_xcom_push=True
113-
)
111+
return dict(python_callable=lambda: write_supervisor_config(self._cfg, _exit=False), do_xcom_push=True)
114112
elif step == "start-supervisor":
115113
from .commands import start_supervisor
116114

117115
return dict(
118-
python_callable=lambda: start_supervisor(self._supervisor_cfg._pydantic_path, _exit=False),
116+
python_callable=lambda: start_supervisor(self._cfg._pydantic_path, _exit=False),
119117
do_xcom_push=True,
120118
)
121119
elif step == "start-programs":
122120
from .commands import start_programs
123121

124-
return dict(python_callable=lambda: start_programs(self._supervisor_cfg, _exit=False), do_xcom_push=True)
122+
return dict(python_callable=lambda: start_programs(self._cfg, _exit=False), do_xcom_push=True)
125123
elif step == "stop-programs":
126124
from .commands import stop_programs
127125

128-
return dict(python_callable=lambda: stop_programs(self._supervisor_cfg, _exit=False), do_xcom_push=True)
126+
return dict(python_callable=lambda: stop_programs(self._cfg, _exit=False), do_xcom_push=True)
129127
elif step == "check-programs":
130128
from .commands import check_programs
131129

132-
def _check_programs(supervisor_cfg=self._supervisor_cfg, **kwargs) -> CheckResult:
130+
def _check_programs(supervisor_cfg=self._cfg, **kwargs) -> CheckResult:
133131
# TODO formalize
134132
if check_programs(supervisor_cfg, check_done=True, _exit=False):
135133
# finish
@@ -144,35 +142,33 @@ def _check_programs(supervisor_cfg=self._supervisor_cfg, **kwargs) -> CheckResul
144142
elif step == "restart-programs":
145143
from .commands import restart_programs
146144

147-
return dict(python_callable=lambda: restart_programs(self._supervisor_cfg, _exit=False), do_xcom_push=True)
145+
return dict(python_callable=lambda: restart_programs(self._cfg, _exit=False), do_xcom_push=True)
148146
elif step == "stop-supervisor":
149147
from .commands import stop_supervisor
150148

151-
return dict(python_callable=lambda: stop_supervisor(self._supervisor_cfg, _exit=False), do_xcom_push=True)
149+
return dict(python_callable=lambda: stop_supervisor(self._cfg, _exit=False), do_xcom_push=True)
152150
elif step == "unconfigure-supervisor":
153151
from .commands import remove_supervisor_config
154152

155-
return dict(
156-
python_callable=lambda: remove_supervisor_config(self._supervisor_cfg, _exit=False), do_xcom_push=True
157-
)
153+
return dict(python_callable=lambda: remove_supervisor_config(self._cfg, _exit=False), do_xcom_push=True)
158154
elif step == "force-kill":
159155
from .commands import kill_supervisor
160156

161-
return dict(python_callable=lambda: kill_supervisor(self._supervisor_cfg, _exit=False), do_xcom_push=True)
157+
return dict(python_callable=lambda: kill_supervisor(self._cfg, _exit=False), do_xcom_push=True)
162158
raise NotImplementedError
163159

164160
def get_step_operator(self, step: SupervisorTaskStep) -> Operator:
165161
if step == "check-programs":
166162
return HighAvailabilityOperator(
167163
**{
168-
"task_id": f"{self.dag_id}-{step}",
169-
"poke_interval": self._supervisor_cfg.check_interval.total_seconds(),
170-
"timeout": self._supervisor_cfg.check_timeout.total_seconds(),
164+
"task_id": f"{self._dag.dag_id}-{step}",
165+
"poke_interval": self._cfg.check_interval.total_seconds(),
166+
"timeout": self._cfg.check_timeout.total_seconds(),
171167
"mode": "poke",
172168
**self.get_base_operator_kwargs(),
173169
**self.get_step_kwargs(step),
174170
}
175171
)
176172
return PythonOperator(
177-
**{"task_id": f"{self.dag_id}-{step}", **self.get_base_operator_kwargs(), **self.get_step_kwargs(step)}
173+
**{"task_id": f"{self._dag.dag_id}-{step}", **self.get_base_operator_kwargs(), **self.get_step_kwargs(step)}
178174
)

airflow_supervisor/airflow/ssh.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from shlex import quote
22
from typing import Dict, List, Optional, Union
33

4+
from airflow.models.dag import DAG
45
from airflow.models.operator import Operator
56
from airflow.providers.ssh.hooks.ssh import SSHHook
67
from airflow.providers.ssh.operators.ssh import SSHOperator
@@ -17,7 +18,8 @@ class SupervisorSSH(Supervisor):
1718
# Mimic SSH Operator: https://airflow.apache.org/docs/apache-airflow-providers-ssh/stable/_api/airflow/providers/ssh/operators/ssh/index.html
1819
def __init__(
1920
self,
20-
supervisor_cfg: SupervisorAirflowConfiguration,
21+
dag: DAG,
22+
cfg: SupervisorAirflowConfiguration,
2123
command_prefix: str = "",
2224
command_noescape: str = "",
2325
ssh_hook: Optional[SSHHook] = None,
@@ -52,7 +54,7 @@ def __init__(
5254
self._ssh_operator_kwargs["banner_timeout"] = banner_timeout
5355
if skip_on_exit_code:
5456
self._ssh_operator_kwargs["skip_on_exit_code"] = skip_on_exit_code
55-
super().__init__(supervisor_cfg=supervisor_cfg, **kwargs)
57+
super().__init__(dag=dag, cfg=cfg, **kwargs)
5658

5759
def get_base_operator_kwargs(self) -> Dict:
5860
return dict(dag=self, **self._ssh_operator_kwargs)

0 commit comments

Comments
 (0)