Skip to content

Commit ad89cba

Browse files
committed
feat(BA-2753): Spawn multiple agents and route RPC appropriately
This change adds support for actually spawning multiple agents within the same agent server and adding agent_id field for all appropriate RPC calls in the agent server, then ensuring that the manager sends that info such that the agent server can correctly route the RPC calls to the correct agent.
1 parent 82933d0 commit ad89cba

File tree

9 files changed

+650
-171
lines changed

9 files changed

+650
-171
lines changed

changes/6320.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Update Agent server RPC functions to include agent ID for agent runtime with multiple agents

src/ai/backend/agent/docker/agent.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,7 +1298,7 @@ class DockerAgent(AbstractAgent[DockerKernel, DockerKernelCreationContext]):
12981298
monitor_docker_task: asyncio.Task
12991299
agent_sockpath: Path
13001300
agent_sock_task: asyncio.Task
1301-
metadata_server: MetadataServer
1301+
metadata_server: Optional[MetadataServer]
13021302
docker_ptask_group: aiotools.PersistentTaskGroup
13031303
gwbridge_subnet: Optional[str]
13041304
checked_invalid_images: Set[str]
@@ -1324,6 +1324,9 @@ def __init__(
13241324
agent_public_key=agent_public_key,
13251325
)
13261326
self.checked_invalid_images = set()
1327+
# MetadataServer must be shared across all instances of DockerAgent.
1328+
# metadata_server is initialized by AgentRPCServer and assigned via assign_metadata_server()
1329+
self.metadata_server = None
13271330

13281331
async def __ainit__(self) -> None:
13291332
async with closing_async(Docker()) as docker:
@@ -1398,12 +1401,6 @@ async def __ainit__(self) -> None:
13981401
self.monitor_docker_task = asyncio.create_task(self.monitor_docker_events())
13991402
self.docker_ptask_group = aiotools.PersistentTaskGroup()
14001403

1401-
self.metadata_server = await MetadataServer.new(
1402-
self.local_config,
1403-
self.etcd,
1404-
self.kernel_registry,
1405-
)
1406-
await self.metadata_server.start_server()
14071404
# For legacy accelerator plugins
14081405
self.docker = Docker()
14091406

@@ -1416,6 +1413,9 @@ async def __ainit__(self) -> None:
14161413
blocklist=self.local_config.agent.block_network_plugins,
14171414
)
14181415

1416+
def assign_metadata_server(self, metadata_server: MetadataServer) -> None:
1417+
self.metadata_server = metadata_server
1418+
14191419
async def shutdown(self, stop_signal: signal.Signals):
14201420
# Stop handling agent sock.
14211421
if self.agent_sock_task is not None:
@@ -1432,7 +1432,6 @@ async def shutdown(self, stop_signal: signal.Signals):
14321432
self.monitor_docker_task.cancel()
14331433
await self.monitor_docker_task
14341434

1435-
await self.metadata_server.cleanup()
14361435
if self.docker:
14371436
await self.docker.close()
14381437

0 commit comments

Comments
 (0)