Skip to content

local_interactive_sdk_kind_test fix testing #829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions .github/workflows/e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,99 @@ jobs:
- name: Install NVidia GPU operator for KinD
uses: ./common/github-actions/nvidia-gpu-operator

- name: Verify GPU availability in KinD
run: |
echo "Checking for available GPUs in the KinD cluster..."

# Wait for GPU operator pods to be ready (with timeout)
echo "Waiting for GPU operator pods to be ready..."
TIMEOUT=300 # 5 minutes timeout
END=$((SECONDS + TIMEOUT))

while [ $SECONDS -lt $END ]; do
# Get total number of pods in the namespace
TOTAL_PODS=$(kubectl get pods -n gpu-operator --no-headers | wc -l)

# Count pods that are either running and ready or completed successfully
# Exclude pods that are still initializing
READY_PODS=$(kubectl get pods -n gpu-operator --no-headers | grep -E 'Running|Completed' | grep -v 'PodInitializing' | wc -l)

if [ "$READY_PODS" -eq "$TOTAL_PODS" ] && [ "$TOTAL_PODS" -gt 0 ]; then
echo "All GPU operator pods are ready or completed successfully!"
break
fi

echo "Waiting for GPU operator pods to be ready... ($READY_PODS/$TOTAL_PODS)"
echo "Pod status:"
kubectl get pods -n gpu-operator
sleep 10
done

if [ $SECONDS -ge $END ]; then
echo "::error::Timeout waiting for GPU operator pods to be ready"
echo "GPU operator pod status:"
kubectl get pods -n gpu-operator -o wide
echo "GPU operator pod logs:"
kubectl logs -n gpu-operator -l app.kubernetes.io/name=gpu-operator
echo "GPU operator pod events:"
kubectl get events -n gpu-operator
exit 1
fi

echo "Node details:"
kubectl describe nodes | grep -E 'nvidia.com/gpu|Allocatable:|Capacity:|Name:'

# Check if GPU operator has labeled nodes
GPU_LABELS=$(kubectl describe nodes | grep -c "nvidia.com/gpu")
if [ "$GPU_LABELS" -eq 0 ]; then
echo "::error::No NVIDIA GPU labels found on nodes. GPU operator may not be running correctly."
echo "Full node descriptions for debugging:"
kubectl describe nodes
exit 1
fi

# Check if GPUs are actually allocatable
GPU_ALLOCATABLE=$(kubectl get nodes -o jsonpath='{.items[*].status.allocatable.nvidia\.com/gpu}' | tr ' ' '\n' | grep -v '^$' | wc -l)
if [ "$GPU_ALLOCATABLE" -eq 0 ]; then
echo "::error::GPU operator is running but no GPUs are allocatable. Check GPU operator logs."
echo "Checking GPU operator pods:"
kubectl get pods -n gpu-operator -o wide
echo "GPU operator pod logs:"
kubectl logs -n gpu-operator -l app.kubernetes.io/name=gpu-operator
echo "GPU operator pod events:"
kubectl get events -n gpu-operator
echo "GPU operator pod descriptions:"
kubectl describe pods -n gpu-operator
exit 1
fi

echo "Successfully found $GPU_ALLOCATABLE allocatable GPU(s) in the cluster."

- name: Deploy CodeFlare stack
id: deploy
run: |
cd codeflare-operator
echo Setting up CodeFlare stack
make setup-e2e

# Create ConfigMap to disable mTLS
echo "Creating ConfigMap to disable mTLS..."
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: codeflare-operator-config
namespace: ray-system
data:
config.yaml: |
kuberay:
mTLSEnabled: false
rayDashboardOAuthEnabled: false
ingressDomain: "kind"
appwrapper:
enabled: true
EOF

echo Deploying CodeFlare operator
make deploy -e IMG="${CODEFLARE_OPERATOR_IMG}" -e ENV="e2e"
kubectl wait --timeout=120s --for=condition=Available=true deployment -n openshift-operators codeflare-operator-manager
Expand All @@ -86,6 +173,36 @@ jobs:
with:
user-name: sdk-user

