Skip to content

Commit b147e93

Browse files
committed
help me
1 parent 02bee78 commit b147e93

File tree

5 files changed

+194
-20
lines changed

5 files changed

+194
-20
lines changed

src/codeflare_sdk/ray/cluster/config.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,16 @@ class ClusterConfiguration:
108108
Kubernetes secret reference containing Redis password. ex: {"name": "secret-name", "key": "password-key"}
109109
external_storage_namespace:
110110
The storage namespace to use for GCS fault tolerance. By default, KubeRay sets it to the UID of RayCluster.
111+
worker_idle_timeout_seconds:
112+
The idle timeout for worker nodes in seconds.
113+
worker_num_of_hosts:
114+
The number of hosts per worker replica for TPUs.
115+
suspend:
116+
A boolean indicating whether to suspend the cluster.
117+
managed_by:
118+
The managed by field value.
119+
redis_username_secret:
120+
Kubernetes secret reference containing Redis username.
111121
"""
112122

113123
name: str
@@ -134,6 +144,8 @@ class ClusterConfiguration:
134144
max_memory: Optional[Union[int, str]] = None # Deprecating
135145
num_gpus: Optional[int] = None # Deprecating
136146
worker_tolerations: Optional[List[V1Toleration]] = None
147+
worker_idle_timeout_seconds: Optional[int] = None
148+
worker_num_of_hosts: Optional[int] = None
137149
appwrapper: bool = False
138150
envs: Dict[str, str] = field(default_factory=dict)
139151
image: str = ""
@@ -150,8 +162,11 @@ class ClusterConfiguration:
150162
annotations: Dict[str, str] = field(default_factory=dict)
151163
volumes: list[V1Volume] = field(default_factory=list)
152164
volume_mounts: list[V1VolumeMount] = field(default_factory=list)
165+
suspend: Optional[bool] = None
166+
managed_by: Optional[str] = None
153167
enable_gcs_ft: bool = False
154168
redis_address: Optional[str] = None
169+
redis_username_secret: Optional[Dict[str, str]] = None
155170
redis_password_secret: Optional[Dict[str, str]] = None
156171
external_storage_namespace: Optional[str] = None
157172

@@ -181,6 +196,29 @@ def __post_init__(self):
181196
raise ValueError(
182197
"redis_password_secret must contain both 'name' and 'key' fields"
183198
)
199+
200+
if self.redis_username_secret and not isinstance(
201+
self.redis_username_secret, dict
202+
):
203+
raise ValueError(
204+
"redis_username_secret must be a dictionary with 'name' and 'key' fields"
205+
)
206+
207+
if self.redis_username_secret and (
208+
"name" not in self.redis_username_secret
209+
or "key" not in self.redis_username_secret
210+
):
211+
raise ValueError(
212+
"redis_username_secret must contain both 'name' and 'key' fields"
213+
)
214+
215+
if self.managed_by and self.managed_by not in [
216+
"ray.io/kuberay-operator",
217+
"kueue.x-k8s.io/multikueue",
218+
]:
219+
raise ValueError(
220+
"managed_by field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
221+
)
184222

185223
self._validate_types()
186224
self._memory_to_resource()

src/codeflare_sdk/ray/job_config.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""
2+
Defines the RayJobConfiguration dataclass for specifying KubeRay RayJob custom resources.
3+
"""
4+
5+
from dataclasses import dataclass, field
6+
from typing import Dict, List, Optional, Union
7+
8+
from codeflare_sdk.ray.cluster.config import ClusterConfiguration
9+
import corev1_client # Placeholder for kubernetes.client.models.V1PodTemplateSpec
10+
11+
# Placeholder for V1PodTemplateSpec until actual import is resolved
12+
# from kubernetes.client.models import V1PodTemplateSpec
13+
# For now, using a generic Dict as a placeholder
14+
V1PodTemplateSpec = Dict[str, Any]
15+
16+
17+
@dataclass
18+
class RayJobConfiguration:
19+
"""
20+
Configuration for a KubeRay RayJob.
21+
22+
Args:
23+
name: Name of the RayJob.
24+
namespace: Namespace for the RayJob.
25+
entrypoint: Command to execute for the job.
26+
runtime_env_yaml: Runtime environment configuration as a YAML string.
27+
job_id: Optional ID for the job. Auto-generated if not set.
28+
active_deadline_seconds: Duration in seconds the job may be active.
29+
backoff_limit: Number of retries before marking job as failed.
30+
deletion_policy: Policy for resource deletion on job completion.
31+
Valid values: "DeleteCluster", "DeleteWorkers", "DeleteSelf", "DeleteNone".
32+
submission_mode: How the Ray job is submitted to the RayCluster.
33+
Valid values: "K8sJobMode", "HTTPMode", "InteractiveMode".
34+
managed_by: Controller managing the RayJob (e.g., "kueue.x-k8s.io/multikueue").
35+
ray_cluster_spec: Specification for the RayCluster if created by this RayJob.
36+
cluster_selector: Labels to select an existing RayCluster.
37+
submitter_pod_template: Pod template for the job submitter (if K8sJobMode).
38+
shutdown_after_job_finishes: Whether to delete the RayCluster after job completion.
39+
ttl_seconds_after_finished: TTL for RayCluster cleanup after job completion.
40+
suspend: Whether to suspend the RayJob (prevents RayCluster creation).
41+
metadata: Metadata for the RayJob.
42+
submitter_config_backoff_limit: BackoffLimit for the submitter Kubernetes Job.
43+
"""
44+
name: str
45+
namespace: Optional[str] = None
46+
entrypoint: str
47+
runtime_env_yaml: Optional[str] = None
48+
job_id: Optional[str] = None
49+
active_deadline_seconds: Optional[int] = None
50+
backoff_limit: int = 0 # KubeRay default is 0
51+
deletion_policy: Optional[str] = None # Needs validation: DeleteCluster, DeleteWorkers, DeleteSelf, DeleteNone
52+
submission_mode: str = "K8sJobMode" # KubeRay default
53+
managed_by: Optional[str] = None
54+
ray_cluster_spec: Optional[ClusterConfiguration] = None
55+
cluster_selector: Dict[str, str] = field(default_factory=dict)
56+
submitter_pod_template: Optional[V1PodTemplateSpec] = None # Kubernetes V1PodTemplateSpec
57+
shutdown_after_job_finishes: bool = True # Common default, KubeRay itself doesn't default this in RayJobSpec directly
58+
ttl_seconds_after_finished: int = 0 # KubeRay default
59+
suspend: bool = False
60+
metadata: Dict[str, str] = field(default_factory=dict)
61+
submitter_config_backoff_limit: Optional[int] = None
62+
63+
64+
def __post_init__(self):
65+
if self.deletion_policy and self.deletion_policy not in [
66+
"DeleteCluster",
67+
"DeleteWorkers",
68+
"DeleteSelf",
69+
"DeleteNone",
70+
]:
71+
raise ValueError(
72+
"deletion_policy must be one of 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
73+
)
74+
75+
if self.submission_mode not in ["K8sJobMode", "HTTPMode", "InteractiveMode"]:
76+
raise ValueError(
77+
"submission_mode must be one of 'K8sJobMode', 'HTTPMode', or 'InteractiveMode'"
78+
)
79+
80+
if self.managed_by and self.managed_by not in [
81+
"ray.io/kuberay-operator",
82+
"kueue.x-k8s.io/multikueue",
83+
]:
84+
raise ValueError(
85+
"managed_by field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
86+
)
87+
88+
if self.ray_cluster_spec and self.cluster_selector:
89+
raise ValueError("Only one of ray_cluster_spec or cluster_selector can be provided.")
90+
91+
if not self.ray_cluster_spec and not self.cluster_selector and self.submission_mode != "InteractiveMode":
92+
# In interactive mode, a cluster might already exist and the user connects to it.
93+
# Otherwise, a RayJob needs either a spec to create a cluster or a selector to find one.
94+
raise ValueError(
95+
"Either ray_cluster_spec (to create a new cluster) or cluster_selector (to use an existing one) must be specified unless in InteractiveMode."
96+
)
97+
98+
# TODO: Add validation for submitter_pod_template if submission_mode is K8sJobMode
99+
# TODO: Add type validation for all fields
100+
pass
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""
2+
Defines the RayServiceConfiguration dataclass for specifying KubeRay RayService custom resources.
3+
"""
4+
5+
from dataclasses import dataclass, field
6+
from typing import Dict, List, Optional
7+
8+
from codeflare_sdk.ray.cluster.config import ClusterConfiguration
9+
import corev1_client # Placeholder for kubernetes.client.models.V1Service
10+
11+
# Placeholder for V1Service until actual import is resolved
12+
# from kubernetes.client.models import V1Service
13+
# For now, using a generic Dict as a placeholder
14+
V1Service = Dict[str, Any]
15+
16+
@dataclass
17+
class RayServiceConfiguration:
18+
"""
19+
Configuration for a KubeRay RayService.
20+
21+
Args:
22+
name: Name of the RayService.
23+
namespace: Namespace for the RayService.
24+
serve_config_v2: YAML string defining the applications and deployments to deploy.
25+
ray_cluster_spec: Specification for the RayCluster underpinning the RayService.
26+
upgrade_strategy_type: Strategy for upgrading the RayService ("NewCluster" or "None").
27+
serve_service: Optional Kubernetes service definition for the serve endpoints.
28+
exclude_head_pod_from_serve_svc: If true, head pod won't be part of the K8s serve service.
29+
metadata: Metadata for the RayService.
30+
annotations: Annotations for the RayService.
31+
"""
32+
name: str
33+
namespace: Optional[str] = None
34+
serve_config_v2: str
35+
ray_cluster_spec: ClusterConfiguration # A RayService always needs a RayClusterSpec
36+
upgrade_strategy_type: Optional[str] = "NewCluster" # KubeRay default if not specified, but good to be explicit.
37+
serve_service: Optional[V1Service] = None # Kubernetes V1Service
38+
exclude_head_pod_from_serve_svc: bool = False
39+
metadata: Dict[str, str] = field(default_factory=dict)
40+
annotations: Dict[str, str] = field(default_factory=dict)
41+
42+
def __post_init__(self):
43+
if self.upgrade_strategy_type and self.upgrade_strategy_type not in [
44+
"NewCluster",
45+
"None",
46+
]:
47+
raise ValueError(
48+
"upgrade_strategy_type must be one of 'NewCluster' or 'None'"
49+
)
50+
51+
if not self.serve_config_v2:
52+
raise ValueError("serve_config_v2 must be provided.")
53+
54+
# TODO: Add type validation for all fields
55+
pass

tests/e2e/local_interactive_sdk_kind_test.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ def run_local_interactives(
7777
cluster.status()
7878
logger.info("Cluster is ready")
7979

80-
logger.info("Waiting for head and worker pods to be fully ready...")
8180
TIMEOUT = 300 # 5 minutes timeout
8281
END = time.time() + TIMEOUT
8382

@@ -118,11 +117,6 @@ def run_local_interactives(
118117
if worker_pod_name:
119118
worker_status = kubectl_get_pod_status(self.namespace, worker_pod_name)
120119

121-
logger.info(f"Head pod ({head_pod_name or 'N/A'}) status: {head_status}")
122-
logger.info(
123-
f"Worker pod ({worker_pod_name or 'N/A'}) status: {worker_status}"
124-
)
125-
126120
if (
127121
head_pod_name
128122
and worker_pod_name
@@ -216,7 +210,7 @@ def run_local_interactives(
216210

217211
logger.info("Initializing Ray connection...")
218212
try:
219-
ray.init(address=client_url, logging_level="INFO")
213+
ray.init(address=client_url, logging_level="INFO", local_mode=True)
220214
logger.info("Ray initialization successful")
221215
except Exception as e:
222216
logger.error(f"Ray initialization failed: {str(e)}")

tests/e2e/support.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -483,18 +483,6 @@ def kubectl_get_pod_status(namespace, pod_name):
483483
text=True,
484484
check=True,
485485
)
486-
print(f"Pod {pod_name} conditions: {conditions.stdout}")
487-
488-
# Get pod events for more context
489-
events = subprocess.run(
490-
["kubectl", "describe", "pod", pod_name, "-n", namespace],
491-
capture_output=True,
492-
text=True,
493-
check=False,
494-
)
495-
if events.returncode == 0:
496-
print(f"Pod {pod_name} details:")
497-
print(events.stdout)
498486

499487
return status
500488
except subprocess.CalledProcessError as e:
@@ -521,7 +509,6 @@ def kubectl_get_pod_ready(namespace, pod_name):
521509
text=True,
522510
check=True,
523511
)
524-
print(f"Container statuses for {pod_name}: {result.stdout}")
525512

526513
# Get ready status
527514
result = subprocess.run(

0 commit comments

Comments
 (0)