Skip to content

Commit 0a5c4e4

Browse files
committed
remote/coordinator: prohibit multiple coordinators with the same name
Since the switch to gRPC, we only check that we don't have a connection from the same peer, but not with the same name, which would break assumptions elsewhere in the code and confuse users. Fix it by raising an exception in this case and shutting down both sides of the ExporterStream. Fixes: #1774 Signed-off-by: Jan Luebbe <[email protected]>
1 parent 70f6150 commit 0a5c4e4

File tree

1 file changed

+27
-0
lines changed

1 file changed

+27
-0
lines changed

labgrid/remote/coordinator.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,8 @@ async def ExporterStream(self, request_iterator, context):
401401
command_queue = asyncio.Queue()
402402
pending_commands = []
403403

404+
startup_done = asyncio.Event()
405+
404406
out_msg = labgrid_coordinator_pb2.ExporterOutMessage()
405407
out_msg.hello.version = labgrid_version()
406408
yield out_msg
@@ -420,10 +422,13 @@ async def request_task():
420422
elif kind == "startup":
421423
version = in_msg.startup.version
422424
name = in_msg.startup.name
425+
if existing := self.get_exporter_by_name(name):
426+
raise ExporterError(f"exporter with name '{name}' is already connected from {existing.peer}")
423427
session = self.exporters[peer] = ExporterSession(self, peer, name, command_queue, version)
424428
logging.debug("Exporters: %s", self.exporters)
425429
logging.debug("Received startup from %s with %s", name, version)
426430
asyncio.current_task().set_name(f"exporter-{peer}-rx/started-{name}")
431+
startup_done.set()
427432
elif kind == "resource":
428433
logging.debug("Received resource from %s with %s", name, in_msg.resource)
429434
action, _ = session.set_resource(
@@ -439,10 +444,32 @@ async def request_task():
439444
logging.debug("exporter request_task done: %s", context.done())
440445
except Exception:
441446
logging.exception("error in exporter message handler")
447+
raise
442448

443449
asyncio.current_task().set_name(f"exporter-{peer}-tx")
444450
running_request_task = self.loop.create_task(request_task(), name=f"exporter-{peer}-rx/init")
445451

452+
startup_done_task = self.loop.create_task(startup_done.wait())
453+
done, _ = await asyncio.wait(
454+
{startup_done_task, running_request_task},
455+
timeout=3,
456+
return_when=asyncio.FIRST_COMPLETED,
457+
)
458+
# clean up event task
459+
startup_done.set()
460+
await startup_done_task
461+
if running_request_task in done:
462+
# we probably had an exception during startup
463+
try:
464+
await running_request_task
465+
except ExporterError as e:
466+
await context.abort(grpc.StatusCode.ALREADY_EXISTS, f"startup failed: {e}")
467+
raise
468+
elif startup_done_task in done:
469+
await startup_done_task
470+
else:
471+
raise ExporterError(f"exporter connection from {peer} timed out during startup")
472+
446473
try:
447474
async for cmd in queue_as_aiter(command_queue):
448475
logging.debug("exporter cmd %s", cmd)

0 commit comments

Comments
 (0)