2
2
//! [scoped](https://doc.rust-lang.org/stable/std/thread/fn.scope.html)
3
3
//! threads equals to the number of cores on the machine. Unlike normal threads, scoped threads
4
4
//! can borrow data from their environment.
5
+ use std:: sync:: atomic:: { AtomicUsize , Ordering :: Relaxed } ;
5
6
use std:: thread:: * ;
6
7
8
+ // Usually the number of physical cores.
9
+ fn threads ( ) -> usize {
10
+ available_parallelism ( ) . unwrap ( ) . get ( )
11
+ }
12
+
7
13
/// Spawn `n` scoped threads, where `n` is the available parallelism.
8
- pub fn spawn < F , T > ( f : F )
14
+ pub fn spawn < F > ( f : F )
9
15
where
10
- F : FnOnce ( ) -> T + Copy + Send ,
11
- T : Send ,
16
+ F : Fn ( ) + Copy + Send ,
12
17
{
13
18
scope ( |scope| {
14
19
for _ in 0 ..threads ( ) {
@@ -17,31 +22,139 @@ where
17
22
} ) ;
18
23
}
19
24
20
- /// Splits `items` into batches, one per thread. Items are assigned in a round robin fashion,
21
- /// to achieve a crude load balacing in case some items are more complex to process than others.
22
- pub fn spawn_batches < F , T , U > ( mut items : Vec < U > , f : F )
25
+ /// Spawns `n` scoped threads that each receive a
26
+ /// [work stealing](https://en.wikipedia.org/wiki/Work_stealing) iterator.
27
+ /// Work stealing is an efficient strategy that keeps each CPU core busy when some items take longer
28
+ /// than other to process, used by popular libraries such as [rayon](https://github.com/rayon-rs/rayon).
29
+ /// Processing at different rates also happens on many modern CPUs with
30
+ /// [heterogeneous performance and efficiency cores](https://en.wikipedia.org/wiki/ARM_big.LITTLE).
31
+ pub fn spawn_parallel_iterator < F , T > ( items : & [ T ] , f : F )
23
32
where
24
- F : FnOnce ( Vec < U > ) -> T + Copy + Send ,
25
- T : Send ,
26
- U : Send ,
33
+ F : Fn ( ParIter < ' _ , T > ) + Copy + Send ,
34
+ T : Sync ,
27
35
{
28
36
let threads = threads ( ) ;
29
- let mut batches: Vec < _ > = ( 0 ..threads) . map ( |_| Vec :: new ( ) ) . collect ( ) ;
30
- let mut index = 0 ;
37
+ let size = items. len ( ) . div_ceil ( threads) ;
31
38
32
- // Round robin items over each thread.
33
- while let Some ( next) = items. pop ( ) {
34
- batches[ index % threads] . push ( next) ;
35
- index += 1 ;
36
- }
39
+ // Initially divide work as evenly as possible amongst each worker thread.
40
+ let workers: Vec < _ > = ( 0 ..threads)
41
+ . map ( |id| {
42
+ let start = ( id * size) . min ( items. len ( ) ) ;
43
+ let end = ( start + size) . min ( items. len ( ) ) ;
44
+ CachePadding :: new ( pack ( start, end) )
45
+ } )
46
+ . collect ( ) ;
47
+ let workers = workers. as_slice ( ) ;
37
48
38
49
scope ( |scope| {
39
- for batch in batches {
40
- scope. spawn ( move || f ( batch ) ) ;
50
+ for id in 0 ..threads {
51
+ scope. spawn ( move || f ( ParIter { id , items , workers } ) ) ;
41
52
}
42
53
} ) ;
43
54
}
44
55
45
- fn threads ( ) -> usize {
46
- available_parallelism ( ) . unwrap ( ) . get ( )
56
+ pub struct ParIter < ' a , T > {
57
+ id : usize ,
58
+ items : & ' a [ T ] ,
59
+ workers : & ' a [ CachePadding ] ,
60
+ }
61
+
62
+ impl < ' a , T > Iterator for ParIter < ' a , T > {
63
+ type Item = & ' a T ;
64
+
65
+ fn next ( & mut self ) -> Option < & ' a T > {
66
+ // First try taking from our own queue.
67
+ let worker = & self . workers [ self . id ] ;
68
+ let current = worker. increment ( ) ;
69
+ let ( start, end) = unpack ( current) ;
70
+
71
+ // There's still items to process.
72
+ if start < end {
73
+ return Some ( & self . items [ start] ) ;
74
+ }
75
+
76
+ // Steal from another worker, [spinlocking](https://en.wikipedia.org/wiki/Spinlock)
77
+ // until we acquire new items to process or there's nothing left to do.
78
+ loop {
79
+ // Find worker with the most remaining items.
80
+ let available = self
81
+ . workers
82
+ . iter ( )
83
+ . filter_map ( |other| {
84
+ let current = other. load ( ) ;
85
+ let ( start, end) = unpack ( current) ;
86
+ let size = end. saturating_sub ( start) ;
87
+
88
+ ( size > 0 ) . then_some ( ( other, current, size) )
89
+ } )
90
+ . max_by_key ( |t| t. 2 ) ;
91
+
92
+ if let Some ( ( other, current, size) ) = available {
93
+ // Split the work items into two roughly equal piles.
94
+ let ( start, end) = unpack ( current) ;
95
+ let middle = start + size. div_ceil ( 2 ) ;
96
+
97
+ let next = pack ( middle, end) ;
98
+ let stolen = pack ( start + 1 , middle) ;
99
+
100
+ // We could be preempted by another thread stealing or by the owning worker
101
+ // thread finishing an item, so check indices are still unmodified.
102
+ if other. compare_exchange ( current, next) {
103
+ worker. store ( stolen) ;
104
+ break Some ( & self . items [ start] ) ;
105
+ }
106
+ } else {
107
+ // No work remaining.
108
+ break None ;
109
+ }
110
+ }
111
+ }
112
+ }
113
+
114
+ /// Intentionally force alignment to 128 bytes to make a best effort attempt to place each atomic
115
+ /// on its own cache line. This reduces contention and improves performance for common
116
+ /// CPU caching protocols such as [MESI](https://en.wikipedia.org/wiki/MESI_protocol).
117
+ #[ repr( align( 128 ) ) ]
118
+ pub struct CachePadding {
119
+ atomic : AtomicUsize ,
120
+ }
121
+
122
+ /// Convenience wrapper methods around atomic operations. Both start and end indices are packed
123
+ /// into a single atomic so that we can use the fastest and easiest to reason about `Relaxed`
124
+ /// ordering.
125
+ impl CachePadding {
126
+ #[ inline]
127
+ fn new ( n : usize ) -> Self {
128
+ CachePadding { atomic : AtomicUsize :: new ( n) }
129
+ }
130
+
131
+ #[ inline]
132
+ fn increment ( & self ) -> usize {
133
+ self . atomic . fetch_add ( 1 , Relaxed )
134
+ }
135
+
136
+ #[ inline]
137
+ fn load ( & self ) -> usize {
138
+ self . atomic . load ( Relaxed )
139
+ }
140
+
141
+ #[ inline]
142
+ fn store ( & self , n : usize ) {
143
+ self . atomic . store ( n, Relaxed ) ;
144
+ }
145
+
146
+ #[ inline]
147
+ fn compare_exchange ( & self , current : usize , new : usize ) -> bool {
148
+ self . atomic . compare_exchange ( current, new, Relaxed , Relaxed ) . is_ok ( )
149
+ }
150
+ }
151
+
152
+ #[ inline]
153
+ fn pack ( start : usize , end : usize ) -> usize {
154
+ ( end << 32 ) | start
155
+ }
156
+
157
+ #[ inline]
158
+ fn unpack ( both : usize ) -> ( usize , usize ) {
159
+ ( both & 0xffffffff , both >> 32 )
47
160
}
0 commit comments