Skip to content

Commit b94bfba

Browse files
committed
Added async/sync queue listener implementations; improved tests; minor code tweaks.
1 parent 6414a03 commit b94bfba

File tree

5 files changed

+166
-15
lines changed

5 files changed

+166
-15
lines changed

CHANGELOG.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
# Changelog
22

3-
* Updated and rewrote Notification 2.0 listener implementation. Added additional parameters for more control:
4-
`consumer_name`, `shared`, `auto_ack` and `auto_unsubscribe`. Added `unsubscribe` function for removing
5-
subscribers on demand. Both `AsyncListener` and `Listener` now provide consistent `start`/`stop` functions
6-
which take care of coroutine and thread creation. The `listen ` function can still be invoked directly if
7-
necessary.
3+
* Added `QueueListener` and `AsyncQueueListener` classes to the Notification 2.0 toolkit. These pre-defined
4+
listener implementation append new notifications to standard queues that can be monitored/listened to which
5+
makes Notification 2.0 solutions even simpler to implement.
6+
* Updated and rewrote Notification 2.0 listener implementation. Added additional parameters for more control:
7+
`consumer_name`, `shared`, `auto_ack` and `auto_unsubscribe`. Added `unsubscribe` function for removing
8+
subscribers on demand. Both `AsyncListener` and `Listener` now provide consistent `start`/`stop` functions
9+
which take care of coroutine and thread creation. The `listen ` function can still be invoked directly if
10+
necessary.
811

912
## Version 3.3.0
1013

c8y_tk/notification2/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
from c8y_tk.notification2.listener import *
44

5-
__all__ = ['Listener', 'AsyncListener']
5+
__all__ = ['Listener', 'AsyncListener', 'AsyncQueueListener', 'QueueListener']

c8y_tk/notification2/listener.py

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import threading
99
import uuid
1010
from itertools import count
11+
import queue as sync_queue
1112
from typing import Callable, Awaitable
1213

1314
import certifi
@@ -231,7 +232,7 @@ async def _callback(msg):
231232
self._is_connected.clear()
232233
if self._connection:
233234
with contextlib.suppress(Exception):
234-
await self._connection.close() # TODO: add code and reason
235+
await self._connection.close()
235236
self._connection = None
236237

237238
self._is_running.clear()
@@ -586,7 +587,7 @@ def wait(self, timeout=None) -> bool:
586587
timeout (float): Timeout in seconds.
587588
588589
Returns:
589-
True if the listener has stopped (before timeout), False otherwise.
590+
Whether the listener has stopped (before timeout).
590591
"""
591592
self._thread.join(timeout=timeout)
592593
return not self._thread.is_alive()
@@ -621,3 +622,101 @@ def ack(self, msg_id: str = None, payload: str = None) -> None:
621622
"""
622623
# assuming that we are already listening ...
623624
asyncio.run_coroutine_threadsafe(self._listener.ack(msg_id, payload), self._event_loop)
625+
626+
627+
class AsyncQueueListener(object):
628+
"""Special listener implementation which pushes notification messages
629+
into a standard (async) queue which can be monitored and read."""
630+
631+
def __init__(
632+
self,
633+
c8y: CumulocityApi,
634+
subscription_name: str,
635+
subscriber_name: str = None,
636+
consumer_name: str = None,
637+
shared: bool = False,
638+
auto_unsubscribe: bool = True,
639+
queue: asyncio.Queue = None
640+
):
641+
self.queue = queue or asyncio.Queue()
642+
self.listener = AsyncListener(
643+
c8y=c8y,
644+
subscription_name=subscription_name,
645+
subscriber_name=subscriber_name,
646+
consumer_name=consumer_name,
647+
shared=shared,
648+
auto_ack=True,
649+
auto_unsubscribe=auto_unsubscribe,
650+
)
651+
652+
def start(self):
653+
"""Start the listener."""
654+
async def push_message(msg: AsyncListener.Message):
655+
self.queue.put_nowait(msg)
656+
657+
self.listener.start(push_message)
658+
659+
def stop(self):
660+
"""Stop the listener."""
661+
self.listener.stop()
662+
663+
async def wait(self, timeout=None):
664+
"""Wait for the listener task to finish.
665+
666+
Args:
667+
timeout (int): The number of seconds to wait for the listener
668+
to finish. The listener will be cancelled if the timeout
669+
occurs.
670+
"""
671+
await self.listener.wait(timeout=timeout)
672+
673+
674+
class QueueListener(object):
675+
"""Special listener implementation which pushes notification messages
676+
into a standard (sync) queue which can be monitored and read."""
677+
678+
def __init__(
679+
self,
680+
c8y: CumulocityApi,
681+
subscription_name: str,
682+
subscriber_name: str = None,
683+
consumer_name: str = None,
684+
shared: bool = False,
685+
auto_unsubscribe: bool = True,
686+
queue: sync_queue.Queue = None
687+
):
688+
self.queue = queue or sync_queue.Queue()
689+
self.listener = Listener(
690+
c8y=c8y,
691+
subscription_name=subscription_name,
692+
subscriber_name=subscriber_name,
693+
consumer_name=consumer_name,
694+
shared=shared,
695+
auto_ack=True,
696+
auto_unsubscribe=auto_unsubscribe,
697+
)
698+
699+
def start(self):
700+
"""Start the listener."""
701+
702+
def push_message(msg: AsyncListener.Message):
703+
self.queue.put(msg)
704+
705+
self.listener.start(push_message)
706+
707+
def stop(self):
708+
"""Stop the listener."""
709+
self.listener.stop()
710+
711+
def wait(self, timeout=None) -> bool:
712+
"""Wait for the listener task to finish.
713+
714+
Args:
715+
timeout (int): The number of seconds to wait for the listener
716+
to finish. The listener will be cancelled if the timeout
717+
occurs.
718+
719+
Returns:
720+
Whether the listener has stopped (before timeout).
721+
"""
722+
return self.listener.wait(timeout=timeout)

