Skip to content

Commit 2e2f49e

Browse files
committed
Rework NoBsWs to avoid agen/trio incompatibility
`trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - python-trio/trio#638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
1 parent ac4a8a3 commit 2e2f49e

File tree

1 file changed

+231
-81
lines changed

1 file changed

+231
-81
lines changed

piker/data/_web_bs.py

Lines changed: 231 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# piker: trading gear for hackers
2-
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
2+
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
33

44
# This program is free software: you can redistribute it and/or modify
55
# it under the terms of the GNU Affero General Public License as published by
@@ -18,23 +18,29 @@
1818
ToOlS fOr CoPInG wITh "tHE wEB" protocols.
1919
2020
"""
21+
from __future__ import annotations
2122
from contextlib import (
22-
asynccontextmanager,
23-
AsyncExitStack,
23+
asynccontextmanager as acm,
2424
)
2525
from itertools import count
26+
from functools import partial
2627
from types import ModuleType
2728
from typing import (
2829
Any,
2930
Optional,
3031
Callable,
32+
AsyncContextManager,
3133
AsyncGenerator,
3234
Iterable,
3335
)
3436
import json
3537

3638
import trio
37-
import trio_websocket
39+
from trio_typing import TaskStatus
40+
from trio_websocket import (
41+
WebSocketConnection,
42+
open_websocket_url,
43+
)
3844
from wsproto.utilities import LocalProtocolError
3945
from trio_websocket._impl import (
4046
ConnectionClosed,
@@ -52,9 +58,15 @@ class NoBsWs:
5258
'''
5359
Make ``trio_websocket`` sockets stay up no matter the bs.
5460
55-
You can provide a ``fixture`` async-context-manager which will be
56-
enter/exitted around each reconnect operation.
61+
A shim interface that allows client code to stream from some
62+
``WebSocketConnection`` but where any connectivy bs is handled
63+
automatcially and entirely in the background.
64+
65+
NOTE: this type should never be created directly but instead is
66+
provided via the ``open_autorecon_ws()`` factor below.
67+
5768
'''
69+
# apparently we can QoS for all sorts of reasons..so catch em.
5870
recon_errors = (
5971
ConnectionClosed,
6072
DisconnectionTimeout,
@@ -67,115 +79,253 @@ class NoBsWs:
6779
def __init__(
6880
self,
6981
url: str,
70-
stack: AsyncExitStack,
71-
fixture: Optional[Callable] = None,
82+
rxchan: trio.MemoryReceiveChannel,
83+
msg_recv_timeout: float,
84+
7285
serializer: ModuleType = json
7386
):
7487
self.url = url
75-
self.fixture = fixture
76-
self._stack = stack
77-
self._ws: 'WebSocketConnection' = None # noqa
78-
79-
# TODO: is there some method we can call
80-
# on the underlying `._ws` to get this?
81-
self._connected: bool = False
82-
83-
async def _connect(
84-
self,
85-
tries: int = 1000,
86-
) -> None:
87-
88-
self._connected = False
89-
while True:
90-
try:
91-
await self._stack.aclose()
92-
except self.recon_errors:
93-
await trio.sleep(0.5)
94-
else:
95-
break
96-
97-
last_err = None
98-
for i in range(tries):
99-
try:
100-
self._ws = await self._stack.enter_async_context(
101-
trio_websocket.open_websocket_url(self.url)
102-
)
88+
self._rx = rxchan
89+
self._timeout = msg_recv_timeout
10390

104-
if self.fixture is not None:
105-
# rerun user code fixture
106-
ret = await self._stack.enter_async_context(
107-
self.fixture(self)
108-
)
91+
# signaling between caller and relay task which determines when
92+
# socket is connected (and subscribed).
93+
self._connected: trio.Event = trio.Event()
10994

110-
assert ret is None
95+
# dynamically reset by the bg relay task
96+
self._ws: WebSocketConnection | None = None
97+
self._cs: trio.CancelScope | None = None
11198

112-
log.info(f'Connection success: {self.url}')
99+
# interchange codec methods
100+
# TODO: obviously the method API here may be different
101+
# for another interchange format..
102+
self._dumps: Callable = serializer.dumps
103+
self._loads: Callable = serializer.loads
113104

114-
self._connected = True
115-
return self._ws
105+
def connected(self) -> bool:
106+
return self._connected.is_set()
116107

117-
except self.recon_errors as err:
118-
last_err = err
119-
log.error(
120-
f'{self} connection bail with '
121-
f'{type(err)}...retry attempt {i}'
122-
)
123-
await trio.sleep(0.5)
124-
self._connected = False
125-
continue
126-
else:
127-
log.exception('ws connection fail...')
128-
raise last_err
108+
async def reset(self) -> None:
109+
'''
110+
Reset the underlying ws connection by cancelling
111+
the bg relay task and waiting for it to signal
112+
a new connection.
129113
130-
def connected(self) -> bool:
131-
return self._connected
114+
'''
115+
self._connected = trio.Event()
116+
self._cs.cancel()
117+
await self._connected.wait()
132118

133119
async def send_msg(
134120
self,
135121
data: Any,
136122
) -> None:
137123
while True:
138124
try:
139-
return await self._ws.send_message(json.dumps(data))
125+
msg: Any = self._dumps(data)
126+
return await self._ws.send_message(msg)
140127
except self.recon_errors:
141-
await self._connect()
128+
await self.reset()
142129

143-
async def recv_msg(
144-
self,
145-
) -> Any:
146-
while True:
147-
try:
148-
return json.loads(await self._ws.get_message())
149-
except self.recon_errors:
150-
await self._connect()
130+
async def recv_msg(self) -> Any:
131+
msg: Any = await self._rx.receive()
132+
data = self._loads(msg)
133+
return data
151134

152135
def __aiter__(self):
153136
return self
154137

155138
async def __anext__(self):
156139
return await self.recv_msg()
157140

141+
def set_recv_timeout(
142+
self,
143+
timeout: float,
144+
) -> None:
145+
self._timeout = timeout
146+
147+
148+
async def _reconnect_forever(
149+
url: str,
150+
snd: trio.MemorySendChannel,
151+
nobsws: NoBsWs,
152+
reset_after: int, # msg recv timeout before reset attempt
153+
154+
fixture: AsyncContextManager | None = None,
155+
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
158156

159-
@asynccontextmanager
157+
) -> None:
158+
159+
async def proxy_msgs(
160+
ws: WebSocketConnection,
161+
pcs: trio.CancelScope, # parent cancel scope
162+
):
163+
'''
164+
Receive (under `timeout` deadline) all msgs from from underlying
165+
websocket and relay them to (calling) parent task via ``trio``
166+
mem chan.
167+
168+
'''
169+
# after so many msg recv timeouts, reset the connection
170+
timeouts: int = 0
171+
172+
while True:
173+
with trio.move_on_after(
174+
# can be dynamically changed by user code
175+
nobsws._timeout,
176+
) as cs:
177+
try:
178+
msg: Any = await ws.get_message()
179+
await snd.send(msg)
180+
except nobsws.recon_errors:
181+
log.exception(
182+
f'{url} connection bail with:'
183+
)
184+
await trio.sleep(0.5)
185+
pcs.cancel()
186+
187+
# go back to reonnect loop in parent task
188+
return
189+
190+
if cs.cancelled_caught:
191+
timeouts += 1
192+
if timeouts > reset_after:
193+
log.error(
194+
'WS feed seems down and slow af? .. resetting\n'
195+
)
196+
pcs.cancel()
197+
198+
# go back to reonnect loop in parent task
199+
return
200+
201+
async def open_fixture(
202+
fixture: AsyncContextManager,
203+
nobsws: NoBsWs,
204+
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
205+
):
206+
'''
207+
Open user provided `@acm` and sleep until any connection
208+
reset occurs.
209+
210+
'''
211+
async with fixture(nobsws) as ret:
212+
assert ret is None
213+
task_status.started()
214+
await trio.sleep_forever()
215+
216+
# last_err = None
217+
nobsws._connected = trio.Event()
218+
task_status.started()
219+
220+
while not snd._closed:
221+
log.info(f'{url} trying (RE)CONNECT')
222+
223+
async with trio.open_nursery() as n:
224+
cs = nobsws._cs = n.cancel_scope
225+
ws: WebSocketConnection
226+
async with open_websocket_url(url) as ws:
227+
nobsws._ws = ws
228+
log.info(f'Connection success: {url}')
229+
230+
# begin relay loop to forward msgs
231+
n.start_soon(
232+
proxy_msgs,
233+
ws,
234+
cs,
235+
)
236+
237+
if fixture is not None:
238+
log.info(f'Entering fixture: {fixture}')
239+
240+
# TODO: should we return an explicit sub-cs
241+
# from this fixture task?
242+
await n.start(
243+
open_fixture,
244+
fixture,
245+
nobsws,
246+
)
247+
248+
# indicate to wrapper / opener that we are up and block
249+
# to let tasks run **inside** the ws open block above.
250+
nobsws._connected.set()
251+
await trio.sleep_forever()
252+
253+
# ws open block end
254+
# nursery block end
255+
nobsws._connected = trio.Event()
256+
if cs.cancelled_caught:
257+
log.cancel(
258+
f'{url} connection cancelled!'
259+
)
260+
# if wrapper cancelled us, we expect it to also
261+
# have re-assigned a new event
262+
assert (
263+
nobsws._connected
264+
and not nobsws._connected.is_set()
265+
)
266+
267+
# -> from here, move to next reconnect attempt
268+
269+
else:
270+
log.exception('ws connection closed by client...')
271+
272+
273+
@acm
160274
async def open_autorecon_ws(
161275
url: str,
162276

163-
# TODO: proper type cannot smh
164-
fixture: Optional[Callable] = None,
277+
fixture: AsyncContextManager | None = None,
278+
279+
# time in sec
280+
msg_recv_timeout: float = 3,
281+
282+
# count of the number of above timeouts before connection reset
283+
reset_after: int = 3,
165284

166285
) -> AsyncGenerator[tuple[...], NoBsWs]:
167-
"""Apparently we can QoS for all sorts of reasons..so catch em.
286+
'''
287+
An auto-reconnect websocket (wrapper API) around
288+
``trio_websocket.open_websocket_url()`` providing automatic
289+
re-connection on network errors, msg latency and thus roaming.
168290
169-
"""
170-
async with AsyncExitStack() as stack:
171-
ws = NoBsWs(url, stack, fixture=fixture)
172-
await ws._connect()
291+
Here we implement a re-connect websocket interface where a bg
292+
nursery runs ``WebSocketConnection.receive_message()``s in a loop
293+
and restarts the full http(s) handshake on catches of certain
294+
connetivity errors, or some user defined recv timeout.
173295
174-
try:
175-
yield ws
296+
You can provide a ``fixture`` async-context-manager which will be
297+
entered/exitted around each connection reset; eg. for (re)requesting
298+
subscriptions without requiring streaming setup code to rerun.
176299
300+
'''
301+
snd: trio.MemorySendChannel
302+
rcv: trio.MemoryReceiveChannel
303+
snd, rcv = trio.open_memory_channel(616)
304+
305+
async with trio.open_nursery() as n:
306+
nobsws = NoBsWs(
307+
url,
308+
rcv,
309+
msg_recv_timeout=msg_recv_timeout,
310+
)
311+
await n.start(
312+
partial(
313+
_reconnect_forever,
314+
url,
315+
snd,
316+
nobsws,
317+
fixture=fixture,
318+
reset_after=reset_after,
319+
)
320+
)
321+
await nobsws._connected.wait()
322+
assert nobsws._cs
323+
assert nobsws.connected()
324+
325+
try:
326+
yield nobsws
177327
finally:
178-
await stack.aclose()
328+
n.cancel_scope.cancel()
179329

180330

181331
'''
@@ -192,7 +342,7 @@ class JSONRPCResult(Struct):
192342
error: Optional[dict] = None
193343

194344

195-
@asynccontextmanager
345+
@acm
196346
async def open_jsonrpc_session(
197347
url: str,
198348
start_id: int = 0,

0 commit comments

Comments
 (0)