diff --git a/docs/sphinx/user-docs/cluster-configuration.rst b/docs/sphinx/user-docs/cluster-configuration.rst index 9f9fdddb7..389be9bd9 100644 --- a/docs/sphinx/user-docs/cluster-configuration.rst +++ b/docs/sphinx/user-docs/cluster-configuration.rst @@ -98,6 +98,48 @@ Custom Volumes/Volume Mounts | For more information on creating Volumes and Volume Mounts with Python check out the Python Kubernetes docs (`Volumes `__, `Volume Mounts `__). | You can also find further information on Volumes and Volume Mounts by visiting the Kubernetes `documentation `__. +GCS Fault Tolerance +------------------ +By default, the state of the Ray cluster is transient to the head Pod. Whatever triggers a restart of the head Pod results in losing that state, including Ray Cluster history. To make Ray cluster state persistent you can enable Global Control Service (GCS) fault tolerance with an external Redis storage. + +To configure GCS fault tolerance you need to set the following parameters: + +.. list-table:: + :header-rows: 1 + :widths: auto + + * - Parameter + - Description + * - ``enable_gcs_ft`` + - Boolean to enable GCS fault tolerance + * - ``redis_address`` + - Address of the external Redis service, ex: "redis:6379" + * - ``redis_password_secret`` + - Dictionary with 'name' and 'key' fields specifying the Kubernetes secret for Redis password + * - ``external_storage_namespace`` + - Custom storage namespace for GCS fault tolerance (by default, KubeRay sets it to the RayCluster's UID) + +Example configuration: + +.. code:: python + + from codeflare_sdk import Cluster, ClusterConfiguration + + cluster = Cluster(ClusterConfiguration( + name='ray-cluster-with-persistence', + num_workers=2, + enable_gcs_ft=True, + redis_address="redis:6379", + redis_password_secret={ + "name": "redis-password-secret", + "key": "password" + }, + # external_storage_namespace="my-custom-namespace" # Optional: Custom namespace for GCS data in Redis + )) + +.. note:: + You need to have a Redis instance deployed in your Kubernetes cluster before using this feature. + Deprecating Parameters ---------------------- diff --git a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py index 215ac32e2..96e9e5f24 100644 --- a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py +++ b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py @@ -170,6 +170,31 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"): }, } + if cluster.config.enable_gcs_ft: + if not cluster.config.redis_address: + raise ValueError( + "redis_address must be provided when enable_gcs_ft is True" + ) + + gcs_ft_options = {"redisAddress": cluster.config.redis_address} + + if cluster.config.external_storage_namespace: + gcs_ft_options[ + "externalStorageNamespace" + ] = cluster.config.external_storage_namespace + + if cluster.config.redis_password_secret: + gcs_ft_options["redisPassword"] = { + "valueFrom": { + "secretKeyRef": { + "name": cluster.config.redis_password_secret["name"], + "key": cluster.config.redis_password_secret["key"], + } + } + } + + resource["spec"]["gcsFaultToleranceOptions"] = gcs_ft_options + config_check() k8s_client = get_api_client() or client.ApiClient() diff --git a/src/codeflare_sdk/ray/cluster/config.py b/src/codeflare_sdk/ray/cluster/config.py index ab64be839..4f646baaa 100644 --- a/src/codeflare_sdk/ray/cluster/config.py +++ b/src/codeflare_sdk/ray/cluster/config.py @@ -100,6 +100,14 @@ class ClusterConfiguration: A list of V1Volume objects to add to the Cluster volume_mounts: A list of V1VolumeMount objects to add to the Cluster + enable_gcs_ft: + A boolean indicating whether to enable GCS fault tolerance. + redis_address: + The address of the Redis server to use for GCS fault tolerance, required when enable_gcs_ft is True. + redis_password_secret: + Kubernetes secret reference containing Redis password. ex: {"name": "secret-name", "key": "password-key"} + external_storage_namespace: + The storage namespace to use for GCS fault tolerance. By default, KubeRay sets it to the UID of RayCluster. """ name: str @@ -142,6 +150,10 @@ class ClusterConfiguration: annotations: Dict[str, str] = field(default_factory=dict) volumes: list[V1Volume] = field(default_factory=list) volume_mounts: list[V1VolumeMount] = field(default_factory=list) + enable_gcs_ft: bool = False + redis_address: Optional[str] = None + redis_password_secret: Optional[Dict[str, str]] = None + external_storage_namespace: Optional[str] = None def __post_init__(self): if not self.verify_tls: @@ -149,6 +161,27 @@ def __post_init__(self): "Warning: TLS verification has been disabled - Endpoint checks will be bypassed" ) + if self.enable_gcs_ft: + if not self.redis_address: + raise ValueError( + "redis_address must be provided when enable_gcs_ft is True" + ) + + if self.redis_password_secret and not isinstance( + self.redis_password_secret, dict + ): + raise ValueError( + "redis_password_secret must be a dictionary with 'name' and 'key' fields" + ) + + if self.redis_password_secret and ( + "name" not in self.redis_password_secret + or "key" not in self.redis_password_secret + ): + raise ValueError( + "redis_password_secret must contain both 'name' and 'key' fields" + ) + self._validate_types() self._memory_to_resource() self._memory_to_string() @@ -283,10 +316,13 @@ def check_type(value, expected_type): else: return True if origin_type is dict: - return all( - check_type(k, args[0]) and check_type(v, args[1]) - for k, v in value.items() - ) + if value is not None: + return all( + check_type(k, args[0]) and check_type(v, args[1]) + for k, v in value.items() + ) + else: + return True if origin_type is tuple: return all(check_type(elem, etype) for elem, etype in zip(value, args)) if expected_type is int: diff --git a/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py b/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py index 7d6d3d0a6..2e5133a95 100644 --- a/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py +++ b/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py @@ -13,8 +13,9 @@ # limitations under the License. from collections import namedtuple import sys -from .build_ray_cluster import gen_names, update_image +from .build_ray_cluster import gen_names, update_image, build_ray_cluster import uuid +from codeflare_sdk.ray.cluster.cluster import ClusterConfiguration, Cluster def test_gen_names_with_name(mocker): @@ -65,3 +66,45 @@ def test_update_image_without_supported_python_version(mocker): # Assert that no image was set since the Python version is not supported assert image is None + + +def test_build_ray_cluster_with_gcs_ft(mocker): + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") + + cluster = Cluster( + ClusterConfiguration( + name="test", + namespace="ns", + enable_gcs_ft=True, + redis_address="redis:6379", + redis_password_secret={"name": "redis-password-secret", "key": "password"}, + external_storage_namespace="new-ns", + ) + ) + + mocker.patch("codeflare_sdk.ray.cluster.build_ray_cluster.config_check") + mocker.patch( + "codeflare_sdk.ray.cluster.build_ray_cluster.get_api_client", return_value=None + ) + mocker.patch( + "codeflare_sdk.ray.cluster.build_ray_cluster.update_image", return_value=None + ) + + resource = build_ray_cluster(cluster) + + assert "spec" in resource + assert "gcsFaultToleranceOptions" in resource["spec"] + + gcs_ft_options = resource["spec"]["gcsFaultToleranceOptions"] + + assert gcs_ft_options["redisAddress"] == "redis:6379" + assert gcs_ft_options["externalStorageNamespace"] == "new-ns" + assert ( + gcs_ft_options["redisPassword"]["valueFrom"]["secretKeyRef"]["name"] + == "redis-password-secret" + ) + assert ( + gcs_ft_options["redisPassword"]["valueFrom"]["secretKeyRef"]["key"] + == "password" + ) diff --git a/src/codeflare_sdk/ray/cluster/test_config.py b/src/codeflare_sdk/ray/cluster/test_config.py index 34cc4237b..e6bcb8bac 100644 --- a/src/codeflare_sdk/ray/cluster/test_config.py +++ b/src/codeflare_sdk/ray/cluster/test_config.py @@ -153,6 +153,57 @@ def test_cluster_config_deprecation_conversion(mocker): assert config.worker_cpu_limits == 2 +def test_gcs_fault_tolerance_config_validation(): + config = ClusterConfiguration( + name="test", + namespace="ns", + enable_gcs_ft=True, + redis_address="redis:6379", + redis_password_secret={"name": "redis-password-secret", "key": "password"}, + external_storage_namespace="new-ns", + ) + + assert config.enable_gcs_ft is True + assert config.redis_address == "redis:6379" + assert config.redis_password_secret == { + "name": "redis-password-secret", + "key": "password", + } + assert config.external_storage_namespace == "new-ns" + + try: + ClusterConfiguration(name="test", namespace="ns", enable_gcs_ft=True) + except ValueError as e: + assert str(e) in "redis_address must be provided when enable_gcs_ft is True" + + try: + ClusterConfiguration( + name="test", + namespace="ns", + enable_gcs_ft=True, + redis_address="redis:6379", + redis_password_secret={"secret"}, + ) + except ValueError as e: + assert ( + str(e) + in "redis_password_secret must be a dictionary with 'name' and 'key' fields" + ) + + try: + ClusterConfiguration( + name="test", + namespace="ns", + enable_gcs_ft=True, + redis_address="redis:6379", + redis_password_secret={"wrong": "format"}, + ) + except ValueError as e: + assert ( + str(e) in "redis_password_secret must contain both 'name' and 'key' fields" + ) + + # Make sure to always keep this function last def test_cleanup(): os.remove(f"{aw_dir}test-all-params.yaml")