Skip to content

Commit e17be1c

Browse files
committed
feat(BA-2851): Add resource isolation options for multi-agent setup
This change adds configuration for partitioning resources rather than every agent always seeing the full resource pool. This prevents unintended over-allocation that could crash kernels. SHARED mode allows all agents to see full resources (useful for stress testing). This is the same behavior as before. AUTO_SPLIT automatically divides resources equally among agents. MANUAL mode lets users specify exact per-agent allocations for all resources. Single-agent deployments remain unaffected and retain access to all available hardware resources.
1 parent 04a0f3a commit e17be1c

File tree

17 files changed

+1628
-66
lines changed

17 files changed

+1628
-66
lines changed

changes/6498.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add resource isolation options for multi-agent setup

configs/agent/sample.toml

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,14 @@
160160
# If agents field is populated, this field indicates the default values for all
161161
# agents.
162162
[resource]
163+
# Hard CPU allocation for this agent (e.g., 8 cores).
164+
# Only used in MANUAL allocation mode.
165+
# All agents must specify this value when allocation-mode is MANUAL.
166+
## allocated-cpu = 8
167+
# Hard memory allocation for this agent (e.g., "32G").
168+
# Only used in MANUAL allocation mode.
169+
# All agents must specify this value when allocation-mode is MANUAL.
170+
## allocated-mem = "32G"
163171
# The number of CPU cores reserved for the operating system and the agent
164172
# service.
165173
reserved-cpu = 1
@@ -173,6 +181,12 @@
173181
# Currently this value is unused. In future releases, it may be used to preserve
174182
# the minimum disk space from the scratch disk allocation via loopback files.
175183
reserved-disk = "8G"
184+
# Resource allocation mode for multi-agent scenarios.
185+
# - `shared`: All agents share the full resource pool (default, backward
186+
# compatible).
187+
# - `auto-split`: Automatically divide resources equally (1/N) among all agents.
188+
# - `manual`: Manually specify per-agent resource allocations via config.
189+
allocation-mode = "shared"
176190
# The alignment of the reported main memory size to absorb tiny deviations from
177191
# per-node firmware/hardware settings. Recommended to be multiple of the
178192
# page/hugepage size (e.g., 2 MiB).
@@ -182,6 +196,10 @@
182196
# Affinity policy
183197
affinity-policy = "INTERLEAVED"
184198

199+
# Device-specific per-slot resource allocations.
200+
# Only used in MANUAL allocation mode.
201+
[resource.allocated-devices]
202+
185203
# Pyroscope configuration
186204
[pyroscope]
187205
# Whether to enable Pyroscope profiling
@@ -433,24 +451,15 @@
433451

434452
# Resource config overrides for the individual agent
435453
[agents.resource]
436-
# The number of CPU cores reserved for the operating system and the agent
437-
# service.
438-
reserved-cpu = 1
439-
# The memory space reserved for the operating system and the agent service. It
440-
# is subtracted from the reported main memory size and not available for user
441-
# workload allocation. Depending on the memory-align-size option and system
442-
# configuration, this may not be the exact value but have slightly less or more
443-
# values within the memory-align-size.
444-
reserved-mem = 1073741824
445-
# The disk space reserved for the operating system and the agent service.
446-
# Currently this value is unused. In future releases, it may be used to preserve
447-
# the minimum disk space from the scratch disk allocation via loopback files.
448-
reserved-disk = 8589934592
449-
# The alignment of the reported main memory size to absorb tiny deviations from
450-
# per-node firmware/hardware settings. Recommended to be multiple of the
451-
# page/hugepage size (e.g., 2 MiB).
452-
memory-align-size = 16777216
453-
# Resource allocation order
454-
allocation-order = [ "cuda", "rocm", "tpu", "cpu", "mem",]
455-
# Affinity policy
456-
affinity-policy = 1
454+
# Hard CPU allocation for this agent (e.g., 8 cores).
455+
# Only used in MANUAL allocation mode.
456+
# All agents must specify this value when allocation-mode is MANUAL.
457+
## allocated-cpu = 8
458+
# Hard memory allocation for this agent (e.g., "32G").
459+
# Only used in MANUAL allocation mode.
460+
# All agents must specify this value when allocation-mode is MANUAL.
461+
## allocated-mem = "32G"
462+
463+
# Device-specific per-slot resource allocations.
464+
# Only used in MANUAL allocation mode.
465+
[agents.resource.allocated-devices]

