-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_ogc_process.py
More file actions
315 lines (261 loc) · 11.4 KB
/
run_ogc_process.py
File metadata and controls
315 lines (261 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
"""
DAG with custom SPSOGCOperator that subclasses KubernetesPodOperator
for OGC process execution with SPS-specific functionality.
"""
import json
import logging
import re
from datetime import datetime
import requests
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator, get_current_context
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret as AirflowK8sSecret
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
from unity_sps_utils import POD_LABEL, POD_NAMESPACE, get_affinity
API_HOST = "https://api.uat.maap-project.org/api/"
def fetch_ogc_processes():
"""Fetch available processes from the OGC API and create mapping."""
try:
response = requests.get(API_HOST + "ogc/processes", timeout=30)
response.raise_for_status()
processes_data = response.json()
process_mapping = {}
dropdown_options = []
for process in processes_data.get("processes", []):
process_id = process.get("id")
process_version = process.get("version")
# Extract numerical ID from links
numerical_id = None
for link in process.get("links", []):
if link.get("rel") == "self":
href = link.get("href", "")
# Extract number from href like "/ogc/processes/7"
match = re.search(r"/processes/(\d+)$", href)
if match:
numerical_id = int(match.group(1))
break
if process_id and numerical_id:
display_name = f"{process_id}:{process_version}" if process_version else process_id
dropdown_options.append(display_name)
process_mapping[display_name] = numerical_id
return process_mapping, dropdown_options
except requests.RequestException as e:
logging.error(f"Failed to fetch processes: {e}")
# Return fallback mapping
return {"example-process:1.0": 1}, ["example-process:1.0"]
except Exception as e:
logging.error(f"Error processing OGC processes: {e}")
return {"example-process:1.0": 1}, ["example-process:1.0"]
# Constants
K8S_SECRET_NAME = "sps-app-credentials"
# This docker image is generated by the files in docker/run_ogc_process
DOCKER_IMAGE = "jplmdps/ogc-job-runner:v1.0.0"
PROCESS_MAPPING, DROPDOWN_OPTIONS = fetch_ogc_processes()
# SPS-specific secrets
secret_env_vars = [
AirflowK8sSecret(
deploy_type="env",
deploy_target="MAAP_PGT",
secret=K8S_SECRET_NAME,
key="MAAP_PGT",
)
]
class SPSOGCOperator(KubernetesPodOperator):
"""
Custom operator for SPS OGC process execution that subclasses KubernetesPodOperator.
This operator encapsulates all SPS-specific configuration and provides a clean
interface for OGC process submission and monitoring.
"""
def __init__(
self,
operation_type: str,
selected_process: str = None,
job_inputs: str = None,
job_queue: str = None,
job_id: str = None,
**kwargs,
):
"""
Initialize the SPSOGCOperator.
Args:
operation_type: Either "submit" or "monitor"
selected_process: Process selection for submit operations
job_inputs: JSON string of job inputs for submit operations
job_queue: Queue name for submit operations
job_id: Job ID for monitor operations
"""
self.operation_type = operation_type
self.selected_process = selected_process
self.job_inputs = job_inputs
self.job_queue = job_queue
self.job_id = job_id
# Set SPS-specific defaults
kwargs.setdefault("namespace", POD_NAMESPACE)
kwargs.setdefault("image", DOCKER_IMAGE)
kwargs.setdefault("service_account_name", "airflow-worker")
kwargs.setdefault("secrets", secret_env_vars)
kwargs.setdefault("in_cluster", True)
kwargs.setdefault("get_logs", True)
kwargs.setdefault("startup_timeout_seconds", 600)
kwargs.setdefault("container_security_context", {"privileged": True})
kwargs.setdefault("container_logs", True)
kwargs.setdefault("labels", {"pod": POD_LABEL})
kwargs.setdefault("annotations", {"karpenter.sh/do-not-disrupt": "true"})
kwargs.setdefault(
"affinity",
get_affinity(
capacity_type=["spot"],
anti_affinity_label=POD_LABEL,
),
)
kwargs.setdefault("on_finish_action", "keep_pod")
kwargs.setdefault("is_delete_operator_pod", False)
# Build operation-specific environment variables
if operation_type == "submit":
kwargs["env_vars"] = self._build_submit_env_vars()
kwargs["name"] = f"ogc-submit-pod-{kwargs.get('task_id', 'unknown')}"
kwargs.setdefault("do_xcom_push", True) # Submit tasks need to return job ID
elif operation_type == "monitor":
kwargs["env_vars"] = self._build_monitor_env_vars()
kwargs["name"] = f"ogc-monitor-pod-{kwargs.get('task_id', 'unknown')}"
else:
raise ValueError(f"Invalid operation_type: {operation_type}. Must be 'submit' or 'monitor'")
super().__init__(**kwargs)
def _build_submit_env_vars(self):
"""Build environment variables for job submission."""
# Resolve numerical process ID from selected process
numerical_process_id = self._resolve_process_id()
return [
k8s.V1EnvVar(
name="SUBMIT_JOB_URL",
value=API_HOST + "ogc/processes/{process_id}/execution",
),
k8s.V1EnvVar(name="PROCESS_ID", value=str(numerical_process_id)),
k8s.V1EnvVar(name="JOB_INPUTS", value=self.job_inputs or "{}"),
k8s.V1EnvVar(name="QUEUE", value=self.job_queue or "maap-dps-worker-cardamom"),
k8s.V1EnvVar(name="SUBMIT_JOB", value="true"),
]
def _build_monitor_env_vars(self):
"""Build environment variables for job monitoring."""
return [
k8s.V1EnvVar(
name="MONITOR_JOB_URL",
value=API_HOST + "ogc/jobs/{job_id}",
),
k8s.V1EnvVar(name="JOB_ID", value=self.job_id),
k8s.V1EnvVar(name="SUBMIT_JOB", value="false"),
]
def _resolve_process_id(self):
"""Resolve the selected process to a numerical process ID."""
if not self.selected_process:
raise ValueError("selected_process is required for submit operations")
# Handle templated values - they won't be resolved yet during __init__
if "{{" in str(self.selected_process):
# Return a template that will be resolved at runtime
return "{{ ti.xcom_pull(task_ids='Setup', key='return_value')['numerical_process_id'] }}"
# Direct lookup for non-templated values
numerical_id = PROCESS_MAPPING.get(self.selected_process)
if numerical_id is None:
self.log.warning(f"Process '{self.selected_process}' not found in mapping, defaulting to ID 1")
return 1
return numerical_id
def execute(self, context):
"""Execute the operator with additional SPS-specific logging."""
self.log.info(f"Starting SPS OGC {self.operation_type} operation")
if self.operation_type == "submit":
self.log.info(f"Selected process: {self.selected_process}")
self.log.info(f"Job queue: {self.job_queue}")
self.log.info(f"Job inputs: {self.job_inputs}")
elif self.operation_type == "monitor":
self.log.info(f"Monitoring job ID: {self.job_id}")
# Call parent execute method
result = super().execute(context)
self.log.info(f"SPS OGC {self.operation_type} operation completed")
return result
dag_default_args = {
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}
# --- DAG Definition ---
dag = DAG(
dag_id="run_ogc_process",
description="Submits a job to an OGC process and monitors (using custom SPSOGCOperator)",
dag_display_name="Run an OGC Process (Custom Operator from KubernetesPodOperator)",
tags=["ogc", "job", "custom-operator"],
is_paused_upon_creation=False,
catchup=False,
schedule=None,
max_active_runs=10,
default_args=dag_default_args,
params={
"selected_process": Param(
default=DROPDOWN_OPTIONS[0] if DROPDOWN_OPTIONS else "Error loading dropdown",
enum=DROPDOWN_OPTIONS,
title="Process Selection",
description="Select a process to execute.",
),
"queue": Param(
"maap-dps-worker-8gb",
type="string",
title="Queue",
description="The MAAP queue to submit the job to",
),
"job_inputs": Param(
"{}",
type="string",
title="Job Inputs",
description="A JSON string representing the inputs payload for the job.",
),
},
)
# --- Task Definitions ---
def setup(ti=None, **context):
"""Task that logs DAG parameters and process mapping information."""
logging.info("Starting OGC job submission and monitoring DAG (Custom Operator Version).")
logging.info(f"Parameters received: {context['params']}")
logging.info(f"Available processes: {len(DROPDOWN_OPTIONS)}")
logging.info(f"Process mapping: {json.dumps(PROCESS_MAPPING, indent=2)}")
context = get_current_context()
logging.info(f"DAG Run parameters: {json.dumps(context['params'], sort_keys=True, indent=4)}")
selected_process = context["params"].get("selected_process")
if selected_process in PROCESS_MAPPING:
numerical_id = PROCESS_MAPPING[selected_process]
logging.info(f"Selected process '{selected_process}' maps to numerical ID: {numerical_id}")
return {"numerical_process_id": numerical_id}
else:
logging.warning(f"Selected process '{selected_process}' not found in mapping")
return {"numerical_process_id": 1}
setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)
submit_job_task = SPSOGCOperator(
task_id="submit_job_task",
operation_type="submit",
selected_process="{{ params.selected_process }}",
job_inputs="{{ params.job_inputs }}",
job_queue="{{ params.queue }}",
dag=dag,
)
monitor_job_task = SPSOGCOperator(
task_id="monitor_job_task",
operation_type="monitor",
job_id="{{ ti.xcom_pull(task_ids='submit_job_task', key='return_value')['job_id'] }}",
dag=dag,
)
def cleanup(**context):
"""A placeholder cleanup task"""
logging.info("Cleanup executed.")
# Log final results if available
submit_result = context["ti"].xcom_pull(task_ids="submit_job_task", key="return_value")
monitor_result = context["ti"].xcom_pull(task_ids="monitor_job_task", key="return_value")
if submit_result:
logging.info(f"Job submission result: {submit_result}")
if monitor_result:
logging.info(f"Job monitoring result: {monitor_result}")
cleanup_task = PythonOperator(
task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE
)
chain(setup_task, submit_job_task, monitor_job_task, cleanup_task)