Skip to content

Commit b05d1f2

Browse files
committed
add virtual cluster
Signed-off-by: 黑驰 <[email protected]>
1 parent d644135 commit b05d1f2

File tree

114 files changed

+8119
-196
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

114 files changed

+8119
-196
lines changed

BUILD.bazel

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ ray_cc_library(
493493
name = "gcs_pub_sub_lib",
494494
srcs = [
495495
"src/ray/gcs/pubsub/gcs_pub_sub.cc",
496+
"src/ray/gcs/pubsub/gcs_pub_sub.ant.cc",
496497
],
497498
hdrs = [
498499
"src/ray/gcs/pubsub/gcs_pub_sub.h",
@@ -1833,6 +1834,34 @@ ray_cc_test(
18331834
],
18341835
)
18351836

1837+
ray_cc_test(
1838+
name = "gcs_virtual_cluster_manager_test",
1839+
size = "small",
1840+
srcs = [
1841+
"src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc",
1842+
],
1843+
tags = ["team:core"],
1844+
deps = [
1845+
":gcs_server_lib",
1846+
":gcs_server_test_util",
1847+
":gcs_test_util_lib",
1848+
":ray_mock",
1849+
"@com_google_googletest//:gtest_main",
1850+
],
1851+
)
1852+
1853+
ray_cc_test(
1854+
name = "virtual_cluster_manager_test",
1855+
size = "small",
1856+
srcs = ["src/ray/raylet/virtual_cluster_manager_test.cc"],
1857+
tags = ["team:core"],
1858+
deps = [
1859+
":ray_mock",
1860+
":raylet_lib",
1861+
"@com_google_googletest//:gtest_main",
1862+
],
1863+
)
1864+
18361865
ray_cc_library(
18371866
name = "gcs_table_storage_lib",
18381867
srcs = glob(
@@ -1956,6 +1985,7 @@ ray_cc_library(
19561985
name = "gcs_client_lib",
19571986
srcs = [
19581987
"src/ray/gcs/gcs_client/accessor.cc",
1988+
"src/ray/gcs/gcs_client/accessor.ant.cc",
19591989
"src/ray/gcs/gcs_client/gcs_client.cc",
19601990
],
19611991
hdrs = [

python/ray/_private/ray_constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ def env_set_by_user(key):
118118
# the local working_dir and py_modules to be uploaded, or these files might get
119119
# garbage collected before the job starts.
120120
RAY_RUNTIME_ENV_URI_PIN_EXPIRATION_S_DEFAULT = 10 * 60
121+
# Environment variable to specify the virtual cluster ID a Ray job belongs to.
122+
RAY_VIRTUAL_CLUSTER_ID_ENV_VAR = "VIRTUAL_CLUSTER_ID"
121123
# If set to 1, then `.gitignore` files will not be parsed and loaded into "excludes"
122124
# when using a local working_dir or py_modules.
123125
RAY_RUNTIME_ENV_IGNORE_GITIGNORE = "RAY_RUNTIME_ENV_IGNORE_GITIGNORE"

python/ray/_private/state.py

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -158,15 +158,15 @@ def _gen_actor_info(self, actor_table_data):
158158
}
159159
return actor_info
160160

161-
def node_table(self):
161+
def node_table(self, virtual_cluster_id=None):
162162
"""Fetch and parse the Gcs node info table.
163163
164164
Returns:
165165
Information about the node in the cluster.
166166
"""
167167
self._check_connected()
168168

169-
return self.global_state_accessor.get_node_table()
169+
return self.global_state_accessor.get_node_table(virtual_cluster_id)
170170

171171
def job_table(self):
172172
"""Fetch and parse the gcs job table.
@@ -724,7 +724,7 @@ def update_worker_num_paused_threads(self, worker_id, num_paused_threads_delta):
724724
worker_id, num_paused_threads_delta
725725
)
726726

727-
def cluster_resources(self):
727+
def cluster_resources(self, virtual_cluster_id=None):
728728
"""Get the current total cluster resources.
729729
730730
Note that this information can grow stale as nodes are added to or
@@ -738,23 +738,25 @@ def cluster_resources(self):
738738

739739
# Calculate total resources.
740740
total_resources = defaultdict(int)
741-
for node_total_resources in self.total_resources_per_node().values():
741+
for node_total_resources in self.total_resources_per_node(
742+
virtual_cluster_id
743+
).values():
742744
for resource_id, value in node_total_resources.items():
743745
total_resources[resource_id] += value
744746

745747
return dict(total_resources)
746748

747-
def _live_node_ids(self):
749+
def _live_node_ids(self, virtual_cluster_id=None):
748750
"""Returns a set of node IDs corresponding to nodes still alive."""
749-
return set(self.total_resources_per_node().keys())
751+
return set(self.total_resources_per_node(virtual_cluster_id).keys())
750752

751-
def available_resources_per_node(self):
753+
def available_resources_per_node(self, virtual_cluster_id=None):
752754
"""Returns a dictionary mapping node id to available resources."""
753755
self._check_connected()
754756
available_resources_by_id = {}
755757

756758
all_available_resources = (
757-
self.global_state_accessor.get_all_available_resources()
759+
self.global_state_accessor.get_all_available_resources(virtual_cluster_id)
758760
)
759761
for available_resource in all_available_resources:
760762
message = gcs_pb2.AvailableResources.FromString(available_resource)
@@ -769,11 +771,15 @@ def available_resources_per_node(self):
769771
return available_resources_by_id
770772

771773
# returns a dict that maps node_id(hex string) to a dict of {resource_id: capacity}
772-
def total_resources_per_node(self) -> Dict[str, Dict[str, int]]:
774+
def total_resources_per_node(
775+
self, virtual_cluster_id=None
776+
) -> Dict[str, Dict[str, int]]:
773777
self._check_connected()
774778
total_resources_by_node = {}
775779

776-
all_total_resources = self.global_state_accessor.get_all_total_resources()
780+
all_total_resources = self.global_state_accessor.get_all_total_resources(
781+
virtual_cluster_id
782+
)
777783
for node_total_resources in all_total_resources:
778784
message = gcs_pb2.TotalResources.FromString(node_total_resources)
779785
# Calculate total resources for this node.
@@ -786,7 +792,7 @@ def total_resources_per_node(self) -> Dict[str, Dict[str, int]]:
786792

787793
return total_resources_by_node
788794

789-
def available_resources(self):
795+
def available_resources(self, virtual_cluster_id=None):
790796
"""Get the current available cluster resources.
791797
792798
This is different from `cluster_resources` in that this will return
@@ -802,7 +808,9 @@ def available_resources(self):
802808
"""
803809
self._check_connected()
804810

805-
available_resources_by_id = self.available_resources_per_node()
811+
available_resources_by_id = self.available_resources_per_node(
812+
virtual_cluster_id
813+
)
806814

807815
# Calculate total available resources.
808816
total_available_resources = defaultdict(int)
@@ -879,13 +887,19 @@ def next_job_id():
879887

880888
@DeveloperAPI
881889
@client_mode_hook
882-
def nodes():
890+
def nodes(virtual_cluster_id=None):
883891
"""Get a list of the nodes in the cluster (for debugging only).
884892
885893
Returns:
886894
Information about the Ray clients in the cluster.
887895
"""
888-
return state.node_table()
896+
if not virtual_cluster_id:
897+
virtual_cluster_id = ray.get_runtime_context().virtual_cluster_id
898+
elif type(virtual_cluster_id) is not str:
899+
raise TypeError(
900+
f"virtual_cluster_id must be a string, got {type(virtual_cluster_id)}"
901+
)
902+
return state.node_table(virtual_cluster_id)
889903

890904

891905
def workers():
@@ -999,7 +1013,7 @@ def object_transfer_timeline(filename=None):
9991013

10001014
@DeveloperAPI
10011015
@client_mode_hook
1002-
def cluster_resources():
1016+
def cluster_resources(virtual_cluster_id=None):
10031017
"""Get the current total cluster resources.
10041018
10051019
Note that this information can grow stale as nodes are added to or removed
@@ -1009,12 +1023,18 @@ def cluster_resources():
10091023
A dictionary mapping resource name to the total quantity of that
10101024
resource in the cluster.
10111025
"""
1012-
return state.cluster_resources()
1026+
if not virtual_cluster_id:
1027+
virtual_cluster_id = ray.get_runtime_context().virtual_cluster_id
1028+
elif type(virtual_cluster_id) is not str:
1029+
raise TypeError(
1030+
f"virtual_cluster_id must be a string, got {type(virtual_cluster_id)}"
1031+
)
1032+
return state.cluster_resources(virtual_cluster_id)
10131033

10141034

10151035
@DeveloperAPI
10161036
@client_mode_hook
1017-
def available_resources():
1037+
def available_resources(virtual_cluster_id=None):
10181038
"""Get the current available cluster resources.
10191039
10201040
This is different from `cluster_resources` in that this will return idle
@@ -1028,7 +1048,13 @@ def available_resources():
10281048
is currently not available (i.e., quantity is 0), it will not
10291049
be included in this dictionary.
10301050
"""
1031-
return state.available_resources()
1051+
if not virtual_cluster_id:
1052+
virtual_cluster_id = ray.get_runtime_context().virtual_cluster_id
1053+
elif type(virtual_cluster_id) is not str:
1054+
raise TypeError(
1055+
f"virtual_cluster_id must be a string, got {type(virtual_cluster_id)}"
1056+
)
1057+
return state.available_resources(virtual_cluster_id)
10321058

10331059

10341060
@DeveloperAPI
@@ -1045,7 +1071,7 @@ def available_resources_per_node():
10451071

10461072

10471073
@DeveloperAPI
1048-
def total_resources_per_node():
1074+
def total_resources_per_node(virtual_cluster_id=None):
10491075
"""Get the current total resources of each live node.
10501076
10511077
Note that this information can grow stale as tasks start and finish.
@@ -1054,6 +1080,12 @@ def total_resources_per_node():
10541080
A dictionary mapping node hex id to total resources dictionary.
10551081
"""
10561082

1083+
if not virtual_cluster_id:
1084+
virtual_cluster_id = ray.get_runtime_context().virtual_cluster_id
1085+
elif type(virtual_cluster_id) is not str:
1086+
raise TypeError(
1087+
f"virtual_cluster_id must be a string, got {type(virtual_cluster_id)}"
1088+
)
10571089
return state.total_resources_per_node()
10581090

10591091

python/ray/_private/worker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,10 @@ def current_cluster_and_job(self):
570570
assert isinstance(self.current_job_id, ray.JobID)
571571
return self.node.cluster_id, self.current_job_id
572572

573+
@property
574+
def current_virtual_cluster_id(self):
575+
return os.environ.get(ray_constants.RAY_VIRTUAL_CLUSTER_ID_ENV_VAR, "")
576+
573577
@property
574578
def runtime_env(self):
575579
"""Get the runtime env in json format"""

python/ray/_raylet.pyx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ from ray.includes.unique_ids cimport (
144144
CObjectID,
145145
CPlacementGroupID,
146146
ObjectIDIndexType,
147+
CVirtualClusterID,
147148
)
148149
from ray.includes.libcoreworker cimport (
149150
ActorHandleSharedPtr,

python/ray/dashboard/modules/job/common.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,17 @@
99

1010
from ray._private import ray_constants
1111
from ray._private.event.export_event_logger import get_export_event_logger
12-
from ray._private.gcs_utils import GcsAioClient
12+
from ray._private.gcs_utils import GcsAioClient, GcsChannel
1313
from ray._private.runtime_env.packaging import parse_uri
14+
from ray.core.generated import gcs_service_pb2_grpc
1415
from ray.core.generated.export_event_pb2 import ExportEvent
1516
from ray.core.generated.export_submission_job_event_pb2 import (
1617
ExportSubmissionJobEventData,
1718
)
19+
from ray.core.generated.gcs_service_pb2 import (
20+
CreateJobClusterRequest,
21+
RemoveVirtualClusterRequest,
22+
)
1823
from ray.util.annotations import PublicAPI
1924

2025
# NOTE(edoakes): these constants should be considered a public API because
@@ -104,6 +109,8 @@ class JobInfo:
104109
#: The driver process exit code after the driver executed. Return None if driver
105110
#: doesn't finish executing
106111
driver_exit_code: Optional[int] = None
112+
#: The job cluster this job belongs to.
113+
job_cluster_id: Optional[str] = None
107114

108115
def __post_init__(self):
109116
if isinstance(self.status, str):
@@ -378,6 +385,62 @@ async def get_job_info(job_id: str):
378385
}
379386

380387

388+
class VirtualClusterClient:
389+
def __init__(self, gcs_address):
390+
self._gcs_channel = GcsChannel(gcs_address=gcs_address, aio=True)
391+
self._gcs_channel.connect()
392+
393+
self._gcs_virtual_cluster_info_stub = (
394+
gcs_service_pb2_grpc.VirtualClusterInfoGcsServiceStub(
395+
self._gcs_channel.channel()
396+
)
397+
)
398+
399+
def build_job_cluster_id(self, job_id, virtual_cluster_id):
400+
"""
401+
Constructs a unique job cluster ID by combining
402+
the virtual cluster ID and job ID.
403+
Note:
404+
The format needs to remain consistent with `virtual_cluster_id.h`.
405+
"""
406+
return f"{virtual_cluster_id}##{job_id}"
407+
408+
async def create_job_cluster(self, job_id, virtual_cluster_id, replica_sets):
409+
request = CreateJobClusterRequest(
410+
job_id=job_id,
411+
virtual_cluster_id=virtual_cluster_id,
412+
replica_sets=replica_sets,
413+
)
414+
415+
reply = await self._gcs_virtual_cluster_info_stub.CreateJobCluster(request)
416+
417+
if reply.status.code != 0:
418+
logger.warning(
419+
f"Failed to create job cluster for job ID '{job_id}' in "
420+
f"virtual cluster '{virtual_cluster_id}', "
421+
f"message: {reply.status.message}"
422+
)
423+
return None, reply.status.message
424+
425+
return reply.job_cluster_id, None
426+
427+
async def remove_job_cluster(self, job_cluster_id):
428+
if job_cluster_id is None:
429+
return
430+
431+
request = RemoveVirtualClusterRequest(
432+
virtual_cluster_id=job_cluster_id,
433+
)
434+
435+
reply = await self._gcs_virtual_cluster_info_stub.RemoveVirtualCluster(request)
436+
437+
if reply.status.code != 0:
438+
logger.warning(
439+
f"Failed to remove job cluster '{job_cluster_id}',"
440+
f" message: {reply.status.message}"
441+
)
442+
443+
381444
def uri_to_http_components(package_uri: str) -> Tuple[str, str]:
382445
suffix = Path(package_uri).suffix
383446
if suffix not in {".zip", ".whl"}:
@@ -426,6 +489,10 @@ class JobSubmitRequest:
426489
# to reserve for the entrypoint command, separately from any Ray tasks
427490
# or actors that are created by it.
428491
entrypoint_resources: Optional[Dict[str, float]] = None
492+
# Optional virtual cluster ID for job.
493+
virtual_cluster_id: Optional[str] = None
494+
# Optional replica sets for job
495+
replica_sets: Optional[Dict[str, int]] = None
429496

430497
def __post_init__(self):
431498
if not isinstance(self.entrypoint, str):
@@ -511,6 +578,31 @@ def __post_init__(self):
511578
f"got {type(v)}"
512579
)
513580

581+
if self.virtual_cluster_id is not None and not isinstance(
582+
self.virtual_cluster_id, str
583+
):
584+
raise TypeError(
585+
"virtual_cluster_id must be a string if provided, "
586+
f"got {type(self.virtual_cluster_id)}"
587+
)
588+
589+
if self.replica_sets is not None:
590+
if not isinstance(self.replica_sets, dict):
591+
raise TypeError(
592+
"replica_sets must be a dict, " f"got {type(self.replica_sets)}"
593+
)
594+
else:
595+
for k in self.replica_sets.keys():
596+
if not isinstance(k, str):
597+
raise TypeError(
598+
"replica_sets keys must be strings, " f"got {type(k)}"
599+
)
600+
for v in self.replica_sets.values():
601+
if not isinstance(v, int):
602+
raise TypeError(
603+
"replica_sets values must be integers, " f"got {type(v)}"
604+
)
605+
514606

515607
@dataclass
516608
class JobSubmitResponse:

0 commit comments

Comments
 (0)