src/ai/backend/accelerator/cuda_open/plugin.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ async def gather_node_measures(
251251
MetricTypes.GAUGE,
252252
unit_hint="bytes",
253253
stats_filter=frozenset({"max"}),
254+
measurement_scale_factor=ctx.agent.resource_partitioner.get_resource_scaling_factor(
255+
SlotName("cuda.device")
256+
),
254257
per_node=Measurement(Decimal(mem_used_total), Decimal(mem_avail_total)),
255258
per_device=mem_stats,
256259
),
@@ -259,6 +262,9 @@ async def gather_node_measures(
259262
MetricTypes.UTILIZATION,
260263
unit_hint="percent",
261264
stats_filter=frozenset({"avg", "max"}),
265+
measurement_scale_factor=ctx.agent.resource_partitioner.get_resource_scaling_factor(
266+
SlotName("cuda.device")
267+
),
262268
per_node=Measurement(Decimal(util_total), Decimal(dev_count * 100)),
263269
per_device=util_stats,
264270
),

src/ai/backend/agent/agent.py

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from dataclasses import dataclass
3131
from decimal import Decimal
3232
from io import SEEK_END, BytesIO
33+
from itertools import chain
3334
from pathlib import Path
3435
from types import TracebackType
3536
from typing import (
@@ -174,7 +175,6 @@
174175
from ai.backend.common.types import (
175176
MODEL_SERVICE_RUNTIME_PROFILES,
176177
AbuseReportValue,
177-
AcceleratorMetadata,
178178
AgentId,
179179
AutoPullBehavior,
180180
BinarySize,
@@ -233,11 +233,11 @@
233233
from .observer.heartbeat import HeartbeatObserver
234234
from .observer.host_port import HostPortObserver
235235
from .resources import (
236-
AbstractComputeDevice,
237236
AbstractComputePlugin,
238237
ComputerContext,
239238
KernelResourceSpec,
240239
Mount,
240+
ResourcePartitioner,
241241
align_memory,
242242
allocate,
243243
known_slot_types,
@@ -765,7 +765,10 @@ class AbstractAgent(
765765
etcd: AsyncEtcd
766766
local_instance_id: str
767767
kernel_registry: MutableMapping[KernelId, AbstractKernel]
768+
resource_partitioner: ResourcePartitioner
768769
computers: MutableMapping[DeviceName, ComputerContext]
770+
total_slots: Mapping[SlotName, Decimal]
771+
reserved_slots: Mapping[SlotName, Decimal]
769772
images: Mapping[ImageCanonical, ScannedImage]
770773
port_pool: set[int]
771774

@@ -836,6 +839,7 @@ def __init__(
836839
error_monitor: ErrorPluginContext,
837840
skip_initial_scan: bool = False,
838841
agent_public_key: Optional[PublicKey],
842+
resource_partitioner: ResourcePartitioner,
839843
) -> None:
840844
self._skip_initial_scan = skip_initial_scan
841845
self.loop = current_loop()
@@ -845,7 +849,10 @@ def __init__(
845849
self.local_instance_id = generate_local_instance_id(__file__)
846850
self.agent_public_key = agent_public_key
847851
self.kernel_registry = {}
852+
self.resource_partitioner = resource_partitioner
848853
self.computers = {}
854+
self.total_slots = {}
855+
self.reserved_slots = {}
849856
self.images = {}
850857
self.restarting_kernels = {}
851858
self.stat_ctx = StatContext(
@@ -934,28 +941,34 @@ async def __ainit__(self) -> None:
934941
alloc_map_mod.log_alloc_map = self.local_config.debug.log_alloc_map
935942
computers = await self.load_resources()
936943

937-
all_devices: list[AbstractComputeDevice] = []
938-
metadatas: list[AcceleratorMetadata] = []
939944
for name, computer in computers.items():
940945
devices = await computer.list_devices()
941-
all_devices.extend(devices)
942946
alloc_map = await computer.create_alloc_map()
943947
self.computers[name] = ComputerContext(computer, devices, alloc_map)
944-
metadatas.append(computer.get_metadata())
945948

949+
self.total_slots = self.resource_partitioner.calculate_total_slots(
950+
self.computers, self.local_config.resource_common
951+
)
952+
self.reserved_slots = self.resource_partitioner.restrict_computer_resources(
953+
self.computers, self.total_slots
954+
)
946955
self.slots = await self.update_slots()
947956
log.info("Resource slots: {!r}", self.slots)
948957
log.info("Slot types: {!r}", known_slot_types)
949958
self.timer_tasks.append(aiotools.create_timer(self.update_slots_periodically, 30.0))
950959

951960
# Use ValkeyStatClient batch operations for better performance
952961
field_value_map = {}
953-
for metadata in metadatas:
962+
for computer_ctx in self.computers.values():
963+
metadata = computer_ctx.instance.get_metadata()
954964
field_value_map[metadata["slot_name"]] = dump_json_str(metadata).encode()
955965

956966
if field_value_map:
957967
await self.valkey_stat_client.store_computer_metadata(field_value_map)
958968

969+
all_devices = list(
970+
chain.from_iterable(computer.devices for computer in self.computers.values())
971+
)
959972
self.affinity_map = AffinityMap.build(all_devices)
960973

961974
if not self._skip_initial_scan:
@@ -1949,6 +1962,7 @@ async def load_resources(
19491962
"""
19501963
Detect available resources attached on the system and load corresponding device plugin.
19511964
"""
1965+
raise NotImplementedError
19521966

19531967
@abstractmethod
19541968
async def scan_available_resources(
@@ -1957,6 +1971,7 @@ async def scan_available_resources(
19571971
"""
19581972
Scan and define the amount of available resource slots in this node.
19591973
"""
1974+
raise NotImplementedError
19601975

19611976
async def update_slots(
19621977
self,
@@ -1967,14 +1982,9 @@ async def update_slots(
19671982
"""
19681983
scanned_slots = await self.scan_available_resources()
19691984
usable_slots: dict[SlotName, Decimal] = {}
1970-
reserved_slots = {
1971-
SlotName("cpu"): Decimal(self.local_config.resource.reserved_cpu),
1972-
SlotName("mem"): Decimal(self.local_config.resource.reserved_mem),
1973-
SlotName("disk"): Decimal(self.local_config.resource.reserved_disk),
1974-
}
19751985
for slot_name, slot_capacity in scanned_slots.items():
19761986
if slot_name == SlotName("mem"):
1977-
mem_reserved = int(reserved_slots.get(slot_name, 0))
1987+
mem_reserved = int(self.reserved_slots.get(slot_name, 0))
19781988
mem_align = int(self.local_config.resource.memory_align_size)
19791989
mem_usable, mem_reserved = align_memory(
19801990
int(slot_capacity), mem_reserved, align=mem_align
@@ -1988,7 +1998,7 @@ async def update_slots(
19881998
)
19891999
else:
19902000
usable_capacity = max(
1991-
Decimal(0), slot_capacity - reserved_slots.get(slot_name, Decimal(0))
2001+
Decimal(0), slot_capacity - self.reserved_slots.get(slot_name, Decimal(0))
19922002
)
19932003
usable_slots[slot_name] = usable_capacity
19942004
return usable_slots
@@ -2100,6 +2110,7 @@ async def scan_images(self) -> ScanImagesResult:
21002110
This is called periodically to keep the image list up-to-date and allow
21012111
manual image addition and deletions by admins.
21022112
"""
2113+
raise NotImplementedError
21032114

21042115
async def _scan_images_wrapper(self, interval: float) -> None:
21052116
result = await self.scan_images()
@@ -2120,6 +2131,7 @@ async def push_image(
21202131
"""
21212132
Push the given image to the given registry.
21222133
"""
2134+
raise NotImplementedError
21232135

21242136
@abstractmethod
21252137
async def pull_image(
@@ -2132,12 +2144,14 @@ async def pull_image(
21322144
"""
21332145
Pull the given image from the given registry.
21342146
"""
2147+
raise NotImplementedError
21352148

21362149
@abstractmethod
21372150
async def purge_images(self, request: PurgeImagesReq) -> PurgeImagesResp:
21382151
"""
21392152
Purge the given images from the agent.
21402153
"""
2154+
raise NotImplementedError
21412155

21422156
async def check_and_pull(
21432157
self,
@@ -2269,7 +2283,7 @@ async def check_image(
22692283
Check the availability of the image and return a boolean flag that indicates whether
22702284
the agent should try pulling the image from a registry.
22712285
"""
2272-
return False
2286+
raise NotImplementedError
22732287

22742288
async def scan_running_kernels(self) -> None:
22752289
"""
@@ -3491,6 +3505,7 @@ async def destroy_kernel(
34913505
* Send SIGTERM to the kernel's main process.
34923506
* Send SIGKILL if it's not terminated within a few seconds.
34933507
"""
3508+
raise NotImplementedError
34943509

34953510
@abstractmethod
34963511
async def clean_kernel(
@@ -3514,6 +3529,7 @@ async def clean_kernel(
35143529
The ``container_id`` may be ``None`` if the container has already gone away.
35153530
In such cases, skip container-specific cleanups.
35163531
"""
3532+
raise NotImplementedError
35173533

35183534
@abstractmethod
35193535
async def create_local_network(self, network_name: str) -> None:
@@ -3525,6 +3541,7 @@ async def create_local_network(self, network_name: str) -> None:
35253541
It may raise :exc:`NotImplementedError` and then the manager
35263542
will cancel creation of the session.
35273543
"""
3544+
raise NotImplementedError
35283545

35293546
@abstractmethod
35303547
async def destroy_local_network(self, network_name: str) -> None:
@@ -3533,6 +3550,7 @@ async def destroy_local_network(self, network_name: str) -> None:
35333550
35343551
This is called by the manager after kernel destruction.
35353552
"""
3553+
raise NotImplementedError
35363554

35373555
@abstractmethod
35383556
async def restart_kernel__load_config(
@@ -3543,7 +3561,7 @@ async def restart_kernel__load_config(
35433561
"""
35443562
Restore the cluster config from a previous launch of the kernel.
35453563
"""
3546-
pass
3564+
raise NotImplementedError
35473565

35483566
@abstractmethod
35493567
async def restart_kernel__store_config(
@@ -3556,7 +3574,7 @@ async def restart_kernel__store_config(
35563574
Store the cluster config to a kernel-related storage (e.g., scratch space),
35573575
so that restarts of this kernel can reuse the configuration.
35583576
"""
3559-
pass
3577+
raise NotImplementedError
35603578

35613579
async def restart_kernel(
35623580
self,

src/ai/backend/agent/alloc_map.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,17 @@ def update_affinity_hint(
236236
hint_for_next_allocation.append(dev)
237237
affinity_hint.devices = hint_for_next_allocation
238238

239+
@final
240+
def update_device_slot_amounts(self, slot_amounts: Mapping[SlotName, Decimal]) -> None:
241+
self.device_slots = {
242+
device_id: DeviceSlotInfo(
243+
slot_type=slot_info.slot_type,
244+
slot_name=slot_info.slot_name,
245+
amount=slot_amounts[slot_info.slot_name],
246+
)
247+
for device_id, slot_info in self.device_slots.items()
248+
}
249+
239250
@abstractmethod
240251
def allocate(
241252
self,

0 commit comments

Comments
 (0)