18
18
from Queue import Queue
19
19
20
20
from decorator import decorator
21
+ import tqdm
21
22
import zmq
22
23
from zmq import MessageTracker
23
24
24
25
from IPython import get_ipython
25
- from IPython .core .display import clear_output , display , display_pretty
26
+ from IPython .core .display import display , display_pretty
26
27
from ipyparallel import error
27
28
from ipyparallel .util import utcnow , compare_datetimes , _parse_date
28
29
from ipython_genutils .py3compat import string_types
@@ -57,7 +58,8 @@ def check_ready(f, self, *args, **kwargs):
57
58
class AsyncResult (Future ):
58
59
"""Class for representing results of non-blocking calls.
59
60
60
- Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
61
+ Extends the interfaces of :py:class:`multiprocessing.pool.AsyncResult`
62
+ and :py:class:`concurrent.futures.Future`.
61
63
"""
62
64
63
65
msg_ids = None
@@ -73,6 +75,7 @@ def __init__(
73
75
fname = 'unknown' ,
74
76
targets = None ,
75
77
owner = False ,
78
+ progress_bar = tqdm .tqdm ,
76
79
):
77
80
super (AsyncResult , self ).__init__ ()
78
81
if not isinstance (children , list ):
@@ -92,6 +95,7 @@ def __init__(
92
95
self ._fname = fname
93
96
self ._targets = targets
94
97
self .owner = owner
98
+ self .progress_bar = progress_bar
95
99
96
100
self ._ready = False
97
101
self ._ready_event = Event ()
@@ -567,20 +571,21 @@ def wall_time(self):
567
571
return self .timedelta (self .submitted , self .received )
568
572
569
573
def wait_interactive (self , interval = 1.0 , timeout = - 1 ):
570
- """interactive wait, printing progress at regular intervals"""
574
+ """interactive wait, printing progress at regular intervals. """
571
575
if timeout is None :
572
576
timeout = - 1
573
577
N = len (self )
574
- tic = time .time ()
575
- while not self .ready () and (timeout < 0 or time .time () - tic <= timeout ):
578
+ tic = time .perf_counter ()
579
+ progress_bar = self .progress_bar (total = N , unit = 'tasks' , desc = self ._fname )
580
+ n_prev = 0
581
+ while not self .ready () and (
582
+ timeout < 0 or time .perf_counter () - tic <= timeout
583
+ ):
576
584
self .wait (interval )
577
- clear_output (wait = True )
578
- print (
579
- "%4i/%i tasks finished after %4i s" % (self .progress , N , self .elapsed ),
580
- end = "" ,
581
- )
582
- sys .stdout .flush ()
583
- print ("\n done" )
585
+ progress_bar .update (self .progress - n_prev )
586
+ n_prev = self .progress
587
+
588
+ progress_bar .close ()
584
589
585
590
def _republish_displaypub (self , content , eid ):
586
591
"""republish individual displaypub content dicts"""
0 commit comments