Skip to content

Commit 06bee20

Browse files
authored
fix: Handle errors from event stream callbacks (#1302)
If a callback raises an exception, it shouldn't prevent other callbacks receiving the same event. Instead, catch any exceptions and re-raise them after all callbacks have been called as a single ExceptionGroup. This allows the task to be aborted if any of the callbacks fail but still allows callbacks that require the "plan failed" events to run.
1 parent a3b679a commit 06bee20

2 files changed

Lines changed: 27 additions & 2 deletions

File tree

src/blueapi/core/event.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import itertools
2+
import logging
23
from abc import ABC, abstractmethod
34
from collections.abc import Callable
45
from typing import Generic, TypeVar
@@ -9,6 +10,8 @@
910
#: Subscription token type
1011
S = TypeVar("S")
1112

13+
LOGGER = logging.getLogger(__name__)
14+
1215

1316
class EventStream(ABC, Generic[E, S]):
1417
"""
@@ -75,6 +78,11 @@ def publish(self, event: E, correlation_id: str | None = None) -> None:
7578
correlation_id: An optional ID that may be used to correlate this
7679
event with other events
7780
"""
78-
81+
errs = []
7982
for callback in list(self._subscriptions.values()):
80-
callback(event, correlation_id)
83+
try:
84+
callback(event, correlation_id)
85+
except Exception as e:
86+
errs.append(e)
87+
if errs:
88+
raise ExceptionGroup(f"Error(s) publishing event: {event}", errs)

tests/unit_tests/core/test_event.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from concurrent.futures import Future
33
from dataclasses import dataclass
44
from queue import Queue
5+
from unittest import mock
56

67
import pytest
78

@@ -76,6 +77,22 @@ def test_correlation_id(publisher: EventPublisher[MyEvent]) -> None:
7677
assert f.result(timeout=_TIMEOUT) == correlation_id
7778

7879

80+
def test_callback_exceptions_are_contained(publisher: EventPublisher[MyEvent]):
81+
event = MyEvent("foo")
82+
c_id = "bar"
83+
84+
# First call should raise exception, next should be fine
85+
handler = mock.Mock(side_effect=[ValueError("Bad Event"), ()])
86+
publisher.subscribe(handler)
87+
publisher.subscribe(handler)
88+
89+
with pytest.RaisesGroup(ValueError):
90+
publisher.publish(event, c_id)
91+
92+
# Both handlers should be called but the exception should still be raised
93+
handler.assert_has_calls([mock.call(event, c_id), mock.call(event, c_id)])
94+
95+
7996
def _drain(queue: Queue) -> Iterable:
8097
while not queue.empty():
8198
yield queue.get_nowait()

0 commit comments

Comments
 (0)