Skip to content

Commit b5f4721

Browse files
committed
fix(BA-2753): Disaggregate components not supposed to be shared
This change pull out many connections/components like etcd connections etc. that are not supposed to be shared between the multi agent instances out such that they are distinct from each other.
1 parent ad89cba commit b5f4721

File tree

7 files changed

+309
-139
lines changed

7 files changed

+309
-139
lines changed

src/ai/backend/agent/agent.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,8 @@ async def __ainit__(self) -> None:
888888
"AbstractAgent.__ainit__": "Redis runtime configuration is not set."
889889
})
890890

891+
self.local_config.agent.image_commit_path.mkdir(parents=True, exist_ok=True)
892+
891893
redis_profile_target = self.local_config.redis.to_redis_profile_target()
892894
stream_redis_target = redis_profile_target.profile_target(RedisRole.STREAM)
893895
mq = await self._make_message_queue(stream_redis_target)

src/ai/backend/agent/config/unified.py

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,46 @@ class OverridableAgentConfig(BaseConfigSchema):
687687
extra="allow",
688688
)
689689

690+
def with_derived_fields(
691+
self,
692+
default_agent: AgentConfig,
693+
agent_idx: int,
694+
) -> OverridableAgentConfig:
695+
if self.id is None and default_agent.id is None:
696+
return self
697+
698+
agent_id = self.id if self.id is not None else f"{default_agent.id}-{agent_idx}"
699+
700+
if "ipc_base_path" in self.model_fields_set:
701+
ipc_base_path = self.ipc_base_path
702+
else:
703+
ipc_base_path = default_agent.ipc_base_path / agent_id
704+
705+
if "var_base_path" in self.model_fields_set:
706+
var_base_path = self.var_base_path
707+
else:
708+
var_base_path = default_agent.var_base_path / agent_id
709+
710+
if "image_commit_path" in self.model_fields_set:
711+
image_commit_path = self.image_commit_path
712+
else:
713+
image_commit_path = default_agent.image_commit_path / agent_id
714+
715+
if default_agent.abuse_report_path is None or "abuse_report_path" in self.model_fields_set:
716+
abuse_report_path = self.abuse_report_path
717+
else:
718+
abuse_report_path = default_agent.abuse_report_path / agent_id
719+
720+
return self.model_copy(
721+
update={
722+
"id": agent_id,
723+
"ipc_base_path": ipc_base_path,
724+
"var_base_path": var_base_path,
725+
"image_commit_path": image_commit_path,
726+
"abuse_report_path": abuse_report_path,
727+
}
728+
)
729+
690730

691731
class AgentConfig(CommonAgentConfig, OverridableAgentConfig):
692732
pass
@@ -1130,10 +1170,16 @@ class AgentOverrideConfig(BaseConfigSchema):
11301170
description="Resource config overrides for the individual agent",
11311171
)
11321172

1133-
def construct_unified_config(self, *, default: AgentUnifiedConfig) -> AgentUnifiedConfig:
1173+
def construct_unified_config(
1174+
self,
1175+
*,
1176+
default: AgentUnifiedConfig,
1177+
agent_idx: int,
1178+
) -> AgentUnifiedConfig:
11341179
agent_updates: dict[str, Any] = {}
11351180
if self.agent is not None:
1136-
agent_override_fields = self.agent.model_dump(include=self.agent.model_fields_set)
1181+
agent = self.agent.with_derived_fields(default.agent, agent_idx)
1182+
agent_override_fields = agent.model_dump(include=agent.model_fields_set)
11371183
agent_updates["agent"] = default.agent.model_copy(update=agent_override_fields)
11381184
if self.container is not None:
11391185
container_override_fields = self.container.model_dump(
@@ -1169,9 +1215,20 @@ class AgentUnifiedConfig(AgentGlobalConfig, AgentSpecificConfig):
11691215
extra="allow",
11701216
)
11711217

1218+
@property
1219+
def agent_common(self) -> CommonAgentConfig:
1220+
return self.agent
1221+
1222+
@property
1223+
def agent_default(self) -> OverridableAgentConfig:
1224+
return self.agent
1225+
11721226
@property
11731227
def agent_configs(self) -> Sequence[AgentUnifiedConfig]:
1174-
return self._for_each_agent(lambda x: x)
1228+
agent_configs = self._for_each_agent(lambda x: x)
1229+
if not agent_configs:
1230+
raise ValueError("There must be at least one agent config")
1231+
return agent_configs
11751232

11761233
def with_updates(
11771234
self,
@@ -1257,7 +1314,10 @@ def validate(config: AgentSpecificConfig) -> None:
12571314
return self
12581315

12591316
def _for_each_agent(self, func: Callable[[AgentUnifiedConfig], R]) -> list[R]:
1260-
agents = [agent.construct_unified_config(default=self) for agent in self.agents]
1317+
agents = [
1318+
agent.construct_unified_config(default=self, agent_idx=i)
1319+
for i, agent in enumerate(self.agents)
1320+
]
12611321
if not agents:
12621322
agents.append(self)
12631323

src/ai/backend/agent/etcd.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from typing import Collection
2+
3+
from ai.backend.agent.config.unified import AgentUnifiedConfig
4+
from ai.backend.common.data.config.types import EtcdConfigData
5+
from ai.backend.common.etcd import AsyncEtcd, ConfigScopes
6+
from ai.backend.common.types import AgentId
7+
8+
9+
class EtcdClientRegistry:
10+
_etcd_config: EtcdConfigData
11+
_etcd_clients: dict[AgentId, AsyncEtcd]
12+
_global_etcd: AsyncEtcd
13+
14+
@property
15+
def global_etcd(self) -> AsyncEtcd:
16+
return self._global_etcd
17+
18+
def __init__(self, etcd_config: EtcdConfigData) -> None:
19+
self._etcd_config = etcd_config
20+
self._etcd_clients = {}
21+
self._global_etcd = self._create_client(agent_id=None, scaling_group=None)
22+
23+
def get_client(self, agent_id: AgentId) -> AsyncEtcd:
24+
return self._etcd_clients[agent_id]
25+
26+
def prefill_clients(self, prefill_data: Collection[tuple[AgentId, AgentUnifiedConfig]]) -> None:
27+
for agent_id, agent_config in prefill_data:
28+
self._etcd_clients[agent_id] = self._create_client(
29+
agent_id, agent_config.agent.scaling_group
30+
)
31+
32+
def _create_client(self, agent_id: AgentId | None, scaling_group: str | None) -> AsyncEtcd:
33+
scope_prefix_map = {ConfigScopes.GLOBAL: ""}
34+
if scaling_group is not None:
35+
scope_prefix_map[ConfigScopes.SGROUP] = f"sgroup/{scaling_group}"
36+
if agent_id is not None:
37+
scope_prefix_map[ConfigScopes.NODE] = f"nodes/agents/{agent_id}"
38+
39+
if self._etcd_config.user is not None and self._etcd_config.password is not None:
40+
etcd_credentials = {
41+
"user": self._etcd_config.user,
42+
"password": self._etcd_config.password,
43+
}
44+
else:
45+
etcd_credentials = None
46+
47+
return AsyncEtcd(
48+
[addr.to_legacy() for addr in self._etcd_config.addrs],
49+
self._etcd_config.namespace,
50+
scope_prefix_map,
51+
credentials=etcd_credentials,
52+
)

0 commit comments

Comments
 (0)