integration_tests/test_notification2.py

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
import pytest
1414

1515
from c8y_api import CumulocityApi
16-
from c8y_api.model import Device, ManagedObject, Subscription, Measurement, Value, Event, Alarm
17-
from c8y_tk.notification2 import AsyncListener, Listener
18-
from tests.utils import assert_in_any, assert_no_failures, assert_all_in
16+
from c8y_api.model import Device, ManagedObject, Subscription, Measurement, Value, Event, Alarm, Operation
17+
from c8y_tk.notification2 import AsyncListener, Listener, AsyncQueueListener, QueueListener
18+
from tests.utils import assert_in_any, assert_no_failures
1919

2020
from util.testing_util import RandomNameGenerator
2121

@@ -86,9 +86,8 @@ def build():
8686
return build
8787

8888

89-
# TODO: Add Operation
9089
@pytest.mark.parametrize("api_filters, expected", [
91-
('*', 'M,E,EwC,A,AwC,MO'),
90+
('*', 'M,E,EwC,A,AwC,MO,O'),
9291
('M', 'M'),
9392
('E', 'E'),
9493
('EwC', 'E,EwC'),
@@ -126,6 +125,7 @@ def test_api_filters(live_c8y: CumulocityApi, sample_object, api_filters, expect
126125

127126
mo = sample_object
128127
mo['c8y_IsDevice'] = {}
128+
mo['com_cumulocity_model_Agent'] = {}
129129
mo.update()
130130
sub = Subscription(
131131
live_c8y,
@@ -153,7 +153,7 @@ def receive_notification(m:Listener.Message):
153153
e_id = Event(live_c8y, source=mo.id, type="c8y_TestEvent", time='now', text='text').create().id
154154
a_id = Alarm(live_c8y, source=mo.id, type="c8y_TestAlarm", time='now', text='text',
155155
severity=Alarm.Severity.WARNING).create().id
156-
# o_id = Operation(live_c8y, device_id=mo.id, c8y_Operation={}).create().id
156+
o_id = Operation(live_c8y, device_id=mo.id, c8y_Operation={}).create().id
157157
mo.apply({'some_tag': {}})
158158

159159
time.sleep(1)
@@ -169,6 +169,8 @@ def receive_notification(m:Listener.Message):
169169
assert e_id in ids
170170
if 'alarms' in expected:
171171
assert a_id in ids
172+
if 'operations' in expected:
173+
assert o_id in ids
172174
finally:
173175
# (99) cleanup
174176
listener.stop()
@@ -666,3 +668,51 @@ async def receive_notification(m:AsyncListener.Message):
666668
== (n_listeners-1 if shared else 0))
667669
assert sum("cancelled" in x for x in log_messages) == n_listeners
668670
assert_no_failures(caplog)
671+
672+
673+
@pytest.mark.asyncio(loop_scope='function')
674+
async def test_asyncio_queue_listener(live_c8y: CumulocityApi, sample_object):
675+
"""Verify that the queue listener works as expected."""
676+
mo = sample_object
677+
sub = create_managed_object_subscription(live_c8y, mo)
678+
679+
q = asyncio.Queue()
680+
listener = AsyncQueueListener(
681+
c8y=live_c8y,
682+
subscription_name=sub.name,
683+
queue=q,
684+
)
685+
686+
listener.start()
687+
try:
688+
await asyncio.sleep(5) # ensure creation
689+
mo.apply({'test_CustomFragment': {'num': 42}})
690+
msg = await q.get()
691+
assert msg.json['test_CustomFragment']['num'] == 42
692+
finally:
693+
listener.stop()
694+
await listener.wait()
695+
696+
697+
@pytest.mark.asyncio(loop_scope='function')
698+
async def test_queue_listener(live_c8y: CumulocityApi, sample_object):
699+
"""Verify that the queue listener works as expected."""
700+
mo = sample_object
701+
sub = create_managed_object_subscription(live_c8y, mo)
702+
703+
q = queue.Queue()
704+
listener = QueueListener(
705+
c8y=live_c8y,
706+
subscription_name=sub.name,
707+
queue=q,
708+
)
709+
710+
listener.start()
711+
try:
712+
time.sleep(5) # ensure creation
713+
mo.apply({'test_CustomFragment': {'num': 42}})
714+
msg = q.get()
715+
assert msg.json['test_CustomFragment']['num'] == 42
716+
finally:
717+
listener.stop()
718+
listener.wait()

samples/notification2_synchronous.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
# pylint: disable=missing-function-docstring
44

5-
import threading
65
import time
76

87
from c8y_api.app import SimpleCumulocityApp

0 commit comments

Comments
 (0)