1
+ import ray
2
+ import time
3
+ import timeit
4
+
5
+ # simulate remote functions with
6
+ # different execution times
7
+ @ray .remote
8
+ def remote_task (x ):
9
+ time .sleep (x )
10
+ return x
11
+
12
+ # create a list of things
13
+ things = list (range (10 ))
14
+ # ensure that the futures won’t complete in order
15
+ things .sort (reverse = True )
16
+
17
+
18
+ # # GET
19
+
20
+ # ## get results when all results are available
21
+ # def in_order():
22
+ # ### use remote function to retrieve futures
23
+ # futures = list(map(lambda x: remote_task.remote(x), things))
24
+ # ### ray.get will block your function until all futures are returned
25
+ # values = ray.get(futures)
26
+ # ### loop over results and print
27
+ # for v in values:
28
+ # print(f" Completed {v}")
29
+ # ### simulate some business logic
30
+ # time.sleep(1)
31
+
32
+
33
+ # ## call order and see how long it takes to complete
34
+ # print("GET took: ", timeit.timeit(lambda: in_order(), number=1))
35
+
36
+
37
+ # WAIT
38
+
39
+ # ## process as results become available
40
+ # def as_available():
41
+ # ### use remote function to retrieve futures
42
+ # futures = list(map(lambda x: remote_task.remote(x), things))
43
+ # ### while still futures left
44
+ # while len(futures) > 0:
45
+ # ### call ray.wait to get the next future
46
+ # ready_futures, rest_futures = ray.wait(futures)
47
+ # ### show progress
48
+ # print(f"Ready {len(ready_futures)} rest {len(rest_futures)}")
49
+ # ### print results
50
+ # for id in ready_futures:
51
+ # print(f'completed value {id}, result {ray.get(id)}')
52
+ # ### simulate some business logic
53
+ # time.sleep(1)
54
+ # ### wait on the ones that are not yet available
55
+ # futures = rest_futures
56
+
57
+ # ## call order and see how long it takes to complete
58
+ # print("WAIT took: ", timeit.timeit(lambda: as_available(), number=1))
59
+
60
+
61
+
62
+ ## same as above but with timeout and remote cancel
63
+ def as_available ():
64
+ futures = list (map (lambda x : remote_task .remote (x ), things ))
65
+ ### while still futures left
66
+ while len (futures ) > 0 :
67
+ ### call ray.wait to get the next future
68
+ ### but with a 10s timeout and always collect 5 results before returning anything
69
+ ready_futures , rest_futures = ray .wait (futures , timeout = 10 , num_returns = 5 )
70
+ # if we get back less than num_returns
71
+ if len (ready_futures ) < 1 :
72
+ print (f"Timed out on { rest_futures } " )
73
+ # cancel remote function, e.g. if task is using a lot of resources
74
+ ray .cancel (* rest_futures )
75
+ break
76
+ for id in ready_futures :
77
+ print (f'completed value { id } , result { ray .get (id )} ' )
78
+ futures = rest_futures
79
+
80
+ ## call order and see how long it takes to complete
81
+ print ("WAIT took: " , timeit .timeit (lambda : as_available (), number = 1 ))
0 commit comments