diff --git a/src/drunc/data/process_manager/k8s-CERN.json b/src/drunc/data/process_manager/k8s-CERN.json index 8e5d5fbbb..3aff4e0be 100644 --- a/src/drunc/data/process_manager/k8s-CERN.json +++ b/src/drunc/data/process_manager/k8s-CERN.json @@ -33,9 +33,6 @@ "drunc_label": "drunc.daq" }, "home_path_base": "/nfs/home", - "connection_server": { - "name": "local-connection-server" - }, "service": { "headless_discovery_port": 80 }, diff --git a/src/drunc/data/process_manager/k8s.json b/src/drunc/data/process_manager/k8s.json index 00ddfefef..7e047b850 100644 --- a/src/drunc/data/process_manager/k8s.json +++ b/src/drunc/data/process_manager/k8s.json @@ -33,9 +33,6 @@ "drunc_label": "drunc.daq" }, "home_path_base": "/nfs/home", - "connection_server": { - "name": "local-connection-server" - }, "service": { "headless_discovery_port": 80 }, diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 2e57a2548..13f5ebdfc 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -35,7 +35,10 @@ DruncK8sNodeException, DruncK8sPodException, ) -from drunc.process_manager.configuration import PROCESS_SHUTDOWN_ORDERING +from drunc.process_manager.configuration import ( + PROCESS_SHUTDOWN_ORDERING, + ProcessManagerConfHandler, +) from drunc.process_manager.process_manager import ProcessManager from drunc.process_manager.utils import on_parent_exit, validate_k8s_session_name from drunc.utils.utils import get_logger, resolve_localhost_to_hostname @@ -43,12 +46,26 @@ class K8sPodWatcherThread(threading.Thread): def __init__(self, pm) -> None: + """ + Initialize the pod watcher thread that monitors and notifies on pod events. + + Args: + pm: The K8sProcessManager instance to watch. + """ threading.Thread.__init__(self) self.pm = pm self.daemon = True self.processed_uuids = set() def run(self) -> None: + """ + Run the pod watcher loop. + + Continuously watches for Kubernetes pod events across all namespaces + managed by the process manager. Detects terminal pod states (Succeeded, + Failed, or Deleted) and notifies the process manager of terminations. + Automatically restarts the watch stream on API errors or disconnections. + """ self.pm.log.info("K8sPodWatcherThread started") while True: try: @@ -131,18 +148,32 @@ def run(self) -> None: class K8sProcessManager(ProcessManager): - def __init__(self, configuration, **kwargs) -> None: + def __init__(self, configuration: ProcessManagerConfHandler, **kwargs) -> None: """ Manages processes as Kubernetes Pods. This ProcessManager interfaces with the Kubernetes API to start, stop, and monitor - applications running in Pods. It includes special handling for a local connectivity - service, which involves: - 1. Using a NodePort service for the orchestrator for external access. + applications running in Pods. + + Args: + configuration: The process manager configuration object containing image, + settings (labels, service, pod_management, volumes, cleanup, checking), + and other runtime parameters. + **kwargs: Additional keyword arguments passed to the parent ProcessManager. + + Raises: + ConfigException: If the Kubernetes configuration cannot be loaded. """ + + # Get the username for the session. This is needed as k8s does not pass the + # username through to the pod self.session = getpass.getuser() super().__init__(configuration=configuration, session=self.session, **kwargs) + + # Setup the loger self.log = get_logger("process_manager.k8s-process-manager") + # Validate that the host this process manager is running on is part of a + # kubernetes cluster try: config.load_kube_config() except ConfigException as e: @@ -154,12 +185,14 @@ def __init__(self, configuration, **kwargs) -> None: self.log.critical("----------------------------------------------") raise + # Set up the hooks to the k8s API, makes later setup easier self._k8s_client = client self._core_v1_api = client.CoreV1Api() self._meta_v1_api = client.V1ObjectMeta self._pod_spec_v1_api = client.V1PodSpec self._api_error_v1_api = client.rest.ApiException + # Storage for process orchestrator parameters self.managed_sessions = set() self.watchers = [] self._start_watcher() @@ -173,41 +206,40 @@ def __init__(self, configuration, **kwargs) -> None: self._host_cache = {} self._host_cache_lock = threading.Lock() - # Get settings from configuration + # Get settings from configuration JSON file + # Any comments following this one will relate to the parameters retrieved from + # the configuration file if the comment starts as "CONFIGURATION -" settings = getattr(self.configuration.data, "settings", {}) - # Labels + # CONFIGURATION - label defaults labels = settings.get("labels", {}) self.drunc_label = labels.get("drunc_label", "drunc.daq") - # Connection server - connection_server = settings.get("connection_server", {}) - self.connection_server_name = connection_server.get( - "name", "local-connection-server" - ) + # CONFIGURATION - connection server connection port numbers self.connection_server_port = None self.connection_server_node_port = None - # Service + # CONFIGURATION - per-pod service port number service = settings.get("service", {}) self.headless_discovery_port = service.get("headless_discovery_port", 80) - # Pod management + + # CONFIGURATION - pod startup management parameters pod_management = settings.get("pod_management", {}) self.kill_timeout = pod_management.get("kill_timeout", 30) self.pod_ready_timeout = pod_management.get("pod_ready_timeout", 60) - # Volume mounts - self.volume_configs = settings.get("volumes", []) - - # Home path configuration - self.home_path_base = settings.get("home_path_base", None) - - # Cleanup + # CONFIGURATION - restart cleanup parameters cleanup = settings.get("cleanup", {}) self.restart_cleanup_time = cleanup.get("restart_cleanup_time", 10.0) self.restart_cleanup_polling = cleanup.get("restart_cleanup_polling", 0.5) - # Checking + # CONFIGURATION - volume mounts + self.volume_configs = settings.get("volumes", []) + + # CONFIGURATION - home path definition + self.home_path_base = settings.get("home_path_base", None) + + # CONFIGURATION - timeouts and check parameters checking = settings.get("checking", {}) self.watcher_retry_sleep = checking.get("watcher_retry_sleep", 5) self.pod_status_check_sleep = checking.get("pod_status_check_sleep", 1) @@ -215,14 +247,12 @@ def __init__(self, configuration, **kwargs) -> None: self.grpc_startup_timeout = checking.get("grpc_startup_timeout", 30) self.socket_retry_timeout = checking.get("socket_retry_timeout", 1.0) - self.log.debug(f"Using kill_timeout of {self.kill_timeout} seconds.") - + # Get and print the list of active namespaces managed by drunc namespaces = self._core_v1_api.list_namespace( label_selector=f"creator.{self.drunc_label}={self.__class__.__name__}" ) namespace_names = [ns.metadata.name for ns in namespaces.items] namespace_list_str = "\n - ".join(namespace_names) - if namespace_list_str: self.log.info( f"Active namespaces created by drunc:\n - {namespace_list_str}" @@ -234,14 +264,27 @@ def __init__(self, configuration, **kwargs) -> None: self._setup_signal_handlers() def _start_watcher(self) -> None: - """Starts the background thread that watches for Pod status changes.""" + """ + Start the background pod watcher thread. + + Creates and starts a K8sPodWatcherThread daemon thread that monitors + pod lifecycle events and notifies the process manager of terminations. + The thread reference is stored in self.watchers. + """ self.log.debug("Starting K8s pod watcher thread") t = K8sPodWatcherThread(pm=self) t.start() self.watchers.append(t) def _setup_signal_handlers(self) -> None: - """Set up signal handlers to clean up pods when the process manager is terminated.""" + """ + Set up signal handlers to clean up pods when the process manager is terminated. + + Registers handlers for SIGTERM, SIGHUP, and SIGQUIT that trigger full + cleanup of all managed pods before exiting. Also attempts to configure + a parent death signal (Linux only) so that pods are cleaned up if the + parent process dies unexpectedly. + """ def signal_handler(signum, frame): self.log.info(f"Received signal {signum}, cleaning up all pods...") @@ -266,20 +309,37 @@ def signal_handler(signum, frame): f"Could not set parent death signal (may not be supported on this platform): {e}" ) - def notify_termination(self, proc_uuid, exit_code, reason, session) -> None: - """Callback for when a pod terminates.""" + def notify_termination( + self, proc_uuid: str, exit_code: int, reason: str, session: str + ) -> None: + """ + Callback for when a pod terminates. + + Updates the final exit code, broadcasts a status update, and signals + the termination_complete_event when all pending deletions are confirmed. + + Args: + proc_uuid: The UUID string of the terminated process. + exit_code: The integer exit code of the terminated pod. + reason: A string describing the termination reason (e.g. 'GracefulShutdown', 'PodDeleted'). + session: The Kubernetes namespace (session) the pod belonged to. + """ self.log.debug( f"notify_termination called for '{proc_uuid}'. Pending={self.uuids_pending_deletion}" ) + # Publish a log message and to kafka for each process that is terminated if proc_uuid in self.boot_request: + # Get the exit data, and compose a message for tty viewing self.final_exit_codes[proc_uuid] = exit_code - meta = self.boot_request[proc_uuid].process_description.metadata end_str = f"Pod '{meta.name}' (session: '{session}', user: '{meta.user}', uuid: {proc_uuid}) terminated with exit code {exit_code}. Reason: {reason}" + + # Publish this information self.log.info(end_str) self.broadcast(end_str, BroadcastType.SUBPROCESS_STATUS_UPDATE) + # Clear the list of processes being removed if proc_uuid in self.uuids_pending_deletion: self.uuids_pending_deletion.remove(proc_uuid) self.log.debug( @@ -289,28 +349,68 @@ def notify_termination(self, proc_uuid, exit_code, reason, session) -> None: self.log.debug("All pending pods terminated, setting event.") self.termination_complete_event.set() - def is_alive(self, podname, session) -> bool: - """Checks if a pod is currently in the 'Running' phase.""" + def is_alive(self, podname: str, session: str) -> bool: + """ + Checks if a pod is currently in the 'Running' phase. + + Args: + podname: The name of the pod to check. + session: The Kubernetes namespace (session) containing the pod. + + Returns: + True if the pod exists and its phase is 'Running', False otherwise. + """ + try: + # Attempt to get the pod status, if you can the pod is alive pod_status = self._core_v1_api.read_namespaced_pod_status(podname, session) return pod_status.status.phase == "Running" except self._api_error_v1_api as e: + # Error 404 implies that if pod is not found, i.e. it is not alive if e.status == 404: return False + # If some other exception occurs, the pod is not found and the cause of the + # exception is logged. self.log.error(f"Error checking status for pod {session}.{podname}: {e}") return False - def _add_label(self, obj_name, obj_type, key, label, session=None) -> None: - """Adds a label to a Kubernetes object (Pod or Namespace).""" + def _add_label( + self, + obj_name: str, + obj_type: str, + key: str, + label: str, + session: str | None = None, + ) -> None: + """ + Constructs a label in the format '{key}.{drunc_label}: {label}' and patches + the specified Kubernetes object. + + Args: + obj_name: The name of the Kubernetes object to label. + obj_type: The type of object, either 'pod' or 'namespace'. + key: The label key prefix (combined with drunc_label). + label: The label value to apply. + session: The Kubernetes namespace (required when obj_type is 'pod', + ignored for 'namespace'). + + Raises: + DruncK8sNamespaceException: If obj_type is 'pod' and session is not provided. + DruncK8sException: If obj_type is not 'pod' or 'namespace'. + """ + # Construct the body of the metadata to allocate to the object body = {"metadata": {"labels": {f"{key}.{self.drunc_label}": label}}} + # Allocated the metadata if obj_type == "pod": + # Ensure all required information has been provided for the pod if not session: raise DruncK8sNamespaceException( "Session (namespace) must be provided to label a pod." ) try: + # Add the label, and log the entry self._core_v1_api.patch_namespaced_pod( name=obj_name, namespace=session, body=body ) @@ -323,6 +423,7 @@ def _add_label(self, obj_name, obj_type, key, label, session=None) -> None: ) elif obj_type == "namespace": try: + # Add the label, and log the entry self._core_v1_api.patch_namespace(name=obj_name, body=body) self.log.info( f'Added label "{key}.{self.drunc_label}:{label}" to namespace "{obj_name}"' @@ -332,27 +433,99 @@ def _add_label(self, obj_name, obj_type, key, label, session=None) -> None: else: raise DruncK8sException(f"Cannot add label to object type: {obj_type}") - def _add_creator_label(self, obj_name, obj_type) -> None: - """Adds a 'creator' label to a Kubernetes object.""" + def _add_creator_label(self, obj_name: str, obj_type: str) -> None: + """ + Sets the label 'creator.{drunc_label}' to the class name on the given object. + + Args: + obj_name: The name of the Kubernetes object to label. + obj_type: The type of object, either 'pod' or 'namespace'. + """ self._add_label(obj_name, obj_type, "creator", self.__class__.__name__) def _get_creator_label_selector(self) -> str: - """Returns the label selector for objects created by this class.""" + """ + Returns the label selector for objects created by this class. + + Returns: + A label selector string in the format 'creator.{drunc_label}={class_name}'. + """ return f"creator.{self.drunc_label}={self.__class__.__name__}" - def _is_host_cached(self, host): - """Check if host is cached and not expired.""" + def _is_local_connection_server( + self, tree_labels: dict[str, str], podname: str + ) -> bool: + """ + Check if a pod is the local connection server by inspecting its role label and name. + + Args: + tree_labels: Dictionary of labels assigned to the pod (including role labels). + podname: The name of the pod. + + Returns: + True if the pod has the 'infrastructure-applications' role and + 'local-connection-server' appears in the pod name, False otherwise. + """ + role_key = f"role.{self.drunc_label}" + return ( + tree_labels.get(role_key) == "infrastructure-applications" + and "local-connection-server" in podname + ) + + def _is_root_controller(self, tree_labels: dict[str, str]) -> bool: + """ + Check if a pod is the root controller by inspecting its role label. + + Args: + tree_labels: Dictionary of labels assigned to the pod (including role labels). + + Returns: + True if the pod has the 'root-controller' role label, False otherwise. + """ + return tree_labels.get(f"role.{self.drunc_label}") == "root-controller" + + def _is_host_cached(self, host: str) -> None | bool: + """ + Check if host is cached and not expired. + + Args: + host: The hostname string to look up in the cache. + + Returns: + True if the host is cached and valid, False if cached and invalid, + or None if not in the cache or the cache entry has expired. + """ with self._host_cache_lock: + # If the host has not been cached already, ignore it if host not in self._host_cache: return None + + # Retrieve the currently stored metadata, validate that it has not expired is_valid, timestamp = self._host_cache[host] if time() - timestamp > self._host_cache_expiry: del self._host_cache[host] return None return is_valid - def _verify_host_in_cluster(self, target_host): - """Verifies that the target host is available in the Kubernetes cluster.""" + def _verify_host_in_cluster(self, target_host: str) -> bool: + """ + Verifies that the target host is available in the Kubernetes cluster. + + Checks the host cache first, then queries the Kubernetes API to confirm + the node exists, is Ready, and is schedulable. Caches the result for + future lookups. + + Args: + target_host: The hostname of the Kubernetes node to verify. + + Returns: + True if the host is available, Ready, and schedulable. + + Raises: + DruncK8sNodeException: If the host is not part of the cluster, not ready, or cordoned. + DruncK8sException: If there is a permission error or other API failure. + """ + # If the host has already been cached, check if it is valid and return that state cached = self._is_host_cached(target_host) if cached is not None: if cached: @@ -363,43 +536,62 @@ def _verify_host_in_cluster(self, target_host): f"Host '{target_host}' was previously verified as unavailable" ) + # The host has not already been checked, check it and assign the data to the + # cache try: - target_node = self._core_v1_api.read_node(name=target_host) # Check node is ready and schedulable + target_node = self._core_v1_api.read_node(name=target_host) is_ready = any( c.type == "Ready" and c.status == "True" for c in target_node.status.conditions or [] ) is_schedulable = not (target_node.spec and target_node.spec.unschedulable) + # Host is not usable, store this metadata, raise the exception if not is_ready or not is_schedulable: with self._host_cache_lock: self._host_cache[target_host] = (False, time()) reason = "not ready" if not is_ready else "cordoned" raise DruncK8sNodeException(f"Host '{target_host}' {reason}") + # Host is usable, store this information with self._host_cache_lock: self._host_cache[target_host] = (True, time()) self.log.info(f"Host '{target_host}' verified and available") return True except self._api_error_v1_api as e: + # If the host is not part of the cluster, this will raise the 404 if e.status == 404: with self._host_cache_lock: self._host_cache[target_host] = (False, time()) raise DruncK8sNodeException( f"Target host '{target_host}' is not part of the Kubernetes cluster" ) + # If permissions are denied elif e.status in [401, 403]: raise DruncK8sException( f"Permission denied accessing cluster to verify '{target_host}': {e}" ) + # Otherwise raise DruncK8sException(f"Failed to verify host '{target_host}': {e}") except Exception as e: raise DruncK8sException(f"Error verifying host '{target_host}': {e}") def _create_namespace_and_wait_for_active(self, session: str) -> None: - """Creates a namespace manifest, calls the API to create it, and waits for it to become Active.""" + """ + Constructs a V1Namespace with privileged pod-security enforcement, creates it + via the Kubernetes API, then polls until its phase becomes 'Active' (up to + restart_cleanup_time seconds). On success, applies the creator label and adds + the session to managed_sessions. + + Args: + session: The name of the Kubernetes namespace to create. + + Raises: + DruncK8sException: If there is an API error while reading the namespace status. + DruncK8sNamespaceException: If the namespace does not become Active within the timeout. + """ self.log.info(f'Creating "{session}" namespace.') namespace_manifest = client.V1Namespace( api_version="v1", @@ -433,7 +625,20 @@ def _create_namespace_and_wait_for_active(self, session: str) -> None: self.managed_sessions.add(session) def _prepare_namespace(self, session) -> None: - """Ensures a Kubernetes namespace exists, handling edge cases like terminating namespaces.""" + """ + If the namespace already exists and is in 'Terminating' state, waits for it to + be fully deleted before recreating it. If the namespace exists and is active, + raises an error. If it does not exist (404), creates it from scratch. + + Args: + session: The name of the Kubernetes namespace to prepare. + + Raises: + DruncK8sNamespaceException: If the namespace already exists and is active, or if + a terminating namespace does not complete deletion within the timeout. + DruncK8sException: If an unexpected API error occurs while checking or waiting + for the namespace. + """ if session in self.sessions_pending_deletion: self.sessions_pending_deletion.remove(session) @@ -484,7 +689,22 @@ def _prepare_namespace(self, session) -> None: raise DruncK8sException(f"Failed to check namespace '{session}': {e}") def _create_headless_service(self, podname, session, pod_uid) -> None: - """Creates a headless service for a pod.""" + """ + Create a headless Kubernetes Service for inter-pod DNS discovery. + + Builds and creates a headless Service (clusterIP=None) that selects the + pod by its 'app' label. The service is owned by the pod via an + OwnerReference so it is automatically garbage-collected when the pod + is deleted. Silently ignores 409 Conflict errors (service already exists). + + Args: + podname - the name of the pod (also used as the service name) + session - the Kubernetes namespace (session) to create the service in + pod_uid - the UID of the owning pod for the OwnerReference + + Raises: + DruncK8sException - if the service creation fails with a non-409 error + """ service_manifest = client.V1Service( api_version="v1", kind="Service", @@ -524,7 +744,23 @@ def _create_headless_service(self, podname, session, pod_uid) -> None: self.log.error(f"Failed to create headless service for {podname}: {e}") def _create_nodeport_service(self, podname, session, pod_uid) -> None: - """Creates a NodePort service for the connection server (external + internal access).""" + """ + Create a NodePort Kubernetes Service for external access. + + Builds and creates a NodePort Service with externalTrafficPolicy=Local, + mapping the connection_server_port to a fixed NodePort + (connection_server_node_port). The service is owned by the pod via an + OwnerReference. Raises a DruncK8sException if the NodePort is already + allocated or another API error occurs. + + Args: + podname - the name of the pod (also used as the service name) + session - the Kubernetes namespace (session) to create the service in + pod_uid - the UID of the owning pod for the OwnerReference + + Raises: + DruncK8sException - if the NodePort is already in use or another API error occurs + """ service_manifest = client.V1Service( api_version="v1", kind="Service", @@ -597,6 +833,19 @@ def _get_pod_volumes_and_mounts( """ Prepares all pod volumes and container mounts, including static configs and the dynamic data_mount. + + Assembles volumes from JSON configuration, auto-mounts the user's home + directory if configured, adds a log mount from process_logs_path, and + attaches the data_mount from the boot request's process restriction. + + Args: + boot_request: The BootRequest containing process description (for log path) + and process restriction (for data_mount path). + + Returns: + A tuple of (pod_volumes, container_volume_mounts) where pod_volumes is a + list of V1Volume objects and container_volume_mounts is a list of + V1VolumeMount objects. """ pod_volumes = [] container_volume_mounts = [] @@ -729,6 +978,17 @@ def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]: """ Determines the role of a pod based on its tree_id, and returns a dictionary of labels to be applied. + + Role mapping: tree_id '0' -> root-controller, depth 0 -> infrastructure-applications, + depth 1 -> segment-controller, depth 2 -> application, otherwise 'unknown'. + + Args: + tree_id: The dot-separated tree identifier string (e.g. '0', '1', '0.1', '0.1.2'). + podname: The name of the pod (used for logging). + + Returns: + A dictionary of labels containing 'tree-id.{drunc_label}' and + 'role.{drunc_label}' keys with their corresponding values. """ role = "unknown" @@ -738,12 +998,12 @@ def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]: role = "unknown" elif tree_id == "0": role = "root-controller" - elif tree_id == "1": - role = "local-connection-server" else: # Count the depth depth = tree_id.count(".") - if depth == 1: + if depth == 0: + role = "infrastructure-applications" + elif depth == 1: role = "segment-controller" elif depth == 2: role = "application" @@ -757,15 +1017,36 @@ def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]: def _build_container_env( self, boot_request: BootRequest, tree_labels: dict[str, str] ) -> list[client.V1EnvVar]: - """Builds the list of environment variables for the container.""" - env_vars = boot_request.process_description.env + """ + Builds the list of environment variables for the container. + Sets USER and HOME based on the boot request or host configuration, + defaults DOTDRUNC if not provided, and adds POD_IP via the Kubernetes + Downward API for root-controller pods. + + Args: + boot_request: The BootRequest containing the process description with + environment variables and user metadata. + tree_labels: Dictionary of labels assigned to the pod (used to determine + if POD_IP should be injected). + + Returns: + A list of V1EnvVar objects representing the container environment variables. + """ + env_vars = boot_request.process_description.env + username_bq = boot_request.process_description.metadata.user host_username = None - if "USER" not in env_vars or self.home_path_base: + + if username_bq is not None: + env_vars["USER"] = username_bq + self.log.debug( + f"Setting USER environment variable from boot request: {username_bq}" + ) + elif self.home_path_base: host_username = self._get_host_username() # Only set USER if not already present in environment - if "USER" not in env_vars and host_username: + if username_bq is None and host_username: env_vars["USER"] = host_username self.log.debug(f"Setting USER environment variable to: {host_username}") @@ -784,7 +1065,7 @@ def _build_container_env( container_env = [client.V1EnvVar(name=k, value=v) for k, v in env_vars.items()] # Add POD_IP environment variable via Downward API for root-controller - if "root-controller" in tree_labels["role." + self.drunc_label]: + if self._is_root_controller(tree_labels): pod_ip_env = client.V1EnvVar( name="POD_IP", value_from=client.V1EnvVarSource( @@ -806,7 +1087,27 @@ def _build_pod_main_container( container_volume_mounts: list[client.V1VolumeMount], tree_labels: dict[str, str], ) -> client.V1Container: - """Builds the primary V1Container manifest, including command and preStop hook.""" + """ + Build the primary pod container manifest from a boot request. + + Parse the executable and arguments, prepend 'exec' to the final C++ + application command, expose the connectivity service port for LCS pods, + add preStop hooks to send SIGQUIT to daq_applications, redirect log + output to file via tee, add signal traps for the local connectivity + service (gunicorn), replace hostnames with $POD_IP for root controllers, + and assemble the final V1Container with environment, security context, + and volume mounts. + + Args: + podname - name of the pod to generate + boot_request - definition of the environment and executable to run + lcs_port - port number of the local connectivity service (None if not LCS) + container_volume_mounts - list of volumes to mount in this container + tree_labels - the labels defining the application tree ID and role + + Returns: + main_container - the fully configured V1Container object + """ pod_image = self.configuration.data.image exec_and_args_list = boot_request.process_description.executable_and_arguments @@ -820,14 +1121,16 @@ def _build_pod_main_container( if ( is_last_command and e_and_a.exec != "source" - and self.connection_server_name - not in tree_labels["role." + self.drunc_label] + and not self._is_local_connection_server(tree_labels, podname) ): prefix = "exec " - if "root-controller" in tree_labels["role." + self.drunc_label]: + if self._is_root_controller(tree_labels): # Replace hostname with $POD_IP environment variable in protocol://hostname:port addresses # POD_IP will be injected via Kubernetes Downward API + # The other pods need to use the pod IP to connect to the root-controller + # This is because the root-controller uses NodePort and can not use host network + # The other pods use Headless and can use host network modified_args = [] for arg in e_and_a.args: modified_arg = re.sub( @@ -848,20 +1151,19 @@ def _build_pod_main_container( container_ports = [] if ( - self.connection_server_name in tree_labels["role." + self.drunc_label] + self._is_local_connection_server(tree_labels, podname) and lcs_port is not None ): + self.connection_server_name = podname container_ports.append( client.V1ContainerPort(container_port=lcs_port, name="http-port") ) # Only add preStop hook for C++ applications (non-controllers) lifecycle_hook = None - if ( - "controller" not in tree_labels["role." + self.drunc_label] - and self.connection_server_name - not in tree_labels["role." + self.drunc_label] - ): + if "controller" not in tree_labels[ + "role." + self.drunc_label + ] and not self._is_local_connection_server(tree_labels, podname): self.log.debug( f"'{podname}' identified as a C++ app, adding preStop hook with SIGQUIT." ) @@ -890,7 +1192,7 @@ def _build_pod_main_container( else: log_redirect_cmd = "" - if self.connection_server_name in tree_labels["role." + self.drunc_label]: + if self._is_local_connection_server(tree_labels, podname): # LCS (gunicorn) needs a shell trap to handle SIGTERM grace final_command_args = ( f"{log_redirect_cmd} " @@ -924,7 +1226,22 @@ def _build_pod_main_container( def _get_pod_node_selector( self, podname: str, restriction: ProcessRestriction ) -> dict: - """Verifies the target host and returns the Kubernetes node selector.""" + """ + Build the Kubernetes node selector for a pod based on host restrictions. + + If the boot request specifies allowed hosts, resolves 'localhost' to + the actual hostname, verifies the target host is available in the + cluster, and returns a node selector dictionary. Returns an empty + dictionary if no host restriction is specified. + + Args: + podname - the name of the pod (used for logging) + restriction - the ProcessRestriction containing allowed_hosts + + Returns: + node_selector - a dictionary for the pod spec's nodeSelector field + (e.g. {'kubernetes.io/hostname': 'node-01'}) + """ node_selector = {} if restriction.allowed_hosts: target_host = restriction.allowed_hosts[0] @@ -946,10 +1263,26 @@ def _get_pod_node_selector( def _get_pod_host_aliases( self, podname: str, session: str, tree_labels: dict[str, str] ) -> list[client.V1HostAlias] | None: - """Gets the ClusterIP of the connection server and prepares host aliases.""" + """ + Build host aliases to redirect localhost to the connection server ClusterIP. + + For non-LCS pods when a local connection server is booted, retrieves the + connection server's ClusterIP and creates a host alias mapping 'localhost' + to that IP. This allows pods to reach the connection server via localhost. + Retries up to 10 times if the ClusterIP is not immediately available. + + Args: + podname - the name of the pod (used for logging) + session - the Kubernetes namespace (session) to look up the service in + tree_labels - the labels defining the application tree ID and role + + Returns: + host_aliases - a list containing a single V1HostAlias mapping localhost + to the connection server IP, or None if not applicable + """ host_aliases = None if ( - self.connection_server_name not in tree_labels["role." + self.drunc_label] + not self._is_local_connection_server(tree_labels, podname) and self.local_connection_server_is_booted ): connection_server_ip = None @@ -978,14 +1311,26 @@ def _determine_service_type( self, podname: str, boot_request: BootRequest, tree_labels: dict[str, str] ) -> str: """ - Determines the correct K8s service type for a pod ("NodePort" or "Headless"). - This logic is centralized here to be used by both pod creation (for hostNetwork) - and service creation. + Determine the correct Kubernetes service type for a pod. + + Centralizes the service type decision used by both pod creation (for + hostNetwork configuration) and service creation. The local connection + server always uses NodePort; the root controller uses NodePort if a + valid port is extracted from the command, otherwise falls back to + Headless. All other pods use Headless. + + Args: + podname - the name of the pod (used for logging) + boot_request - the boot request to extract port information from + tree_labels - the labels defining the application tree ID and role + + Returns: + service_type - either "NodePort" or "Headless" """ - if self.connection_server_name in tree_labels["role." + self.drunc_label]: + if self._is_local_connection_server(tree_labels, podname): return "NodePort" - if "root-controller" in tree_labels["role." + self.drunc_label]: + if self._is_root_controller(tree_labels): port = self._extract_port_from_cmd(boot_request) if port is not None and port != 0: return "NodePort" @@ -999,7 +1344,15 @@ def _determine_service_type( return "Headless" def _get_host_username(self) -> str: - """Resolves the username of the user running the process manager.""" + """ + Resolves the username of the user running the process manager. + + Tries getpass.getuser() first, then falls back to pwd lookup by UID, + and finally returns the numeric UID as a string if both fail. + + Returns: + The resolved username string, or the numeric UID as a string on failure. + """ try: return getpass.getuser() except KeyError: @@ -1021,7 +1374,26 @@ def _build_pod_manifest( extra_labels: dict[str, str] | None = None, use_host_network: bool = True, ) -> client.V1Pod: - """Assembles the final V1Pod object.""" + """ + Assemble the final V1Pod manifest from its component parts. + + Combines the main container, node selector, host aliases, volumes, + and labels into a complete V1Pod object with the configured + termination grace period and restart policy. + + Args: + podname - the name of the pod + session - the Kubernetes namespace (session) for the pod + main_container - the pre-built V1Container for the pod + node_selector - dictionary for node scheduling constraints + host_aliases - optional list of V1HostAlias entries for DNS overrides + pod_volumes - list of V1Volume objects to attach to the pod + extra_labels - optional additional labels to merge into the pod metadata + use_host_network - whether to enable hostNetwork on the pod (default True) + + Returns: + pod - the fully assembled V1Pod object ready for creation + """ # Get pod labels pod_labels = { @@ -1053,7 +1425,23 @@ def _build_pod_manifest( def _execute_pod_creation_api( self, session: str, podname: str, pod_manifest: client.V1Pod ) -> str: - """Executes the API call to create the pod, handling 409 conflict during restarts.""" + """ + Attempts to create the pod via the API. If a 409 Conflict error occurs + (indicating a previous pod with the same name has not yet been fully + deleted), retries with polling until restart_cleanup_time is exceeded. + + Args: + session - the Kubernetes namespace (session) to create the pod in + podname - the name of the pod to create + pod_manifest - the fully assembled V1Pod manifest + + Returns: + pod_uid - the UID string of the newly created pod + + Raises: + DruncK8sException - if the 409 conflict persists beyond the timeout + ApiException - if a non-409 API error occurs + """ start_time = time() while True: @@ -1092,9 +1480,27 @@ def _create_associated_service( service_type: str, tree_labels: dict[str, str], ) -> None: - """Calls the appropriate service creation method based on pod type.""" + """ + Routes to _create_nodeport_service or _create_headless_service based + on the determined service_type. For NodePort services, handles both + the local connection server case (using the pre-extracted lcs_port) + and the root controller case (extracting the port from the boot request). + Falls back to headless if the root controller port cannot be determined. + + Args: + podname - the name of the pod (also used as the service name) + session - the Kubernetes namespace (session) to create the service in + pod_uid - the UID of the owning pod for the OwnerReference + boot_request - the boot request (used to extract ports for root controller) + lcs_port - the port number for the local connectivity service (None if not LCS) + service_type - either "NodePort" or "Headless" + tree_labels - the labels defining the application tree ID and role + + Raises: + DruncK8sException - if LCS service creation is requested but lcs_port is None + """ if service_type == "NodePort": - if self.connection_server_name in tree_labels["role." + self.drunc_label]: + if self._is_local_connection_server(tree_labels, podname): if lcs_port is None: raise DruncK8sException( "LCS service creation failed: port was not extracted." @@ -1102,7 +1508,7 @@ def _create_associated_service( # This call uses class variables set in _create_pod self._create_nodeport_service(podname, session, pod_uid) - elif "root-controller" in tree_labels["role." + self.drunc_label]: + elif self._is_root_controller(tree_labels): self.log.info( f"'{podname}' is the root controller, checking for NodePort service." ) @@ -1128,11 +1534,26 @@ def _create_associated_service( def _create_pod( self, podname, session, boot_request: BootRequest, tree_labels: dict[str, str] ) -> None: - """Constructs and creates a Kubernetes Pod manifest and its associated service.""" + """ + Orchestrates the full pod creation pipeline: extracts the LCS port if + applicable, prepares volume mounts, builds the main container manifest, + determines the service type and hostNetwork setting, constructs the node + selector and host aliases, assembles the pod manifest, creates the pod + via the API, and creates the associated service (NodePort or Headless). + + Args: + podname - the name of the pod to create + session - the Kubernetes namespace (session) to create the pod in + boot_request - the boot request defining the executable, environment, and restrictions + tree_labels - the labels defining the application tree ID and role + + Raises: + DruncK8sException - if pod or service creation fails for any reason + """ try: lcs_port = None # Early Port Extraction and Class Variable Setup for LCS - if self.connection_server_name in tree_labels["role." + self.drunc_label]: + if self._is_local_connection_server(tree_labels, podname): lcs_port = self._extract_port_from_cmd(boot_request) if lcs_port: self.connection_server_port = lcs_port @@ -1214,8 +1635,20 @@ def _create_pod( f"Failed to create pod '{session}.{podname}': {e}" ) from e - def _get_connection_server_cluster_ip(self, session) -> str: - """Gets the ClusterIP of the connection server service.""" + def _get_connection_server_cluster_ip(self, session: str) -> str: + """ + Get the ClusterIP of the connection server's Kubernetes Service. + + Reads the named service from the session namespace and returns its + clusterIP. Returns None if the service cannot be found or an API + error occurs. + + Args: + session - the Kubernetes namespace (session) containing the service + + Returns: + cluster_ip - the ClusterIP string, or None on failure + """ try: service = self._core_v1_api.read_namespaced_service( name=self.connection_server_name, namespace=session @@ -1228,7 +1661,17 @@ def _get_connection_server_cluster_ip(self, session) -> str: def _extract_port_from_cmd(self, boot_request) -> int | None: """ Parses the boot request's command arguments to find a port. + It must cover Gunicorn (hardcoded and env var) and drunc-controller. + Checks for gunicorn --bind syntax (both hardcoded ports and environment + variable references), drunc-controller --port syntax, and drunc-controller + -c grpc://host:port syntax. + + Args: + boot_request: The BootRequest containing executable_and_arguments to parse. + + Returns: + The extracted port as an integer, or None if no valid port is found. """ # Check all command parts for a port argument for e_and_a in boot_request.process_description.executable_and_arguments: @@ -1304,8 +1747,19 @@ def _get_process_uid(self, query: ProcessQuery, order_by: str = None) -> list[st """ Finds process UUIDs matching a query. - If order_by is "leaf_first", it sorts the UUIDs so that child processes - (which have a longer tree_id) come before their parents. + Searches all stored boot requests for processes matching the query criteria + (UUIDs, names, session, user). An empty query matches all processes. If + order_by is "leaf_first", sorts the UUIDs so that child processes (which + have a longer tree_id) come before their parents. + + Args: + query: A ProcessQuery protobuf with optional uuids, names, session, and user + filters. + order_by: Optional sorting mode. Use 'leaf_first' to sort by tree depth + (deepest first). Defaults to None (unsorted). + + Returns: + A list of UUID strings matching the query, optionally sorted by tree depth. """ initial_match = set() for proc_uuid, boot_req in self.boot_request.items(): @@ -1339,7 +1793,19 @@ def _get_process_uid(self, query: ProcessQuery, order_by: str = None) -> list[st return sorted_uuids def _logs_impl(self, log_request: LogRequest) -> LogLines: - """Handles the 'logs' command.""" + """ + Handles the 'logs' command. + + Resolves the target process from the query, retrieves the pod's log tail + from the Kubernetes API, and returns the lines. + + Args: + log_request: A LogRequest protobuf containing the query to identify the + process and how_far (number of tail lines to retrieve). + + Returns: + A LogLines protobuf containing the process UUID and its log lines. + """ uuids = self._get_process_uid(log_request.query) uuid = self._ensure_one_process(uuids, in_boot_request=True) podname = self.boot_request[uuid].process_description.metadata.name @@ -1358,7 +1824,17 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: ) def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: - """Handles the 'boot' command from the gRPC interface.""" + """ + Handles the 'boot' command from the gRPC interface. + + Generates a new UUID and delegates to __boot to create the pod. + + Args: + boot_request: A BootRequest protobuf defining the process to start. + + Returns: + A ProcessInstanceList containing a single ProcessInstance for the booted process. + """ self.log.debug(f"{self.name} running boot command") this_uuid = str(uuid.uuid4()) process = self.__boot(boot_request, this_uuid) @@ -1367,7 +1843,17 @@ def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: def _run_pre_boot_checks( self, session: str, podname: str, boot_request: BootRequest ) -> None: - """Performs initial validation.""" + """ + Validates that the session name conforms to Kubernetes RFC1123 label rules. + + Args: + session: The Kubernetes namespace (session) name to validate. + podname: The name of the pod to boot (reserved for future checks). + boot_request: The BootRequest protobuf (reserved for future checks). + + Raises: + DruncK8sNamespaceException: If the session name is not a valid RFC1123 label. + """ if not validate_k8s_session_name(session): raise DruncK8sNamespaceException( f'Invalid session/namespace name "{session}". Must match RFC1123 label: ' @@ -1378,10 +1864,20 @@ def _wait_for_pod_api_ready( self, podname: str, session: str, timeout: float ) -> str: """ - [HELPER] Blocking wait for a pod to be 'Running' and 'Ready' - in the K8s API. - Returns the node_name on success. - Raises DruncK8sException on timeout. + Polls the pod status at pod_status_check_sleep intervals until the pod's + phase is 'Running' and its 'Ready' condition is 'True', or the timeout + is exceeded. + + Args: + podname: The name of the pod to wait for. + session: The Kubernetes namespace (session) containing the pod. + timeout: Maximum number of seconds to wait before raising an exception. + + Returns: + The node_name string where the pod is running on success. + + Raises: + DruncK8sException: If the pod does not become API Ready within the timeout. """ self.log.info( f"Stage 1: Waiting for '{podname}' pod to be Running and Ready..." @@ -1425,8 +1921,15 @@ def _wait_for_pod_api_ready( def _wait_for_nodeport_http_ready(self, url: str, timeout: float) -> None: """ - [HELPER] Blocking wait for a NodePort URL to be reachable via HTTP. - Raises DruncK8sException on timeout. + Polls the URL at pod_status_check_sleep intervals using urllib until a + successful HTTP response is received, or the timeout is exceeded. + + Args: + url: The full HTTP URL to poll (e.g. 'http://node-01:31000'). + timeout: Maximum number of seconds to wait before raising an exception. + + Raises: + DruncK8sException: If the URL does not become reachable within the timeout. """ self.log.info(f"Stage 2: Waiting for NodePort {url} to be reachable...") start_time = time() @@ -1453,8 +1956,17 @@ def _wait_for_nodeport_tcp_ready( self, node_name: str, port: int, timeout: float ) -> None: """ - [HELPER] Blocking wait for a NodePort to be reachable via TCP socket. - Raises DruncK8sException on timeout. + Polls the node_name:port combination at pod_status_check_sleep intervals + using a TCP socket connect until a connection succeeds, or the timeout + is exceeded. + + Args: + node_name: The hostname of the Kubernetes node to connect to. + port: The NodePort number to test connectivity on. + timeout: Maximum number of seconds to wait before raising an exception. + + Raises: + DruncK8sException: If the NodePort does not become reachable within the timeout. """ self.log.info( f"Stage 2: Waiting for NodePort {node_name}:{port} to be reachable..." @@ -1493,7 +2005,20 @@ def _wait_for_nodeport_tcp_ready( ) def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: - """Blocking two-stage wait for the Local Connection Server (NodePort) to be fully ready.""" + """ + Perform a two-stage blocking wait for the Local Connection Server to be fully ready. + + Stage 1: waits for the pod to be Running and Ready in the Kubernetes API. + Stage 2: waits for the NodePort to be externally reachable via HTTP. + Sets local_connection_server_is_booted to True on success. + + Args: + podname - the name of the LCS pod to wait for + session - the Kubernetes namespace (session) of the pod + + Raises: + DruncK8sException - if either stage times out within pod_ready_timeout seconds + """ self.log.info(f"Waiting for LCS '{podname}' to be fully ready...") start_time = time() total_timeout = self.pod_ready_timeout @@ -1521,7 +2046,22 @@ def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: def _wait_for_controller_readiness( self, podname: str, session: str, boot_request: BootRequest ) -> None: - """Blocking two-stage wait for Drunc Controller (NodePort) to be fully ready.""" + """ + Perform a two-stage blocking wait for the Drunc Controller to be fully ready. + + Stage 1: waits for the pod to be Running and Ready in the Kubernetes API + (up to pod_ready_timeout seconds). + Stage 2: waits for the NodePort to be reachable via TCP socket connection + (up to grpc_startup_timeout seconds). + + Args: + podname - the name of the controller pod to wait for + session - the Kubernetes namespace (session) of the pod + boot_request - the boot request used to extract the controller port + + Raises: + DruncK8sException - if the port is 0 or missing, or if either stage times out + """ self.log.info( f"Waiting for controller '{podname}' (NodePort) to become ready..." ) @@ -1546,7 +2086,19 @@ def _wait_for_controller_readiness( def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: """ - Internal boot method. Handles pre-checks, pod creation, and blocking wait for critical services. + Internal boot method for creating a pod and waiting for critical services. + + Orchestrates the full boot sequence: determines tree labels and roles, + runs pre-boot validation, prepares the namespace, stores the boot request, + creates the pod and its service, adds the UUID label, and performs + blocking readiness waits for the LCS or root controller if applicable. + + Args: + boot_request - the BootRequest protobuf defining the process to start + uuid - the UUID string to assign to this process + + Returns: + process_instance - a ProcessInstance protobuf with RUNNING status """ session = boot_request.process_description.metadata.session podname = boot_request.process_description.metadata.name @@ -1566,9 +2118,9 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: self.log.info(f'"{session}.{podname}":{uuid} boot request sent.') # Special handling and blocking wait for critical processes - if self.connection_server_name in tree_labels["role." + self.drunc_label]: + if self._is_local_connection_server(tree_labels, podname): self._wait_for_lcs_readiness(podname, session) - elif "root-controller" in tree_labels["role." + self.drunc_label]: + elif self._is_root_controller(tree_labels): self._wait_for_controller_readiness(podname, session, boot_request) # Post-Process @@ -1588,7 +2140,20 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: ) def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: - """Handles the 'ps' command.""" + """ + Handles the 'ps' command. + + Queries matching process UUIDs, fetches their current pod status from + the Kubernetes API, and builds a list of ProcessInstance entries with + the live status code, return code, and hostname. + + Args: + query: A ProcessQuery protobuf specifying which processes to list. + + Returns: + A ProcessInstanceList containing a ProcessInstance for each matched process, + with status set to RUNNING or DEAD and optional return_code. + """ queried_uuids = self._get_process_uid(query) if not queried_uuids: return ProcessInstanceList(values=[]) @@ -1651,7 +2216,23 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: return ProcessInstanceList(values=ret) def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: - """Handles the 'restart' command.""" + """ + Handles the 'restart' command. + + Kills each matched process and re-boots it using the original boot request. + Handles race conditions where a pod may be in a terminal state but not yet + fully deleted. Failed restarts are included in the result with DEAD status. + + Args: + query: A ProcessQuery specifying which processes to restart. + + Returns: + A ProcessInstanceList containing a ProcessInstance for each process, + with RUNNING status on success or DEAD status on failure. + + Raises: + DruncK8sPodException: If no processes match the query. + """ uuids = self._get_process_uid(query) if not uuids: raise DruncK8sPodException("No processes found matching the query.") @@ -1724,7 +2305,21 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: return ProcessInstanceList(values=ret) def _kill_pod(self, podname, session, grace_period_seconds=None) -> None: - """Deletes a specific pod from a namespace.""" + """ + Deletes a specific pod from a namespace. + + Calls the Kubernetes API to delete the named pod. Silently ignores 404 + errors (pod already deleted). + + Args: + podname: The name of the pod to delete. + session: The Kubernetes namespace (session) containing the pod. + grace_period_seconds: Optional override for the termination grace period + in seconds. None uses the pod's configured default. + + Raises: + DruncK8sException: If a non-404 API error occurs during deletion. + """ try: self._core_v1_api.delete_namespaced_pod( name=podname, @@ -1739,8 +2334,21 @@ def _kill_pod(self, podname, session, grace_period_seconds=None) -> None: def _kill_impl(self, query: ProcessQuery) -> ProcessInstanceList: """ - Handles the 'kill' command with staged, role-based shutdown - by querying pod labels. + Handle the 'kill' gRPC command with staged, role-based shutdown. + + Performs an ordered shutdown of matched processes by their role labels: + unknown → application → segment-controller → root-controller → + infrastructure-applications. Each stage issues delete requests and + blocks until the watcher thread confirms all pods in that stage have + terminated (or a timeout is reached). After all pods are killed, + cleans up managed namespaces if no tracked processes remain. + + Args: + query - a ProcessQuery specifying which processes to kill + + Returns: + process_list - a ProcessInstanceList with DEAD status and exit codes + for all terminated processes """ # Get all UUIDs @@ -1803,7 +2411,7 @@ def kill_and_wait(uuids, grace_period=None) -> None: "application": [], "segment-controller": [], "root-controller": [], - "local-connection-server": [], + "infrastructure-applications": [], } uuid_label_key = f"uuid.{self.drunc_label}" @@ -1860,7 +2468,16 @@ def kill_and_wait(uuids, grace_period=None) -> None: return ProcessInstanceList(values=final_ret) def _terminate_impl(self) -> ProcessInstanceList: - """Handles the 'terminate' command, killing all known processes.""" + """ + Handles the 'terminate' command, killing all known processes. + + Issues a kill command matching all process names ('.*') to shut down + every tracked process. If no processes are tracked, returns an empty list. + + Returns: + A ProcessInstanceList containing DEAD-status entries for all terminated + processes, or an empty list if there were no processes to terminate. + """ self.log.info("Terminating all known K8s processes.") if not self.boot_request: self.log.info("No processes to terminate.") @@ -1869,7 +2486,18 @@ def _terminate_impl(self) -> ProcessInstanceList: return self._kill_impl(all_processes_query) def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList: - """Handles the 'flush' command (no-op for Kubernetes).""" + """ + Handles the 'flush' command (no-op for Kubernetes). + + Cleanup of dead processes is handled automatically in real-time by the + pod watcher thread, so this command performs no action. + + Args: + query: A ProcessQuery specifying which processes to flush (ignored). + + Returns: + An empty ProcessInstanceList. + """ self.log.info( "The 'flush' command is not needed for the K8sProcessManager. " "Cleanup of dead processes is handled automatically in real-time."