@@ -87,7 +87,6 @@ class Sampler:
8787 # holds all the ``tractor.Context`` remote subscriptions for
8888 # a particular sample period increment event: all subscribers are
8989 # notified on a step.
90- # subscribers: dict[int, list[tractor.MsgStream]] = {}
9190 subscribers : defaultdict [
9291 float ,
9392 list [
@@ -240,8 +239,11 @@ async def broadcast(
240239 subscribers for a given sample period.
241240
242241 '''
242+ pair : list [float , set ]
243243 pair = self .subscribers [period_s ]
244244
245+ last_ts : float
246+ subs : set
245247 last_ts , subs = pair
246248
247249 task = trio .lowlevel .current_task ()
@@ -253,25 +255,35 @@ async def broadcast(
253255 # f'consumers: {subs}'
254256 )
255257 borked : set [tractor .MsgStream ] = set ()
256- for stream in subs :
258+ sent : set [tractor .MsgStream ] = set ()
259+ while True :
257260 try :
258- await stream .send ({
259- 'index' : time_stamp or last_ts ,
260- 'period' : period_s ,
261- })
262- except (
263- trio .BrokenResourceError ,
264- trio .ClosedResourceError
265- ):
266- log .error (
267- f'{ stream ._ctx .chan .uid } dropped connection'
268- )
269- borked .add (stream )
261+ for stream in (subs - sent ):
262+ try :
263+ await stream .send ({
264+ 'index' : time_stamp or last_ts ,
265+ 'period' : period_s ,
266+ })
267+ sent .add (stream )
268+
269+ except (
270+ trio .BrokenResourceError ,
271+ trio .ClosedResourceError
272+ ):
273+ log .error (
274+ f'{ stream ._ctx .chan .uid } dropped connection'
275+ )
276+ borked .add (stream )
277+ else :
278+ break
279+ except RuntimeError :
280+ log .warning (f'Client subs { subs } changed while broadcasting' )
281+ continue
270282
271283 for stream in borked :
272284 try :
273285 subs .remove (stream )
274- except ValueError :
286+ except KeyError :
275287 log .warning (
276288 f'{ stream ._ctx .chan .uid } sub already removed!?'
277289 )
@@ -419,7 +431,7 @@ async def maybe_open_samplerd(
419431 loglevel : str | None = None ,
420432 ** kwargs ,
421433
422- ) -> tractor ._portal . Portal : # noqa
434+ ) -> tractor .Portal : # noqa
423435 '''
424436 Client-side helper to maybe startup the ``samplerd`` service
425437 under the ``pikerd`` tree.
@@ -609,6 +621,14 @@ async def sample_and_broadcast(
609621 fqsn = f'{ broker_symbol } .{ brokername } '
610622 lags : int = 0
611623
624+ # TODO: speed up this loop in an AOT compiled lang (like
625+ # rust or nim or zig) and/or instead of doing a fan out to
626+ # TCP sockets here, we add a shm-style tick queue which
627+ # readers can pull from instead of placing the burden of
628+ # broadcast on solely on this `brokerd` actor. see issues:
629+ # - https://github.com/pikers/piker/issues/98
630+ # - https://github.com/pikers/piker/issues/107
631+
612632 for (stream , tick_throttle ) in subs .copy ():
613633 try :
614634 with trio .move_on_after (0.2 ) as cs :
@@ -738,9 +758,6 @@ def frame_ticks(
738758 ticks_by_type [ttype ].append (tick )
739759
740760
741- # TODO: a less naive throttler, here's some snippets:
742- # token bucket by njs:
743- # https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
744761async def uniform_rate_send (
745762
746763 rate : float ,
@@ -750,8 +767,22 @@ async def uniform_rate_send(
750767 task_status : TaskStatus = trio .TASK_STATUS_IGNORED ,
751768
752769) -> None :
770+ '''
771+ Throttle a real-time (presumably tick event) stream to a uniform
772+ transmissiom rate, normally for the purposes of throttling a data
773+ flow being consumed by a graphics rendering actor which itself is limited
774+ by a fixed maximum display rate.
753775
754- # try not to error-out on overruns of the subscribed (chart) client
776+ Though this function isn't documented (nor was intentially written
777+ to be) a token-bucket style algo, it effectively operates as one (we
778+ think?).
779+
780+ TODO: a less naive throttler, here's some snippets:
781+ token bucket by njs:
782+ https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
783+
784+ '''
785+ # try not to error-out on overruns of the subscribed client
755786 stream ._ctx ._backpressure = True
756787
757788 # TODO: compute the approx overhead latency per cycle
@@ -848,6 +879,16 @@ async def uniform_rate_send(
848879 # rate timing exactly lul
849880 try :
850881 await stream .send ({sym : first_quote })
882+ except tractor .RemoteActorError as rme :
883+ if rme .type is not tractor ._exceptions .StreamOverrun :
884+ raise
885+ ctx = stream ._ctx
886+ chan = ctx .chan
887+ log .warning (
888+ 'Throttled quote-stream overrun!\n '
889+ f'{ sym } :{ ctx .cid } @{ chan .uid } '
890+ )
891+
851892 except (
852893 # NOTE: any of these can be raised by ``tractor``'s IPC
853894 # transport-layer and we want to be highly resilient
0 commit comments