forked from DlangRen/Programming-in-D
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathparallelism.d
1217 lines (936 loc) · 38.8 KB
/
parallelism.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
Ddoc
$(DERS_BOLUMU $(IX parallelism) 并行)
$(P
$(IX core) 大多数现代的微处理器都包含一个以上的$(I 内核),其中的每一个内核都可以当作单独的处理单元使用。它们可同时执行不同程序的不同部分。灵活使用模块 $(C std.parallelism) 中的功能来尽可能地利用所有内核的运算能力,使程序能以更快的速度运行。
)
$(P
本章涉及以下范围(range)算法。只有当元素操作之间没有相互依赖时,它们才能被 $(I 并行) 执行。$(I 并行) 意味着将要执行的一组操作可以被分派到多个内核同时执行。
)
$(UL
$(LI $(C parallel):并行访问范围中的元素。)
$(LI $(C task):创建并行执行的任务。)
$(LI $(C asyncBuf):以并行半延时取值的方式迭代 $(C InputRange) 中的元素。)
$(LI $(C map):以并行半延时取值的方式对 $(C InputRange) 中的每一个元素应用指定的函数。)
$(LI $(C amap):以并行即时取值的方式对 $(C RandomAccessRange) 中的元素应用指定的函数。)
$(LI $(C reduce):使用指定的函数并行归约计算 $(C RandomAccessRange) 中的元素。)
)
$(P
对于之前我们写过的程序,我们都假定程序中的表达式都是按照确定的顺序执行的,或者说至少通常情况下它们是顺序执行的。
)
---
++i;
++j;
---
$(P
对于上面的代码我们认为 $(C i) 将在 $(C j) 之前自增 1。虽然从语义上看这种判断是正确的,但实际上我们预计的这种情况很少发生:微处理器和编译器使用的优化技术会将彼此独立的值储存在处理器寄存器中。当出现这种情况时,微处理器会并行处理上面那样的自增操作。
)
$(P
虽然这些优化效果显著,但是它们通常只能自动应用在非常低级别的操作上。只有程序员才能判断哪些高级别的操作是互相独立、可以并行化的。
)
$(P
对于循环,范围中的元素通常是一个一个按顺序被处理的,即上一个循环的操作结束后才会进行下一次循环:
)
---
auto students =
[ Student(1), Student(2), Student(3), Student(4) ];
foreach (student; students) {
student.aSlowOperation();
}
---
$(P
正常情况下程序将会在某个操作系统指派的用于运行程序的处理器内核上运行。因为 $(C foreach) 循环通常是顺序操作元素的,学生的 $(C aSlowOperation()) 也会被顺序调用。然而大部分时候这种处理的顺序并不是必须的。如果 $(C Student) 对象相互独立,不去使用那些可能处在空闲状态的微处理器核心是非常浪费的。
)
$(P
$(IX Thread.sleep) 下面的例子使用了 $(C core.thread) 模块中的 $(C Thread.sleep()) 来模拟长耗时操作。$(C Thread.sleep()) 会将操作挂起一段时间,其时间长短可在代码中指定。当然在下面的例子中 $(C Thread.sleep) 只是用来模拟长耗时的任务,因为它并不需要处理器核心处理实际的工作,它只是占用时间。虽然使用了一个和真实任务有差别的工具,但本章的示例还是能很好的展现出并行编程的威力。
)
---
import std.stdio;
import core.thread;
struct Student {
int number;
void aSlowOperation() {
writefln("The work on student %s has begun", number);
// 在此处暂停一会以模拟耗时长的操作
Thread.sleep(1.seconds);
writefln("The work on student %s has ended", number);
}
}
void main() {
auto students =
[ Student(1), Student(2), Student(3), Student(4) ];
foreach (student; students) {
student.aSlowOperation();
}
}
---
$(P
可以使用终端中的 $(C time) 来测量程序的执行时间。
)
$(SHELL
$ $(HILITE time) ./deneme
$(SHELL_OBSERVED
The work on student 1 has begun
The work on student 1 has ended
The work on student 2 has begun
The work on student 2 has ended
The work on student 3 has begun
The work on student 3 has ended
The work on student 4 has begun
The work on student 4 has ended
real 0m4.005s $(SHELL_NOTE 共 4 秒)
user 0m0.004s
sys 0m0.000s
)
)
$(P
如果处理每个学生需要花费 1 秒,那按照顺序迭代学生总共需要花费 4 秒。然而如果把这 4 个操作分配给 4 个内核执行,它们将会被同时处理。那么处理完这 4 个学生总共耗时 1 秒。
)
$(P
$(IX totalCPUs) 在了解实现方法之前,我们先来看下如何通过使用 $(C std.parallelism.totalCPUs) 来获取系统可用的处理器核心数:
)
---
import std.stdio;
import std.parallelism;
void main() {
writefln("There are %s cores on this system.", totalCPUs);
}
---
$(P
在本章编写时使用的环境上运行上面的代码将会输出这样的结果:
)
$(SHELL
此系统里有 4 个内核。
)
$(H5 $(IX parallel) $(C taskPool.parallel()))
$(P
此函数可以简单地使用 $(C parallel()) 来调用。
)
$(P
$(IX foreach, parallel) $(C parallel()) 以并行的方式访问范围中的元素。它可以在 $(C foreach) 循环中起到非常大的作用。只需要导入 $(C std.parallelism) 模块并将上面代码中的 $(C students) 换为 $(C parallel(students)) 即可充分利用系统中全部核心的运算能力。
)
---
import std.parallelism;
// ...
foreach (student; $(HILITE parallel(students))) {
---
$(P
在之前 $(LINK2 /ders/d.cn/foreach_opapply.html, $(C foreach) 结构体和类) 一章中我们了解到: $(C foreach) 块中的表达式将会被包装成委托传递给对象的 $(C opApply()) 成员函数。$(C parallel()) 返回一个范围对象,它将决定如何将处理元素的 $(C delegate) 分发给独立的处理器核心执行。
)
$(P
最后,在一个有 4 核的系统上,将 $(C Student) 范围传递给 $(C parallel()) 可以使上面的程序在 1 秒之内完成:
)
$(SHELL
$ time ./deneme
$(SHELL_OBSERVED The work on student 2 has begun
The work on student 1 has begun
The work on student 4 has begun
The work on student 3 has begun
The work on student 1 has ended
The work on student 2 has ended
The work on student 4 has ended
The work on student 3 has ended
real 0m1.005s $(SHELL_NOTE 现在只需 1 秒)
user 0m0.004s
sys 0m0.004s)
)
$(P
$(I $(B 注意:)在不同的系统上运行上面的程序可能会有不同的执行时间,但大致都是“4 秒除以内核数”的结果。)
)
$(P
$(IX thread) 执行程序某一部分的执行流被称为 $(I 执行线程) 或 $(I 线程)。程序可由多个同时执行操作的线程组成。操作系统可在一个核心上启动和执行线程,在需要时将其挂起来让出核心运算资源去执行另一个线程。每个线程的执行都可能包含多轮启动和挂起。
)
$(P
所有程序的所有在操作系统指定时间活动的线程将会在微处理器的每个核心上执行。操作系统将会决定在合适何种情况启动或挂起线程。这就是上面程序中 $(C aSlowOperation()) 输出的信息是乱序的原因。如果对 $(C Student) 对象的操作是相互独立的,那以一种不确定的顺序执行线程也不会有什么副作用。
)
$(P
只有在确定在每一次迭代中对元素的操作相互独立后,程序员才能调用 $(C parallel()) 来实现并行。例如,如果输出信息的顺序非常重要,那么上面程序中的 $(C parallel()) 调用将会使程序出错。支持线程依赖的编程模型叫做$(I 并发(concurrency))。关于并发,下一章会主要讲解它。
)
$(P
对每个元素的所有操作完成后,并行的 $(C foreach) 才算完成执行。在 $(C foreach) 循环完成后,程序可以安全地继续执行下面的代码。
)
$(H6 $(IX work unit size) 工作单元大小)
$(P
$(C parallel()) 的重载为其第二个参数赋予了多个含义,或使其在某些情况下被忽略:
)
---
/* ... */ = parallel($(I range), $(I work_unit_size) = 100);
---
$(UL
$(LI 在迭代 $(C RandomAccessRange) 范围时:
$(P
虽然将线程分派给内核的开销极小。但在某些情况下这种开销会显得极其昂贵,尤其是在每次循环的操作耗时都非常短的时候。此时,让每个线程去执行循环的多个迭代反而会更快些。工作单元大小决定了每个线程在每次迭代时应该执行的元素个数。
)
---
foreach (student; parallel(students, $(HILITE 2))) {
// ...
}
---
$(P
工作单元大小的默认值是 100。在大多数情况下这个大小是合适的。
)
)
$(LI 在迭代非 $(C RandomAccessRange) 范围时:
$(P
只有在非 $(C RandomAccessRange) 中与$(I 工作单元大小)相同数量的元素被顺序处理时,$(C parallel()) 才会开始并行执行。由于提供的默认值 100 相对较高,$(C parallel()) 会给人一种错觉:在处理较短的非 $(C RandomAccessRange) 范围时效率不高。
)
)
$(LI 在迭代 $(C asyncBuf()) 或 $(C map()) 返回的范围时(稍后本章会介绍这两个函数):
$(P
若使用 $(C parallel()) 处理 $(C asyncBuf()) 或 $(C map()) 的返回值时,它将忽略工作单元大小参数。$(C parallel()) 将会使用这两个函数返回的范围中的缓冲区。
)
)
)
$(H5 $(IX Task) $(C Task))
$(P
程序中以并行的方式执行的操作叫做$(I 任务(task))。任务的表示类型为 $(C std.parallelism.Task)。
)
$(P
实际上,$(C parallel()) 为每个工作线程构建并自动启动了一个 $(C Task)。之后 $(C parallel()) 会等待所有任务都完成后再退出循环。$(C parallel()) 用起来非常方便,因为无论是$(I 构建)、$(I 启动)还是$(I 等待任务执行完成)都是自动进行的。
)
$(P
$(IX task) $(IX executeInNewThread) $(IX yieldForce) 当任务无法与范围元素相对应或者无法通过它们来表示时,程序员可以自己按照以下三步手动处理:使用 $(C task()) 来创建任务对象;使用 $(C executeInNewThread()) 来启动任务对象;使用 $(C yieldForce()) 来等待任务对象。在下方程序的注释中有对这三个函数详细的解释。
)
$(P
在下方的程序中 $(C anOperation()) 函数被调用两次。它将打印出 $(C id) 的第一个字符,这样我们就可以通过这个字符来判断程序正在等待哪一个任务执行完成。
)
$(P
$(IX flush, std.stdio) $(I $(B 注意:)正常情况下向类似 $(C stdout) 这样的输出流直接输出的字符并不会立刻显示出来。它们将会被储存在输出缓冲区中,一直到整行输出完成为止都会显示。因为 $(C write) 并不会输出换行符,而为了能够在下面的程序中观察并行执行的情况,我们使用 $(C stdout.flush()) 使缓冲区中的数据能在未到达行尾时就被发送至 $(C stdout)。)
)
---
import std.stdio;
import std.parallelism;
import std.array;
import core.thread;
/* 每半秒打印一次‘id’的首字母函数。它
* 会随意返回值 1,以便模拟函数正在
* 进行计算。稍后会在 main 函数中使用这个结果。*/
int anOperation(string id, int duration) {
writefln("%s will take %s seconds", id, duration);
foreach (i; 0 .. (duration * 2)) {
Thread.sleep(500.msecs); /* 半秒 */
write(id.front);
stdout.flush();
}
return 1;
}
void main() {
/* 构建一个用于执行
* anOperation() 的任务对象。在此处被指定的函数实参
* 将会被作为将要并行执行的任务函数的实参
* 传递给任务函数。*/
auto theTask = $(HILITE task!anOperation)("theTask", 5);
/* 启动任务对象 */
theTask.$(HILITE executeInNewThread());
/* 在‘theTask’执行的时候,main 函数会直接调用
* ‘anOperation()’一次。*/
immutable result = anOperation("main's call", 3);
/* 此处我们可以确认
* 直接在 main 函数中启动的操作
* 已经执行完成,因为它是一个常规
* 函数调用,而不是一个任务。*/
/* 另一方面,但在此处我们无法确定
* ‘theTask’执行的操作是否已经
* 完成。yieldForce() 会等待该任务完成
* 操作,只有在任务完成时它才会
* 返回。它的返回值即为
* 任务执行的函数的返回值,如 anOperation()。*/
immutable taskResult = theTask.$(HILITE yieldForce());
writeln();
writefln("All finished; the result is %s.",
result + taskResult);
}
---
$(P
程序的输出类似下面这个样子:字符 $(C m) 和字符 $(C t) 交替输出表明这些操作是并行执行的:
)
$(SHELL
main's call will take 3 seconds
theTask will take 5 seconds
mtmttmmttmmttttt
All finished; the result is 2.
)
$(P
上面的任务函数是 $(C task()) 的一个参数模版 $(C task!anOperation) 的实参。虽然这种方法在大多数情况下都工作的很好,但就如我们在 $(LINK2 /ders/d.cn/templates.html, 模版) 一章中看到的那样:每个不同的模版实例都是一个不同的类型。两个看起来 $(I 相同) 的任务对象实际上不是同一个类型,在某些特定的情况下这种差别可能是我们不想看到的。
)
$(P
例如,虽然两个函数有相同的函数签名,但两个通过 $(C task()) 函数模版创建的 $(C Task) 示例类型是不同的。因此,它们不能被放在同一个数组里:
)
---
import std.parallelism;
double foo(int i) {
return i * 1.5;
}
double bar(int i) {
return i * 2.5;
}
void main() {
auto tasks = [ task$(HILITE !)foo(1),
task$(HILITE !)bar(2) ]; $(DERLEME_HATASI)
}
---
$(SHELL
Error: $(HILITE incompatible types) for ((task(1)) : (task(2))):
'Task!($(HILITE foo), int)*' and 'Task!($(HILITE bar), int)*'
)
$(P
$(C task()) 函数的另一个重载形式是将函数作为其第一个形参:
)
---
void someFunction(int value) {
// ...
}
auto theTask = task($(HILITE &someFunction), 42);
---
$(P
采用这种方法不会产生不同类型的 $(C Task) 模版实例,因此可以把它们放在同一个数组里:
)
---
import std.parallelism;
double foo(int i) {
return i * 1.5;
}
double bar(int i) {
return i * 2.5;
}
void main() {
auto tasks = [ task($(HILITE &)foo, 1),
task($(HILITE &)bar, 2) ]; $(CODE_NOTE 编译通过)
}
---
$(P
lambda 函数和定义了 $(C opCall) 的对象都可以被当作任务函数使用。下面这个例子就是让任务执行了一个 lambda 函数:
)
---
auto theTask = task((int value) $(HILITE {)
/* ... */
$(HILITE }), 42);
---
$(H6 $(IX exception, parallelism) 异常处理)
$(P
因为任务都是在一个单独的线程上执行的,所以启动它们的那个线程并不能捕捉到它们抛出的异常。因此,所有抛出的异常会自动被任务自已捕获到,然后在调用类似 $(C yieldForce()) 那样的 $(C Task) 成员函数时,再重新抛出。这样便可以在主线程里捕获任务中抛出的各种异常。
)
---
import std.stdio;
import std.parallelism;
import core.thread;
void mayThrow() {
writeln("mayThrow() is started");
Thread.sleep(1.seconds);
writeln("mayThrow() is throwing an exception");
throw new Exception("Error message");
}
void main() {
auto theTask = task!mayThrow();
theTask.executeInNewThread();
writeln("main is continuing");
Thread.sleep(3.seconds);
writeln("main is waiting for the task");
theTask.yieldForce();
}
---
$(P
这个程序的输出表明由任务抛出的异常并不会立刻导致整个程序终止运行(它只终止了那个任务):
)
$(SHELL
main is continuing
mayThrow() is started
mayThrow() is throwing an exception $(SHELL_NOTE 抛出)
main is waiting for the task
[email protected](10): Error message $(SHELL_NOTE 终止)
)
$(P
可以在 $(C try-catch) 语句块中调用 $(C yieldForce()) 来捕获由 task 抛出的异常。这与单线程有着极大的不同:像本章上面的程序如果写成单线程的话,应该将 $(C try-catch) 包裹住可能会抛出异常的代码。而在并行中,它只封装了 $(C yieldForce()):
)
---
try {
theTask.yieldForce();
} catch (Exception exc) {
writefln("Detected an error in the task: '%s'", exc.msg);
}
---
$(P
这次异常将会被主线程捕获而不是终止程序:
)
$(SHELL
main is continuing
mayThrow() is started
mayThrow() is throwing an exception $(SHELL_NOTE 抛出)
main is waiting for the task
Detected an error in the task: 'Error message' $(SHELL_NOTE 捕获)
)
$(H6 $(C Task)) 的成员函数
$(UL
$(LI $(C done):指明任务是否已完成;如果任务已经因为异常而中止,则重新抛出该异常。
---
if (theTask.done) {
writeln("Yes, the task has been completed");
} else {
writeln("No, the task is still going on");
}
---
)
$(LI $(C executeInNewThread()):在新线程中启动任务。)
$(LI $(C executeInNewThread(int priority)):在新线程中启动任务,并指定线程优先级。(优先级是一个操作系统概念,它决定了线程执行的优先次序。))
)
$(P
有三个用来等待任务完成的函数:
)
$(UL
$(LI $(C yieldForce()):如果任务没有启动,则启动它;如果任务已经完成,则返回任务函数的返回值;如果任务正在执行,则以不占用微处理器的方式等待该任务完成;如果任务在执行中抛出了一个异常,它将在此处重新抛出那个异常。)
$(LI $(IX spinForce) $(C spinForce()):与 $(C yieldForce()) 功能相似。不同的是在等待时它将以占用微处理器为代价换取更快的检测任务完成的速度。)
$(LI $(IX workForce) $(C workForce()):与 $(C yieldForce()) 功能相似。不同的是,它在等待当前任务完成时会将启动一个新的任务。)
)
$(P
在大多数情况下,$(C yieldForce()) 是最适合用来等待任务完成的函数;$(C yieldForce()) 会将调用它的线程挂起,一直到任务完成为止。虽然在等待时 $(C spinForce()) 会使处理器忙碌,但它非常适合等待耗时短的任务。$(C workForce()) 则适合用需要优先启动其他任务,而不是先考虑将当前线程挂起的情况。
)
$(P
要获得更多有关成 $(C Task) 员函数的信息请参见 Phobos 在线文档。
)
$(H5 $(IX asyncBuf) $(C taskPool.asyncBuf()))
$(P
$(C asyncBuf()) 与 $(C parallel()) 相似,都是用来并行迭代 $(C InputRange) 的。它将范围中的元素储存在缓冲区中,需要时用户再从缓冲区中获取元素。
)
$(P
为了防止将输入的可能为延迟取值的范围转换为即时取值的范围,它会以$(I 轮动)的方式迭代这些元素。每轮在缓冲区中载入一定数量的元素用于并行迭代。只有当上一轮缓冲的元素被 $(C popFront()) 消耗完后,它才会开始为下一轮迭代缓冲元素。
)
$(P
$(C asyncBuf()) 有两个形参:一个是范围;另一个是可选的$(I 缓冲区大小)。缓冲区大小决定了每轮将有多少元素被加载到缓冲区。
)
---
auto elements = taskPool.asyncBuf($(I range), $(I buffer_size));
---
$(P
为了能突出 $(C asyncBuf()) 的作用,示例中的范围与之前的范围有些不同:遍历一次要耗时半秒,处理一个元素也要耗费半秒。这个范围的作用就是提供一个不高于指定上限的整数。
)
---
import std.stdio;
import core.thread;
struct Range {
int limit;
int i;
bool empty() const @property {
return i >= limit;
}
int front() const @property {
return i;
}
void popFront() {
writefln("Producing the element after %s", i);
Thread.sleep(500.msecs);
++i;
}
}
void main() {
auto range = Range(10);
foreach (element; range) {
writefln("Using element %s", element);
Thread.sleep(500.msecs);
}
}
---
$(P
这些元素都是采用惰性方式生成和使用的。因为处理一个元素耗时 1 秒,整个程序总共花费 10 秒。
)
$(SHELL
$ time ./deneme
$(SHELL_OBSERVED
Using element 0
Producing the element after 0
Using element 1
Producing the element after 1
Using element 2
...
Producing the element after 8
Using element 9
Producing the element after 9
real 0m10.007s $(SHELL_NOTE 共 10 秒)
user 0m0.004s
sys 0m0.000s)
)
$(P
从输出结果来看,这些元素都被依次生成和使用。
)
$(P
但我们并不需要严格的按照顺序等待上一个元素被处理完后才开始计算下一个元素的值。如果能在上一个元素被使用时就开始计算下一个元素,那程序消耗的时间就会大大减小。
)
---
import std.parallelism;
//...
foreach (element; $(HILITE taskPool.asyncBuf)(range, $(HILITE 2))) {
---
$(P
通过上面 $(C asyncBuf()) 的调用,缓冲区中已经准备好了两个元素。在元素被使用时,剩下的元素也在被并行求值
)
$(SHELL
$ time ./deneme
$(SHELL_OBSERVED
Producing the element after 0
Producing the element after 1
Using element 0
Producing the element after 2
Using element 1
Producing the element after 3
Using element 2
Producing the element after 4
Using element 3
Producing the element after 5
Using element 4
Producing the element after 6
Producing the element after 7
Using element 5
Using element 6
Producing the element after 8
Producing the element after 9
Using element 7
Using element 8
Using element 9
real 0m6.007s $(SHELL_NOTE 现在耗时 6 秒)
user 0m0.000s
sys 0m0.004s)
)
$(P
缓冲区的默认大小为 100。能使程序获得最大性能的缓冲区大小会随着使用情况的不同而有所不同。
)
$(P
$(C asyncBuf()) 也可以在 $(C foreach) 外使用。例如,下面这个例子就将 $(C asyncBuf()) 的返回值作为一个半延迟取值的 $(C InputRange):
)
---
auto range = Range(10);
auto asyncRange = taskPool.asyncBuf(range, 2);
writeln($(HILITE asyncRange.front));
---
$(H5 $(IX map, parallel) $(C taskPool.map()))
$(P
$(IX map, std.algorithm)在介绍 $(C taskPool.map()) 之前,先了解一下 $(C std.algorithm) 模块里的 $(C map()),这样有助于理解本节的内容。在许多函数式编程语言里都能找到与 $(C std.algorithm.map) 类似的算法。它会用范围里元素一个接一个地调用函数,并返回一个范围(其组成元素是每次调用该函数时得到的结果)。这是个懒式算法:函数只有在需要时才会被调用。标准库中还有一个与之相似的 $(C std.algorithm.each)。但不同的是它并不会返回新的范围来储存结果,而是直接将结果应用到传入的范围的元素上。
)
$(P
在很多程序里,$(C std.algorithm.map) 的懒式操作都会非常有用。但如果范围中的每个元素都注定要被作为实参传递给函数而且每次操作又都是独立的的话,我们根本就没必要使用速度较慢的延迟取值而不是并行操作。$(C std.parallelism) 模块里的 $(C taskPool.map()) 和 $(C taskPool.amap()) 会充分利用多核,并在大部分情况下加快程序的运行速度。
)
$(P
下面用 $(C Student) 作为示例,对这三个算法做个比较。假设 $(C Student) 有一个计算并返回学生平均分的成员函数。为了展示并行算法到底有多快,我们需要再次使用 $(C Thread.sleep()) 将这个函数变慢。
)
$(P
$(C std.algorithm.map) 有一个模版参数用来接收函数,有一个函数形参用来接收范围。它会返回一个新的范围来储存应用函数后得到的结果:
)
---
auto $(I result_range) = map!$(I func)($(I range));
---
$(P
这个函数也可以是由 $(C =>) 语法声明的 $(I lambda 表达式)。下面这个程序使用 $(C map()) 来调用每一个元素的成员函数 $(C averageGrade()):
)
---
import std.stdio;
import std.algorithm;
import core.thread;
struct Student {
int number;
int[] grades;
double averageGrade() @property {
writefln("Started working on student %s",
number);
Thread.sleep(1.seconds);
const average = grades.sum / grades.length;
writefln("Finished working on student %s", number);
return average;
}
}
void main() {
Student[] students;
foreach (i; 0 .. 10) {
/* 每个学生有两个成绩 */
students ~= Student(i, [80 + i, 90 + i]);
}
auto results = $(HILITE map)!(a => a.averageGrade)(students);
foreach (result; results) {
writeln(result);
}
}
---
$(P
从程序的输出可以看出 $(C map()) 是延迟取值的;它调用 $(C averageGrade()) 的行为与 $(C foreach) 循环相似:
)
$(SHELL
$ time ./deneme
$(SHELL_OBSERVED
Started working on student 0
Finished working on student 0
85 $(SHELL_NOTE 像 foreach 一样迭代元素)
Started working on student 1
Finished working on student 1
86
...
Started working on student 9
Finished working on student 9
94
real 0m10.006s $(SHELL_NOTE 共 10 秒)
user 0m0.000s
sys 0m0.004s)
)
$(P
如果 $(C std.algorithm.map) 是一个即时取值的算法,那表示操作开始和操作结束的信息应该在程序最开始的时候一下全部显示出来。
)
$(P
$(C std.parallelism) 模块中的 $(C taskPool.map()) 和 $(C std.algorithm.map) 的功能相同。唯一不同的是 $(C taskPool.map()) 是以半延迟取值的方式调用函数操作元素,并将结果储存在缓冲区中。缓冲区的大小由第二个形参决定。例如,下面的代码每次都会为三个元素准备好函数调用的结果:
)
---
import std.parallelism;
// ...
double averageGrade(Student student) {
return student.averageGrade;
}
// ...
auto results = $(HILITE taskPool.map)!averageGrade(students, $(HILITE 3));
---
$(P
$(I $(B 注意:)之所以上面的代码需要一个独立的 $(C averageGrade()) 函数是因为像 $(C TaskPool.map) 这样的成员模版函数存在无法使用局部委托的限制。如果不使用独立的函数的话,程序将不能通过编译:
))
---
auto results =
taskPool.map!(a => a.averageGrade)(students, 3); $(DERLEME_HATASI)
---
$(P
这次每轮将会对三个元素进行操作:
)
$(SHELL
$ time ./deneme
$(SHELL_OBSERVED
Started working on student 1 $(SHELL_NOTE 并行)
Started working on student 2 $(SHELL_NOTE 但是顺序不确定)
Started working on student 0
Finished working on student 1
Finished working on student 2
Finished working on student 0
85
86
87
Started working on student 4
Started working on student 5
Started working on student 3
Finished working on student 4
Finished working on student 3
Finished working on student 5
88
89
90
Started working on student 7
Started working on student 8
Started working on student 6
Finished working on student 7
Finished working on student 6
Finished working on student 8
91
92
93
Started working on student 9
Finished working on student 9
94
real 0m4.007s $(SHELL_NOTE 共 4 秒)
user 0m0.000s
sys 0m0.004s)
)
$(P
$(C map()) 的第二个参数与 $(C asyncBuf()) 的第二个参数含义相同:它决定了 $(C map()) 储存结果的缓冲区的大小。第三个参数为工作单元大小,与 $(C parallel()) 中对应的参数作用相同但默认值不同;此处它的默认值为 $(C size_t.max):
)
---
/* ... */ = taskPool.map!$(I func)($(I range),
$(I buffer_size) = 100
$(I work_unit_size) = size_t.max);
---
$(H5 $(IX amap) $(C taskPool.amap()))
$(P
并行 $(C amap()) 与并行 $(C map()) 作用基本相似,但有以下两点不同:
)
$(UL
$(LI
它是即时计算。
)
$(LI
它适用于 $(C RandomAccessRange) 范围。
)
)
---
auto results = $(HILITE taskPool.amap)!averageGrade(students);
---
$(P
因为它是即时计算,所以在 $(C amap()) 返回时所有元素都已被运算完成:
)
$(SHELL
$ time ./deneme
$(SHELL_OBSERVED
Started working on student 1 $(SHELL_NOTE 所有元素已被提前准备好)
Started working on student 0
Started working on student 2
Started working on student 3
Finished working on student 1
Started working on student 4
Finished working on student 2
Finished working on student 3
Started working on student 6
Finished working on student 0
Started working on student 7
Started working on student 5
Finished working on student 4
Started working on student 8
Finished working on student 6
Started working on student 9
Finished working on student 7
Finished working on student 5
Finished working on student 8
Finished working on student 9
85
86
87
88
89
90
91
92
93
94
real 0m3.005s $(SHELL_NOTE 共 3 秒)
user 0m0.000s
sys 0m0.004s)
)
$(P
$(C amap()) 的确比 $(C map()) 的速度快。但相应的代价是它需要提前准备好一个足够大的数组来储存结果。它是通过消耗更多的内存来获得更快速度的。
)
$(P
$(C amap()) 的第二个参数也是工作单元大小,也同样是可选的:
)
---
auto results = taskPool.amap!averageGrade(students, $(HILITE 2));
---
$(P
还可以通过 $(C amap()) 的第三个参数传递一个 $(C RandomAccessRange) 进去来储存运算结果:
)
---
double[] results;
results.length = students.length;
taskPool.amap!averageGrade(students, 2, $(HILITE results));
---
$(H5 $(IX reduce, parallel) $(C taskPool.reduce()))
$(P
$(IX reduce, std.algorithm)与 $(C map()) 一样,先来了解下 $(C std.algorithm) 模块里的 $(C reduce())。
)
$(P
$(IX fold, std.algorithm) $(C reduce()) 与 $(C std.algorithm.fold) 相同。我们已经在 $(LINK2 /ders/d.cn/ranges.html, Ranges) 一章中学习到了相关知识。两者的不同点在于它们的形参顺序正好相反。(因此,我建议你最好在非并行代码中使用 $(C fold())。因为它可以利用链式范围表达式的 $(LINK2 /ders/d.cn/ufcs.html, UFCS)。
)
$(P
$(C reduce()) 也是一个经常在函数式编程中使用的高阶函数。和 $(C map()) 一样,它也可以接收一个或多个函数作为模版实参。除了用于接收函数的模版形参,它需要传入一个运算结果的初始值和一个范围。$(C reduce()) 将传入的函数作为运算方法对每个元素进行计算并按照合并到结果中。如果没有指定初始值,它会把范围的第一个元素作为初始值。
)
$(P
假设它在执行过程中定义了一个名为 $(C result) 的变量,那 $(C reduce()) 就是按照下面这几步运作的:
)
$(OL
$(LI 把初始值赋给 $(C result))
$(LI 对每一个元素执行 $(C result = func(result, element)) 这样的表达式)
$(LI 返回最终 $(C result) 的值)
)
$(P
例如,下面这个计算数组元素的平方和的程序:
)
---
import std.stdio;
import std.algorithm;
void main() {
writeln(reduce!((a, b) => a + b * b)(0, [5, 10]));
}
---
$(P
如果传入的函数是以 $(C =>) 声明的,那它的第一个参数(即上方程序中的 $(C a))代表当前运算结果(使用 reduce 的参数 $(C 0) 初始化),第二个参数(即上方程序中的 $(C b))代表当前元素。
)
$(P
这个程序将输出 5 和 10 的平方的和:
)
$(SHELL
125
)
$(P
就像我们看到的那样,$(C reduce()) 使用一个循环来实现其功能。因为通常情况下这个操作时运行在一个处理器核心上的,那么如果对每个元素的操作是独立的,这将造成不必要的执行速度的降低。在这种情况下你就可以使用 $(C std.parallelism) 模块中的 $(C taskPool.reduce()) 来利用多核心的运算能力。
)
$(P
下面来看一个示例,将 $(C reduce()) 与一个人为减慢的函数一起使用:
)
---
import std.stdio;
import std.algorithm;
import core.thread;
int aCalculation(int result, int element) {
writefln("started - element: %s, result: %s",
element, result);
Thread.sleep(1.seconds);
result += element;
writefln("finished - element: %s, result: %s",
element, result);
return result;
}
void main() {
writeln("Result: ", $(HILITE reduce)!aCalculation(0, [1, 2, 3, 4]));
}
---
$(P
$(C reduce()) 将按照顺序使用元素:
)
$(SHELL