Skip to content

Commit 0aa4d87

Browse files
authored
Merge pull request #291 from plugwise/alt_prio_queue
Use PriorityQueue.qsize() to obtain the queue depth
2 parents c8fa3ea + e34bb76 commit 0aa4d87

File tree

8 files changed

+10
-37
lines changed

8 files changed

+10
-37
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# Changelog
22

3-
## Ongoing / v0.44.8a0
3+
## v0.44.8
44

5+
- PR [291](https://github.com/plugwise/python-plugwise-usb/pull/291): Collect send-queue depth via PriorityQueue.qsize(), this provides a more accurate result
56
- Fix for [#288](https://github.com/plugwise/plugwise_usb-beta/issues/288) via PR [293](https://github.com/plugwise/python-plugwise-usb/pull/293)
67
- Chores move module publishing on (test)pypi to Trusted Publishing (and using uv) - released as alpha 0.44.8a0 to demonstrate functionality
78

plugwise_usb/connection/manager.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,6 @@ def __init__(self) -> None:
3636
] = {}
3737
self._unsubscribe_stick_events: Callable[[], None] | None = None
3838

39-
@property
40-
def queue_depth(self) -> int:
41-
"""Return estimated size of pending responses."""
42-
return self._sender.processed_messages - self._receiver.processed_messages
43-
44-
def correct_received_messages(self, correction: int) -> None:
45-
"""Correct received messages count."""
46-
self._receiver.correct_processed_messages(correction)
47-
4839
@property
4940
def serial_path(self) -> str:
5041
"""Return current port."""

plugwise_usb/connection/queue.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import logging
99

1010
from ..api import StickEvent
11+
from ..constants import REPORT_QUEUE_FILLING_UP
1112
from ..exceptions import MessageError, NodeTimeout, StickError, StickTimeout
1213
from ..messages import Priority
1314
from ..messages.requests import NodePingRequest, PlugwiseCancelRequest, PlugwiseRequest
@@ -112,13 +113,11 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse | None:
112113
_LOGGER.warning("%s, cancel request", exc) # type: ignore[unreachable]
113114
except StickError as exc:
114115
_LOGGER.error(exc)
115-
self._stick.correct_received_messages(1)
116116
raise StickError(
117117
f"No response received for {request.__class__.__name__} "
118118
+ f"to {request.mac_decoded}"
119119
) from exc
120120
except BaseException as exc:
121-
self._stick.correct_received_messages(1)
122121
raise StickError(
123122
f"No response received for {request.__class__.__name__} "
124123
+ f"to {request.mac_decoded}"
@@ -145,12 +144,12 @@ async def _send_queue_worker(self) -> None:
145144
self._submit_queue.task_done()
146145
return
147146

148-
if self._stick.queue_depth > 3:
147+
qsize = self._submit_queue.qsize()
148+
if qsize > REPORT_QUEUE_FILLING_UP:
149+
# When the queue size grows, rate-limit the sending of requests to avoid overloading the network
149150
await sleep(0.125)
150-
if self._stick.queue_depth > 3:
151-
_LOGGER.warning(
152-
"Awaiting plugwise responses %d", self._stick.queue_depth
153-
)
151+
if qsize > REPORT_QUEUE_FILLING_UP:
152+
_LOGGER.warning("Awaiting plugwise responses %d", qsize)
154153

155154
await self._stick.write_to_stick(request)
156155
self._submit_queue.task_done()

plugwise_usb/connection/receiver.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ def __init__(
9999
self._data_worker_task: Task[None] | None = None
100100

101101
# Message processing
102-
self._processed_msgs = 0
103102
self._message_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue()
104103
self._last_processed_messages: list[bytes] = []
105104
self._current_seq_id: bytes | None = None
@@ -138,20 +137,11 @@ def connection_lost(self, exc: Exception | None = None) -> None:
138137
self._transport = None
139138
self._connection_state = False
140139

141-
@property
142-
def processed_messages(self) -> int:
143-
"""Return the number of processed messages."""
144-
return self._processed_msgs
145-
146140
@property
147141
def is_connected(self) -> bool:
148142
"""Return current connection state of the USB-Stick."""
149143
return self._connection_state
150144

151-
def correct_processed_messages(self, correction: int) -> None:
152-
"""Correct the number of processed messages."""
153-
self._processed_msgs += correction
154-
155145
def connection_made(self, transport: SerialTransport) -> None:
156146
"""Call when the serial connection to USB-Stick is established."""
157147
_LOGGER.info("Connection made")
@@ -291,7 +281,6 @@ async def _message_queue_worker(self) -> None:
291281
await self._notify_stick_subscribers(response)
292282
else:
293283
await self._notify_node_response_subscribers(response)
294-
self._processed_msgs += 1
295284
self._message_queue.task_done()
296285
await sleep(0)
297286
_LOGGER.debug("Message queue worker stopped")

plugwise_usb/connection/sender.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,11 @@ def __init__(self, stick_receiver: StickReceiver, transport: Transport) -> None:
3838
self._loop = get_running_loop()
3939
self._receiver = stick_receiver
4040
self._transport = transport
41-
self._processed_msgs = 0
4241
self._stick_response: Future[StickResponse] | None = None
4342
self._stick_lock = Lock()
4443
self._current_request: None | PlugwiseRequest = None
4544
self._unsubscribe_stick_response: Callable[[], None] | None = None
4645

47-
@property
48-
def processed_messages(self) -> int:
49-
"""Return the number of processed messages."""
50-
return self._processed_msgs
51-
5246
async def start(self) -> None:
5347
"""Start the sender."""
5448
# Subscribe to ACCEPT stick responses, which contain the seq_id we need.
@@ -149,7 +143,6 @@ async def write_request_to_port(self, request: PlugwiseRequest) -> None:
149143
finally:
150144
self._stick_response.cancel()
151145
self._stick_lock.release()
152-
self._processed_msgs += 1
153146

154147
async def _process_stick_response(self, response: StickResponse) -> None:
155148
"""Process stick response."""

plugwise_usb/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
# Value limits
2020
MAX_UINT_2: Final = 255 # 8-bit unsigned integer max
2121
MAX_UINT_4: Final = 65535 # 16-bit unsigned integer max
22+
REPORT_QUEUE_FILLING_UP: Final = 8
2223

2324
# Time
2425
DAY_IN_HOURS: Final = 24

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "plugwise_usb"
7-
version = "0.44.7"
7+
version = "0.44.8a8"
88
license = "MIT"
99
keywords = ["home", "automation", "plugwise", "module", "usb"]
1010
classifiers = [

tests/test_usb.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1691,7 +1691,6 @@ async def makedirs(cache_dir: str, exist_ok: bool) -> None:
16911691
"FEDCBA9876543210": pw_api.NodeType.CIRCLE,
16921692
"1298347650AFBECD": pw_api.NodeType.SCAN,
16931693
}
1694-
pw_nw_cache.update_nodetypes("1234ABCD4321FEDC", pw_api.NodeType.STEALTH)
16951694

16961695
with patch("aiofiles.threadpool.sync_open", return_value=mock_file_stream):
16971696
# await pw_nw_cache.save_cache()

0 commit comments

Comments
 (0)