Skip to content

Commit cc3a528

Browse files
authored
Merge pull request #473 from minrk/broadcast_outstanding
[BroadcastView] hook up derivative message futures before send
2 parents 24f06f0 + 92ff99c commit cc3a528

File tree

2 files changed

+48
-10
lines changed

2 files changed

+48
-10
lines changed

ipyparallel/client/client.py

+21-2
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,7 @@ def _send(
11041104
header=None,
11051105
metadata=None,
11061106
track_outstanding=False,
1107+
message_future_hook=None,
11071108
):
11081109
"""Send a message in the IO thread
11091110
@@ -1136,6 +1137,8 @@ def _send(
11361137
async_result=msg_type in {'execute_request', 'apply_request'},
11371138
track=track,
11381139
)
1140+
if message_future_hook is not None:
1141+
message_future_hook(futures[0])
11391142

11401143
def cleanup(f):
11411144
"""Purge caches on Future resolution"""
@@ -1574,7 +1577,15 @@ def _maybe_raise(self, result):
15741577
return result
15751578

15761579
def send_apply_request(
1577-
self, socket, f, args=None, kwargs=None, metadata=None, track=False, ident=None
1580+
self,
1581+
socket,
1582+
f,
1583+
args=None,
1584+
kwargs=None,
1585+
metadata=None,
1586+
track=False,
1587+
ident=None,
1588+
message_future_hook=None,
15781589
):
15791590
"""construct and send an apply message via a socket.
15801591
@@ -1617,13 +1628,20 @@ def send_apply_request(
16171628
metadata=metadata,
16181629
track=track,
16191630
track_outstanding=True,
1631+
message_future_hook=message_future_hook,
16201632
)
16211633
msg_id = future.msg_id
16221634

16231635
return future
16241636

16251637
def send_execute_request(
1626-
self, socket, code, silent=True, metadata=None, ident=None
1638+
self,
1639+
socket,
1640+
code,
1641+
silent=True,
1642+
metadata=None,
1643+
ident=None,
1644+
message_future_hook=None,
16271645
):
16281646
"""construct and send an execute request via a socket."""
16291647

@@ -1650,6 +1668,7 @@ def send_execute_request(
16501668
ident=ident,
16511669
metadata=metadata,
16521670
track_outstanding=True,
1671+
message_future_hook=message_future_hook,
16531672
)
16541673

16551674
return future

ipyparallel/client/view.py

+27-8
Original file line numberDiff line numberDiff line change
@@ -912,11 +912,23 @@ def _really_apply(
912912
s_idents = [ident.decode("utf8") for ident in idents]
913913

914914
metadata = self._init_metadata(s_idents)
915-
message_future = self.client.send_apply_request(
916-
self._socket, pf, pargs, pkwargs, track=track, metadata=metadata
917-
)
918-
ar = self._make_async_result(
919-
message_future, s_idents, fname=getname(f), targets=targets
915+
916+
ar = None
917+
918+
def make_asyncresult(message_future):
919+
nonlocal ar
920+
ar = self._make_async_result(
921+
message_future, s_idents, fname=getname(f), targets=_targets
922+
)
923+
924+
self.client.send_apply_request(
925+
self._socket,
926+
pf,
927+
pargs,
928+
pkwargs,
929+
track=track,
930+
metadata=metadata,
931+
message_future_hook=make_asyncresult,
920932
)
921933

922934
if block:
@@ -948,14 +960,21 @@ def execute(self, code, silent=True, targets=None, block=None):
948960
s_idents = [ident.decode("utf8") for ident in _idents]
949961

950962
metadata = self._init_metadata(s_idents)
963+
964+
ar = None
965+
966+
def make_asyncresult(message_future):
967+
nonlocal ar
968+
ar = self._make_async_result(
969+
message_future, s_idents, fname='execute', targets=_targets
970+
)
971+
951972
message_future = self.client.send_execute_request(
952973
self._socket,
953974
code,
954975
silent=silent,
955976
metadata=metadata,
956-
)
957-
ar = self._make_async_result(
958-
message_future, s_idents, fname='execute', targets=_targets
977+
message_future_hook=make_asyncresult,
959978
)
960979
if block:
961980
try:

0 commit comments

Comments
 (0)