1 /**
2 `std.parallelism` implements high-level primitives for SMP parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise parallelism. `std.parallelism` is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread. For communication between arbitrary threads, see
8 `std.concurrency`.
9
10 `std.parallelism` is based on the concept of a `Task`. A `Task` is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other `Task`. Using `Task`
13 directly allows programming with a future/promise paradigm. All other
14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over `Task`. They
16 automatically create one or more `Task` objects, or closely related types
17 that are conceptually identical but not part of the public API.
18
19 After creation, a `Task` may be executed in a new thread, or submitted
20 to a `TaskPool` for execution. A `TaskPool` encapsulates a task queue
21 and its worker threads. Its purpose is to efficiently map a large
22 number of `Task`s onto a smaller number of threads. A task queue is a
23 FIFO queue of `Task` objects that have been submitted to the
24 `TaskPool` and are awaiting execution. A worker thread is a thread that
25 is associated with exactly one task queue. It executes the `Task` at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available. Each task queue is associated with zero or
28 more worker threads. If the result of a `Task` is needed before execution
29 by a worker thread has begun, the `Task` can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31
32 Warning: Unless marked as `@trusted` or `@safe`, artifacts in
33 this module allow implicit data sharing between threads and cannot
34 guarantee that client code is free from low level data races.
35
36 Source: $(PHOBOSSRC std/parallelism.d)
37 Author: David Simcha
38 Copyright: Copyright (c) 2009-2011, David Simcha.
39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
42
43 version (OSX)
44 version = Darwin;
45 else version (iOS)
46 version = Darwin;
47 else version (TVOS)
48 version = Darwin;
49 else version (WatchOS)
50 version = Darwin;
51
52 ///
53 @system unittest
54 {
55 import std.algorithm.iteration : map;
56 import std.math.operations : isClose;
57 import std.parallelism : taskPool;
58 import std.range : iota;
59
60 // Parallel reduce can be combined with
61 // std.algorithm.iteration.map to interesting effect.
62 // The following example (thanks to Russel Winder)
63 // calculates pi by quadrature using
64 // std.algorithm.map and TaskPool.reduce.
65 // getTerm is evaluated in parallel as needed by
66 // TaskPool.reduce.
67 //
68 // Timings on an Intel i5-3450 quad core machine
69 // for n = 1_000_000_000:
70 //
71 // TaskPool.reduce: 1.067 s
72 // std.algorithm.reduce: 4.011 s
73
74 enum n = 1_000_000;
75 enum delta = 1.0 / n;
76
77 alias getTerm = (int i)
78 {
79 immutable x = ( i - 0.5 ) * delta;
80 return delta / ( 1.0 + x * x ) ;
81 };
82
83 immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
84
85 assert(pi.isClose(3.14159, 1e-5));
86 }
87
88 import core.atomic;
89 import core.memory;
90 import core.sync.condition;
91 import core.thread;
92
93 import std.functional;
94 import std.meta;
95 import std.range.primitives;
96 import std.traits;
97
98 /*
99 (For now public undocumented with reserved name.)
100
101 A lazily initialized global constant. The underlying value is a shared global
102 statically initialized to `outOfBandValue` which must not be a legit value of
103 the constant. Upon the first call the situation is detected and the global is
104 initialized by calling `initializer`. The initializer is assumed to be pure
105 (even if not marked as such), i.e. return the same value upon repeated calls.
106 For that reason, no special precautions are taken so `initializer` may be called
107 more than one time leading to benign races on the cached value.
108
109 In the quiescent state the cost of the function is an atomic load from a global.
110
111 Params:
112 T = The type of the pseudo-constant (may be qualified)
113 outOfBandValue = A value that cannot be valid, it is used for initialization
114 initializer = The function performing initialization; must be `nothrow`
115
116 Returns:
117 The lazily initialized value
118 */
119 @property pure
120 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)()
121 if (is(Unqual!T : T)
122 && is(typeof(initializer()) : T)
123 && is(typeof(outOfBandValue) : T))
124 {
impl()125 static T impl() nothrow
126 {
127 // Thread-local cache
128 static Unqual!T tls = outOfBandValue;
129 auto local = tls;
130 // Shortest path, no atomic operations
131 if (local != outOfBandValue) return local;
132 // Process-level cache
133 static shared Unqual!T result = outOfBandValue;
134 // Initialize both process-level cache and tls
135 local = atomicLoad(result);
136 if (local == outOfBandValue)
137 {
138 local = initializer();
139 atomicStore(result, local);
140 }
141 tls = local;
142 return local;
143 }
144
145 import std.traits : SetFunctionAttributes;
146 alias Fun = SetFunctionAttributes!(typeof(&impl), "D",
147 functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_);
148 auto purified = (() @trusted => cast(Fun) &impl)();
149 return purified();
150 }
151
152 // Returns the size of a cache line.
153 alias cacheLineSize =
154 __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl);
155
cacheLineSizeImpl()156 private size_t cacheLineSizeImpl() @nogc nothrow @trusted
157 {
158 size_t result = 0;
159 import core.cpuid : datacache;
160 foreach (ref const cachelevel; datacache)
161 {
162 if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max)
163 {
164 result = cachelevel.lineSize;
165 }
166 }
167 return result;
168 }
169
170 @nogc @safe nothrow unittest
171 {
172 assert(cacheLineSize == cacheLineSizeImpl);
173 }
174
175 /* Atomics code. These forward to core.atomic, but are written like this
176 for two reasons:
177
178 1. They used to actually contain ASM code and I don' want to have to change
179 to directly calling core.atomic in a zillion different places.
180
181 2. core.atomic has some misc. issues that make my use cases difficult
182 without wrapping it. If I didn't wrap it, casts would be required
183 basically everywhere.
184 */
185 private void atomicSetUbyte(T)(ref T stuff, T newVal)
186 if (__traits(isIntegral, T) && is(T : ubyte))
187 {
188 //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
189 atomicStore(*(cast(shared) &stuff), newVal);
190 }
191
192 private ubyte atomicReadUbyte(T)(ref T val)
193 if (__traits(isIntegral, T) && is(T : ubyte))
194 {
195 return atomicLoad(*(cast(shared) &val));
196 }
197
198 // This gets rid of the need for a lot of annoying casts in other parts of the
199 // code, when enums are involved.
200 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
201 if (__traits(isIntegral, T) && is(T : ubyte))
202 {
203 return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
204 }
205
206 /*--------------------- Generic helper functions, etc.------------------------*/
MapType(R,functions...)207 private template MapType(R, functions...)
208 {
209 static assert(functions.length);
210
211 ElementType!R e = void;
212 alias MapType =
213 typeof(adjoin!(staticMap!(unaryFun, functions))(e));
214 }
215
ReduceType(alias fun,R,E)216 private template ReduceType(alias fun, R, E)
217 {
218 alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
219 }
220
noUnsharedAliasing(T)221 private template noUnsharedAliasing(T)
222 {
223 enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
224 }
225
226 // This template tests whether a function may be executed in parallel from
227 // @safe code via Task.executeInNewThread(). There is an additional
228 // requirement for executing it via a TaskPool. (See isSafeReturn).
isSafeTask(F)229 private template isSafeTask(F)
230 {
231 enum bool isSafeTask =
232 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
233 (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
234 (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
235 allSatisfy!(noUnsharedAliasing, Parameters!F);
236 }
237
238 @safe unittest
239 {
240 alias F1 = void function() @safe;
241 alias F2 = void function();
242 alias F3 = void function(uint, string) @trusted;
243 alias F4 = void function(uint, char[]);
244
245 static assert( isSafeTask!F1);
246 static assert(!isSafeTask!F2);
247 static assert( isSafeTask!F3);
248 static assert(!isSafeTask!F4);
249
250 alias F5 = uint[] function(uint, string) pure @trusted;
251 static assert( isSafeTask!F5);
252 }
253
254 // This function decides whether Tasks that meet all of the other requirements
255 // for being executed from @safe code can be executed on a TaskPool.
256 // When executing via TaskPool, it's theoretically possible
257 // to return a value that is also pointed to by a worker thread's thread local
258 // storage. When executing from executeInNewThread(), the thread that executed
259 // the Task is terminated by the time the return value is visible in the calling
260 // thread, so this is a non-issue. It's also a non-issue for pure functions
261 // since they can't read global state.
isSafeReturn(T)262 private template isSafeReturn(T)
263 {
264 static if (!hasUnsharedAliasing!(T.ReturnType))
265 {
266 enum isSafeReturn = true;
267 }
268 else static if (T.isPure)
269 {
270 enum isSafeReturn = true;
271 }
272 else
273 {
274 enum isSafeReturn = false;
275 }
276 }
277
randAssignable(R)278 private template randAssignable(R)
279 {
280 enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
281 }
282
283 private enum TaskStatus : ubyte
284 {
285 notStarted,
286 inProgress,
287 done
288 }
289
AliasReturn(alias fun,T...)290 private template AliasReturn(alias fun, T...)
291 {
292 alias AliasReturn = typeof({ T args; return fun(args); });
293 }
294
295 // Should be private, but std.algorithm.reduce is used in the zero-thread case
296 // and won't work w/ private.
reduceAdjoin(functions...)297 template reduceAdjoin(functions...)
298 {
299 static if (functions.length == 1)
300 {
301 alias reduceAdjoin = binaryFun!(functions[0]);
302 }
303 else
304 {
305 T reduceAdjoin(T, U)(T lhs, U rhs)
306 {
307 alias funs = staticMap!(binaryFun, functions);
308
309 foreach (i, Unused; typeof(lhs.expand))
310 {
311 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
312 }
313
314 return lhs;
315 }
316 }
317 }
318
reduceFinish(functions...)319 private template reduceFinish(functions...)
320 {
321 static if (functions.length == 1)
322 {
323 alias reduceFinish = binaryFun!(functions[0]);
324 }
325 else
326 {
327 T reduceFinish(T)(T lhs, T rhs)
328 {
329 alias funs = staticMap!(binaryFun, functions);
330
331 foreach (i, Unused; typeof(lhs.expand))
332 {
333 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
334 }
335
336 return lhs;
337 }
338 }
339 }
340
341 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
342 {
343 enum isRoundRobin = true;
344 }
345
isRoundRobin(T)346 private template isRoundRobin(T)
347 {
348 enum isRoundRobin = false;
349 }
350
351 @safe unittest
352 {
353 static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
354 static assert(!isRoundRobin!(uint));
355 }
356
357 // This is the base "class" for all of the other tasks. Using C-style
358 // polymorphism to allow more direct control over memory allocation, etc.
359 private struct AbstractTask
360 {
361 AbstractTask* prev;
362 AbstractTask* next;
363
364 // Pointer to a function that executes this task.
365 void function(void*) runTask;
366
367 Throwable exception;
368 ubyte taskStatus = TaskStatus.notStarted;
369
doneAbstractTask370 bool done() @property
371 {
372 if (atomicReadUbyte(taskStatus) == TaskStatus.done)
373 {
374 if (exception)
375 {
376 throw exception;
377 }
378
379 return true;
380 }
381
382 return false;
383 }
384
jobAbstractTask385 void job()
386 {
387 runTask(&this);
388 }
389 }
390
391 /**
392 `Task` represents the fundamental unit of work. A `Task` may be
393 executed in parallel with any other `Task`. Using this struct directly
394 allows future/promise parallelism. In this paradigm, a function (or delegate
395 or other callable) is executed in a thread other than the one it was called
396 from. The calling thread does not block while the function is being executed.
397 A call to `workForce`, `yieldForce`, or `spinForce` is used to
398 ensure that the `Task` has finished executing and to obtain the return
399 value, if any. These functions and `done` also act as full memory barriers,
400 meaning that any memory writes made in the thread that executed the `Task`
401 are guaranteed to be visible in the calling thread after one of these functions
402 returns.
403
404 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
405 be used to create an instance of this struct. See `task` for usage examples.
406
407 Function results are returned from `yieldForce`, `spinForce` and
408 `workForce` by ref. If `fun` returns by ref, the reference will point
409 to the returned reference of `fun`. Otherwise it will point to a
410 field in this struct.
411
412 Copying of this struct is disabled, since it would provide no useful semantics.
413 If you want to pass this struct around, you should do so by reference or
414 pointer.
415
416 Bugs: Changes to `ref` and `out` arguments are not propagated to the
417 call site, only to `args` in this struct.
418 */
Task(alias fun,Args...)419 struct Task(alias fun, Args...)
420 {
421 AbstractTask base = {runTask : &impl};
422 alias base this;
423
424 private @property AbstractTask* basePtr()
425 {
426 return &base;
427 }
428
429 private static void impl(void* myTask)
430 {
431 import std.algorithm.internal : addressOf;
432
433 Task* myCastedTask = cast(typeof(this)*) myTask;
434 static if (is(ReturnType == void))
435 {
436 fun(myCastedTask._args);
437 }
438 else static if (is(typeof(&(fun(myCastedTask._args)))))
439 {
440 myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
441 }
442 else
443 {
444 myCastedTask.returnVal = fun(myCastedTask._args);
445 }
446 }
447
448 private TaskPool pool;
449 private bool isScoped; // True if created with scopedTask.
450
451 Args _args;
452
453 /**
454 The arguments the function was called with. Changes to `out` and
455 `ref` arguments will be visible here.
456 */
457 static if (__traits(isSame, fun, run))
458 {
459 alias args = _args[1..$];
460 }
461 else
462 {
463 alias args = _args;
464 }
465
466
467 // The purpose of this code is to decide whether functions whose
468 // return values have unshared aliasing can be executed via
469 // TaskPool from @safe code. See isSafeReturn.
470 static if (__traits(isSame, fun, run))
471 {
472 static if (isFunctionPointer!(_args[0]))
473 {
474 private enum bool isPure =
475 (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0;
476 }
477 else
478 {
479 // BUG: Should check this for delegates too, but std.traits
480 // apparently doesn't allow this. isPure is irrelevant
481 // for delegates, at least for now since shared delegates
482 // don't work.
483 private enum bool isPure = false;
484 }
485
486 }
487 else
488 {
489 // We already know that we can't execute aliases in @safe code, so
490 // just put a dummy value here.
491 private enum bool isPure = false;
492 }
493
494
495 /**
496 The return type of the function called by this `Task`. This can be
497 `void`.
498 */
499 alias ReturnType = typeof(fun(_args));
500
501 static if (!is(ReturnType == void))
502 {
503 static if (is(typeof(&fun(_args))))
504 {
505 // Ref return.
506 ReturnType* returnVal;
507
508 ref ReturnType fixRef(ReturnType* val)
509 {
510 return *val;
511 }
512
513 }
514 else
515 {
516 ReturnType returnVal;
517
518 ref ReturnType fixRef(ref ReturnType val)
519 {
520 return val;
521 }
522 }
523 }
524
525 private void enforcePool()
526 {
527 import std.exception : enforce;
528 enforce(this.pool !is null, "Job not submitted yet.");
529 }
530
531 static if (Args.length > 0)
532 {
533 private this(Args args)
534 {
535 _args = args;
536 }
537 }
538
539 // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588,
540 // allow immutable elements.
541 static if (allSatisfy!(isAssignable, Args))
542 {
543 typeof(this) opAssign(typeof(this) rhs)
544 {
545 foreach (i, Type; typeof(this.tupleof))
546 {
547 this.tupleof[i] = rhs.tupleof[i];
548 }
549 return this;
550 }
551 }
552 else
553 {
554 @disable typeof(this) opAssign(typeof(this) rhs);
555 }
556
557 /**
558 If the `Task` isn't started yet, execute it in the current thread.
559 If it's done, return its return value, if any. If it's in progress,
560 busy spin until it's done, then return the return value. If it threw
561 an exception, rethrow that exception.
562
563 This function should be used when you expect the result of the
564 `Task` to be available on a timescale shorter than that of an OS
565 context switch.
566 */
567 @property ref ReturnType spinForce() @trusted
568 {
569 enforcePool();
570
571 this.pool.tryDeleteExecute(basePtr);
572
573 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
574
575 if (exception)
576 {
577 throw exception;
578 }
579
580 static if (!is(ReturnType == void))
581 {
582 return fixRef(this.returnVal);
583 }
584 }
585
586 /**
587 If the `Task` isn't started yet, execute it in the current thread.
588 If it's done, return its return value, if any. If it's in progress,
589 wait on a condition variable. If it threw an exception, rethrow that
590 exception.
591
592 This function should be used for expensive functions, as waiting on a
593 condition variable introduces latency, but avoids wasted CPU cycles.
594 */
595 @property ref ReturnType yieldForce() @trusted
596 {
597 enforcePool();
598 this.pool.tryDeleteExecute(basePtr);
599
600 if (done)
601 {
602 static if (is(ReturnType == void))
603 {
604 return;
605 }
606 else
607 {
608 return fixRef(this.returnVal);
609 }
610 }
611
612 pool.waiterLock();
613 scope(exit) pool.waiterUnlock();
614
615 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
616 {
617 pool.waitUntilCompletion();
618 }
619
620 if (exception)
621 {
622 throw exception; // nocoverage
623 }
624
625 static if (!is(ReturnType == void))
626 {
627 return fixRef(this.returnVal);
628 }
629 }
630
631 /**
632 If this `Task` was not started yet, execute it in the current
633 thread. If it is finished, return its result. If it is in progress,
634 execute any other `Task` from the `TaskPool` instance that
635 this `Task` was submitted to until this one
636 is finished. If it threw an exception, rethrow that exception.
637 If no other tasks are available or this `Task` was executed using
638 `executeInNewThread`, wait on a condition variable.
639 */
640 @property ref ReturnType workForce() @trusted
641 {
642 enforcePool();
643 this.pool.tryDeleteExecute(basePtr);
644
645 while (true)
646 {
647 if (done) // done() implicitly checks for exceptions.
648 {
649 static if (is(ReturnType == void))
650 {
651 return;
652 }
653 else
654 {
655 return fixRef(this.returnVal);
656 }
657 }
658
659 AbstractTask* job;
660 {
661 // Locking explicitly and calling popNoSync() because
662 // pop() waits on a condition variable if there are no Tasks
663 // in the queue.
664
665 pool.queueLock();
666 scope(exit) pool.queueUnlock();
667 job = pool.popNoSync();
668 }
669
670
671 if (job !is null)
672 {
673
674 version (verboseUnittest)
675 {
676 stderr.writeln("Doing workForce work.");
677 }
678
679 pool.doJob(job);
680
681 if (done)
682 {
683 static if (is(ReturnType == void))
684 {
685 return;
686 }
687 else
688 {
689 return fixRef(this.returnVal);
690 }
691 }
692 }
693 else
694 {
695 version (verboseUnittest)
696 {
697 stderr.writeln("Yield from workForce.");
698 }
699
700 return yieldForce;
701 }
702 }
703 }
704
705 /**
706 Returns `true` if the `Task` is finished executing.
707
708 Throws: Rethrows any exception thrown during the execution of the
709 `Task`.
710 */
711 @property bool done() @trusted
712 {
713 // Explicitly forwarded for documentation purposes.
714 return base.done;
715 }
716
717 /**
718 Create a new thread for executing this `Task`, execute it in the
719 newly created thread, then terminate the thread. This can be used for
720 future/promise parallelism. An explicit priority may be given
721 to the `Task`. If one is provided, its value is forwarded to
722 `core.thread.Thread.priority`. See $(REF task, std,parallelism) for
723 usage example.
724 */
725 void executeInNewThread() @trusted
726 {
727 pool = new TaskPool(basePtr);
728 }
729
730 /// Ditto
731 void executeInNewThread(int priority) @trusted
732 {
733 pool = new TaskPool(basePtr, priority);
734 }
735
736 @safe ~this()
737 {
738 if (isScoped && pool !is null && taskStatus != TaskStatus.done)
739 {
740 yieldForce;
741 }
742 }
743
744 // When this is uncommented, it somehow gets called on returning from
745 // scopedTask even though the struct shouldn't be getting copied.
746 //@disable this(this) {}
747 }
748
749 // Calls `fpOrDelegate` with `args`. This is an
750 // adapter that makes `Task` work with delegates, function pointers and
751 // functors instead of just aliases.
752 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
753 {
754 return fpOrDelegate(args);
755 }
756
757 /**
758 Creates a `Task` on the GC heap that calls an alias. This may be executed
759 via `Task.executeInNewThread` or by submitting to a
760 $(REF TaskPool, std,parallelism). A globally accessible instance of
761 `TaskPool` is provided by $(REF taskPool, std,parallelism).
762
763 Returns: A pointer to the `Task`.
764
765 Example:
766 ---
767 // Read two files into memory at the same time.
768 import std.file;
769
770 void main()
771 {
772 // Create and execute a Task for reading
773 // foo.txt.
774 auto file1Task = task!read("foo.txt");
775 file1Task.executeInNewThread();
776
777 // Read bar.txt in parallel.
778 auto file2Data = read("bar.txt");
779
780 // Get the results of reading foo.txt.
781 auto file1Data = file1Task.yieldForce;
782 }
783 ---
784
785 ---
786 // Sorts an array using a parallel quick sort algorithm.
787 // The first partition is done serially. Both recursion
788 // branches are then executed in parallel.
789 //
790 // Timings for sorting an array of 1,000,000 doubles on
791 // an Athlon 64 X2 dual core machine:
792 //
793 // This implementation: 176 milliseconds.
794 // Equivalent serial implementation: 280 milliseconds
795 void parallelSort(T)(T[] data)
796 {
797 // Sort small subarrays serially.
798 if (data.length < 100)
799 {
800 std.algorithm.sort(data);
801 return;
802 }
803
804 // Partition the array.
805 swap(data[$ / 2], data[$ - 1]);
806 auto pivot = data[$ - 1];
807 bool lessThanPivot(T elem) { return elem < pivot; }
808
809 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
810 swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
811
812 auto less = data[0..$ - greaterEqual.length - 1];
813 greaterEqual = data[$ - greaterEqual.length..$];
814
815 // Execute both recursion branches in parallel.
816 auto recurseTask = task!parallelSort(greaterEqual);
817 taskPool.put(recurseTask);
818 parallelSort(less);
819 recurseTask.yieldForce;
820 }
821 ---
822 */
task(alias fun,Args...)823 auto task(alias fun, Args...)(Args args)
824 {
825 return new Task!(fun, Args)(args);
826 }
827
828 /**
829 Creates a `Task` on the GC heap that calls a function pointer, delegate, or
830 class/struct with overloaded opCall.
831
832 Example:
833 ---
834 // Read two files in at the same time again,
835 // but this time use a function pointer instead
836 // of an alias to represent std.file.read.
837 import std.file;
838
839 void main()
840 {
841 // Create and execute a Task for reading
842 // foo.txt.
843 auto file1Task = task(&read!string, "foo.txt", size_t.max);
844 file1Task.executeInNewThread();
845
846 // Read bar.txt in parallel.
847 auto file2Data = read("bar.txt");
848
849 // Get the results of reading foo.txt.
850 auto file1Data = file1Task.yieldForce;
851 }
852 ---
853
854 Notes: This function takes a non-scope delegate, meaning it can be
855 used with closures. If you can't allocate a closure due to objects
856 on the stack that have scoped destruction, see `scopedTask`, which
857 takes a scope delegate.
858 */
859 auto task(F, Args...)(F delegateOrFp, Args args)
860 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
861 {
862 return new Task!(run, F, Args)(delegateOrFp, args);
863 }
864
865 /**
866 Version of `task` usable from `@safe` code. Usage mechanics are
867 identical to the non-@safe case, but safety introduces some restrictions:
868
869 1. `fun` must be @safe or @trusted.
870
871 2. `F` must not have any unshared aliasing as defined by
872 $(REF hasUnsharedAliasing, std,traits). This means it
873 may not be an unshared delegate or a non-shared class or struct
874 with overloaded `opCall`. This also precludes accepting template
875 alias parameters.
876
877 3. `Args` must not have unshared aliasing.
878
879 4. `fun` must not return by reference.
880
881 5. The return type must not have unshared aliasing unless `fun` is
882 `pure` or the `Task` is executed via `executeInNewThread` instead
883 of using a `TaskPool`.
884
885 */
886 @trusted auto task(F, Args...)(F fun, Args args)
887 if (is(typeof(fun(args))) && isSafeTask!F)
888 {
889 return new Task!(run, F, Args)(fun, args);
890 }
891
892 /**
893 These functions allow the creation of `Task` objects on the stack rather
894 than the GC heap. The lifetime of a `Task` created by `scopedTask`
895 cannot exceed the lifetime of the scope it was created in.
896
897 `scopedTask` might be preferred over `task`:
898
899 1. When a `Task` that calls a delegate is being created and a closure
900 cannot be allocated due to objects on the stack that have scoped
901 destruction. The delegate overload of `scopedTask` takes a `scope`
902 delegate.
903
904 2. As a micro-optimization, to avoid the heap allocation associated with
905 `task` or with the creation of a closure.
906
907 Usage is otherwise identical to `task`.
908
909 Notes: `Task` objects created using `scopedTask` will automatically
910 call `Task.yieldForce` in their destructor if necessary to ensure
911 the `Task` is complete before the stack frame they reside on is destroyed.
912 */
scopedTask(alias fun,Args...)913 auto scopedTask(alias fun, Args...)(Args args)
914 {
915 auto ret = Task!(fun, Args)(args);
916 ret.isScoped = true;
917 return ret;
918 }
919
920 /// Ditto
921 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
922 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
923 {
924 auto ret = Task!(run, F, Args)(delegateOrFp, args);
925 ret.isScoped = true;
926 return ret;
927 }
928
929 /// Ditto
930 @trusted auto scopedTask(F, Args...)(F fun, Args args)
931 if (is(typeof(fun(args))) && isSafeTask!F)
932 {
933 auto ret = Task!(run, F, Args)(fun, args);
934 ret.isScoped = true;
935 return ret;
936 }
937
938 /**
939 The total number of CPU cores available on the current machine, as reported by
940 the operating system.
941 */
942 alias totalCPUs =
943 __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl);
944
totalCPUsImpl()945 uint totalCPUsImpl() @nogc nothrow @trusted
946 {
947 version (Windows)
948 {
949 // BUGS: Only works on Windows 2000 and above.
950 import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
951 import std.algorithm.comparison : max;
952 SYSTEM_INFO si;
953 GetSystemInfo(&si);
954 return max(1, cast(uint) si.dwNumberOfProcessors);
955 }
956 else version (linux)
957 {
958 import core.stdc.stdlib : calloc;
959 import core.stdc.string : memset;
960 import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity;
961 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
962
963 int count = 0;
964
965 /**
966 * According to ruby's source code, CPU_ALLOC() doesn't work as expected.
967 * see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242
968 *
969 * The hardcode number also comes from ruby's source code.
970 * see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4
971 */
972 for (int n = 64; n <= 16384; n *= 2)
973 {
974 size_t size = CPU_ALLOC_SIZE(count);
975 if (size >= 0x400)
976 {
977 auto cpuset = cast(cpu_set_t*) calloc(1, size);
978 if (cpuset is null) break;
979 if (sched_getaffinity(0, size, cpuset) == 0)
980 {
981 count = CPU_COUNT_S(size, cpuset);
982 }
983 CPU_FREE(cpuset);
984 }
985 else
986 {
987 cpu_set_t cpuset;
988 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0)
989 {
990 count = CPU_COUNT(&cpuset);
991 }
992 }
993
994 if (count > 0)
995 return cast(uint) count;
996 }
997
998 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
999 }
1000 else version (Darwin)
1001 {
1002 import core.sys.darwin.sys.sysctl : sysctlbyname;
1003 uint result;
1004 size_t len = result.sizeof;
1005 sysctlbyname("hw.physicalcpu", &result, &len, null, 0);
1006 return result;
1007 }
1008 else version (DragonFlyBSD)
1009 {
1010 import core.sys.dragonflybsd.sys.sysctl : sysctlbyname;
1011 uint result;
1012 size_t len = result.sizeof;
1013 sysctlbyname("hw.ncpu", &result, &len, null, 0);
1014 return result;
1015 }
1016 else version (FreeBSD)
1017 {
1018 import core.sys.freebsd.sys.sysctl : sysctlbyname;
1019 uint result;
1020 size_t len = result.sizeof;
1021 sysctlbyname("hw.ncpu", &result, &len, null, 0);
1022 return result;
1023 }
1024 else version (NetBSD)
1025 {
1026 import core.sys.netbsd.sys.sysctl : sysctlbyname;
1027 uint result;
1028 size_t len = result.sizeof;
1029 sysctlbyname("hw.ncpu", &result, &len, null, 0);
1030 return result;
1031 }
1032 else version (Solaris)
1033 {
1034 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1035 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1036 }
1037 else version (OpenBSD)
1038 {
1039 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1040 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1041 }
1042 else version (Hurd)
1043 {
1044 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1045 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1046 }
1047 else
1048 {
1049 static assert(0, "Don't know how to get N CPUs on this OS.");
1050 }
1051 }
1052
1053 /*
1054 This class serves two purposes:
1055
1056 1. It distinguishes std.parallelism threads from other threads so that
1057 the std.parallelism daemon threads can be terminated.
1058
1059 2. It adds a reference to the pool that the thread is a member of,
1060 which is also necessary to allow the daemon threads to be properly
1061 terminated.
1062 */
1063 private final class ParallelismThread : Thread
1064 {
this(void delegate ()dg)1065 this(void delegate() dg)
1066 {
1067 super(dg);
1068 }
1069
1070 TaskPool pool;
1071 }
1072
1073 // Kill daemon threads.
~this()1074 shared static ~this()
1075 {
1076 foreach (ref thread; Thread)
1077 {
1078 auto pthread = cast(ParallelismThread) thread;
1079 if (pthread is null) continue;
1080 auto pool = pthread.pool;
1081 if (!pool.isDaemon) continue;
1082 pool.stop();
1083 pthread.join();
1084 }
1085 }
1086
1087 /**
1088 This class encapsulates a task queue and a set of worker threads. Its purpose
1089 is to efficiently map a large number of `Task`s onto a smaller number of
1090 threads. A task queue is a FIFO queue of `Task` objects that have been
1091 submitted to the `TaskPool` and are awaiting execution. A worker thread is a
1092 thread that executes the `Task` at the front of the queue when one is
1093 available and sleeps when the queue is empty.
1094
1095 This class should usually be used via the global instantiation
1096 available via the $(REF taskPool, std,parallelism) property.
1097 Occasionally it is useful to explicitly instantiate a `TaskPool`:
1098
1099 1. When you want `TaskPool` instances with multiple priorities, for example
1100 a low priority pool and a high priority pool.
1101
1102 2. When the threads in the global task pool are waiting on a synchronization
1103 primitive (for example a mutex), and you want to parallelize the code that
1104 needs to run before these threads can be resumed.
1105
1106 Note: The worker threads in this pool will not stop until
1107 `stop` or `finish` is called, even if the main thread
1108 has finished already. This may lead to programs that
1109 never end. If you do not want this behaviour, you can set `isDaemon`
1110 to true.
1111 */
1112 final class TaskPool
1113 {
1114 private:
1115
1116 // A pool can either be a regular pool or a single-task pool. A
1117 // single-task pool is a dummy pool that's fired up for
1118 // Task.executeInNewThread().
1119 bool isSingleTask;
1120
1121 ParallelismThread[] pool;
1122 Thread singleTaskThread;
1123
1124 AbstractTask* head;
1125 AbstractTask* tail;
1126 PoolState status = PoolState.running;
1127 Condition workerCondition;
1128 Condition waiterCondition;
1129 Mutex queueMutex;
1130 Mutex waiterMutex; // For waiterCondition
1131
1132 // The instanceStartIndex of the next instance that will be created.
1133 __gshared size_t nextInstanceIndex = 1;
1134
1135 // The index of the current thread.
1136 static size_t threadIndex;
1137
1138 // The index of the first thread in this instance.
1139 immutable size_t instanceStartIndex;
1140
1141 // The index that the next thread to be initialized in this pool will have.
1142 size_t nextThreadIndex;
1143
1144 enum PoolState : ubyte
1145 {
1146 running,
1147 finishing,
1148 stopNow
1149 }
1150
doJob(AbstractTask * job)1151 void doJob(AbstractTask* job)
1152 {
1153 assert(job.taskStatus == TaskStatus.inProgress);
1154 assert(job.next is null);
1155 assert(job.prev is null);
1156
1157 scope(exit)
1158 {
1159 if (!isSingleTask)
1160 {
1161 waiterLock();
1162 scope(exit) waiterUnlock();
1163 notifyWaiters();
1164 }
1165 }
1166
1167 try
1168 {
1169 job.job();
1170 }
1171 catch (Throwable e)
1172 {
1173 job.exception = e;
1174 }
1175
1176 atomicSetUbyte(job.taskStatus, TaskStatus.done);
1177 }
1178
1179 // This function is used for dummy pools created by Task.executeInNewThread().
doSingleTask()1180 void doSingleTask()
1181 {
1182 // No synchronization. Pool is guaranteed to only have one thread,
1183 // and the queue is submitted to before this thread is created.
1184 assert(head);
1185 auto t = head;
1186 t.next = t.prev = head = null;
1187 doJob(t);
1188 }
1189
1190 // This function performs initialization for each thread that affects
1191 // thread local storage and therefore must be done from within the
1192 // worker thread. It then calls executeWorkLoop().
startWorkLoop()1193 void startWorkLoop()
1194 {
1195 // Initialize thread index.
1196 {
1197 queueLock();
1198 scope(exit) queueUnlock();
1199 threadIndex = nextThreadIndex;
1200 nextThreadIndex++;
1201 }
1202
1203 executeWorkLoop();
1204 }
1205
1206 // This is the main work loop that worker threads spend their time in
1207 // until they terminate. It's also entered by non-worker threads when
1208 // finish() is called with the blocking variable set to true.
executeWorkLoop()1209 void executeWorkLoop()
1210 {
1211 while (atomicReadUbyte(status) != PoolState.stopNow)
1212 {
1213 AbstractTask* task = pop();
1214 if (task is null)
1215 {
1216 if (atomicReadUbyte(status) == PoolState.finishing)
1217 {
1218 atomicSetUbyte(status, PoolState.stopNow);
1219 return;
1220 }
1221 }
1222 else
1223 {
1224 doJob(task);
1225 }
1226 }
1227 }
1228
1229 // Pop a task off the queue.
pop()1230 AbstractTask* pop()
1231 {
1232 queueLock();
1233 scope(exit) queueUnlock();
1234 auto ret = popNoSync();
1235 while (ret is null && status == PoolState.running)
1236 {
1237 wait();
1238 ret = popNoSync();
1239 }
1240 return ret;
1241 }
1242
popNoSync()1243 AbstractTask* popNoSync()
1244 out(returned)
1245 {
1246 /* If task.prev and task.next aren't null, then another thread
1247 * can try to delete this task from the pool after it's
1248 * alreadly been deleted/popped.
1249 */
1250 if (returned !is null)
1251 {
1252 assert(returned.next is null);
1253 assert(returned.prev is null);
1254 }
1255 }
1256 do
1257 {
1258 if (isSingleTask) return null;
1259
1260 AbstractTask* returned = head;
1261 if (head !is null)
1262 {
1263 head = head.next;
1264 returned.prev = null;
1265 returned.next = null;
1266 returned.taskStatus = TaskStatus.inProgress;
1267 }
1268 if (head !is null)
1269 {
1270 head.prev = null;
1271 }
1272
1273 return returned;
1274 }
1275
1276 // Push a task onto the queue.
abstractPut(AbstractTask * task)1277 void abstractPut(AbstractTask* task)
1278 {
1279 queueLock();
1280 scope(exit) queueUnlock();
1281 abstractPutNoSync(task);
1282 }
1283
abstractPutNoSync(AbstractTask * task)1284 void abstractPutNoSync(AbstractTask* task)
1285 in
1286 {
1287 assert(task);
1288 }
1289 out
1290 {
1291 import std.conv : text;
1292
1293 assert(tail.prev !is tail);
1294 assert(tail.next is null, text(tail.prev, '\t', tail.next));
1295 if (tail.prev !is null)
1296 {
1297 assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1298 }
1299 }
1300 do
1301 {
1302 // Not using enforce() to save on function call overhead since this
1303 // is a performance critical function.
1304 if (status != PoolState.running)
1305 {
1306 throw new Error(
1307 "Cannot submit a new task to a pool after calling " ~
1308 "finish() or stop()."
1309 );
1310 }
1311
1312 task.next = null;
1313 if (head is null) //Queue is empty.
1314 {
1315 head = task;
1316 tail = task;
1317 tail.prev = null;
1318 }
1319 else
1320 {
1321 assert(tail);
1322 task.prev = tail;
1323 tail.next = task;
1324 tail = task;
1325 }
1326 notify();
1327 }
1328
abstractPutGroupNoSync(AbstractTask * h,AbstractTask * t)1329 void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1330 {
1331 if (status != PoolState.running)
1332 {
1333 throw new Error(
1334 "Cannot submit a new task to a pool after calling " ~
1335 "finish() or stop()."
1336 );
1337 }
1338
1339 if (head is null)
1340 {
1341 head = h;
1342 tail = t;
1343 }
1344 else
1345 {
1346 h.prev = tail;
1347 tail.next = h;
1348 tail = t;
1349 }
1350
1351 notifyAll();
1352 }
1353
tryDeleteExecute(AbstractTask * toExecute)1354 void tryDeleteExecute(AbstractTask* toExecute)
1355 {
1356 if (isSingleTask) return;
1357
1358 if ( !deleteItem(toExecute) )
1359 {
1360 return;
1361 }
1362
1363 try
1364 {
1365 toExecute.job();
1366 }
1367 catch (Exception e)
1368 {
1369 toExecute.exception = e;
1370 }
1371
1372 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1373 }
1374
deleteItem(AbstractTask * item)1375 bool deleteItem(AbstractTask* item)
1376 {
1377 queueLock();
1378 scope(exit) queueUnlock();
1379 return deleteItemNoSync(item);
1380 }
1381
deleteItemNoSync(AbstractTask * item)1382 bool deleteItemNoSync(AbstractTask* item)
1383 {
1384 if (item.taskStatus != TaskStatus.notStarted)
1385 {
1386 return false;
1387 }
1388 item.taskStatus = TaskStatus.inProgress;
1389
1390 if (item is head)
1391 {
1392 // Make sure head gets set properly.
1393 popNoSync();
1394 return true;
1395 }
1396 if (item is tail)
1397 {
1398 tail = tail.prev;
1399 if (tail !is null)
1400 {
1401 tail.next = null;
1402 }
1403 item.next = null;
1404 item.prev = null;
1405 return true;
1406 }
1407 if (item.next !is null)
1408 {
1409 assert(item.next.prev is item); // Check queue consistency.
1410 item.next.prev = item.prev;
1411 }
1412 if (item.prev !is null)
1413 {
1414 assert(item.prev.next is item); // Check queue consistency.
1415 item.prev.next = item.next;
1416 }
1417 item.next = null;
1418 item.prev = null;
1419 return true;
1420 }
1421
queueLock()1422 void queueLock()
1423 {
1424 assert(queueMutex);
1425 if (!isSingleTask) queueMutex.lock();
1426 }
1427
queueUnlock()1428 void queueUnlock()
1429 {
1430 assert(queueMutex);
1431 if (!isSingleTask) queueMutex.unlock();
1432 }
1433
waiterLock()1434 void waiterLock()
1435 {
1436 if (!isSingleTask) waiterMutex.lock();
1437 }
1438
waiterUnlock()1439 void waiterUnlock()
1440 {
1441 if (!isSingleTask) waiterMutex.unlock();
1442 }
1443
wait()1444 void wait()
1445 {
1446 if (!isSingleTask) workerCondition.wait();
1447 }
1448
notify()1449 void notify()
1450 {
1451 if (!isSingleTask) workerCondition.notify();
1452 }
1453
notifyAll()1454 void notifyAll()
1455 {
1456 if (!isSingleTask) workerCondition.notifyAll();
1457 }
1458
waitUntilCompletion()1459 void waitUntilCompletion()
1460 {
1461 if (isSingleTask)
1462 {
1463 singleTaskThread.join();
1464 }
1465 else
1466 {
1467 waiterCondition.wait();
1468 }
1469 }
1470
notifyWaiters()1471 void notifyWaiters()
1472 {
1473 if (!isSingleTask) waiterCondition.notifyAll();
1474 }
1475
1476 // Private constructor for creating dummy pools that only have one thread,
1477 // only execute one Task, and then terminate. This is used for
1478 // Task.executeInNewThread().
1479 this(AbstractTask* task, int priority = int.max)
1480 {
1481 assert(task);
1482
1483 // Dummy value, not used.
1484 instanceStartIndex = 0;
1485
1486 this.isSingleTask = true;
1487 task.taskStatus = TaskStatus.inProgress;
1488 this.head = task;
1489 singleTaskThread = new Thread(&doSingleTask);
1490 singleTaskThread.start();
1491
1492 // Disabled until writing code to support
1493 // running thread with specified priority
1494 // See https://issues.dlang.org/show_bug.cgi?id=8960
1495
1496 /*if (priority != int.max)
1497 {
1498 singleTaskThread.priority = priority;
1499 }*/
1500 }
1501
1502 public:
1503 // This is used in parallel_algorithm but is too unstable to document
1504 // as public API.
defaultWorkUnitSize(size_t rangeLen)1505 size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1506 {
1507 import std.algorithm.comparison : max;
1508
1509 if (this.size == 0)
1510 {
1511 return rangeLen;
1512 }
1513
1514 immutable size_t eightSize = 4 * (this.size + 1);
1515 auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1516 return max(ret, 1);
1517 }
1518
1519 /**
1520 Default constructor that initializes a `TaskPool` with
1521 `totalCPUs` - 1 worker threads. The minus 1 is included because the
1522 main thread will also be available to do work.
1523
1524 Note: On single-core machines, the primitives provided by `TaskPool`
1525 operate transparently in single-threaded mode.
1526 */
this()1527 this() @trusted
1528 {
1529 this(totalCPUs - 1);
1530 }
1531
1532 /**
1533 Allows for custom number of worker threads.
1534 */
this(size_t nWorkers)1535 this(size_t nWorkers) @trusted
1536 {
1537 synchronized(typeid(TaskPool))
1538 {
1539 instanceStartIndex = nextInstanceIndex;
1540
1541 // The first worker thread to be initialized will have this index,
1542 // and will increment it. The second worker to be initialized will
1543 // have this index plus 1.
1544 nextThreadIndex = instanceStartIndex;
1545 nextInstanceIndex += nWorkers;
1546 }
1547
1548 queueMutex = new Mutex(this);
1549 waiterMutex = new Mutex();
1550 workerCondition = new Condition(queueMutex);
1551 waiterCondition = new Condition(waiterMutex);
1552
1553 pool = new ParallelismThread[nWorkers];
1554 foreach (ref poolThread; pool)
1555 {
1556 poolThread = new ParallelismThread(&startWorkLoop);
1557 poolThread.pool = this;
1558 poolThread.start();
1559 }
1560 }
1561
1562 /**
1563 Implements a parallel foreach loop over a range. This works by implicitly
1564 creating and submitting one `Task` to the `TaskPool` for each worker
1565 thread. A work unit is a set of consecutive elements of `range` to
1566 be processed by a worker thread between communication with any other
1567 thread. The number of elements processed per work unit is controlled by the
1568 `workUnitSize` parameter. Smaller work units provide better load
1569 balancing, but larger work units avoid the overhead of communicating
1570 with other threads frequently to fetch the next work unit. Large work
1571 units also avoid false sharing in cases where the range is being modified.
1572 The less time a single iteration of the loop takes, the larger
1573 `workUnitSize` should be. For very expensive loop bodies,
1574 `workUnitSize` should be 1. An overload that chooses a default work
1575 unit size is also available.
1576
1577 Example:
1578 ---
1579 // Find the logarithm of every number from 1 to
1580 // 10_000_000 in parallel.
1581 auto logs = new double[10_000_000];
1582
1583 // Parallel foreach works with or without an index
1584 // variable. It can be iterate by ref if range.front
1585 // returns by ref.
1586
1587 // Iterate over logs using work units of size 100.
1588 foreach (i, ref elem; taskPool.parallel(logs, 100))
1589 {
1590 elem = log(i + 1.0);
1591 }
1592
1593 // Same thing, but use the default work unit size.
1594 //
1595 // Timings on an Athlon 64 X2 dual core machine:
1596 //
1597 // Parallel foreach: 388 milliseconds
1598 // Regular foreach: 619 milliseconds
1599 foreach (i, ref elem; taskPool.parallel(logs))
1600 {
1601 elem = log(i + 1.0);
1602 }
1603 ---
1604
1605 Notes:
1606
1607 The memory usage of this implementation is guaranteed to be constant
1608 in `range.length`.
1609
1610 Breaking from a parallel foreach loop via a break, labeled break,
1611 labeled continue, return or goto statement throws a
1612 `ParallelForeachError`.
1613
1614 In the case of non-random access ranges, parallel foreach buffers lazily
1615 to an array of size `workUnitSize` before executing the parallel portion
1616 of the loop. The exception is that, if a parallel foreach is executed
1617 over a range returned by `asyncBuf` or `map`, the copying is elided
1618 and the buffers are simply swapped. In this case `workUnitSize` is
1619 ignored and the work unit size is set to the buffer size of `range`.
1620
1621 A memory barrier is guaranteed to be executed on exit from the loop,
1622 so that results produced by all threads are visible in the calling thread.
1623
1624 $(B Exception Handling):
1625
1626 When at least one exception is thrown from inside a parallel foreach loop,
1627 the submission of additional `Task` objects is terminated as soon as
1628 possible, in a non-deterministic manner. All executing or
1629 enqueued work units are allowed to complete. Then, all exceptions that
1630 were thrown by any work unit are chained using `Throwable.next` and
1631 rethrown. The order of the exception chaining is non-deterministic.
1632 */
1633 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1634 {
1635 import std.exception : enforce;
1636 enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1637 alias RetType = ParallelForeach!R;
1638 return RetType(this, range, workUnitSize);
1639 }
1640
1641
1642 /// Ditto
1643 ParallelForeach!R parallel(R)(R range)
1644 {
1645 static if (hasLength!R)
1646 {
1647 // Default work unit size is such that we would use 4x as many
1648 // slots as are in this thread pool.
1649 size_t workUnitSize = defaultWorkUnitSize(range.length);
1650 return parallel(range, workUnitSize);
1651 }
1652 else
1653 {
1654 // Just use a really, really dumb guess if the user is too lazy to
1655 // specify.
1656 return parallel(range, 512);
1657 }
1658 }
1659
1660 ///
amap(functions...)1661 template amap(functions...)
1662 {
1663 /**
1664 Eager parallel map. The eagerness of this function means it has less
1665 overhead than the lazily evaluated `TaskPool.map` and should be
1666 preferred where the memory requirements of eagerness are acceptable.
1667 `functions` are the functions to be evaluated, passed as template
1668 alias parameters in a style similar to
1669 $(REF map, std,algorithm,iteration).
1670 The first argument must be a random access range. For performance
1671 reasons, amap will assume the range elements have not yet been
1672 initialized. Elements will be overwritten without calling a destructor
1673 nor doing an assignment. As such, the range must not contain meaningful
1674 data$(DDOC_COMMENT not a section): either un-initialized objects, or
1675 objects in their `.init` state.
1676
1677 ---
1678 auto numbers = iota(100_000_000.0);
1679
1680 // Find the square roots of numbers.
1681 //
1682 // Timings on an Athlon 64 X2 dual core machine:
1683 //
1684 // Parallel eager map: 0.802 s
1685 // Equivalent serial implementation: 1.768 s
1686 auto squareRoots = taskPool.amap!sqrt(numbers);
1687 ---
1688
1689 Immediately after the range argument, an optional work unit size argument
1690 may be provided. Work units as used by `amap` are identical to those
1691 defined for parallel foreach. If no work unit size is provided, the
1692 default work unit size is used.
1693
1694 ---
1695 // Same thing, but make work unit size 100.
1696 auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1697 ---
1698
1699 An output range for returning the results may be provided as the last
1700 argument. If one is not provided, an array of the proper type will be
1701 allocated on the garbage collected heap. If one is provided, it must be a
1702 random access range with assignable elements, must have reference
1703 semantics with respect to assignment to its elements, and must have the
1704 same length as the input range. Writing to adjacent elements from
1705 different threads must be safe.
1706
1707 ---
1708 // Same thing, but explicitly allocate an array
1709 // to return the results in. The element type
1710 // of the array may be either the exact type
1711 // returned by functions or an implicit conversion
1712 // target.
1713 auto squareRoots = new float[numbers.length];
1714 taskPool.amap!sqrt(numbers, squareRoots);
1715
1716 // Multiple functions, explicit output range, and
1717 // explicit work unit size.
1718 auto results = new Tuple!(float, real)[numbers.length];
1719 taskPool.amap!(sqrt, log)(numbers, 100, results);
1720 ---
1721
1722 Note:
1723
1724 A memory barrier is guaranteed to be executed after all results are written
1725 but before returning so that results produced by all threads are visible
1726 in the calling thread.
1727
1728 Tips:
1729
1730 To perform the mapping operation in place, provide the same range for the
1731 input and output range.
1732
1733 To parallelize the copying of a range with expensive to evaluate elements
1734 to an array, pass an identity function (a function that just returns
1735 whatever argument is provided to it) to `amap`.
1736
1737 $(B Exception Handling):
1738
1739 When at least one exception is thrown from inside the map functions,
1740 the submission of additional `Task` objects is terminated as soon as
1741 possible, in a non-deterministic manner. All currently executing or
1742 enqueued work units are allowed to complete. Then, all exceptions that
1743 were thrown from any work unit are chained using `Throwable.next` and
1744 rethrown. The order of the exception chaining is non-deterministic.
1745 */
1746 auto amap(Args...)(Args args)
1747 if (isRandomAccessRange!(Args[0]))
1748 {
1749 import core.internal.lifetime : emplaceRef;
1750
1751 alias fun = adjoin!(staticMap!(unaryFun, functions));
1752
1753 alias range = args[0];
1754 immutable len = range.length;
1755
1756 static if (
1757 Args.length > 1 &&
1758 randAssignable!(Args[$ - 1]) &&
1759 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1760 )
1761 {
1762 import std.conv : text;
1763 import std.exception : enforce;
1764
1765 alias buf = args[$ - 1];
1766 alias args2 = args[0..$ - 1];
1767 alias Args2 = Args[0..$ - 1];
1768 enforce(buf.length == len,
1769 text("Can't use a user supplied buffer that's the wrong ",
1770 "size. (Expected :", len, " Got: ", buf.length));
1771 }
1772 else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1773 {
1774 static assert(0, "Wrong buffer type.");
1775 }
1776 else
1777 {
1778 import std.array : uninitializedArray;
1779
1780 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1781 alias args2 = args;
1782 alias Args2 = Args;
1783 }
1784
1785 if (!len) return buf;
1786
1787 static if (isIntegral!(Args2[$ - 1]))
1788 {
1789 static assert(args2.length == 2);
1790 auto workUnitSize = cast(size_t) args2[1];
1791 }
1792 else
1793 {
1794 static assert(args2.length == 1, Args);
1795 auto workUnitSize = defaultWorkUnitSize(range.length);
1796 }
1797
1798 alias R = typeof(range);
1799
1800 if (workUnitSize > len)
1801 {
1802 workUnitSize = len;
1803 }
1804
1805 // Handle as a special case:
1806 if (size == 0)
1807 {
1808 size_t index = 0;
1809 foreach (elem; range)
1810 {
1811 emplaceRef(buf[index++], fun(elem));
1812 }
1813 return buf;
1814 }
1815
1816 // Effectively -1: chunkIndex + 1 == 0:
1817 shared size_t workUnitIndex = size_t.max;
1818 shared bool shouldContinue = true;
1819
1820 void doIt()
1821 {
1822 import std.algorithm.comparison : min;
1823
1824 scope(failure)
1825 {
1826 // If an exception is thrown, all threads should bail.
1827 atomicStore(shouldContinue, false);
1828 }
1829
1830 while (atomicLoad(shouldContinue))
1831 {
1832 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1833 immutable start = workUnitSize * myUnitIndex;
1834 if (start >= len)
1835 {
1836 atomicStore(shouldContinue, false);
1837 break;
1838 }
1839
1840 immutable end = min(len, start + workUnitSize);
1841
1842 static if (hasSlicing!R)
1843 {
1844 auto subrange = range[start .. end];
1845 foreach (i; start .. end)
1846 {
1847 emplaceRef(buf[i], fun(subrange.front));
1848 subrange.popFront();
1849 }
1850 }
1851 else
1852 {
1853 foreach (i; start .. end)
1854 {
1855 emplaceRef(buf[i], fun(range[i]));
1856 }
1857 }
1858 }
1859 }
1860
1861 submitAndExecute(this, &doIt);
1862 return buf;
1863 }
1864 }
1865
1866 ///
map(functions...)1867 template map(functions...)
1868 {
1869 /**
1870 A semi-lazy parallel map that can be used for pipelining. The map
1871 functions are evaluated for the first `bufSize` elements and stored in a
1872 buffer and made available to `popFront`. Meanwhile, in the
1873 background a second buffer of the same size is filled. When the first
1874 buffer is exhausted, it is swapped with the second buffer and filled while
1875 the values from what was originally the second buffer are read. This
1876 implementation allows for elements to be written to the buffer without
1877 the need for atomic operations or synchronization for each write, and
1878 enables the mapping function to be evaluated efficiently in parallel.
1879
1880 `map` has more overhead than the simpler procedure used by `amap`
1881 but avoids the need to keep all results in memory simultaneously and works
1882 with non-random access ranges.
1883
1884 Params:
1885
1886 source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives)
1887 to be mapped. If `source` is not random
1888 access it will be lazily buffered to an array of size `bufSize` before
1889 the map function is evaluated. (For an exception to this rule, see Notes.)
1890
1891 bufSize = The size of the buffer to store the evaluated elements.
1892
1893 workUnitSize = The number of elements to evaluate in a single
1894 `Task`. Must be less than or equal to `bufSize`, and
1895 should be a fraction of `bufSize` such that all worker threads can be
1896 used. If the default of size_t.max is used, workUnitSize will be set to
1897 the pool-wide default.
1898
1899 Returns: An input range representing the results of the map. This range
1900 has a length iff `source` has a length.
1901
1902 Notes:
1903
1904 If a range returned by `map` or `asyncBuf` is used as an input to
1905 `map`, then as an optimization the copying from the output buffer
1906 of the first range to the input buffer of the second range is elided, even
1907 though the ranges returned by `map` and `asyncBuf` are non-random
1908 access ranges. This means that the `bufSize` parameter passed to the
1909 current call to `map` will be ignored and the size of the buffer
1910 will be the buffer size of `source`.
1911
1912 Example:
1913 ---
1914 // Pipeline reading a file, converting each line
1915 // to a number, taking the logarithms of the numbers,
1916 // and performing the additions necessary to find
1917 // the sum of the logarithms.
1918
1919 auto lineRange = File("numberList.txt").byLine();
1920 auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1921 auto nums = taskPool.map!(to!double)(dupedLines);
1922 auto logs = taskPool.map!log10(nums);
1923
1924 double sum = 0;
1925 foreach (elem; logs)
1926 {
1927 sum += elem;
1928 }
1929 ---
1930
1931 $(B Exception Handling):
1932
1933 Any exceptions thrown while iterating over `source`
1934 or computing the map function are re-thrown on a call to `popFront` or,
1935 if thrown during construction, are simply allowed to propagate to the
1936 caller. In the case of exceptions thrown while computing the map function,
1937 the exceptions are chained as in `TaskPool.amap`.
1938 */
1939 auto
1940 map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1941 if (isInputRange!S)
1942 {
1943 import std.exception : enforce;
1944
1945 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1946 "Work unit size must be smaller than buffer size.");
1947 alias fun = adjoin!(staticMap!(unaryFun, functions));
1948
1949 static final class Map
1950 {
1951 // This is a class because the task needs to be located on the
1952 // heap and in the non-random access case source needs to be on
1953 // the heap, too.
1954
1955 private:
1956 enum bufferTrick = is(typeof(source.buf1)) &&
1957 is(typeof(source.bufPos)) &&
1958 is(typeof(source.doBufSwap()));
1959
1960 alias E = MapType!(S, functions);
1961 E[] buf1, buf2;
1962 S source;
1963 TaskPool pool;
1964 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1965 size_t workUnitSize;
1966 size_t bufPos;
1967 bool lastTaskWaited;
1968
1969 static if (isRandomAccessRange!S)
1970 {
1971 alias FromType = S;
1972
1973 void popSource()
1974 {
1975 import std.algorithm.comparison : min;
1976
1977 static if (__traits(compiles, source[0 .. source.length]))
1978 {
1979 source = source[min(buf1.length, source.length)..source.length];
1980 }
1981 else static if (__traits(compiles, source[0..$]))
1982 {
1983 source = source[min(buf1.length, source.length)..$];
1984 }
1985 else
1986 {
1987 static assert(0, "S must have slicing for Map."
1988 ~ " " ~ S.stringof ~ " doesn't.");
1989 }
1990 }
1991 }
1992 else static if (bufferTrick)
1993 {
1994 // Make sure we don't have the buffer recycling overload of
1995 // asyncBuf.
1996 static if (
1997 is(typeof(source.source)) &&
1998 isRoundRobin!(typeof(source.source))
1999 )
2000 {
2001 static assert(0, "Cannot execute a parallel map on " ~
2002 "the buffer recycling overload of asyncBuf."
2003 );
2004 }
2005
2006 alias FromType = typeof(source.buf1);
2007 FromType from;
2008
2009 // Just swap our input buffer with source's output buffer.
2010 // No need to copy element by element.
2011 FromType dumpToFrom()
2012 {
2013 import std.algorithm.mutation : swap;
2014
2015 assert(source.buf1.length <= from.length);
2016 from.length = source.buf1.length;
2017 swap(source.buf1, from);
2018
2019 // Just in case this source has been popped before
2020 // being sent to map:
2021 from = from[source.bufPos..$];
2022
2023 static if (is(typeof(source._length)))
2024 {
2025 source._length -= (from.length - source.bufPos);
2026 }
2027
2028 source.doBufSwap();
2029
2030 return from;
2031 }
2032 }
2033 else
2034 {
2035 alias FromType = ElementType!S[];
2036
2037 // The temporary array that data is copied to before being
2038 // mapped.
2039 FromType from;
2040
2041 FromType dumpToFrom()
2042 {
2043 assert(from !is null);
2044
2045 size_t i;
2046 for (; !source.empty && i < from.length; source.popFront())
2047 {
2048 from[i++] = source.front;
2049 }
2050
2051 from = from[0 .. i];
2052 return from;
2053 }
2054 }
2055
2056 static if (hasLength!S)
2057 {
2058 size_t _length;
2059
2060 public @property size_t length() const @safe pure nothrow
2061 {
2062 return _length;
2063 }
2064 }
2065
2066 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
2067 {
2068 static if (bufferTrick)
2069 {
2070 bufSize = source.buf1.length;
2071 }
2072
2073 buf1.length = bufSize;
2074 buf2.length = bufSize;
2075
2076 static if (!isRandomAccessRange!S)
2077 {
2078 from.length = bufSize;
2079 }
2080
2081 this.workUnitSize = (workUnitSize == size_t.max) ?
2082 pool.defaultWorkUnitSize(bufSize) : workUnitSize;
2083 this.source = source;
2084 this.pool = pool;
2085
2086 static if (hasLength!S)
2087 {
2088 _length = source.length;
2089 }
2090
2091 buf1 = fillBuf(buf1);
2092 submitBuf2();
2093 }
2094
2095 // The from parameter is a dummy and ignored in the random access
2096 // case.
2097 E[] fillBuf(E[] buf)
2098 {
2099 import std.algorithm.comparison : min;
2100
2101 static if (isRandomAccessRange!S)
2102 {
2103 import std.range : take;
2104 auto toMap = take(source, buf.length);
2105 scope(success) popSource();
2106 }
2107 else
2108 {
2109 auto toMap = dumpToFrom();
2110 }
2111
2112 buf = buf[0 .. min(buf.length, toMap.length)];
2113
2114 // Handle as a special case:
2115 if (pool.size == 0)
2116 {
2117 size_t index = 0;
2118 foreach (elem; toMap)
2119 {
2120 buf[index++] = fun(elem);
2121 }
2122 return buf;
2123 }
2124
2125 pool.amap!functions(toMap, workUnitSize, buf);
2126
2127 return buf;
2128 }
2129
2130 void submitBuf2()
2131 in
2132 {
2133 assert(nextBufTask.prev is null);
2134 assert(nextBufTask.next is null);
2135 }
2136 do
2137 {
2138 // Hack to reuse the task object.
2139
2140 nextBufTask = typeof(nextBufTask).init;
2141 nextBufTask._args[0] = &fillBuf;
2142 nextBufTask._args[1] = buf2;
2143 pool.put(nextBufTask);
2144 }
2145
2146 void doBufSwap()
2147 {
2148 if (lastTaskWaited)
2149 {
2150 // Then the source is empty. Signal it here.
2151 buf1 = null;
2152 buf2 = null;
2153
2154 static if (!isRandomAccessRange!S)
2155 {
2156 from = null;
2157 }
2158
2159 return;
2160 }
2161
2162 buf2 = buf1;
2163 buf1 = nextBufTask.yieldForce;
2164 bufPos = 0;
2165
2166 if (source.empty)
2167 {
2168 lastTaskWaited = true;
2169 }
2170 else
2171 {
2172 submitBuf2();
2173 }
2174 }
2175
2176 public:
2177 @property auto front()
2178 {
2179 return buf1[bufPos];
2180 }
2181
2182 void popFront()
2183 {
2184 static if (hasLength!S)
2185 {
2186 _length--;
2187 }
2188
2189 bufPos++;
2190 if (bufPos >= buf1.length)
2191 {
2192 doBufSwap();
2193 }
2194 }
2195
2196 static if (isInfinite!S)
2197 {
2198 enum bool empty = false;
2199 }
2200 else
2201 {
2202
2203 bool empty() const @property
2204 {
2205 // popFront() sets this when source is empty
2206 return buf1.length == 0;
2207 }
2208 }
2209 }
2210 return new Map(source, bufSize, workUnitSize, this);
2211 }
2212 }
2213
2214 /**
2215 Given a `source` range that is expensive to iterate over, returns an
2216 $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that
2217 asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread,
2218 while making previously buffered elements from a second buffer, also of size
2219 `bufSize`, available via the range interface of the returned
2220 object. The returned range has a length iff `hasLength!S`.
2221 `asyncBuf` is useful, for example, when performing expensive operations
2222 on the elements of ranges that represent data on a disk or network.
2223
2224 Example:
2225 ---
2226 import std.conv, std.stdio;
2227
2228 void main()
2229 {
2230 // Fetch lines of a file in a background thread
2231 // while processing previously fetched lines,
2232 // dealing with byLine's buffer recycling by
2233 // eagerly duplicating every line.
2234 auto lines = File("foo.txt").byLine();
2235 auto duped = std.algorithm.map!"a.idup"(lines);
2236
2237 // Fetch more lines in the background while we
2238 // process the lines already read into memory
2239 // into a matrix of doubles.
2240 double[][] matrix;
2241 auto asyncReader = taskPool.asyncBuf(duped);
2242
2243 foreach (line; asyncReader)
2244 {
2245 auto ls = line.split("\t");
2246 matrix ~= to!(double[])(ls);
2247 }
2248 }
2249 ---
2250
2251 $(B Exception Handling):
2252
2253 Any exceptions thrown while iterating over `source` are re-thrown on a
2254 call to `popFront` or, if thrown during construction, simply
2255 allowed to propagate to the caller.
2256 */
2257 auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
2258 {
2259 static final class AsyncBuf
2260 {
2261 // This is a class because the task and source both need to be on
2262 // the heap.
2263
2264 // The element type of S.
2265 alias E = ElementType!S; // Needs to be here b/c of forward ref bugs.
2266
2267 private:
2268 E[] buf1, buf2;
2269 S source;
2270 TaskPool pool;
2271 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2272 size_t bufPos;
2273 bool lastTaskWaited;
2274
2275 static if (hasLength!S)
2276 {
2277 size_t _length;
2278
2279 // Available if hasLength!S.
length()2280 public @property size_t length() const @safe pure nothrow
2281 {
2282 return _length;
2283 }
2284 }
2285
this(S source,size_t bufSize,TaskPool pool)2286 this(S source, size_t bufSize, TaskPool pool)
2287 {
2288 buf1.length = bufSize;
2289 buf2.length = bufSize;
2290
2291 this.source = source;
2292 this.pool = pool;
2293
2294 static if (hasLength!S)
2295 {
2296 _length = source.length;
2297 }
2298
2299 buf1 = fillBuf(buf1);
2300 submitBuf2();
2301 }
2302
fillBuf(E[]buf)2303 E[] fillBuf(E[] buf)
2304 {
2305 assert(buf !is null);
2306
2307 size_t i;
2308 for (; !source.empty && i < buf.length; source.popFront())
2309 {
2310 buf[i++] = source.front;
2311 }
2312
2313 buf = buf[0 .. i];
2314 return buf;
2315 }
2316
submitBuf2()2317 void submitBuf2()
2318 in
2319 {
2320 assert(nextBufTask.prev is null);
2321 assert(nextBufTask.next is null);
2322 }
2323 do
2324 {
2325 // Hack to reuse the task object.
2326
2327 nextBufTask = typeof(nextBufTask).init;
2328 nextBufTask._args[0] = &fillBuf;
2329 nextBufTask._args[1] = buf2;
2330 pool.put(nextBufTask);
2331 }
2332
doBufSwap()2333 void doBufSwap()
2334 {
2335 if (lastTaskWaited)
2336 {
2337 // Then source is empty. Signal it here.
2338 buf1 = null;
2339 buf2 = null;
2340 return;
2341 }
2342
2343 buf2 = buf1;
2344 buf1 = nextBufTask.yieldForce;
2345 bufPos = 0;
2346
2347 if (source.empty)
2348 {
2349 lastTaskWaited = true;
2350 }
2351 else
2352 {
2353 submitBuf2();
2354 }
2355 }
2356
2357 public:
front()2358 E front() @property
2359 {
2360 return buf1[bufPos];
2361 }
2362
popFront()2363 void popFront()
2364 {
2365 static if (hasLength!S)
2366 {
2367 _length--;
2368 }
2369
2370 bufPos++;
2371 if (bufPos >= buf1.length)
2372 {
2373 doBufSwap();
2374 }
2375 }
2376
2377 static if (isInfinite!S)
2378 {
2379 enum bool empty = false;
2380 }
2381
2382 else
2383 {
2384 ///
empty()2385 bool empty() @property
2386 {
2387 // popFront() sets this when source is empty:
2388 return buf1.length == 0;
2389 }
2390 }
2391 }
2392 return new AsyncBuf(source, bufSize, this);
2393 }
2394
2395 /**
2396 Given a callable object `next` that writes to a user-provided buffer and
2397 a second callable object `empty` that determines whether more data is
2398 available to write via `next`, returns an input range that
2399 asynchronously calls `next` with a set of size `nBuffers` of buffers
2400 and makes the results available in the order they were obtained via the
2401 input range interface of the returned object. Similarly to the
2402 input range overload of `asyncBuf`, the first half of the buffers
2403 are made available via the range interface while the second half are
2404 filled and vice-versa.
2405
2406 Params:
2407
2408 next = A callable object that takes a single argument that must be an array
2409 with mutable elements. When called, `next` writes data to
2410 the array provided by the caller.
2411
2412 empty = A callable object that takes no arguments and returns a type
2413 implicitly convertible to `bool`. This is used to signify
2414 that no more data is available to be obtained by calling `next`.
2415
2416 initialBufSize = The initial size of each buffer. If `next` takes its
2417 array by reference, it may resize the buffers.
2418
2419 nBuffers = The number of buffers to cycle through when calling `next`.
2420
2421 Example:
2422 ---
2423 // Fetch lines of a file in a background
2424 // thread while processing previously fetched
2425 // lines, without duplicating any lines.
2426 auto file = File("foo.txt");
2427
2428 void next(ref char[] buf)
2429 {
2430 file.readln(buf);
2431 }
2432
2433 // Fetch more lines in the background while we
2434 // process the lines already read into memory
2435 // into a matrix of doubles.
2436 double[][] matrix;
2437 auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2438
2439 foreach (line; asyncReader)
2440 {
2441 auto ls = line.split("\t");
2442 matrix ~= to!(double[])(ls);
2443 }
2444 ---
2445
2446 $(B Exception Handling):
2447
2448 Any exceptions thrown while iterating over `range` are re-thrown on a
2449 call to `popFront`.
2450
2451 Warning:
2452
2453 Using the range returned by this function in a parallel foreach loop
2454 will not work because buffers may be overwritten while the task that
2455 processes them is in queue. This is checked for at compile time
2456 and will result in a static assertion failure.
2457 */
2458 auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2459 if (is(typeof(C2.init()) : bool) &&
2460 Parameters!C1.length == 1 &&
2461 Parameters!C2.length == 0 &&
2462 isArray!(Parameters!C1[0])
2463 ) {
2464 auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2465 return asyncBuf(roundRobin, nBuffers / 2);
2466 }
2467
2468 ///
reduce(functions...)2469 template reduce(functions...)
2470 {
2471 /**
2472 Parallel reduce on a random access range. Except as otherwise noted,
2473 usage is similar to $(REF _reduce, std,algorithm,iteration). There is
2474 also $(LREF fold) which does the same thing with a different parameter
2475 order.
2476
2477 This function works by splitting the range to be reduced into work
2478 units, which are slices to be reduced in parallel. Once the results
2479 from all work units are computed, a final serial reduction is performed
2480 on these results to compute the final answer. Therefore, care must be
2481 taken to choose the seed value appropriately.
2482
2483 Because the reduction is being performed in parallel, `functions`
2484 must be associative. For notational simplicity, let # be an
2485 infix operator representing `functions`. Then, (a # b) # c must equal
2486 a # (b # c). Floating point addition is not associative
2487 even though addition in exact arithmetic is. Summing floating
2488 point numbers using this function may give different results than summing
2489 serially. However, for many practical purposes floating point addition
2490 can be treated as associative.
2491
2492 Note that, since `functions` are assumed to be associative,
2493 additional optimizations are made to the serial portion of the reduction
2494 algorithm. These take advantage of the instruction level parallelism of
2495 modern CPUs, in addition to the thread-level parallelism that the rest
2496 of this module exploits. This can lead to better than linear speedups
2497 relative to $(REF _reduce, std,algorithm,iteration), especially for
2498 fine-grained benchmarks like dot products.
2499
2500 An explicit seed may be provided as the first argument. If
2501 provided, it is used as the seed for all work units and for the final
2502 reduction of results from all work units. Therefore, if it is not the
2503 identity value for the operation being performed, results may differ
2504 from those generated by $(REF _reduce, std,algorithm,iteration) or
2505 depending on how many work units are used. The next argument must be
2506 the range to be reduced.
2507 ---
2508 // Find the sum of squares of a range in parallel, using
2509 // an explicit seed.
2510 //
2511 // Timings on an Athlon 64 X2 dual core machine:
2512 //
2513 // Parallel reduce: 72 milliseconds
2514 // Using std.algorithm.reduce instead: 181 milliseconds
2515 auto nums = iota(10_000_000.0f);
2516 auto sumSquares = taskPool.reduce!"a + b"(
2517 0.0, std.algorithm.map!"a * a"(nums)
2518 );
2519 ---
2520
2521 If no explicit seed is provided, the first element of each work unit
2522 is used as a seed. For the final reduction, the result from the first
2523 work unit is used as the seed.
2524 ---
2525 // Find the sum of a range in parallel, using the first
2526 // element of each work unit as the seed.
2527 auto sum = taskPool.reduce!"a + b"(nums);
2528 ---
2529
2530 An explicit work unit size may be specified as the last argument.
2531 Specifying too small a work unit size will effectively serialize the
2532 reduction, as the final reduction of the result of each work unit will
2533 dominate computation time. If `TaskPool.size` for this instance
2534 is zero, this parameter is ignored and one work unit is used.
2535 ---
2536 // Use a work unit size of 100.
2537 auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2538
2539 // Work unit size of 100 and explicit seed.
2540 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2541 ---
2542
2543 Parallel reduce supports multiple functions, like
2544 `std.algorithm.reduce`.
2545 ---
2546 // Find both the min and max of nums.
2547 auto minMax = taskPool.reduce!(min, max)(nums);
2548 assert(minMax[0] == reduce!min(nums));
2549 assert(minMax[1] == reduce!max(nums));
2550 ---
2551
2552 $(B Exception Handling):
2553
2554 After this function is finished executing, any exceptions thrown
2555 are chained together via `Throwable.next` and rethrown. The chaining
2556 order is non-deterministic.
2557
2558 See_Also:
2559
2560 $(LREF fold) is functionally equivalent to $(LREF _reduce) except the
2561 range parameter comes first and there is no need to use
2562 $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds.
2563 */
2564 auto reduce(Args...)(Args args)
2565 {
2566 import core.exception : OutOfMemoryError;
2567 import core.internal.lifetime : emplaceRef;
2568 import std.exception : enforce;
2569
2570 alias fun = reduceAdjoin!functions;
2571 alias finishFun = reduceFinish!functions;
2572
2573 static if (isIntegral!(Args[$ - 1]))
2574 {
2575 size_t workUnitSize = cast(size_t) args[$ - 1];
2576 alias args2 = args[0..$ - 1];
2577 alias Args2 = Args[0..$ - 1];
2578 }
2579 else
2580 {
2581 alias args2 = args;
2582 alias Args2 = Args;
2583 }
2584
2585 auto makeStartValue(Type)(Type e)
2586 {
2587 static if (functions.length == 1)
2588 {
2589 return e;
2590 }
2591 else
2592 {
2593 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2594 foreach (i, T; seed.Types)
2595 {
2596 emplaceRef(seed.expand[i], e);
2597 }
2598
2599 return seed;
2600 }
2601 }
2602
2603 static if (args2.length == 2)
2604 {
2605 static assert(isInputRange!(Args2[1]));
2606 alias range = args2[1];
2607 alias seed = args2[0];
2608 enum explicitSeed = true;
2609
2610 static if (!is(typeof(workUnitSize)))
2611 {
2612 size_t workUnitSize = defaultWorkUnitSize(range.length);
2613 }
2614 }
2615 else
2616 {
2617 static assert(args2.length == 1);
2618 alias range = args2[0];
2619
2620 static if (!is(typeof(workUnitSize)))
2621 {
2622 size_t workUnitSize = defaultWorkUnitSize(range.length);
2623 }
2624
2625 enforce(!range.empty,
2626 "Cannot reduce an empty range with first element as start value.");
2627
2628 auto seed = makeStartValue(range.front);
2629 enum explicitSeed = false;
2630 range.popFront();
2631 }
2632
2633 alias E = typeof(seed);
2634 alias R = typeof(range);
2635
2636 E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2637 {
2638 // This is for exploiting instruction level parallelism by
2639 // using multiple accumulator variables within each thread,
2640 // since we're assuming functions are associative anyhow.
2641
2642 // This is so that loops can be unrolled automatically.
2643 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2644 enum nILP = ilpTuple.length;
2645 immutable subSize = (upperBound - lowerBound) / nILP;
2646
2647 if (subSize <= 1)
2648 {
2649 // Handle as a special case.
2650 static if (explicitSeed)
2651 {
2652 E result = seed;
2653 }
2654 else
2655 {
2656 E result = makeStartValue(range[lowerBound]);
2657 lowerBound++;
2658 }
2659
2660 foreach (i; lowerBound .. upperBound)
2661 {
2662 result = fun(result, range[i]);
2663 }
2664
2665 return result;
2666 }
2667
2668 assert(subSize > 1);
2669 E[nILP] results;
2670 size_t[nILP] offsets;
2671
2672 foreach (i; ilpTuple)
2673 {
2674 offsets[i] = lowerBound + subSize * i;
2675
2676 static if (explicitSeed)
2677 {
2678 results[i] = seed;
2679 }
2680 else
2681 {
2682 results[i] = makeStartValue(range[offsets[i]]);
2683 offsets[i]++;
2684 }
2685 }
2686
2687 immutable nLoop = subSize - (!explicitSeed);
2688 foreach (i; 0 .. nLoop)
2689 {
2690 foreach (j; ilpTuple)
2691 {
2692 results[j] = fun(results[j], range[offsets[j]]);
2693 offsets[j]++;
2694 }
2695 }
2696
2697 // Finish the remainder.
2698 foreach (i; nILP * subSize + lowerBound .. upperBound)
2699 {
2700 results[$ - 1] = fun(results[$ - 1], range[i]);
2701 }
2702
2703 foreach (i; ilpTuple[1..$])
2704 {
2705 results[0] = finishFun(results[0], results[i]);
2706 }
2707
2708 return results[0];
2709 }
2710
2711 immutable len = range.length;
2712 if (len == 0)
2713 {
2714 return seed;
2715 }
2716
2717 if (this.size == 0)
2718 {
2719 return finishFun(seed, reduceOnRange(range, 0, len));
2720 }
2721
2722 // Unlike the rest of the functions here, I can't use the Task object
2723 // recycling trick here because this has to work on non-commutative
2724 // operations. After all the tasks are done executing, fun() has to
2725 // be applied on the results of these to get a final result, but
2726 // it can't be evaluated out of order.
2727
2728 if (workUnitSize > len)
2729 {
2730 workUnitSize = len;
2731 }
2732
2733 immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2734 assert(nWorkUnits * workUnitSize >= len);
2735
2736 alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2737 RTask[] tasks;
2738
2739 // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753
2740 // Use a fixed buffer backed by malloc().
2741 enum maxStack = 2_048;
2742 byte[maxStack] buf = void;
2743 immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2744
2745 import core.stdc.stdlib : malloc, free;
2746 if (nBytesNeeded <= maxStack)
2747 {
2748 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2749 }
2750 else
2751 {
2752 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2753 if (!ptr)
2754 {
2755 throw new OutOfMemoryError(
2756 "Out of memory in std.parallelism."
2757 );
2758 }
2759
2760 tasks = ptr[0 .. nWorkUnits];
2761 }
2762
2763 scope(exit)
2764 {
2765 if (nBytesNeeded > maxStack)
2766 {
2767 free(tasks.ptr);
2768 }
2769 }
2770
2771 // Hack to take the address of a nested function w/o
2772 // making a closure.
2773 static auto scopedAddress(D)(scope D del) @system
2774 {
2775 auto tmp = del;
2776 return tmp;
2777 }
2778
2779 size_t curPos = 0;
2780 void useTask(ref RTask task)
2781 {
2782 import std.algorithm.comparison : min;
2783 import core.lifetime : emplace;
2784
2785 // Private constructor, so can't feed it's arguments directly
2786 // to emplace
2787 emplace(&task, RTask
2788 (
2789 scopedAddress(&reduceOnRange),
2790 range,
2791 curPos, // lower bound.
2792 cast() min(len, curPos + workUnitSize) // upper bound.
2793 ));
2794
2795 task.pool = this;
2796
2797 curPos += workUnitSize;
2798 }
2799
2800 foreach (ref task; tasks)
2801 {
2802 useTask(task);
2803 }
2804
2805 foreach (i; 1 .. tasks.length - 1)
2806 {
2807 tasks[i].next = tasks[i + 1].basePtr;
2808 tasks[i + 1].prev = tasks[i].basePtr;
2809 }
2810
2811 if (tasks.length > 1)
2812 {
2813 queueLock();
2814 scope(exit) queueUnlock();
2815
2816 abstractPutGroupNoSync(
2817 tasks[1].basePtr,
2818 tasks[$ - 1].basePtr
2819 );
2820 }
2821
2822 if (tasks.length > 0)
2823 {
2824 try
2825 {
2826 tasks[0].job();
2827 }
2828 catch (Throwable e)
2829 {
2830 tasks[0].exception = e;
2831 }
2832 tasks[0].taskStatus = TaskStatus.done;
2833
2834 // Try to execute each of these in the current thread
2835 foreach (ref task; tasks[1..$])
2836 {
2837 tryDeleteExecute(task.basePtr);
2838 }
2839 }
2840
2841 // Now that we've tried to execute every task, they're all either
2842 // done or in progress. Force all of them.
2843 E result = seed;
2844
2845 Throwable firstException;
2846
2847 foreach (ref task; tasks)
2848 {
2849 try
2850 {
2851 task.yieldForce;
2852 }
2853 catch (Throwable e)
2854 {
2855 /* Chain e to front because order doesn't matter and because
2856 * e is not likely to be a chain itself (so fewer traversals)
2857 */
2858 firstException = Throwable.chainTogether(e, firstException);
2859 continue;
2860 }
2861
2862 if (!firstException) result = finishFun(result, task.returnVal);
2863 }
2864
2865 if (firstException) throw firstException;
2866
2867 return result;
2868 }
2869 }
2870
2871 ///
fold(functions...)2872 template fold(functions...)
2873 {
2874 /** Implements the homonym function (also known as `accumulate`, `compress`,
2875 `inject`, or `foldl`) present in various programming languages of
2876 functional flavor.
2877
2878 `fold` is functionally equivalent to $(LREF reduce) except the range
2879 parameter comes first and there is no need to use $(REF_ALTTEXT
2880 `tuple`,tuple,std,typecons) for multiple seeds.
2881
2882 There may be one or more callable entities (`functions` argument) to
2883 apply.
2884
2885 Params:
2886 args = Just the range to _fold over; or the range and one seed
2887 per function; or the range, one seed per function, and
2888 the work unit size
2889
2890 Returns:
2891 The accumulated result as a single value for single function and
2892 as a tuple of values for multiple functions
2893
2894 See_Also:
2895 Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce).
2896
2897 Example:
2898 ---
2899 static int adder(int a, int b)
2900 {
2901 return a + b;
2902 }
2903 static int multiplier(int a, int b)
2904 {
2905 return a * b;
2906 }
2907
2908 // Just the range
2909 auto x = taskPool.fold!adder([1, 2, 3, 4]);
2910 assert(x == 10);
2911
2912 // The range and the seeds (0 and 1 below; also note multiple
2913 // functions in this example)
2914 auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
2915 assert(y[0] == 10);
2916 assert(y[1] == 24);
2917
2918 // The range, the seed (0), and the work unit size (20)
2919 auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
2920 assert(z == 10);
2921 ---
2922 */
2923 auto fold(Args...)(Args args)
2924 {
2925 static assert(isInputRange!(Args[0]), "First argument must be an InputRange");
2926
2927 alias range = args[0];
2928
2929 static if (Args.length == 1)
2930 {
2931 // Just the range
2932 return reduce!functions(range);
2933 }
2934 else static if (Args.length == 1 + functions.length ||
2935 Args.length == 1 + functions.length + 1)
2936 {
2937 static if (functions.length == 1)
2938 {
2939 alias seeds = args[1];
2940 }
2941 else
2942 {
2943 auto seeds()
2944 {
2945 import std.typecons : tuple;
2946 return tuple(args[1 .. functions.length+1]);
2947 }
2948 }
2949
2950 static if (Args.length == 1 + functions.length)
2951 {
2952 // The range and the seeds
2953 return reduce!functions(seeds, range);
2954 }
2955 else static if (Args.length == 1 + functions.length + 1)
2956 {
2957 // The range, the seeds, and the work unit size
2958 static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type");
2959 return reduce!functions(seeds, range, args[$-1]);
2960 }
2961 }
2962 else
2963 {
2964 import std.conv : text;
2965 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, "
2966 ~ functions.length.text ~ " optional seed(s), and an optional work unit size.");
2967 }
2968 }
2969 }
2970
2971 // This test is not included in the documentation because even though these
2972 // examples are for the inner fold() template, with their current location,
2973 // they would appear under the outer one. (We can't move this inside the
2974 // outer fold() template because then dmd runs out of memory possibly due to
2975 // recursive template instantiation, which is surprisingly not caught.)
2976 @system unittest
2977 {
2978 // Just the range
2979 auto x = taskPool.fold!"a + b"([1, 2, 3, 4]);
2980 assert(x == 10);
2981
2982 // The range and the seeds (0 and 1 below; also note multiple
2983 // functions in this example)
2984 auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1);
2985 assert(y[0] == 10);
2986 assert(y[1] == 24);
2987
2988 // The range, the seed (0), and the work unit size (20)
2989 auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20);
2990 assert(z == 10);
2991 }
2992
2993 /**
2994 Gets the index of the current thread relative to this `TaskPool`. Any
2995 thread not in this pool will receive an index of 0. The worker threads in
2996 this pool receive unique indices of 1 through `this.size`.
2997
2998 This function is useful for maintaining worker-local resources.
2999
3000 Example:
3001 ---
3002 // Execute a loop that computes the greatest common
3003 // divisor of every number from 0 through 999 with
3004 // 42 in parallel. Write the results out to
3005 // a set of files, one for each thread. This allows
3006 // results to be written out without any synchronization.
3007
3008 import std.conv, std.range, std.numeric, std.stdio;
3009
3010 void main()
3011 {
3012 auto filesHandles = new File[taskPool.size + 1];
3013 scope(exit) {
3014 foreach (ref handle; fileHandles)
3015 {
3016 handle.close();
3017 }
3018 }
3019
3020 foreach (i, ref handle; fileHandles)
3021 {
3022 handle = File("workerResults" ~ to!string(i) ~ ".txt");
3023 }
3024
3025 foreach (num; parallel(iota(1_000)))
3026 {
3027 auto outHandle = fileHandles[taskPool.workerIndex];
3028 outHandle.writeln(num, '\t', gcd(num, 42));
3029 }
3030 }
3031 ---
3032 */
workerIndex()3033 size_t workerIndex() @property @safe const nothrow
3034 {
3035 immutable rawInd = threadIndex;
3036 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
3037 (rawInd - instanceStartIndex + 1) : 0;
3038 }
3039
3040 /**
3041 Struct for creating worker-local storage. Worker-local storage is
3042 thread-local storage that exists only for worker threads in a given
3043 `TaskPool` plus a single thread outside the pool. It is allocated on the
3044 garbage collected heap in a way that avoids _false sharing, and doesn't
3045 necessarily have global scope within any thread. It can be accessed from
3046 any worker thread in the `TaskPool` that created it, and one thread
3047 outside this `TaskPool`. All threads outside the pool that created a
3048 given instance of worker-local storage share a single slot.
3049
3050 Since the underlying data for this struct is heap-allocated, this struct
3051 has reference semantics when passed between functions.
3052
3053 The main uses cases for `WorkerLocalStorage` are:
3054
3055 1. Performing parallel reductions with an imperative, as opposed to
3056 functional, programming style. In this case, it's useful to treat
3057 `WorkerLocalStorage` as local to each thread for only the parallel
3058 portion of an algorithm.
3059
3060 2. Recycling temporary buffers across iterations of a parallel foreach loop.
3061
3062 Example:
3063 ---
3064 // Calculate pi as in our synopsis example, but
3065 // use an imperative instead of a functional style.
3066 immutable n = 1_000_000_000;
3067 immutable delta = 1.0L / n;
3068
3069 auto sums = taskPool.workerLocalStorage(0.0L);
3070 foreach (i; parallel(iota(n)))
3071 {
3072 immutable x = ( i - 0.5L ) * delta;
3073 immutable toAdd = delta / ( 1.0 + x * x );
3074 sums.get += toAdd;
3075 }
3076
3077 // Add up the results from each worker thread.
3078 real pi = 0;
3079 foreach (threadResult; sums.toRange)
3080 {
3081 pi += 4.0L * threadResult;
3082 }
3083 ---
3084 */
WorkerLocalStorage(T)3085 static struct WorkerLocalStorage(T)
3086 {
3087 private:
3088 TaskPool pool;
3089 size_t size;
3090
3091 size_t elemSize;
3092 bool* stillThreadLocal;
3093
3094 static size_t roundToLine(size_t num) pure nothrow
3095 {
3096 if (num % cacheLineSize == 0)
3097 {
3098 return num;
3099 }
3100 else
3101 {
3102 return ((num / cacheLineSize) + 1) * cacheLineSize;
3103 }
3104 }
3105
3106 void* data;
3107
3108 void initialize(TaskPool pool)
3109 {
3110 this.pool = pool;
3111 size = pool.size + 1;
3112 stillThreadLocal = new bool;
3113 *stillThreadLocal = true;
3114
3115 // Determines whether the GC should scan the array.
3116 auto blkInfo = (typeid(T).flags & 1) ?
3117 cast(GC.BlkAttr) 0 :
3118 GC.BlkAttr.NO_SCAN;
3119
3120 immutable nElem = pool.size + 1;
3121 elemSize = roundToLine(T.sizeof);
3122
3123 // The + 3 is to pad one full cache line worth of space on either side
3124 // of the data structure to make sure false sharing with completely
3125 // unrelated heap data is prevented, and to provide enough padding to
3126 // make sure that data is cache line-aligned.
3127 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
3128
3129 // Cache line align data ptr.
3130 data = cast(void*) roundToLine(cast(size_t) data);
3131
3132 foreach (i; 0 .. nElem)
3133 {
3134 this.opIndex(i) = T.init;
3135 }
3136 }
3137
3138 ref opIndex(this Qualified)(size_t index)
3139 {
3140 import std.conv : text;
3141 assert(index < size, text(index, '\t', uint.max));
3142 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
3143 }
3144
3145 void opIndexAssign(T val, size_t index)
3146 {
3147 assert(index < size);
3148 *(cast(T*) (data + elemSize * index)) = val;
3149 }
3150
3151 public:
3152 /**
3153 Get the current thread's instance. Returns by ref.
3154 Note that calling `get` from any thread
3155 outside the `TaskPool` that created this instance will return the
3156 same reference, so an instance of worker-local storage should only be
3157 accessed from one thread outside the pool that created it. If this
3158 rule is violated, undefined behavior will result.
3159
3160 If assertions are enabled and `toRange` has been called, then this
3161 WorkerLocalStorage instance is no longer worker-local and an assertion
3162 failure will result when calling this method. This is not checked
3163 when assertions are disabled for performance reasons.
3164 */
3165 ref get(this Qualified)() @property
3166 {
3167 assert(*stillThreadLocal,
3168 "Cannot call get() on this instance of WorkerLocalStorage " ~
3169 "because it is no longer worker-local."
3170 );
3171 return opIndex(pool.workerIndex);
3172 }
3173
3174 /**
3175 Assign a value to the current thread's instance. This function has
3176 the same caveats as its overload.
3177 */
3178 void get(T val) @property
3179 {
3180 assert(*stillThreadLocal,
3181 "Cannot call get() on this instance of WorkerLocalStorage " ~
3182 "because it is no longer worker-local."
3183 );
3184
3185 opIndexAssign(val, pool.workerIndex);
3186 }
3187
3188 /**
3189 Returns a range view of the values for all threads, which can be used
3190 to further process the results of each thread after running the parallel
3191 part of your algorithm. Do not use this method in the parallel portion
3192 of your algorithm.
3193
3194 Calling this function sets a flag indicating that this struct is no
3195 longer worker-local, and attempting to use the `get` method again
3196 will result in an assertion failure if assertions are enabled.
3197 */
3198 WorkerLocalStorageRange!T toRange() @property
3199 {
3200 if (*stillThreadLocal)
3201 {
3202 *stillThreadLocal = false;
3203
3204 // Make absolutely sure results are visible to all threads.
3205 // This is probably not necessary since some other
3206 // synchronization primitive will be used to signal that the
3207 // parallel part of the algorithm is done, but the
3208 // performance impact should be negligible, so it's better
3209 // to be safe.
3210 ubyte barrierDummy;
3211 atomicSetUbyte(barrierDummy, 1);
3212 }
3213
3214 return WorkerLocalStorageRange!T(this);
3215 }
3216 }
3217
3218 /**
3219 Range primitives for worker-local storage. The purpose of this is to
3220 access results produced by each worker thread from a single thread once you
3221 are no longer using the worker-local storage from multiple threads.
3222 Do not use this struct in the parallel portion of your algorithm.
3223
3224 The proper way to instantiate this object is to call
3225 `WorkerLocalStorage.toRange`. Once instantiated, this object behaves
3226 as a finite random-access range with assignable, lvalue elements and
3227 a length equal to the number of worker threads in the `TaskPool` that
3228 created it plus 1.
3229 */
WorkerLocalStorageRange(T)3230 static struct WorkerLocalStorageRange(T)
3231 {
3232 private:
3233 WorkerLocalStorage!T workerLocalStorage;
3234
3235 size_t _length;
3236 size_t beginOffset;
3237
3238 this(WorkerLocalStorage!T wl)
3239 {
3240 this.workerLocalStorage = wl;
3241 _length = wl.size;
3242 }
3243
3244 public:
3245 ref front(this Qualified)() @property
3246 {
3247 return this[0];
3248 }
3249
3250 ref back(this Qualified)() @property
3251 {
3252 return this[_length - 1];
3253 }
3254
3255 void popFront()
3256 {
3257 if (_length > 0)
3258 {
3259 beginOffset++;
3260 _length--;
3261 }
3262 }
3263
3264 void popBack()
3265 {
3266 if (_length > 0)
3267 {
3268 _length--;
3269 }
3270 }
3271
3272 typeof(this) save() @property
3273 {
3274 return this;
3275 }
3276
3277 ref opIndex(this Qualified)(size_t index)
3278 {
3279 assert(index < _length);
3280 return workerLocalStorage[index + beginOffset];
3281 }
3282
3283 void opIndexAssign(T val, size_t index)
3284 {
3285 assert(index < _length);
3286 workerLocalStorage[index] = val;
3287 }
3288
3289 typeof(this) opSlice(size_t lower, size_t upper)
3290 {
3291 assert(upper <= _length);
3292 auto newWl = this.workerLocalStorage;
3293 newWl.data += lower * newWl.elemSize;
3294 newWl.size = upper - lower;
3295 return typeof(this)(newWl);
3296 }
3297
3298 bool empty() const @property
3299 {
3300 return length == 0;
3301 }
3302
3303 size_t length() const @property
3304 {
3305 return _length;
3306 }
3307 }
3308
3309 /**
3310 Creates an instance of worker-local storage, initialized with a given
3311 value. The value is `lazy` so that you can, for example, easily
3312 create one instance of a class for each worker. For usage example,
3313 see the `WorkerLocalStorage` struct.
3314 */
3315 WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3316 {
3317 WorkerLocalStorage!T ret;
3318 ret.initialize(this);
3319 foreach (i; 0 .. size + 1)
3320 {
3321 ret[i] = initialVal;
3322 }
3323
3324 // Memory barrier to make absolutely sure that what we wrote is
3325 // visible to worker threads.
3326 ubyte barrierDummy;
3327 atomicSetUbyte(barrierDummy, 0);
3328
3329 return ret;
3330 }
3331
3332 /**
3333 Signals to all worker threads to terminate as soon as they are finished
3334 with their current `Task`, or immediately if they are not executing a
3335 `Task`. `Task`s that were in queue will not be executed unless
3336 a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce`
3337 causes them to be executed.
3338
3339 Use only if you have waited on every `Task` and therefore know the
3340 queue is empty, or if you speculatively executed some tasks and no longer
3341 need the results.
3342 */
stop()3343 void stop() @trusted
3344 {
3345 queueLock();
3346 scope(exit) queueUnlock();
3347 atomicSetUbyte(status, PoolState.stopNow);
3348 notifyAll();
3349 }
3350
3351 /**
3352 Signals worker threads to terminate when the queue becomes empty.
3353
3354 If blocking argument is true, wait for all worker threads to terminate
3355 before returning. This option might be used in applications where
3356 task results are never consumed-- e.g. when `TaskPool` is employed as a
3357 rudimentary scheduler for tasks which communicate by means other than
3358 return values.
3359
3360 Warning: Calling this function with $(D blocking = true) from a worker
3361 thread that is a member of the same `TaskPool` that
3362 `finish` is being called on will result in a deadlock.
3363 */
3364 void finish(bool blocking = false) @trusted
3365 {
3366 {
3367 queueLock();
3368 scope(exit) queueUnlock();
3369 atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3370 notifyAll();
3371 }
3372 if (blocking)
3373 {
3374 // Use this thread as a worker until everything is finished.
3375 executeWorkLoop();
3376
foreach(t;pool)3377 foreach (t; pool)
3378 {
3379 // Maybe there should be something here to prevent a thread
3380 // from calling join() on itself if this function is called
3381 // from a worker thread in the same pool, but:
3382 //
3383 // 1. Using an if statement to skip join() would result in
3384 // finish() returning without all tasks being finished.
3385 //
3386 // 2. If an exception were thrown, it would bubble up to the
3387 // Task from which finish() was called and likely be
3388 // swallowed.
3389 t.join();
3390 }
3391 }
3392 }
3393
3394 /// Returns the number of worker threads in the pool.
size()3395 @property size_t size() @safe const pure nothrow
3396 {
3397 return pool.length;
3398 }
3399
3400 /**
3401 Put a `Task` object on the back of the task queue. The `Task`
3402 object may be passed by pointer or reference.
3403
3404 Example:
3405 ---
3406 import std.file;
3407
3408 // Create a task.
3409 auto t = task!read("foo.txt");
3410
3411 // Add it to the queue to be executed.
3412 taskPool.put(t);
3413 ---
3414
3415 Notes:
3416
3417 @trusted overloads of this function are called for `Task`s if
3418 $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s
3419 return type or the function the `Task` executes is `pure`.
3420 `Task` objects that meet all other requirements specified in the
3421 `@trusted` overloads of `task` and `scopedTask` may be created
3422 and executed from `@safe` code via `Task.executeInNewThread` but
3423 not via `TaskPool`.
3424
3425 While this function takes the address of variables that may
3426 be on the stack, some overloads are marked as @trusted.
3427 `Task` includes a destructor that waits for the task to complete
3428 before destroying the stack frame it is allocated on. Therefore,
3429 it is impossible for the stack frame to be destroyed before the task is
3430 complete and no longer referenced by a `TaskPool`.
3431 */
3432 void put(alias fun, Args...)(ref Task!(fun, Args) task)
3433 if (!isSafeReturn!(typeof(task)))
3434 {
3435 task.pool = this;
3436 abstractPut(task.basePtr);
3437 }
3438
3439 /// Ditto
3440 void put(alias fun, Args...)(Task!(fun, Args)* task)
3441 if (!isSafeReturn!(typeof(*task)))
3442 {
3443 import std.exception : enforce;
3444 enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3445 put(*task);
3446 }
3447
3448 @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3449 if (isSafeReturn!(typeof(task)))
3450 {
3451 task.pool = this;
3452 abstractPut(task.basePtr);
3453 }
3454
3455 @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3456 if (isSafeReturn!(typeof(*task)))
3457 {
3458 import std.exception : enforce;
3459 enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3460 put(*task);
3461 }
3462
3463 /**
3464 These properties control whether the worker threads are daemon threads.
3465 A daemon thread is automatically terminated when all non-daemon threads
3466 have terminated. A non-daemon thread will prevent a program from
3467 terminating as long as it has not terminated.
3468
3469 If any `TaskPool` with non-daemon threads is active, either `stop`
3470 or `finish` must be called on it before the program can terminate.
3471
3472 The worker treads in the `TaskPool` instance returned by the
3473 `taskPool` property are daemon by default. The worker threads of
3474 manually instantiated task pools are non-daemon by default.
3475
3476 Note: For a size zero pool, the getter arbitrarily returns true and the
3477 setter has no effect.
3478 */
isDaemon()3479 bool isDaemon() @property @trusted
3480 {
3481 queueLock();
3482 scope(exit) queueUnlock();
3483 return (size == 0) ? true : pool[0].isDaemon;
3484 }
3485
3486 /// Ditto
isDaemon(bool newVal)3487 void isDaemon(bool newVal) @property @trusted
3488 {
3489 queueLock();
3490 scope(exit) queueUnlock();
3491 foreach (thread; pool)
3492 {
3493 thread.isDaemon = newVal;
3494 }
3495 }
3496
3497 /**
3498 These functions allow getting and setting the OS scheduling priority of
3499 the worker threads in this `TaskPool`. They forward to
3500 `core.thread.Thread.priority`, so a given priority value here means the
3501 same thing as an identical priority value in `core.thread`.
3502
3503 Note: For a size zero pool, the getter arbitrarily returns
3504 `core.thread.Thread.PRIORITY_MIN` and the setter has no effect.
3505 */
priority()3506 int priority() @property @trusted
3507 {
3508 return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3509 pool[0].priority;
3510 }
3511
3512 /// Ditto
priority(int newPriority)3513 void priority(int newPriority) @property @trusted
3514 {
3515 if (size > 0)
3516 {
3517 foreach (t; pool)
3518 {
3519 t.priority = newPriority;
3520 }
3521 }
3522 }
3523 }
3524
3525 @system unittest
3526 {
3527 import std.algorithm.iteration : sum;
3528 import std.range : iota;
3529 import std.typecons : tuple;
3530
3531 enum N = 100;
3532 auto r = iota(1, N + 1);
3533 const expected = r.sum();
3534
3535 // Just the range
3536 assert(taskPool.fold!"a + b"(r) == expected);
3537
3538 // Range and seeds
3539 assert(taskPool.fold!"a + b"(r, 0) == expected);
3540 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected));
3541
3542 // Range, seeds, and work unit size
3543 assert(taskPool.fold!"a + b"(r, 0, 42) == expected);
3544 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
3545 }
3546
3547 // Issue 16705
3548 @system unittest
3549 {
3550 struct MyIota
3551 {
3552 size_t front;
popFrontMyIota3553 void popFront()(){front++;}
emptyMyIota3554 auto empty(){return front >= 25;}
opIndexMyIota3555 auto opIndex(size_t i){return front+i;}
lengthMyIota3556 auto length(){return 25-front;}
3557 }
3558
3559 auto mySum = taskPool.reduce!"a + b"(MyIota());
3560 }
3561
3562 /**
3563 Returns a lazily initialized global instantiation of `TaskPool`.
3564 This function can safely be called concurrently from multiple non-worker
3565 threads. The worker threads in this pool are daemon threads, meaning that it
3566 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before
3567 terminating the main thread.
3568 */
taskPool()3569 @property TaskPool taskPool() @trusted
3570 {
3571 import std.concurrency : initOnce;
3572 __gshared TaskPool pool;
3573 return initOnce!pool({
3574 auto p = new TaskPool(defaultPoolThreads);
3575 p.isDaemon = true;
3576 return p;
3577 }());
3578 }
3579
3580 private shared uint _defaultPoolThreads = uint.max;
3581
3582 /**
3583 These properties get and set the number of worker threads in the `TaskPool`
3584 instance returned by `taskPool`. The default value is `totalCPUs` - 1.
3585 Calling the setter after the first call to `taskPool` does not changes
3586 number of worker threads in the instance returned by `taskPool`.
3587 */
defaultPoolThreads()3588 @property uint defaultPoolThreads() @trusted
3589 {
3590 const local = atomicLoad(_defaultPoolThreads);
3591 return local < uint.max ? local : totalCPUs - 1;
3592 }
3593
3594 /// Ditto
defaultPoolThreads(uint newVal)3595 @property void defaultPoolThreads(uint newVal) @trusted
3596 {
3597 atomicStore(_defaultPoolThreads, newVal);
3598 }
3599
3600 /**
3601 Convenience functions that forwards to `taskPool.parallel`. The
3602 purpose of these is to make parallel foreach less verbose and more
3603 readable.
3604
3605 Example:
3606 ---
3607 // Find the logarithm of every number from
3608 // 1 to 1_000_000 in parallel, using the
3609 // default TaskPool instance.
3610 auto logs = new double[1_000_000];
3611
3612 foreach (i, ref elem; parallel(logs))
3613 {
3614 elem = log(i + 1.0);
3615 }
3616 ---
3617
3618 */
3619 ParallelForeach!R parallel(R)(R range)
3620 {
3621 return taskPool.parallel(range);
3622 }
3623
3624 /// Ditto
3625 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3626 {
3627 return taskPool.parallel(range, workUnitSize);
3628 }
3629
3630 // `each` should be usable with parallel
3631 // https://issues.dlang.org/show_bug.cgi?id=17019
3632 @system unittest
3633 {
3634 import std.algorithm.iteration : each, sum;
3635 import std.range : iota;
3636
3637 // check behavior with parallel
3638 auto arr = new int[10];
3639 parallel(arr).each!((ref e) => e += 1);
3640 assert(arr.sum == 10);
3641
3642 auto arrIndex = new int[10];
3643 parallel(arrIndex).each!((i, ref e) => e += i);
3644 assert(arrIndex.sum == 10.iota.sum);
3645 }
3646
3647 // Thrown when a parallel foreach loop is broken from.
3648 class ParallelForeachError : Error
3649 {
this()3650 this()
3651 {
3652 super("Cannot break from a parallel foreach loop using break, return, "
3653 ~ "labeled break/continue or goto statements.");
3654 }
3655 }
3656
3657 /*------Structs that implement opApply for parallel foreach.------------------*/
randLen(R)3658 private template randLen(R)
3659 {
3660 enum randLen = isRandomAccessRange!R && hasLength!R;
3661 }
3662
submitAndExecute(TaskPool pool,scope void delegate ()doIt)3663 private void submitAndExecute(
3664 TaskPool pool,
3665 scope void delegate() doIt
3666 )
3667 {
3668 import core.exception : OutOfMemoryError;
3669 immutable nThreads = pool.size + 1;
3670
3671 alias PTask = typeof(scopedTask(doIt));
3672 import core.stdc.stdlib : malloc, free;
3673 import core.stdc.string : memcpy;
3674
3675 // The logical thing to do would be to just use alloca() here, but that
3676 // causes problems on Windows for reasons that I don't understand
3677 // (tentatively a compiler bug) and definitely doesn't work on Posix due
3678 // to https://issues.dlang.org/show_bug.cgi?id=3753.
3679 // Therefore, allocate a fixed buffer and fall back to `malloc()` if
3680 // someone's using a ridiculous amount of threads.
3681 // Also, the using a byte array instead of a PTask array as the fixed buffer
3682 // is to prevent d'tors from being called on uninitialized excess PTask
3683 // instances.
3684 enum nBuf = 64;
3685 byte[nBuf * PTask.sizeof] buf = void;
3686 PTask[] tasks;
3687 if (nThreads <= nBuf)
3688 {
3689 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3690 }
3691 else
3692 {
3693 auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3694 if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3695 tasks = ptr[0 .. nThreads];
3696 }
3697
3698 scope(exit)
3699 {
3700 if (nThreads > nBuf)
3701 {
3702 free(tasks.ptr);
3703 }
3704 }
3705
3706 foreach (ref t; tasks)
3707 {
3708 import core.stdc.string : memcpy;
3709
3710 // This silly looking code is necessary to prevent d'tors from being
3711 // called on uninitialized objects.
3712 auto temp = scopedTask(doIt);
3713 memcpy(&t, &temp, PTask.sizeof);
3714
3715 // This has to be done to t after copying, not temp before copying.
3716 // Otherwise, temp's destructor will sit here and wait for the
3717 // task to finish.
3718 t.pool = pool;
3719 }
3720
3721 foreach (i; 1 .. tasks.length - 1)
3722 {
3723 tasks[i].next = tasks[i + 1].basePtr;
3724 tasks[i + 1].prev = tasks[i].basePtr;
3725 }
3726
3727 if (tasks.length > 1)
3728 {
3729 pool.queueLock();
3730 scope(exit) pool.queueUnlock();
3731
3732 pool.abstractPutGroupNoSync(
3733 tasks[1].basePtr,
3734 tasks[$ - 1].basePtr
3735 );
3736 }
3737
3738 if (tasks.length > 0)
3739 {
3740 try
3741 {
3742 tasks[0].job();
3743 }
3744 catch (Throwable e)
3745 {
3746 tasks[0].exception = e; // nocoverage
3747 }
3748 tasks[0].taskStatus = TaskStatus.done;
3749
3750 // Try to execute each of these in the current thread
3751 foreach (ref task; tasks[1..$])
3752 {
3753 pool.tryDeleteExecute(task.basePtr);
3754 }
3755 }
3756
3757 Throwable firstException;
3758
3759 foreach (i, ref task; tasks)
3760 {
3761 try
3762 {
3763 task.yieldForce;
3764 }
3765 catch (Throwable e)
3766 {
3767 /* Chain e to front because order doesn't matter and because
3768 * e is not likely to be a chain itself (so fewer traversals)
3769 */
3770 firstException = Throwable.chainTogether(e, firstException);
3771 continue;
3772 }
3773 }
3774
3775 if (firstException) throw firstException;
3776 }
3777
foreachErr()3778 void foreachErr()
3779 {
3780 throw new ParallelForeachError();
3781 }
3782
doSizeZeroCase(R,Delegate)3783 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3784 {
3785 with(p)
3786 {
3787 int res = 0;
3788 size_t index = 0;
3789
3790 // The explicit ElementType!R in the foreach loops is necessary for
3791 // correct behavior when iterating over strings.
3792 static if (hasLvalueElements!R)
3793 {
3794 foreach (ref ElementType!R elem; range)
3795 {
3796 static if (Parameters!dg.length == 2)
3797 {
3798 res = dg(index, elem);
3799 }
3800 else
3801 {
3802 res = dg(elem);
3803 }
3804 if (res) break;
3805 index++;
3806 }
3807 }
3808 else
3809 {
3810 foreach (ElementType!R elem; range)
3811 {
3812 static if (Parameters!dg.length == 2)
3813 {
3814 res = dg(index, elem);
3815 }
3816 else
3817 {
3818 res = dg(elem);
3819 }
3820 if (res) break;
3821 index++;
3822 }
3823 }
3824 if (res) foreachErr;
3825 return res;
3826 }
3827 }
3828
3829 private enum string parallelApplyMixinRandomAccess = q{
3830 // Handle empty thread pool as special case.
3831 if (pool.size == 0)
3832 {
3833 return doSizeZeroCase(this, dg);
3834 }
3835
3836 // Whether iteration is with or without an index variable.
3837 enum withIndex = Parameters!(typeof(dg)).length == 2;
3838
3839 shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0
3840 immutable len = range.length;
3841 if (!len) return 0;
3842
3843 shared bool shouldContinue = true;
3844
3845 void doIt()
3846 {
3847 import std.algorithm.comparison : min;
3848
3849 scope(failure)
3850 {
3851 // If an exception is thrown, all threads should bail.
3852 atomicStore(shouldContinue, false);
3853 }
3854
3855 while (atomicLoad(shouldContinue))
3856 {
3857 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3858 immutable start = workUnitSize * myUnitIndex;
3859 if (start >= len)
3860 {
3861 atomicStore(shouldContinue, false);
3862 break;
3863 }
3864
3865 immutable end = min(len, start + workUnitSize);
3866
3867 foreach (i; start .. end)
3868 {
3869 static if (withIndex)
3870 {
3871 if (dg(i, range[i])) foreachErr();
3872 }
3873 else
3874 {
3875 if (dg(range[i])) foreachErr();
3876 }
3877 }
3878 }
3879 }
3880
3881 submitAndExecute(pool, &doIt);
3882
3883 return 0;
3884 };
3885
3886 enum string parallelApplyMixinInputRange = q{
3887 // Handle empty thread pool as special case.
3888 if (pool.size == 0)
3889 {
3890 return doSizeZeroCase(this, dg);
3891 }
3892
3893 // Whether iteration is with or without an index variable.
3894 enum withIndex = Parameters!(typeof(dg)).length == 2;
3895
3896 // This protects the range while copying it.
3897 auto rangeMutex = new Mutex();
3898
3899 shared bool shouldContinue = true;
3900
3901 // The total number of elements that have been popped off range.
3902 // This is updated only while protected by rangeMutex;
3903 size_t nPopped = 0;
3904
3905 static if (
3906 is(typeof(range.buf1)) &&
3907 is(typeof(range.bufPos)) &&
3908 is(typeof(range.doBufSwap()))
3909 )
3910 {
3911 // Make sure we don't have the buffer recycling overload of
3912 // asyncBuf.
3913 static if (
3914 is(typeof(range.source)) &&
3915 isRoundRobin!(typeof(range.source))
3916 )
3917 {
3918 static assert(0, "Cannot execute a parallel foreach loop on " ~
3919 "the buffer recycling overload of asyncBuf.");
3920 }
3921
3922 enum bool bufferTrick = true;
3923 }
3924 else
3925 {
3926 enum bool bufferTrick = false;
3927 }
3928
3929 void doIt()
3930 {
3931 scope(failure)
3932 {
3933 // If an exception is thrown, all threads should bail.
3934 atomicStore(shouldContinue, false);
3935 }
3936
3937 static if (hasLvalueElements!R)
3938 {
3939 alias Temp = ElementType!R*[];
3940 Temp temp;
3941
3942 // Returns: The previous value of nPopped.
3943 size_t makeTemp()
3944 {
3945 import std.algorithm.internal : addressOf;
3946 import std.array : uninitializedArray;
3947
3948 if (temp is null)
3949 {
3950 temp = uninitializedArray!Temp(workUnitSize);
3951 }
3952
3953 rangeMutex.lock();
3954 scope(exit) rangeMutex.unlock();
3955
3956 size_t i = 0;
3957 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3958 {
3959 temp[i] = addressOf(range.front);
3960 }
3961
3962 temp = temp[0 .. i];
3963 auto ret = nPopped;
3964 nPopped += temp.length;
3965 return ret;
3966 }
3967
3968 }
3969 else
3970 {
3971
3972 alias Temp = ElementType!R[];
3973 Temp temp;
3974
3975 // Returns: The previous value of nPopped.
3976 static if (!bufferTrick) size_t makeTemp()
3977 {
3978 import std.array : uninitializedArray;
3979
3980 if (temp is null)
3981 {
3982 temp = uninitializedArray!Temp(workUnitSize);
3983 }
3984
3985 rangeMutex.lock();
3986 scope(exit) rangeMutex.unlock();
3987
3988 size_t i = 0;
3989 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3990 {
3991 temp[i] = range.front;
3992 }
3993
3994 temp = temp[0 .. i];
3995 auto ret = nPopped;
3996 nPopped += temp.length;
3997 return ret;
3998 }
3999
4000 static if (bufferTrick) size_t makeTemp()
4001 {
4002 import std.algorithm.mutation : swap;
4003 rangeMutex.lock();
4004 scope(exit) rangeMutex.unlock();
4005
4006 // Elide copying by just swapping buffers.
4007 temp.length = range.buf1.length;
4008 swap(range.buf1, temp);
4009
4010 // This is necessary in case popFront() has been called on
4011 // range before entering the parallel foreach loop.
4012 temp = temp[range.bufPos..$];
4013
4014 static if (is(typeof(range._length)))
4015 {
4016 range._length -= (temp.length - range.bufPos);
4017 }
4018
4019 range.doBufSwap();
4020 auto ret = nPopped;
4021 nPopped += temp.length;
4022 return ret;
4023 }
4024 }
4025
4026 while (atomicLoad(shouldContinue))
4027 {
4028 auto overallIndex = makeTemp();
4029 if (temp.empty)
4030 {
4031 atomicStore(shouldContinue, false);
4032 break;
4033 }
4034
4035 foreach (i; 0 .. temp.length)
4036 {
4037 scope(success) overallIndex++;
4038
4039 static if (hasLvalueElements!R)
4040 {
4041 static if (withIndex)
4042 {
4043 if (dg(overallIndex, *temp[i])) foreachErr();
4044 }
4045 else
4046 {
4047 if (dg(*temp[i])) foreachErr();
4048 }
4049 }
4050 else
4051 {
4052 static if (withIndex)
4053 {
4054 if (dg(overallIndex, temp[i])) foreachErr();
4055 }
4056 else
4057 {
4058 if (dg(temp[i])) foreachErr();
4059 }
4060 }
4061 }
4062 }
4063 }
4064
4065 submitAndExecute(pool, &doIt);
4066
4067 return 0;
4068 };
4069
4070
ParallelForeach(R)4071 private struct ParallelForeach(R)
4072 {
4073 TaskPool pool;
4074 R range;
4075 size_t workUnitSize;
4076 alias E = ElementType!R;
4077
4078 static if (hasLvalueElements!R)
4079 {
4080 alias NoIndexDg = int delegate(ref E);
4081 alias IndexDg = int delegate(size_t, ref E);
4082 }
4083 else
4084 {
4085 alias NoIndexDg = int delegate(E);
4086 alias IndexDg = int delegate(size_t, E);
4087 }
4088
4089 int opApply(scope NoIndexDg dg)
4090 {
4091 static if (randLen!R)
4092 {
4093 mixin(parallelApplyMixinRandomAccess);
4094 }
4095 else
4096 {
4097 mixin(parallelApplyMixinInputRange);
4098 }
4099 }
4100
4101 int opApply(scope IndexDg dg)
4102 {
4103 static if (randLen!R)
4104 {
4105 mixin(parallelApplyMixinRandomAccess);
4106 }
4107 else
4108 {
4109 mixin(parallelApplyMixinInputRange);
4110 }
4111 }
4112 }
4113
4114 /*
4115 This struct buffers the output of a callable that outputs data into a
4116 user-supplied buffer into a set of buffers of some fixed size. It allows these
4117 buffers to be accessed with an input range interface. This is used internally
4118 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
4119 instance and forwards it to the input range overload of asyncBuf.
4120 */
RoundRobinBuffer(C1,C2)4121 private struct RoundRobinBuffer(C1, C2)
4122 {
4123 // No need for constraints because they're already checked for in asyncBuf.
4124
4125 alias Array = Parameters!(C1.init)[0];
4126 alias T = typeof(Array.init[0]);
4127
4128 T[][] bufs;
4129 size_t index;
4130 C1 nextDel;
4131 C2 emptyDel;
4132 bool _empty;
4133 bool primed;
4134
4135 this(
4136 C1 nextDel,
4137 C2 emptyDel,
4138 size_t initialBufSize,
4139 size_t nBuffers
4140 ) {
4141 this.nextDel = nextDel;
4142 this.emptyDel = emptyDel;
4143 bufs.length = nBuffers;
4144
4145 foreach (ref buf; bufs)
4146 {
4147 buf.length = initialBufSize;
4148 }
4149 }
4150
4151 void prime()
4152 in
4153 {
4154 assert(!empty);
4155 }
4156 do
4157 {
4158 scope(success) primed = true;
4159 nextDel(bufs[index]);
4160 }
4161
4162
4163 T[] front() @property
4164 in
4165 {
4166 assert(!empty);
4167 }
4168 do
4169 {
4170 if (!primed) prime();
4171 return bufs[index];
4172 }
4173
4174 void popFront()
4175 {
4176 if (empty || emptyDel())
4177 {
4178 _empty = true;
4179 return;
4180 }
4181
4182 index = (index + 1) % bufs.length;
4183 primed = false;
4184 }
4185
4186 bool empty() @property const @safe pure nothrow
4187 {
4188 return _empty;
4189 }
4190 }
4191
version(StdUnittest)4192 version (StdUnittest)
4193 {
4194 // This was the only way I could get nested maps to work.
4195 private __gshared TaskPool poolInstance;
4196 }
4197
4198 // These test basic functionality but don't stress test for threading bugs.
4199 // These are the tests that should be run every time Phobos is compiled.
4200 @system unittest
4201 {
4202 import std.algorithm.comparison : equal, min, max;
4203 import std.algorithm.iteration : filter, map, reduce;
4204 import std.array : split;
4205 import std.conv : text;
4206 import std.exception : assertThrown;
4207 import std.math.operations : isClose;
4208 import std.math.algebraic : sqrt, abs;
4209 import std.math.exponential : log;
4210 import std.range : indexed, iota, join;
4211 import std.typecons : Tuple, tuple;
4212 import std.stdio;
4213
4214 poolInstance = new TaskPool(2);
4215 scope(exit) poolInstance.stop();
4216
4217 // The only way this can be verified is manually.
4218 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
4219
4220 auto oldPriority = poolInstance.priority;
4221 poolInstance.priority = Thread.PRIORITY_MAX;
4222 assert(poolInstance.priority == Thread.PRIORITY_MAX);
4223
4224 poolInstance.priority = Thread.PRIORITY_MIN;
4225 assert(poolInstance.priority == Thread.PRIORITY_MIN);
4226
4227 poolInstance.priority = oldPriority;
4228 assert(poolInstance.priority == oldPriority);
4229
refFun(ref uint num)4230 static void refFun(ref uint num)
4231 {
4232 num++;
4233 }
4234
4235 uint x;
4236
4237 // Test task().
4238 auto t = task!refFun(x);
4239 poolInstance.put(t);
4240 t.yieldForce;
4241 assert(t.args[0] == 1);
4242
4243 auto t2 = task(&refFun, x);
4244 poolInstance.put(t2);
4245 t2.yieldForce;
4246 assert(t2.args[0] == 1);
4247
4248 // Test scopedTask().
4249 auto st = scopedTask!refFun(x);
4250 poolInstance.put(st);
4251 st.yieldForce;
4252 assert(st.args[0] == 1);
4253
4254 auto st2 = scopedTask(&refFun, x);
4255 poolInstance.put(st2);
4256 st2.yieldForce;
4257 assert(st2.args[0] == 1);
4258
4259 // Test executeInNewThread().
4260 auto ct = scopedTask!refFun(x);
4261 ct.executeInNewThread(Thread.PRIORITY_MAX);
4262 ct.yieldForce;
4263 assert(ct.args[0] == 1);
4264
4265 // Test ref return.
4266 uint toInc = 0;
makeRef(T)4267 static ref T makeRef(T)(ref T num)
4268 {
4269 return num;
4270 }
4271
4272 auto t3 = task!makeRef(toInc);
4273 taskPool.put(t3);
4274 assert(t3.args[0] == 0);
4275 t3.spinForce++;
4276 assert(t3.args[0] == 1);
4277
testSafe()4278 static void testSafe() @safe {
4279 static int bump(int num)
4280 {
4281 return num + 1;
4282 }
4283
4284 auto safePool = new TaskPool(0);
4285 auto t = task(&bump, 1);
4286 taskPool.put(t);
4287 assert(t.yieldForce == 2);
4288
4289 auto st = scopedTask(&bump, 1);
4290 taskPool.put(st);
4291 assert(st.yieldForce == 2);
4292 safePool.stop();
4293 }
4294
4295 auto arr = [1,2,3,4,5];
4296 auto nums = new uint[5];
4297 auto nums2 = new uint[5];
4298
4299 foreach (i, ref elem; poolInstance.parallel(arr))
4300 {
4301 elem++;
4302 nums[i] = cast(uint) i + 2;
4303 nums2[i] = elem;
4304 }
4305
4306 assert(nums == [2,3,4,5,6], text(nums));
4307 assert(nums2 == nums, text(nums2));
4308 assert(arr == nums, text(arr));
4309
4310 // Test const/immutable arguments.
add(int lhs,int rhs)4311 static int add(int lhs, int rhs)
4312 {
4313 return lhs + rhs;
4314 }
4315 immutable addLhs = 1;
4316 immutable addRhs = 2;
4317 auto addTask = task(&add, addLhs, addRhs);
4318 auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4319 poolInstance.put(addTask);
4320 poolInstance.put(addScopedTask);
4321 assert(addTask.yieldForce == 3);
4322 assert(addScopedTask.yieldForce == 3);
4323
4324 // Test parallel foreach with non-random access range.
4325 auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4326
4327 foreach (i, elem; poolInstance.parallel(range))
4328 {
4329 nums[i] = cast(uint) i;
4330 }
4331
4332 assert(nums == [0,1,2,3,4]);
4333
4334 auto logs = new double[1_000_000];
4335 foreach (i, ref elem; poolInstance.parallel(logs))
4336 {
4337 elem = log(i + 1.0);
4338 }
4339
foreach(i,elem;logs)4340 foreach (i, elem; logs)
4341 {
4342 assert(isClose(elem, cast(double) log(i + 1)));
4343 }
4344
4345 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4346 assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4347 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4348 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4349
4350 auto tupleBuf = new Tuple!(int, int)[3];
4351 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4352 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4353 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4354 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4355
4356 // Test amap with a non-array buffer.
4357 auto toIndex = new int[5];
4358 auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4359 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4360 assert(equal(ind, [2, 4, 6, 8, 10]));
4361 assert(equal(toIndex, [8, 4, 10, 2, 6]));
4362 poolInstance.amap!"a / 2"(ind, ind);
4363 assert(equal(ind, [1, 2, 3, 4, 5]));
4364 assert(equal(toIndex, [4, 2, 5, 1, 3]));
4365
4366 auto buf = new int[5];
4367 poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4368 assert(buf == [1,4,9,16,25]);
4369 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4370 assert(buf == [1,4,9,16,25]);
4371
4372 assert(poolInstance.reduce!"a + b"([1]) == 1);
4373 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4374 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4375 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4376 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4377 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4378 tuple(10, 24));
4379
4380 immutable serialAns = reduce!"a + b"(iota(1000));
4381 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4382 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4383
4384 // Test worker-local storage.
4385 auto wl = poolInstance.workerLocalStorage(0);
4386 foreach (i; poolInstance.parallel(iota(1000), 1))
4387 {
4388 wl.get = wl.get + i;
4389 }
4390
4391 auto wlRange = wl.toRange;
4392 auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4393 assert(parallelSum == 499500);
4394 assert(wlRange[0 .. 1][0] == wlRange[0]);
4395 assert(wlRange[1 .. 2][0] == wlRange[1]);
4396
4397 // Test finish()
4398 {
slowFun()4399 static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4400
4401 auto pool1 = new TaskPool();
4402 auto tSlow = task!slowFun();
4403 pool1.put(tSlow);
4404 pool1.finish();
4405 tSlow.yieldForce;
4406 // Can't assert that pool1.status == PoolState.stopNow because status
4407 // doesn't change until after the "done" flag is set and the waiting
4408 // thread is woken up.
4409
4410 auto pool2 = new TaskPool();
4411 auto tSlow2 = task!slowFun();
4412 pool2.put(tSlow2);
4413 pool2.finish(true); // blocking
4414 assert(tSlow2.done);
4415
4416 // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero.
4417 auto pool3 = new TaskPool(0);
4418 auto tSlow3 = task!slowFun();
4419 pool3.put(tSlow3);
4420 pool3.finish(true); // blocking
4421 assert(tSlow3.done);
4422
4423 // This is correct because no thread will terminate unless pool2.status
4424 // and pool3.status have already been set to stopNow.
4425 assert(pool2.status == TaskPool.PoolState.stopNow);
4426 assert(pool3.status == TaskPool.PoolState.stopNow);
4427 }
4428
4429 // Test default pool stuff.
4430 assert(taskPool.size == totalCPUs - 1);
4431
4432 nums = new uint[1000];
4433 foreach (i; parallel(iota(1000)))
4434 {
4435 nums[i] = cast(uint) i;
4436 }
4437 assert(equal(nums, iota(1000)));
4438
4439 assert(equal(
4440 poolInstance.map!"a * a"(iota(3_000_001), 10_000),
4441 map!"a * a"(iota(3_000_001))
4442 ));
4443
4444 // The filter is to kill random access and test the non-random access
4445 // branch.
4446 assert(equal(
4447 poolInstance.map!"a * a"(
4448 filter!"a == a"(iota(3_000_001)
4449 ), 10_000, 1000),
4450 map!"a * a"(iota(3_000_001))
4451 ));
4452
4453 assert(
4454 reduce!"a + b"(0UL,
4455 poolInstance.map!"a * a"(iota(300_001), 10_000)
4456 ) ==
4457 reduce!"a + b"(0UL,
4458 map!"a * a"(iota(300_001))
4459 )
4460 );
4461
4462 assert(equal(
4463 iota(1_000_002),
4464 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4465 ));
4466
4467 {
4468 import std.conv : to;
4469 import std.file : deleteme;
4470
4471 string temp_file = deleteme ~ "-tempDelMe.txt";
4472 auto file = File(temp_file, "wb");
scope(exit)4473 scope(exit)
4474 {
4475 file.close();
4476 import std.file;
4477 remove(temp_file);
4478 }
4479
4480 auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
foreach(row;written)4481 foreach (row; written)
4482 {
4483 file.writeln(join(to!(string[])(row), "\t"));
4484 }
4485
4486 file = File(temp_file);
4487
next(ref char[]buf)4488 void next(ref char[] buf)
4489 {
4490 file.readln(buf);
4491 import std.string : chomp;
4492 buf = chomp(buf);
4493 }
4494
4495 double[][] read;
4496 auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
4497
foreach(line;asyncReader)4498 foreach (line; asyncReader)
4499 {
4500 if (line.length == 0) continue;
4501 auto ls = line.split("\t");
4502 read ~= to!(double[])(ls);
4503 }
4504
4505 assert(read == written);
4506 file.close();
4507 }
4508
4509 // Test Map/AsyncBuf chaining.
4510
4511 auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4512 auto temp = poolInstance.map!sqrt(
4513 abuf, 100, 5
4514 );
4515 auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4516 lmchain.popFront();
4517
4518 int ii;
foreach(elem;(lmchain))4519 foreach ( elem; (lmchain))
4520 {
4521 if (!isClose(elem, ii))
4522 {
4523 stderr.writeln(ii, '\t', elem);
4524 }
4525 ii++;
4526 }
4527
4528 // Test buffer trick in parallel foreach.
4529 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4530 abuf.popFront();
4531 auto bufTrickTest = new size_t[abuf.length];
foreach(i,elem;parallel (abuf))4532 foreach (i, elem; parallel(abuf))
4533 {
4534 bufTrickTest[i] = i;
4535 }
4536
4537 assert(equal(iota(1_000_000), bufTrickTest));
4538
4539 auto myTask = task!(abs)(-1);
4540 taskPool.put(myTask);
4541 assert(myTask.spinForce == 1);
4542
4543 // Test that worker local storage from one pool receives an index of 0
4544 // when the index is queried w.r.t. another pool. The only way to do this
4545 // is non-deterministically.
4546 foreach (i; parallel(iota(1000), 1))
4547 {
4548 assert(poolInstance.workerIndex == 0);
4549 }
4550
4551 foreach (i; poolInstance.parallel(iota(1000), 1))
4552 {
4553 assert(taskPool.workerIndex == 0);
4554 }
4555
4556 // Test exception handling.
parallelForeachThrow()4557 static void parallelForeachThrow()
4558 {
4559 foreach (elem; parallel(iota(10)))
4560 {
4561 throw new Exception("");
4562 }
4563 }
4564
4565 assertThrown!Exception(parallelForeachThrow());
4566
reduceException(int a,int b)4567 static int reduceException(int a, int b)
4568 {
4569 throw new Exception("");
4570 }
4571
4572 assertThrown!Exception(
4573 poolInstance.reduce!reduceException(iota(3))
4574 );
4575
mapException(int a)4576 static int mapException(int a)
4577 {
4578 throw new Exception("");
4579 }
4580
4581 assertThrown!Exception(
4582 poolInstance.amap!mapException(iota(3))
4583 );
4584
mapThrow()4585 static void mapThrow()
4586 {
4587 auto m = poolInstance.map!mapException(iota(3));
4588 m.popFront();
4589 }
4590
4591 assertThrown!Exception(mapThrow());
4592
4593 struct ThrowingRange
4594 {
frontThrowingRange4595 @property int front()
4596 {
4597 return 1;
4598 }
popFrontThrowingRange4599 void popFront()
4600 {
4601 throw new Exception("");
4602 }
4603 enum bool empty = false;
4604 }
4605
4606 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4607 }
4608
4609 //version = parallelismStressTest;
4610
4611 // These are more like stress tests than real unit tests. They print out
4612 // tons of stuff and should not be run every time make unittest is run.
version(parallelismStressTest)4613 version (parallelismStressTest)
4614 {
4615 @system unittest
4616 {
4617 import std.stdio : stderr, writeln, readln;
4618 import std.range : iota;
4619 import std.algorithm.iteration : filter, reduce;
4620
4621 size_t attempt;
4622 for (; attempt < 10; attempt++)
4623 foreach (poolSize; [0, 4])
4624 {
4625
4626 poolInstance = new TaskPool(poolSize);
4627
4628 uint[] numbers = new uint[1_000];
4629
4630 foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4631 {
4632 numbers[i] = cast(uint) i;
4633 }
4634
4635 // Make sure it works.
4636 foreach (i; 0 .. numbers.length)
4637 {
4638 assert(numbers[i] == i);
4639 }
4640
4641 stderr.writeln("Done creating nums.");
4642
4643
4644 auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4645 foreach (num; poolInstance.parallel(myNumbers))
4646 {
4647 assert(num % 7 > 0 && num < 1000);
4648 }
4649 stderr.writeln("Done modulus test.");
4650
4651 uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4652 assert(squares.length == numbers.length);
4653 foreach (i, number; numbers)
4654 {
4655 assert(squares[i] == number * number);
4656 }
4657 stderr.writeln("Done squares.");
4658
4659 auto sumFuture = task!( reduce!"a + b" )(numbers);
4660 poolInstance.put(sumFuture);
4661
4662 ulong sumSquares = 0;
4663 foreach (elem; numbers)
4664 {
4665 sumSquares += elem * elem;
4666 }
4667
4668 uint mySum = sumFuture.spinForce();
4669 assert(mySum == 999 * 1000 / 2);
4670
4671 auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4672 assert(mySum == mySumParallel);
4673 stderr.writeln("Done sums.");
4674
4675 auto myTask = task(
4676 {
4677 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4678 });
4679 poolInstance.put(myTask);
4680
4681 auto nestedOuter = "abcd";
4682 auto nestedInner = iota(0, 10, 2);
4683
4684 foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4685 {
4686 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4687 {
4688 synchronized writeln(i, ": ", letter, " ", j, ": ", number);
4689 }
4690 }
4691
4692 poolInstance.stop();
4693 }
4694
4695 assert(attempt == 10);
4696 writeln("Press enter to go to next round of unittests.");
4697 readln();
4698 }
4699
4700 // These unittests are intended more for actual testing and not so much
4701 // as examples.
4702 @system unittest
4703 {
4704 import std.stdio : stderr;
4705 import std.range : iota;
4706 import std.algorithm.iteration : filter, reduce;
4707 import std.math.algebraic : sqrt;
4708 import std.math.operations : isClose;
4709 import std.math.traits : isNaN;
4710 import std.conv : text;
4711
4712 foreach (attempt; 0 .. 10)
4713 foreach (poolSize; [0, 4])
4714 {
4715 poolInstance = new TaskPool(poolSize);
4716
4717 // Test indexing.
4718 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex);
4719 assert(poolInstance.workerIndex() == 0);
4720
4721 // Test worker-local storage.
4722 auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4723 foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4724 {
4725 workerLocalStorage.get++;
4726 }
4727 assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4728 1_000_000 + poolInstance.size + 1);
4729
4730 // Make sure work is reasonably balanced among threads. This test is
4731 // non-deterministic and is more of a sanity check than something that
4732 // has an absolute pass/fail.
4733 shared(uint)[void*] nJobsByThread;
4734 foreach (thread; poolInstance.pool)
4735 {
4736 nJobsByThread[cast(void*) thread] = 0;
4737 }
4738 nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4739
4740 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4741 {
4742 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4743 }
4744
4745 stderr.writeln("\nCurrent thread is: ",
4746 cast(void*) Thread.getThis());
4747 stderr.writeln("Workload distribution: ");
4748 foreach (k, v; nJobsByThread)
4749 {
4750 stderr.writeln(k, '\t', v);
4751 }
4752
4753 // Test whether amap can be nested.
4754 real[][] matrix = new real[][](1000, 1000);
4755 foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4756 {
4757 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4758 {
4759 matrix[i][j] = i * j;
4760 }
4761 }
4762
4763 // Get around weird bugs having to do w/ sqrt being an intrinsic:
4764 static real mySqrt(real num)
4765 {
4766 return sqrt(num);
4767 }
4768
4769 static real[] parallelSqrt(real[] nums)
4770 {
4771 return poolInstance.amap!mySqrt(nums);
4772 }
4773
4774 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4775
4776 foreach (i, row; sqrtMatrix)
4777 {
4778 foreach (j, elem; row)
4779 {
4780 real shouldBe = sqrt( cast(real) i * j);
4781 assert(isClose(shouldBe, elem));
4782 sqrtMatrix[i][j] = shouldBe;
4783 }
4784 }
4785
4786 auto saySuccess = task(
4787 {
4788 stderr.writeln(
4789 "Success doing matrix stuff that involves nested pool use.");
4790 });
4791 poolInstance.put(saySuccess);
4792 saySuccess.workForce();
4793
4794 // A more thorough test of amap, reduce: Find the sum of the square roots of
4795 // matrix.
4796
4797 static real parallelSum(real[] input)
4798 {
4799 return poolInstance.reduce!"a + b"(input);
4800 }
4801
4802 auto sumSqrt = poolInstance.reduce!"a + b"(
4803 poolInstance.amap!parallelSum(
4804 sqrtMatrix
4805 )
4806 );
4807
4808 assert(isClose(sumSqrt, 4.437e8, 1e-2));
4809 stderr.writeln("Done sum of square roots.");
4810
4811 // Test whether tasks work with function pointers.
4812 /+ // This part is buggy and needs to be fixed...
4813 auto nanTask = task(&isNaN, 1.0L);
4814 poolInstance.put(nanTask);
4815 assert(nanTask.spinForce == false);
4816
4817 if (poolInstance.size > 0)
4818 {
4819 // Test work waiting.
4820 static void uselessFun()
4821 {
4822 foreach (i; 0 .. 1_000_000) {}
4823 }
4824
4825 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4826 foreach (ref uselessTask; uselessTasks)
4827 {
4828 uselessTask = task(&uselessFun);
4829 }
4830 foreach (ref uselessTask; uselessTasks)
4831 {
4832 poolInstance.put(uselessTask);
4833 }
4834 foreach (ref uselessTask; uselessTasks)
4835 {
4836 uselessTask.workForce();
4837 }
4838 }
4839 +/
4840
4841 // Test the case of non-random access + ref returns.
4842 int[] nums = [1,2,3,4,5];
4843 static struct RemoveRandom
4844 {
4845 int[] arr;
4846
4847 ref int front()
4848 {
4849 return arr.front;
4850 }
4851 void popFront()
4852 {
4853 arr.popFront();
4854 }
4855 bool empty()
4856 {
4857 return arr.empty;
4858 }
4859 }
4860
4861 auto refRange = RemoveRandom(nums);
4862 foreach (ref elem; poolInstance.parallel(refRange))
4863 {
4864 elem++;
4865 }
4866 assert(nums == [2,3,4,5,6], text(nums));
4867 stderr.writeln("Nums: ", nums);
4868
4869 poolInstance.stop();
4870 }
4871 }
4872 }
4873
4874 @system unittest
4875 {
4876 static struct __S_12733
4877 {
invariant__S_127334878 invariant() { assert(checksum == 1_234_567_890); }
this__S_127334879 this(ulong u){n = u;}
opAssign__S_127334880 void opAssign(__S_12733 s){this.n = s.n;}
4881 ulong n;
4882 ulong checksum = 1_234_567_890;
4883 }
4884
__genPair_12733(ulong n)4885 static auto __genPair_12733(ulong n) { return __S_12733(n); }
4886 immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4887
4888 auto result = taskPool.amap!__genPair_12733(data);
4889 }
4890
4891 @safe unittest
4892 {
4893 import std.range : iota;
4894
4895 // this test was in std.range, but caused cycles.
4896 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4897 }
4898
4899 @safe unittest
4900 {
4901 import std.algorithm.iteration : each;
4902
4903 long[] arr;
4904 static assert(is(typeof({
4905 arr.parallel.each!"a++";
4906 })));
4907 }
4908
4909 // https://issues.dlang.org/show_bug.cgi?id=17539
4910 @system unittest
4911 {
4912 import std.random : rndGen;
4913 // ensure compilation
4914 try foreach (rnd; rndGen.parallel) break;
catch(ParallelForeachError e)4915 catch (ParallelForeachError e) {}
4916 }
4917