18
18
from Queue import Queue
19
19
20
20
from decorator import decorator
21
+ import tqdm
21
22
import zmq
22
23
from zmq import MessageTracker
23
24
24
25
from IPython import get_ipython
25
- from IPython .core .display import clear_output , display , display_pretty
26
+ from IPython .core .display import display , display_pretty
26
27
from ipyparallel import error
27
28
from ipyparallel .util import utcnow , compare_datetimes , _parse_date
28
29
from ipython_genutils .py3compat import string_types
@@ -57,7 +58,8 @@ def check_ready(f, self, *args, **kwargs):
57
58
class AsyncResult (Future ):
58
59
"""Class for representing results of non-blocking calls.
59
60
60
- Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
61
+ Extends the interfaces of :py:class:`multiprocessing.pool.AsyncResult`
62
+ and :py:class:`concurrent.futures.Future`.
61
63
"""
62
64
63
65
msg_ids = None
@@ -73,6 +75,7 @@ def __init__(
73
75
fname = 'unknown' ,
74
76
targets = None ,
75
77
owner = False ,
78
+ progress = tqdm .tqdm ,
76
79
):
77
80
super (AsyncResult , self ).__init__ ()
78
81
if not isinstance (children , list ):
@@ -92,6 +95,7 @@ def __init__(
92
95
self ._fname = fname
93
96
self ._targets = targets
94
97
self .owner = owner
98
+ self .progress = progress
95
99
96
100
self ._ready = False
97
101
self ._ready_event = Event ()
@@ -566,37 +570,22 @@ def wall_time(self):
566
570
"""
567
571
return self .timedelta (self .submitted , self .received )
568
572
569
- def wait_interactive (self , interval = 1.0 , timeout = - 1 , progress = None ):
570
- """interactive wait, printing progress at regular intervals.
571
-
572
- progress can be a tqdm-like progress bar."""
573
-
574
- use_progressbar = progress is not None
573
+ def wait_interactive (self , interval = 1.0 , timeout = - 1 ):
574
+ """interactive wait, printing progress at regular intervals."""
575
575
if timeout is None :
576
576
timeout = - 1
577
577
N = len (self )
578
- tic = time .time ()
579
- if use_progressbar :
580
- progress_bar = progress (total = N )
581
- n_prev = 0
582
- while not self .ready () and (timeout < 0 or time .time () - tic <= timeout ):
578
+ tic = time .perf_counter ()
579
+ progress_bar = self .progress (total = N , unit = 'tasks' , desc = self ._fname )
580
+ n_prev = 0
581
+ while not self .ready () and (
582
+ timeout < 0 or time .perf_counter () - tic <= timeout
583
+ ):
583
584
self .wait (interval )
584
- if use_progressbar :
585
- progress_bar .update (self .progress - n_prev )
586
- n_prev = self .progress
587
- else :
588
- clear_output (wait = True )
589
- print (
590
- "%4i/%i tasks finished after %4i s"
591
- % (self .progress , N , self .elapsed ),
592
- end = "" ,
593
- )
594
- sys .stdout .flush ()
585
+ progress_bar .update (self .progress - n_prev )
586
+ n_prev = self .progress
595
587
596
- if use_progressbar :
597
- progress_bar .close ()
598
- else :
599
- print ("\n done" )
588
+ progress_bar .close ()
600
589
601
590
def _republish_displaypub (self , content , eid ):
602
591
"""republish individual displaypub content dicts"""
0 commit comments