- name: Grant sdk-user port-forwarding permissions
run: |
cat <<EOF | kubectl apply -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: port-forward-permissions
rules:
- apiGroups: [""]
resources: ["services", "pods"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["pods/portforward"]
verbs: ["create"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: sdk-user-port-forward-binding
subjects:
- kind: User
name: sdk-user
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: ClusterRole
name: port-forward-permissions
apiGroup: rbac.authorization.k8s.io
EOF
shell: bash

- name: Configure RBAC for sdk user with limited permissions
run: |
kubectl create clusterrole list-ingresses --verb=get,list --resource=ingresses
Expand Down
13 changes: 13 additions & 0 deletions codeflare-kuberay.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"folders": [
{
"path": "/Users/bkeane/Code/github.com/codeflare-sdk"
},
{
"path": "/Users/bkeane/Code/github.com/kuberay"
},
{
"path": "/Users/bkeane/Code/github.com/codeflare-operator"
}
]
}
9 changes: 9 additions & 0 deletions src/codeflare_sdk/common/utils/generate_cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,18 @@ def export_env(cluster_name, namespace):
- RAY_TLS_SERVER_CERT: Path to the TLS server certificate.
- RAY_TLS_SERVER_KEY: Path to the TLS server private key.
- RAY_TLS_CA_CERT: Path to the CA certificate.
- RAY_CLIENT_SKIP_TLS_VERIFY: Skips TLS verification by the client.
"""
tls_dir = os.path.join(os.getcwd(), f"tls-{cluster_name}-{namespace}")
os.environ["RAY_USE_TLS"] = "1"
os.environ["RAY_TLS_SERVER_CERT"] = os.path.join(tls_dir, "tls.crt")
os.environ["RAY_TLS_SERVER_KEY"] = os.path.join(tls_dir, "tls.key")
os.environ["RAY_TLS_CA_CERT"] = os.path.join(tls_dir, "ca.crt")
os.environ["RAY_CLIENT_SKIP_TLS_VERIFY"] = "1" # Skip verification for E2E

# Optional: Add print statements here if you still want to log them for verification
print(f"generate_cert.export_env: RAY_USE_TLS set to: {os.environ.get('RAY_USE_TLS')}")
print(f"generate_cert.export_env: RAY_TLS_CA_CERT set to: {os.environ.get('RAY_TLS_CA_CERT')}")
print(f"generate_cert.export_env: RAY_TLS_SERVER_CERT is: {os.environ.get('RAY_TLS_SERVER_CERT')}")
print(f"generate_cert.export_env: RAY_TLS_SERVER_KEY is: {os.environ.get('RAY_TLS_SERVER_KEY')}")
print(f"generate_cert.export_env: RAY_CLIENT_SKIP_TLS_VERIFY is: {os.environ.get('RAY_CLIENT_SKIP_TLS_VERIFY')}")
38 changes: 38 additions & 0 deletions src/codeflare_sdk/ray/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ class ClusterConfiguration:
Kubernetes secret reference containing Redis password. ex: {"name": "secret-name", "key": "password-key"}
external_storage_namespace:
The storage namespace to use for GCS fault tolerance. By default, KubeRay sets it to the UID of RayCluster.
worker_idle_timeout_seconds:
The idle timeout for worker nodes in seconds.
worker_num_of_hosts:
The number of hosts per worker replica for TPUs.
suspend:
A boolean indicating whether to suspend the cluster.
managed_by:
The managed by field value.
redis_username_secret:
Kubernetes secret reference containing Redis username.
"""

name: str
Expand All @@ -134,6 +144,8 @@ class ClusterConfiguration:
max_memory: Optional[Union[int, str]] = None # Deprecating
num_gpus: Optional[int] = None # Deprecating
worker_tolerations: Optional[List[V1Toleration]] = None
worker_idle_timeout_seconds: Optional[int] = None
worker_num_of_hosts: Optional[int] = None
appwrapper: bool = False
envs: Dict[str, str] = field(default_factory=dict)
image: str = ""
Expand All @@ -150,8 +162,11 @@ class ClusterConfiguration:
annotations: Dict[str, str] = field(default_factory=dict)
volumes: list[V1Volume] = field(default_factory=list)
volume_mounts: list[V1VolumeMount] = field(default_factory=list)
suspend: Optional[bool] = None
managed_by: Optional[str] = None
enable_gcs_ft: bool = False
redis_address: Optional[str] = None
redis_username_secret: Optional[Dict[str, str]] = None
redis_password_secret: Optional[Dict[str, str]] = None
external_storage_namespace: Optional[str] = None

Expand Down Expand Up @@ -181,6 +196,29 @@ def __post_init__(self):
raise ValueError(
"redis_password_secret must contain both 'name' and 'key' fields"
)

if self.redis_username_secret and not isinstance(
self.redis_username_secret, dict
):
raise ValueError(
"redis_username_secret must be a dictionary with 'name' and 'key' fields"
)

if self.redis_username_secret and (
"name" not in self.redis_username_secret
or "key" not in self.redis_username_secret
):
raise ValueError(
"redis_username_secret must contain both 'name' and 'key' fields"
)

if self.managed_by and self.managed_by not in [
"ray.io/kuberay-operator",
"kueue.x-k8s.io/multikueue",
]:
raise ValueError(
"managed_by field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
)

self._validate_types()
self._memory_to_resource()
Expand Down
100 changes: 100 additions & 0 deletions src/codeflare_sdk/ray/job_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Defines the RayJobConfiguration dataclass for specifying KubeRay RayJob custom resources.
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Union

from codeflare_sdk.ray.cluster.config import ClusterConfiguration
import corev1_client # Placeholder for kubernetes.client.models.V1PodTemplateSpec

# Placeholder for V1PodTemplateSpec until actual import is resolved
# from kubernetes.client.models import V1PodTemplateSpec
# For now, using a generic Dict as a placeholder
V1PodTemplateSpec = Dict[str, Any]


@dataclass
class RayJobConfiguration:
"""
Configuration for a KubeRay RayJob.

Args:
name: Name of the RayJob.
namespace: Namespace for the RayJob.
entrypoint: Command to execute for the job.
runtime_env_yaml: Runtime environment configuration as a YAML string.
job_id: Optional ID for the job. Auto-generated if not set.
active_deadline_seconds: Duration in seconds the job may be active.
backoff_limit: Number of retries before marking job as failed.
deletion_policy: Policy for resource deletion on job completion.
Valid values: "DeleteCluster", "DeleteWorkers", "DeleteSelf", "DeleteNone".
submission_mode: How the Ray job is submitted to the RayCluster.
Valid values: "K8sJobMode", "HTTPMode", "InteractiveMode".
managed_by: Controller managing the RayJob (e.g., "kueue.x-k8s.io/multikueue").
ray_cluster_spec: Specification for the RayCluster if created by this RayJob.
cluster_selector: Labels to select an existing RayCluster.
submitter_pod_template: Pod template for the job submitter (if K8sJobMode).
shutdown_after_job_finishes: Whether to delete the RayCluster after job completion.
ttl_seconds_after_finished: TTL for RayCluster cleanup after job completion.
suspend: Whether to suspend the RayJob (prevents RayCluster creation).
metadata: Metadata for the RayJob.
submitter_config_backoff_limit: BackoffLimit for the submitter Kubernetes Job.
"""
name: str
namespace: Optional[str] = None
entrypoint: str
runtime_env_yaml: Optional[str] = None
job_id: Optional[str] = None
active_deadline_seconds: Optional[int] = None
backoff_limit: int = 0 # KubeRay default is 0
deletion_policy: Optional[str] = None # Needs validation: DeleteCluster, DeleteWorkers, DeleteSelf, DeleteNone
submission_mode: str = "K8sJobMode" # KubeRay default
managed_by: Optional[str] = None
ray_cluster_spec: Optional[ClusterConfiguration] = None
cluster_selector: Dict[str, str] = field(default_factory=dict)
submitter_pod_template: Optional[V1PodTemplateSpec] = None # Kubernetes V1PodTemplateSpec
shutdown_after_job_finishes: bool = True # Common default, KubeRay itself doesn't default this in RayJobSpec directly
ttl_seconds_after_finished: int = 0 # KubeRay default
suspend: bool = False
metadata: Dict[str, str] = field(default_factory=dict)
submitter_config_backoff_limit: Optional[int] = None


def __post_init__(self):
if self.deletion_policy and self.deletion_policy not in [
"DeleteCluster",
"DeleteWorkers",
"DeleteSelf",
"DeleteNone",
]:
raise ValueError(
"deletion_policy must be one of 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
)

if self.submission_mode not in ["K8sJobMode", "HTTPMode", "InteractiveMode"]:
raise ValueError(
"submission_mode must be one of 'K8sJobMode', 'HTTPMode', or 'InteractiveMode'"
)

if self.managed_by and self.managed_by not in [
"ray.io/kuberay-operator",
"kueue.x-k8s.io/multikueue",
]:
raise ValueError(
"managed_by field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
)

if self.ray_cluster_spec and self.cluster_selector:
raise ValueError("Only one of ray_cluster_spec or cluster_selector can be provided.")

if not self.ray_cluster_spec and not self.cluster_selector and self.submission_mode != "InteractiveMode":
# In interactive mode, a cluster might already exist and the user connects to it.
# Otherwise, a RayJob needs either a spec to create a cluster or a selector to find one.
raise ValueError(
"Either ray_cluster_spec (to create a new cluster) or cluster_selector (to use an existing one) must be specified unless in InteractiveMode."
)

# TODO: Add validation for submitter_pod_template if submission_mode is K8sJobMode
# TODO: Add type validation for all fields
pass
55 changes: 55 additions & 0 deletions src/codeflare_sdk/ray/service_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Defines the RayServiceConfiguration dataclass for specifying KubeRay RayService custom resources.
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional

from codeflare_sdk.ray.cluster.config import ClusterConfiguration
import corev1_client # Placeholder for kubernetes.client.models.V1Service

# Placeholder for V1Service until actual import is resolved
# from kubernetes.client.models import V1Service
# For now, using a generic Dict as a placeholder
V1Service = Dict[str, Any]

@dataclass
class RayServiceConfiguration:
"""
Configuration for a KubeRay RayService.

Args:
name: Name of the RayService.
namespace: Namespace for the RayService.
serve_config_v2: YAML string defining the applications and deployments to deploy.
ray_cluster_spec: Specification for the RayCluster underpinning the RayService.
upgrade_strategy_type: Strategy for upgrading the RayService ("NewCluster" or "None").
serve_service: Optional Kubernetes service definition for the serve endpoints.
exclude_head_pod_from_serve_svc: If true, head pod won't be part of the K8s serve service.
metadata: Metadata for the RayService.
annotations: Annotations for the RayService.
"""
name: str
namespace: Optional[str] = None
serve_config_v2: str
ray_cluster_spec: ClusterConfiguration # A RayService always needs a RayClusterSpec
upgrade_strategy_type: Optional[str] = "NewCluster" # KubeRay default if not specified, but good to be explicit.
serve_service: Optional[V1Service] = None # Kubernetes V1Service
exclude_head_pod_from_serve_svc: bool = False
metadata: Dict[str, str] = field(default_factory=dict)
annotations: Dict[str, str] = field(default_factory=dict)

def __post_init__(self):
if self.upgrade_strategy_type and self.upgrade_strategy_type not in [
"NewCluster",
"None",
]:
raise ValueError(
"upgrade_strategy_type must be one of 'NewCluster' or 'None'"
)

if not self.serve_config_v2:
raise ValueError("serve_config_v2 must be provided.")

# TODO: Add type validation for all fields
pass
Loading
Loading