Skip to content

Commit 70db20b

Browse files
authored
Merge pull request #473 from pikers/binance_ws_ep_update
`binance`: use built-in `anext()` add note about new ws ep URL, fix agen streaming within `NoBsWs` usage
2 parents f3b04f2 + 609b91e commit 70db20b

File tree

4 files changed

+75
-56
lines changed

4 files changed

+75
-56
lines changed

piker/brokers/binance.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
)
2727
import time
2828

29+
from trio_util import trio_async_generator
2930
import trio
3031
from trio_typing import TaskStatus
3132
import pendulum
@@ -317,7 +318,10 @@ class AggTrade(Struct):
317318
M: bool # Ignore
318319

319320

320-
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
321+
@trio_async_generator
322+
async def stream_messages(
323+
ws: NoBsWs,
324+
) -> AsyncGenerator[NoBsWs, dict]:
321325

322326
timeouts = 0
323327
while True:
@@ -529,19 +533,23 @@ async def subscribe(ws: wsproto.WSConnection):
529533
# XXX: do we need to ack the unsub?
530534
# await ws.recv_msg()
531535

532-
async with open_autorecon_ws(
533-
'wss://stream.binance.com/ws',
534-
fixture=subscribe,
535-
) as ws:
536+
async with (
537+
open_autorecon_ws(
538+
# XXX: see api docs which show diff addr?
539+
# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information
540+
# 'wss://ws-api.binance.com:443/ws-api/v3',
541+
'wss://stream.binance.com/ws',
542+
fixture=subscribe,
543+
) as ws,
536544

537-
# pull a first quote and deliver
538-
msg_gen = stream_messages(ws)
539-
540-
typ, quote = await msg_gen.__anext__()
545+
# avoid stream-gen closure from breaking trio..
546+
stream_messages(ws) as msg_gen,
547+
):
548+
typ, quote = await anext(msg_gen)
541549

550+
# pull a first quote and deliver
542551
while typ != 'trade':
543-
# TODO: use ``anext()`` when it lands in 3.10!
544-
typ, quote = await msg_gen.__anext__()
552+
typ, quote = await anext(msg_gen)
545553

546554
task_status.started((init_msgs, quote))
547555

piker/brokers/ib/_util.py

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424

2525
import tractor
2626

27+
from piker.log import get_logger
28+
29+
log = get_logger(__name__)
30+
2731

2832
_reset_tech: Literal[
2933
'vnc',
@@ -134,54 +138,54 @@ def i3ipc_xdotool_manual_click_hack() -> None:
134138
# 'IB', # gw running in i3 (newer version?)
135139
]
136140

137-
for name in win_names:
138-
results = t.find_titled(name)
139-
print(f'results for {name}: {results}')
140-
if results:
141-
con = results[0]
142-
print(f'Resetting data feed for {name}')
143-
win_id = str(con.window)
144-
w, h = con.rect.width, con.rect.height
145-
146-
# TODO: seems to be a few libs for python but not sure
147-
# if they support all the sub commands we need, order of
148-
# most recent commit history:
149-
# https://github.com/rr-/pyxdotool
150-
# https://github.com/ShaneHutter/pyxdotool
151-
# https://github.com/cphyc/pyxdotool
152-
153-
# TODO: only run the reconnect (2nd) kc on a detected
154-
# disconnect?
155-
for key_combo, timeout in [
156-
# only required if we need a connection reset.
157-
# ('ctrl+alt+r', 12),
158-
# data feed reset.
159-
('ctrl+alt+f', 6)
160-
]:
161-
subprocess.call([
162-
'xdotool',
163-
'windowactivate', '--sync', win_id,
164-
165-
# move mouse to bottom left of window (where there should
166-
# be nothing to click).
167-
'mousemove_relative', '--sync', str(w-4), str(h-4),
168-
169-
# NOTE: we may need to stick a `--retry 3` in here..
170-
'click', '--window', win_id,
171-
'--repeat', '3', '1',
172-
173-
# hackzorzes
174-
'key', key_combo,
175-
],
176-
timeout=timeout,
177-
)
141+
try:
142+
for name in win_names:
143+
results = t.find_titled(name)
144+
print(f'results for {name}: {results}')
145+
if results:
146+
con = results[0]
147+
print(f'Resetting data feed for {name}')
148+
win_id = str(con.window)
149+
w, h = con.rect.width, con.rect.height
150+
151+
# TODO: seems to be a few libs for python but not sure
152+
# if they support all the sub commands we need, order of
153+
# most recent commit history:
154+
# https://github.com/rr-/pyxdotool
155+
# https://github.com/ShaneHutter/pyxdotool
156+
# https://github.com/cphyc/pyxdotool
157+
158+
# TODO: only run the reconnect (2nd) kc on a detected
159+
# disconnect?
160+
for key_combo, timeout in [
161+
# only required if we need a connection reset.
162+
# ('ctrl+alt+r', 12),
163+
# data feed reset.
164+
('ctrl+alt+f', 6)
165+
]:
166+
subprocess.call([
167+
'xdotool',
168+
'windowactivate', '--sync', win_id,
169+
170+
# move mouse to bottom left of window (where
171+
# there should be nothing to click).
172+
'mousemove_relative', '--sync', str(w-4), str(h-4),
173+
174+
# NOTE: we may need to stick a `--retry 3` in here..
175+
'click', '--window', win_id,
176+
'--repeat', '3', '1',
177+
178+
# hackzorzes
179+
'key', key_combo,
180+
],
181+
timeout=timeout,
182+
)
178183

179184
# re-activate and focus original window
180-
try:
181185
subprocess.call([
182186
'xdotool',
183187
'windowactivate', '--sync', str(orig_win_id),
184188
'click', '--window', str(orig_win_id), '1',
185189
])
186190
except subprocess.TimeoutExpired:
187-
log.exception(f'xdotool timed out?')
191+
log.exception('xdotool timed out?')

piker/brokers/kraken/feed.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
)
2828
import time
2929

30-
from async_generator import aclosing
3130
from fuzzywuzzy import process as fuzzy
3231
import numpy as np
3332
import pendulum
3433
from trio_typing import TaskStatus
34+
from trio_util import trio_async_generator
3535
import tractor
3636
import trio
3737

@@ -122,6 +122,7 @@ async def stream_messages(
122122
yield msg
123123

124124

125+
@trio_async_generator
125126
async def process_data_feed_msgs(
126127
ws: NoBsWs,
127128
):
@@ -378,7 +379,12 @@ async def subscribe(ws: NoBsWs):
378379
'wss://ws.kraken.com/',
379380
fixture=subscribe,
380381
) as ws,
381-
aclosing(process_data_feed_msgs(ws)) as msg_gen,
382+
383+
# avoid stream-gen closure from breaking trio..
384+
# NOTE: not sure this actually works XD particularly
385+
# if we call `ws._connect()` manally in the streaming
386+
# async gen..
387+
process_data_feed_msgs(ws) as msg_gen,
382388
):
383389
# pull a first quote and deliver
384390
typ, ohlc_last = await anext(msg_gen)

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
# async
5454
'trio',
5555
'trio-websocket',
56+
'trio-util',
5657
'async_generator',
5758

5859
# from github currently (see requirements.txt)

0 commit comments

Comments
 (0)