1 /** 2 $(D std._parallelism) implements high-level primitives for SMP _parallelism. 3 These include parallel foreach, parallel reduce, parallel eager map, pipelining 4 and future/promise _parallelism. $(D std._parallelism) is recommended when the 5 same operation is to be executed in parallel on different data, or when a 6 function is to be executed in a background thread and its result returned to a 7 well-defined main thread. For communication between arbitrary threads, see 8 $(D std.concurrency). 9 10 $(D std._parallelism) is based on the concept of a $(D Task). A $(D Task) is an 11 object that represents the fundamental unit of work in this library and may be 12 executed in parallel with any other $(D Task). Using $(D Task) 13 directly allows programming with a future/promise paradigm. All other 14 supported _parallelism paradigms (parallel foreach, map, reduce, pipelining) 15 represent an additional level of abstraction over $(D Task). They 16 automatically create one or more $(D Task) objects, or closely related types 17 that are conceptually identical but not part of the public API. 18 19 After creation, a $(D Task) may be executed in a new thread, or submitted 20 to a $(D TaskPool) for execution. A $(D TaskPool) encapsulates a task queue 21 and its worker threads. Its purpose is to efficiently map a large 22 number of $(D Task)s onto a smaller number of threads. A task queue is a 23 FIFO queue of $(D Task) objects that have been submitted to the 24 $(D TaskPool) and are awaiting execution. A worker thread is a thread that 25 is associated with exactly one task queue. It executes the $(D Task) at the 26 front of its queue when the queue has work available, or sleeps when 27 no work is available. Each task queue is associated with zero or 28 more worker threads. If the result of a $(D Task) is needed before execution 29 by a worker thread has begun, the $(D Task) can be removed from the task queue 30 and executed immediately in the thread where the result is needed. 31 32 Warning: Unless marked as $(D @trusted) or $(D @safe), artifacts in 33 this module allow implicit data sharing between threads and cannot 34 guarantee that client code is free from low level data races. 35 36 Source: $(PHOBOSSRC std/_parallelism.d) 37 Author: David Simcha 38 Copyright: Copyright (c) 2009-2011, David Simcha. 39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0) 40 */ 41 module std.parallelism; 42 43 /// 44 @system unittest 45 { 46 import std.algorithm.iteration : map; 47 import std.math : approxEqual; 48 import std.parallelism : taskPool; 49 import std.range : iota; 50 51 // Parallel reduce can be combined with 52 // std.algorithm.iteration.map to interesting effect. 53 // The following example (thanks to Russel Winder) 54 // calculates pi by quadrature using 55 // std.algorithm.map and TaskPool.reduce. 56 // getTerm is evaluated in parallel as needed by 57 // TaskPool.reduce. 58 // 59 // Timings on an Intel i5-3450 quad core machine 60 // for n = 1_000_000_000: 61 // 62 // TaskPool.reduce: 1.067 s 63 // std.algorithm.reduce: 4.011 s 64 65 enum n = 1_000_000; 66 enum delta = 1.0 / n; 67 68 alias getTerm = (int i) 69 { 70 immutable x = ( i - 0.5 ) * delta; 71 return delta / ( 1.0 + x * x ) ; 72 }; 73 74 immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm); 75 76 assert(pi.approxEqual(3.1415926)); 77 } 78 79 import core.atomic; 80 import core.memory; 81 import core.sync.condition; 82 import core.thread; 83 84 import std.functional; 85 import std.meta; 86 import std.range.primitives; 87 import std.traits; 88 89 version (OSX) 90 { 91 version = useSysctlbyname; 92 } 93 else version (FreeBSD) 94 { 95 version = useSysctlbyname; 96 } 97 else version (DragonFlyBSD) 98 { 99 version = useSysctlbyname; 100 } 101 else version (NetBSD) 102 { 103 version = useSysctlbyname; 104 } 105 106 107 version (Windows) 108 { 109 // BUGS: Only works on Windows 2000 and above. 110 shared static this() 111 { 112 import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo; 113 import std.algorithm.comparison : max; 114 115 SYSTEM_INFO si; 116 GetSystemInfo(&si); 117 totalCPUs = max(1, cast(uint) si.dwNumberOfProcessors); 118 } 119 120 } 121 else version (linux) 122 { 123 shared static this() 124 { 125 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 126 totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 127 } 128 } 129 else version (Solaris) 130 { 131 shared static this() 132 { 133 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 134 totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 135 } 136 } 137 else version (useSysctlbyname) 138 { 139 extern(C) int sysctlbyname( 140 const char *, void *, size_t *, void *, size_t 141 ); 142 143 shared static this() 144 { 145 version (OSX) 146 { 147 auto nameStr = "machdep.cpu.core_count\0".ptr; 148 } 149 else version (FreeBSD) 150 { 151 auto nameStr = "hw.ncpu\0".ptr; 152 } 153 else version (DragonFlyBSD) 154 { 155 auto nameStr = "hw.ncpu\0".ptr; 156 } 157 else version (NetBSD) 158 { 159 auto nameStr = "hw.ncpu\0".ptr; 160 } 161 162 uint ans; 163 size_t len = uint.sizeof; 164 sysctlbyname(nameStr, &ans, &len, null, 0); 165 totalCPUs = ans; 166 } 167 168 } 169 else 170 { 171 static assert(0, "Don't know how to get N CPUs on this OS."); 172 } 173 174 immutable size_t cacheLineSize; 175 shared static this() 176 { 177 import core.cpuid : datacache; 178 size_t lineSize = 0; 179 foreach (cachelevel; datacache) 180 { 181 if (cachelevel.lineSize > lineSize && cachelevel.lineSize < uint.max) 182 { 183 lineSize = cachelevel.lineSize; 184 } 185 } 186 187 cacheLineSize = lineSize; 188 } 189 190 191 /* Atomics code. These forward to core.atomic, but are written like this 192 for two reasons: 193 194 1. They used to actually contain ASM code and I don' want to have to change 195 to directly calling core.atomic in a zillion different places. 196 197 2. core.atomic has some misc. issues that make my use cases difficult 198 without wrapping it. If I didn't wrap it, casts would be required 199 basically everywhere. 200 */ 201 private void atomicSetUbyte(T)(ref T stuff, T newVal) 202 if (__traits(isIntegral, T) && is(T : ubyte)) 203 { 204 //core.atomic.cas(cast(shared) &stuff, stuff, newVal); 205 atomicStore(*(cast(shared) &stuff), newVal); 206 } 207 208 private ubyte atomicReadUbyte(T)(ref T val) 209 if (__traits(isIntegral, T) && is(T : ubyte)) 210 { 211 return atomicLoad(*(cast(shared) &val)); 212 } 213 214 // This gets rid of the need for a lot of annoying casts in other parts of the 215 // code, when enums are involved. 216 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) 217 if (__traits(isIntegral, T) && is(T : ubyte)) 218 { 219 return core.atomic.cas(cast(shared) &stuff, testVal, newVal); 220 } 221 222 /*--------------------- Generic helper functions, etc.------------------------*/ 223 private template MapType(R, functions...) 224 { 225 static assert(functions.length); 226 227 ElementType!R e = void; 228 alias MapType = 229 typeof(adjoin!(staticMap!(unaryFun, functions))(e)); 230 } 231 232 private template ReduceType(alias fun, R, E) 233 { 234 alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init)); 235 } 236 237 private template noUnsharedAliasing(T) 238 { 239 enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; 240 } 241 242 // This template tests whether a function may be executed in parallel from 243 // @safe code via Task.executeInNewThread(). There is an additional 244 // requirement for executing it via a TaskPool. (See isSafeReturn). 245 private template isSafeTask(F) 246 { 247 enum bool isSafeTask = 248 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && 249 (functionAttributes!F & FunctionAttribute.ref_) == 0 && 250 (isFunctionPointer!F || !hasUnsharedAliasing!F) && 251 allSatisfy!(noUnsharedAliasing, Parameters!F); 252 } 253 254 @safe unittest 255 { 256 alias F1 = void function() @safe; 257 alias F2 = void function(); 258 alias F3 = void function(uint, string) @trusted; 259 alias F4 = void function(uint, char[]); 260 261 static assert( isSafeTask!F1); 262 static assert(!isSafeTask!F2); 263 static assert( isSafeTask!F3); 264 static assert(!isSafeTask!F4); 265 266 alias F5 = uint[] function(uint, string) pure @trusted; 267 static assert( isSafeTask!F5); 268 } 269 270 // This function decides whether Tasks that meet all of the other requirements 271 // for being executed from @safe code can be executed on a TaskPool. 272 // When executing via TaskPool, it's theoretically possible 273 // to return a value that is also pointed to by a worker thread's thread local 274 // storage. When executing from executeInNewThread(), the thread that executed 275 // the Task is terminated by the time the return value is visible in the calling 276 // thread, so this is a non-issue. It's also a non-issue for pure functions 277 // since they can't read global state. 278 private template isSafeReturn(T) 279 { 280 static if (!hasUnsharedAliasing!(T.ReturnType)) 281 { 282 enum isSafeReturn = true; 283 } 284 else static if (T.isPure) 285 { 286 enum isSafeReturn = true; 287 } 288 else 289 { 290 enum isSafeReturn = false; 291 } 292 } 293 294 private template randAssignable(R) 295 { 296 enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; 297 } 298 299 private enum TaskStatus : ubyte 300 { 301 notStarted, 302 inProgress, 303 done 304 } 305 306 private template AliasReturn(alias fun, T...) 307 { 308 alias AliasReturn = typeof({ T args; return fun(args); }); 309 } 310 311 // Should be private, but std.algorithm.reduce is used in the zero-thread case 312 // and won't work w/ private. 313 template reduceAdjoin(functions...) 314 { 315 static if (functions.length == 1) 316 { 317 alias reduceAdjoin = binaryFun!(functions[0]); 318 } 319 else 320 { 321 T reduceAdjoin(T, U)(T lhs, U rhs) 322 { 323 alias funs = staticMap!(binaryFun, functions); 324 325 foreach (i, Unused; typeof(lhs.expand)) 326 { 327 lhs.expand[i] = funs[i](lhs.expand[i], rhs); 328 } 329 330 return lhs; 331 } 332 } 333 } 334 335 private template reduceFinish(functions...) 336 { 337 static if (functions.length == 1) 338 { 339 alias reduceFinish = binaryFun!(functions[0]); 340 } 341 else 342 { 343 T reduceFinish(T)(T lhs, T rhs) 344 { 345 alias funs = staticMap!(binaryFun, functions); 346 347 foreach (i, Unused; typeof(lhs.expand)) 348 { 349 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]); 350 } 351 352 return lhs; 353 } 354 } 355 } 356 357 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) 358 { 359 enum isRoundRobin = true; 360 } 361 362 private template isRoundRobin(T) 363 { 364 enum isRoundRobin = false; 365 } 366 367 @safe unittest 368 { 369 static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate()))); 370 static assert(!isRoundRobin!(uint)); 371 } 372 373 // This is the base "class" for all of the other tasks. Using C-style 374 // polymorphism to allow more direct control over memory allocation, etc. 375 private struct AbstractTask 376 { 377 AbstractTask* prev; 378 AbstractTask* next; 379 380 // Pointer to a function that executes this task. 381 void function(void*) runTask; 382 383 Throwable exception; 384 ubyte taskStatus = TaskStatus.notStarted; 385 386 bool done() @property 387 { 388 if (atomicReadUbyte(taskStatus) == TaskStatus.done) 389 { 390 if (exception) 391 { 392 throw exception; 393 } 394 395 return true; 396 } 397 398 return false; 399 } 400 401 void job() 402 { 403 runTask(&this); 404 } 405 } 406 407 /** 408 $(D Task) represents the fundamental unit of work. A $(D Task) may be 409 executed in parallel with any other $(D Task). Using this struct directly 410 allows future/promise _parallelism. In this paradigm, a function (or delegate 411 or other callable) is executed in a thread other than the one it was called 412 from. The calling thread does not block while the function is being executed. 413 A call to $(D workForce), $(D yieldForce), or $(D spinForce) is used to 414 ensure that the $(D Task) has finished executing and to obtain the return 415 value, if any. These functions and $(D done) also act as full memory barriers, 416 meaning that any memory writes made in the thread that executed the $(D Task) 417 are guaranteed to be visible in the calling thread after one of these functions 418 returns. 419 420 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can 421 be used to create an instance of this struct. See $(D task) for usage examples. 422 423 Function results are returned from $(D yieldForce), $(D spinForce) and 424 $(D workForce) by ref. If $(D fun) returns by ref, the reference will point 425 to the returned reference of $(D fun). Otherwise it will point to a 426 field in this struct. 427 428 Copying of this struct is disabled, since it would provide no useful semantics. 429 If you want to pass this struct around, you should do so by reference or 430 pointer. 431 432 Bugs: Changes to $(D ref) and $(D out) arguments are not propagated to the 433 call site, only to $(D args) in this struct. 434 */ 435 struct Task(alias fun, Args...) 436 { 437 AbstractTask base = {runTask : &impl}; 438 alias base this; 439 440 private @property AbstractTask* basePtr() 441 { 442 return &base; 443 } 444 445 private static void impl(void* myTask) 446 { 447 import std.algorithm.internal : addressOf; 448 449 Task* myCastedTask = cast(typeof(this)*) myTask; 450 static if (is(ReturnType == void)) 451 { 452 fun(myCastedTask._args); 453 } 454 else static if (is(typeof(addressOf(fun(myCastedTask._args))))) 455 { 456 myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); 457 } 458 else 459 { 460 myCastedTask.returnVal = fun(myCastedTask._args); 461 } 462 } 463 464 private TaskPool pool; 465 private bool isScoped; // True if created with scopedTask. 466 467 Args _args; 468 469 /** 470 The arguments the function was called with. Changes to $(D out) and 471 $(D ref) arguments will be visible here. 472 */ 473 static if (__traits(isSame, fun, run)) 474 { 475 alias args = _args[1..$]; 476 } 477 else 478 { 479 alias args = _args; 480 } 481 482 483 // The purpose of this code is to decide whether functions whose 484 // return values have unshared aliasing can be executed via 485 // TaskPool from @safe code. See isSafeReturn. 486 static if (__traits(isSame, fun, run)) 487 { 488 static if (isFunctionPointer!(_args[0])) 489 { 490 private enum bool isPure = 491 functionAttributes!(Args[0]) & FunctionAttribute.pure_; 492 } 493 else 494 { 495 // BUG: Should check this for delegates too, but std.traits 496 // apparently doesn't allow this. isPure is irrelevant 497 // for delegates, at least for now since shared delegates 498 // don't work. 499 private enum bool isPure = false; 500 } 501 502 } 503 else 504 { 505 // We already know that we can't execute aliases in @safe code, so 506 // just put a dummy value here. 507 private enum bool isPure = false; 508 } 509 510 511 /** 512 The return type of the function called by this $(D Task). This can be 513 $(D void). 514 */ 515 alias ReturnType = typeof(fun(_args)); 516 517 static if (!is(ReturnType == void)) 518 { 519 static if (is(typeof(&fun(_args)))) 520 { 521 // Ref return. 522 ReturnType* returnVal; 523 524 ref ReturnType fixRef(ReturnType* val) 525 { 526 return *val; 527 } 528 529 } 530 else 531 { 532 ReturnType returnVal; 533 534 ref ReturnType fixRef(ref ReturnType val) 535 { 536 return val; 537 } 538 } 539 } 540 541 private void enforcePool() 542 { 543 import std.exception : enforce; 544 enforce(this.pool !is null, "Job not submitted yet."); 545 } 546 547 static if (Args.length > 0) 548 { 549 private this(Args args) 550 { 551 _args = args; 552 } 553 } 554 555 // Work around DMD bug 6588, allow immutable elements. 556 static if (allSatisfy!(isAssignable, Args)) 557 { 558 typeof(this) opAssign(typeof(this) rhs) 559 { 560 foreach (i, Type; typeof(this.tupleof)) 561 { 562 this.tupleof[i] = rhs.tupleof[i]; 563 } 564 return this; 565 } 566 } 567 else 568 { 569 @disable typeof(this) opAssign(typeof(this) rhs) 570 { 571 assert(0); 572 } 573 } 574 575 /** 576 If the $(D Task) isn't started yet, execute it in the current thread. 577 If it's done, return its return value, if any. If it's in progress, 578 busy spin until it's done, then return the return value. If it threw 579 an exception, rethrow that exception. 580 581 This function should be used when you expect the result of the 582 $(D Task) to be available on a timescale shorter than that of an OS 583 context switch. 584 */ 585 @property ref ReturnType spinForce() @trusted 586 { 587 enforcePool(); 588 589 this.pool.tryDeleteExecute(basePtr); 590 591 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {} 592 593 if (exception) 594 { 595 throw exception; 596 } 597 598 static if (!is(ReturnType == void)) 599 { 600 return fixRef(this.returnVal); 601 } 602 } 603 604 /** 605 If the $(D Task) isn't started yet, execute it in the current thread. 606 If it's done, return its return value, if any. If it's in progress, 607 wait on a condition variable. If it threw an exception, rethrow that 608 exception. 609 610 This function should be used for expensive functions, as waiting on a 611 condition variable introduces latency, but avoids wasted CPU cycles. 612 */ 613 @property ref ReturnType yieldForce() @trusted 614 { 615 enforcePool(); 616 this.pool.tryDeleteExecute(basePtr); 617 618 if (done) 619 { 620 static if (is(ReturnType == void)) 621 { 622 return; 623 } 624 else 625 { 626 return fixRef(this.returnVal); 627 } 628 } 629 630 pool.waiterLock(); 631 scope(exit) pool.waiterUnlock(); 632 633 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) 634 { 635 pool.waitUntilCompletion(); 636 } 637 638 if (exception) 639 { 640 throw exception; // nocoverage 641 } 642 643 static if (!is(ReturnType == void)) 644 { 645 return fixRef(this.returnVal); 646 } 647 } 648 649 /** 650 If this $(D Task) was not started yet, execute it in the current 651 thread. If it is finished, return its result. If it is in progress, 652 execute any other $(D Task) from the $(D TaskPool) instance that 653 this $(D Task) was submitted to until this one 654 is finished. If it threw an exception, rethrow that exception. 655 If no other tasks are available or this $(D Task) was executed using 656 $(D executeInNewThread), wait on a condition variable. 657 */ 658 @property ref ReturnType workForce() @trusted 659 { 660 enforcePool(); 661 this.pool.tryDeleteExecute(basePtr); 662 663 while (true) 664 { 665 if (done) // done() implicitly checks for exceptions. 666 { 667 static if (is(ReturnType == void)) 668 { 669 return; 670 } 671 else 672 { 673 return fixRef(this.returnVal); 674 } 675 } 676 677 AbstractTask* job; 678 { 679 // Locking explicitly and calling popNoSync() because 680 // pop() waits on a condition variable if there are no Tasks 681 // in the queue. 682 683 pool.queueLock(); 684 scope(exit) pool.queueUnlock(); 685 job = pool.popNoSync(); 686 } 687 688 689 if (job !is null) 690 { 691 692 version (verboseUnittest) 693 { 694 stderr.writeln("Doing workForce work."); 695 } 696 697 pool.doJob(job); 698 699 if (done) 700 { 701 static if (is(ReturnType == void)) 702 { 703 return; 704 } 705 else 706 { 707 return fixRef(this.returnVal); 708 } 709 } 710 } 711 else 712 { 713 version (verboseUnittest) 714 { 715 stderr.writeln("Yield from workForce."); 716 } 717 718 return yieldForce; 719 } 720 } 721 } 722 723 /** 724 Returns $(D true) if the $(D Task) is finished executing. 725 726 Throws: Rethrows any exception thrown during the execution of the 727 $(D Task). 728 */ 729 @property bool done() @trusted 730 { 731 // Explicitly forwarded for documentation purposes. 732 return base.done; 733 } 734 735 /** 736 Create a new thread for executing this $(D Task), execute it in the 737 newly created thread, then terminate the thread. This can be used for 738 future/promise parallelism. An explicit priority may be given 739 to the $(D Task). If one is provided, its value is forwarded to 740 $(D core.thread.Thread.priority). See $(REF task, std,parallelism) for 741 usage example. 742 */ 743 void executeInNewThread() @trusted 744 { 745 pool = new TaskPool(basePtr); 746 } 747 748 /// Ditto 749 void executeInNewThread(int priority) @trusted 750 { 751 pool = new TaskPool(basePtr, priority); 752 } 753 754 @safe ~this() 755 { 756 if (isScoped && pool !is null && taskStatus != TaskStatus.done) 757 { 758 yieldForce; 759 } 760 } 761 762 // When this is uncommented, it somehow gets called on returning from 763 // scopedTask even though the struct shouldn't be getting copied. 764 //@disable this(this) {} 765 } 766 767 // Calls $(D fpOrDelegate) with $(D args). This is an 768 // adapter that makes $(D Task) work with delegates, function pointers and 769 // functors instead of just aliases. 770 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) 771 { 772 return fpOrDelegate(args); 773 } 774 775 /** 776 Creates a $(D Task) on the GC heap that calls an alias. This may be executed 777 via $(D Task.executeInNewThread) or by submitting to a 778 $(REF TaskPool, std,parallelism). A globally accessible instance of 779 $(D TaskPool) is provided by $(REF taskPool, std,parallelism). 780 781 Returns: A pointer to the $(D Task). 782 783 Example: 784 --- 785 // Read two files into memory at the same time. 786 import std.file; 787 788 void main() 789 { 790 // Create and execute a Task for reading 791 // foo.txt. 792 auto file1Task = task!read("foo.txt"); 793 file1Task.executeInNewThread(); 794 795 // Read bar.txt in parallel. 796 auto file2Data = read("bar.txt"); 797 798 // Get the results of reading foo.txt. 799 auto file1Data = file1Task.yieldForce; 800 } 801 --- 802 803 --- 804 // Sorts an array using a parallel quick sort algorithm. 805 // The first partition is done serially. Both recursion 806 // branches are then executed in parallel. 807 // 808 // Timings for sorting an array of 1,000,000 doubles on 809 // an Athlon 64 X2 dual core machine: 810 // 811 // This implementation: 176 milliseconds. 812 // Equivalent serial implementation: 280 milliseconds 813 void parallelSort(T)(T[] data) 814 { 815 // Sort small subarrays serially. 816 if (data.length < 100) 817 { 818 std.algorithm.sort(data); 819 return; 820 } 821 822 // Partition the array. 823 swap(data[$ / 2], data[$ - 1]); 824 auto pivot = data[$ - 1]; 825 bool lessThanPivot(T elem) { return elem < pivot; } 826 827 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); 828 swap(data[$ - greaterEqual.length - 1], data[$ - 1]); 829 830 auto less = data[0..$ - greaterEqual.length - 1]; 831 greaterEqual = data[$ - greaterEqual.length..$]; 832 833 // Execute both recursion branches in parallel. 834 auto recurseTask = task!parallelSort(greaterEqual); 835 taskPool.put(recurseTask); 836 parallelSort(less); 837 recurseTask.yieldForce; 838 } 839 --- 840 */ 841 auto task(alias fun, Args...)(Args args) 842 { 843 return new Task!(fun, Args)(args); 844 } 845 846 /** 847 Creates a $(D Task) on the GC heap that calls a function pointer, delegate, or 848 class/struct with overloaded opCall. 849 850 Example: 851 --- 852 // Read two files in at the same time again, 853 // but this time use a function pointer instead 854 // of an alias to represent std.file.read. 855 import std.file; 856 857 void main() 858 { 859 // Create and execute a Task for reading 860 // foo.txt. 861 auto file1Task = task(&read, "foo.txt"); 862 file1Task.executeInNewThread(); 863 864 // Read bar.txt in parallel. 865 auto file2Data = read("bar.txt"); 866 867 // Get the results of reading foo.txt. 868 auto file1Data = file1Task.yieldForce; 869 } 870 --- 871 872 Notes: This function takes a non-scope delegate, meaning it can be 873 used with closures. If you can't allocate a closure due to objects 874 on the stack that have scoped destruction, see $(D scopedTask), which 875 takes a scope delegate. 876 */ 877 auto task(F, Args...)(F delegateOrFp, Args args) 878 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 879 { 880 return new Task!(run, F, Args)(delegateOrFp, args); 881 } 882 883 /** 884 Version of $(D task) usable from $(D @safe) code. Usage mechanics are 885 identical to the non-@safe case, but safety introduces some restrictions: 886 887 1. $(D fun) must be @safe or @trusted. 888 889 2. $(D F) must not have any unshared aliasing as defined by 890 $(REF hasUnsharedAliasing, std,traits). This means it 891 may not be an unshared delegate or a non-shared class or struct 892 with overloaded $(D opCall). This also precludes accepting template 893 alias parameters. 894 895 3. $(D Args) must not have unshared aliasing. 896 897 4. $(D fun) must not return by reference. 898 899 5. The return type must not have unshared aliasing unless $(D fun) is 900 $(D pure) or the $(D Task) is executed via $(D executeInNewThread) instead 901 of using a $(D TaskPool). 902 903 */ 904 @trusted auto task(F, Args...)(F fun, Args args) 905 if (is(typeof(fun(args))) && isSafeTask!F) 906 { 907 return new Task!(run, F, Args)(fun, args); 908 } 909 910 /** 911 These functions allow the creation of $(D Task) objects on the stack rather 912 than the GC heap. The lifetime of a $(D Task) created by $(D scopedTask) 913 cannot exceed the lifetime of the scope it was created in. 914 915 $(D scopedTask) might be preferred over $(D task): 916 917 1. When a $(D Task) that calls a delegate is being created and a closure 918 cannot be allocated due to objects on the stack that have scoped 919 destruction. The delegate overload of $(D scopedTask) takes a $(D scope) 920 delegate. 921 922 2. As a micro-optimization, to avoid the heap allocation associated with 923 $(D task) or with the creation of a closure. 924 925 Usage is otherwise identical to $(D task). 926 927 Notes: $(D Task) objects created using $(D scopedTask) will automatically 928 call $(D Task.yieldForce) in their destructor if necessary to ensure 929 the $(D Task) is complete before the stack frame they reside on is destroyed. 930 */ 931 auto scopedTask(alias fun, Args...)(Args args) 932 { 933 auto ret = Task!(fun, Args)(args); 934 ret.isScoped = true; 935 return ret; 936 } 937 938 /// Ditto 939 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) 940 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 941 { 942 auto ret = Task!(run, F, Args)(delegateOrFp, args); 943 ret.isScoped = true; 944 return ret; 945 } 946 947 /// Ditto 948 @trusted auto scopedTask(F, Args...)(F fun, Args args) 949 if (is(typeof(fun(args))) && isSafeTask!F) 950 { 951 auto ret = Task!(run, F, Args)(fun, args); 952 ret.isScoped = true; 953 return ret; 954 } 955 956 /** 957 The total number of CPU cores available on the current machine, as reported by 958 the operating system. 959 */ 960 immutable uint totalCPUs; 961 962 /* 963 This class serves two purposes: 964 965 1. It distinguishes std.parallelism threads from other threads so that 966 the std.parallelism daemon threads can be terminated. 967 968 2. It adds a reference to the pool that the thread is a member of, 969 which is also necessary to allow the daemon threads to be properly 970 terminated. 971 */ 972 private final class ParallelismThread : Thread 973 { 974 this(void delegate() dg) 975 { 976 super(dg); 977 } 978 979 TaskPool pool; 980 } 981 982 // Kill daemon threads. 983 shared static ~this() 984 { 985 foreach (ref thread; Thread) 986 { 987 auto pthread = cast(ParallelismThread) thread; 988 if (pthread is null) continue; 989 auto pool = pthread.pool; 990 if (!pool.isDaemon) continue; 991 pool.stop(); 992 pthread.join(); 993 } 994 } 995 996 /** 997 This class encapsulates a task queue and a set of worker threads. Its purpose 998 is to efficiently map a large number of $(D Task)s onto a smaller number of 999 threads. A task queue is a FIFO queue of $(D Task) objects that have been 1000 submitted to the $(D TaskPool) and are awaiting execution. A worker thread is a 1001 thread that executes the $(D Task) at the front of the queue when one is 1002 available and sleeps when the queue is empty. 1003 1004 This class should usually be used via the global instantiation 1005 available via the $(REF taskPool, std,parallelism) property. 1006 Occasionally it is useful to explicitly instantiate a $(D TaskPool): 1007 1008 1. When you want $(D TaskPool) instances with multiple priorities, for example 1009 a low priority pool and a high priority pool. 1010 1011 2. When the threads in the global task pool are waiting on a synchronization 1012 primitive (for example a mutex), and you want to parallelize the code that 1013 needs to run before these threads can be resumed. 1014 */ 1015 final class TaskPool 1016 { 1017 private: 1018 1019 // A pool can either be a regular pool or a single-task pool. A 1020 // single-task pool is a dummy pool that's fired up for 1021 // Task.executeInNewThread(). 1022 bool isSingleTask; 1023 1024 ParallelismThread[] pool; 1025 Thread singleTaskThread; 1026 1027 AbstractTask* head; 1028 AbstractTask* tail; 1029 PoolState status = PoolState.running; 1030 Condition workerCondition; 1031 Condition waiterCondition; 1032 Mutex queueMutex; 1033 Mutex waiterMutex; // For waiterCondition 1034 1035 // The instanceStartIndex of the next instance that will be created. 1036 __gshared static size_t nextInstanceIndex = 1; 1037 1038 // The index of the current thread. 1039 static size_t threadIndex; 1040 1041 // The index of the first thread in this instance. 1042 immutable size_t instanceStartIndex; 1043 1044 // The index that the next thread to be initialized in this pool will have. 1045 size_t nextThreadIndex; 1046 1047 enum PoolState : ubyte 1048 { 1049 running, 1050 finishing, 1051 stopNow 1052 } 1053 1054 void doJob(AbstractTask* job) 1055 { 1056 assert(job.taskStatus == TaskStatus.inProgress); 1057 assert(job.next is null); 1058 assert(job.prev is null); 1059 1060 scope(exit) 1061 { 1062 if (!isSingleTask) 1063 { 1064 waiterLock(); 1065 scope(exit) waiterUnlock(); 1066 notifyWaiters(); 1067 } 1068 } 1069 1070 try 1071 { 1072 job.job(); 1073 } 1074 catch (Throwable e) 1075 { 1076 job.exception = e; 1077 } 1078 1079 atomicSetUbyte(job.taskStatus, TaskStatus.done); 1080 } 1081 1082 // This function is used for dummy pools created by Task.executeInNewThread(). 1083 void doSingleTask() 1084 { 1085 // No synchronization. Pool is guaranteed to only have one thread, 1086 // and the queue is submitted to before this thread is created. 1087 assert(head); 1088 auto t = head; 1089 t.next = t.prev = head = null; 1090 doJob(t); 1091 } 1092 1093 // This function performs initialization for each thread that affects 1094 // thread local storage and therefore must be done from within the 1095 // worker thread. It then calls executeWorkLoop(). 1096 void startWorkLoop() 1097 { 1098 // Initialize thread index. 1099 { 1100 queueLock(); 1101 scope(exit) queueUnlock(); 1102 threadIndex = nextThreadIndex; 1103 nextThreadIndex++; 1104 } 1105 1106 executeWorkLoop(); 1107 } 1108 1109 // This is the main work loop that worker threads spend their time in 1110 // until they terminate. It's also entered by non-worker threads when 1111 // finish() is called with the blocking variable set to true. 1112 void executeWorkLoop() 1113 { 1114 while (atomicReadUbyte(status) != PoolState.stopNow) 1115 { 1116 AbstractTask* task = pop(); 1117 if (task is null) 1118 { 1119 if (atomicReadUbyte(status) == PoolState.finishing) 1120 { 1121 atomicSetUbyte(status, PoolState.stopNow); 1122 return; 1123 } 1124 } 1125 else 1126 { 1127 doJob(task); 1128 } 1129 } 1130 } 1131 1132 // Pop a task off the queue. 1133 AbstractTask* pop() 1134 { 1135 queueLock(); 1136 scope(exit) queueUnlock(); 1137 auto ret = popNoSync(); 1138 while (ret is null && status == PoolState.running) 1139 { 1140 wait(); 1141 ret = popNoSync(); 1142 } 1143 return ret; 1144 } 1145 1146 AbstractTask* popNoSync() 1147 out(returned) 1148 { 1149 /* If task.prev and task.next aren't null, then another thread 1150 * can try to delete this task from the pool after it's 1151 * alreadly been deleted/popped. 1152 */ 1153 if (returned !is null) 1154 { 1155 assert(returned.next is null); 1156 assert(returned.prev is null); 1157 } 1158 } 1159 body 1160 { 1161 if (isSingleTask) return null; 1162 1163 AbstractTask* returned = head; 1164 if (head !is null) 1165 { 1166 head = head.next; 1167 returned.prev = null; 1168 returned.next = null; 1169 returned.taskStatus = TaskStatus.inProgress; 1170 } 1171 if (head !is null) 1172 { 1173 head.prev = null; 1174 } 1175 1176 return returned; 1177 } 1178 1179 // Push a task onto the queue. 1180 void abstractPut(AbstractTask* task) 1181 { 1182 queueLock(); 1183 scope(exit) queueUnlock(); 1184 abstractPutNoSync(task); 1185 } 1186 1187 void abstractPutNoSync(AbstractTask* task) 1188 in 1189 { 1190 assert(task); 1191 } 1192 out 1193 { 1194 import std.conv : text; 1195 1196 assert(tail.prev !is tail); 1197 assert(tail.next is null, text(tail.prev, '\t', tail.next)); 1198 if (tail.prev !is null) 1199 { 1200 assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next)); 1201 } 1202 } 1203 body 1204 { 1205 // Not using enforce() to save on function call overhead since this 1206 // is a performance critical function. 1207 if (status != PoolState.running) 1208 { 1209 throw new Error( 1210 "Cannot submit a new task to a pool after calling " ~ 1211 "finish() or stop()." 1212 ); 1213 } 1214 1215 task.next = null; 1216 if (head is null) //Queue is empty. 1217 { 1218 head = task; 1219 tail = task; 1220 tail.prev = null; 1221 } 1222 else 1223 { 1224 assert(tail); 1225 task.prev = tail; 1226 tail.next = task; 1227 tail = task; 1228 } 1229 notify(); 1230 } 1231 1232 void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) 1233 { 1234 if (status != PoolState.running) 1235 { 1236 throw new Error( 1237 "Cannot submit a new task to a pool after calling " ~ 1238 "finish() or stop()." 1239 ); 1240 } 1241 1242 if (head is null) 1243 { 1244 head = h; 1245 tail = t; 1246 } 1247 else 1248 { 1249 h.prev = tail; 1250 tail.next = h; 1251 tail = t; 1252 } 1253 1254 notifyAll(); 1255 } 1256 1257 void tryDeleteExecute(AbstractTask* toExecute) 1258 { 1259 if (isSingleTask) return; 1260 1261 if ( !deleteItem(toExecute) ) 1262 { 1263 return; 1264 } 1265 1266 try 1267 { 1268 toExecute.job(); 1269 } 1270 catch (Exception e) 1271 { 1272 toExecute.exception = e; 1273 } 1274 1275 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done); 1276 } 1277 1278 bool deleteItem(AbstractTask* item) 1279 { 1280 queueLock(); 1281 scope(exit) queueUnlock(); 1282 return deleteItemNoSync(item); 1283 } 1284 1285 bool deleteItemNoSync(AbstractTask* item) 1286 { 1287 if (item.taskStatus != TaskStatus.notStarted) 1288 { 1289 return false; 1290 } 1291 item.taskStatus = TaskStatus.inProgress; 1292 1293 if (item is head) 1294 { 1295 // Make sure head gets set properly. 1296 popNoSync(); 1297 return true; 1298 } 1299 if (item is tail) 1300 { 1301 tail = tail.prev; 1302 if (tail !is null) 1303 { 1304 tail.next = null; 1305 } 1306 item.next = null; 1307 item.prev = null; 1308 return true; 1309 } 1310 if (item.next !is null) 1311 { 1312 assert(item.next.prev is item); // Check queue consistency. 1313 item.next.prev = item.prev; 1314 } 1315 if (item.prev !is null) 1316 { 1317 assert(item.prev.next is item); // Check queue consistency. 1318 item.prev.next = item.next; 1319 } 1320 item.next = null; 1321 item.prev = null; 1322 return true; 1323 } 1324 1325 void queueLock() 1326 { 1327 assert(queueMutex); 1328 if (!isSingleTask) queueMutex.lock(); 1329 } 1330 1331 void queueUnlock() 1332 { 1333 assert(queueMutex); 1334 if (!isSingleTask) queueMutex.unlock(); 1335 } 1336 1337 void waiterLock() 1338 { 1339 if (!isSingleTask) waiterMutex.lock(); 1340 } 1341 1342 void waiterUnlock() 1343 { 1344 if (!isSingleTask) waiterMutex.unlock(); 1345 } 1346 1347 void wait() 1348 { 1349 if (!isSingleTask) workerCondition.wait(); 1350 } 1351 1352 void notify() 1353 { 1354 if (!isSingleTask) workerCondition.notify(); 1355 } 1356 1357 void notifyAll() 1358 { 1359 if (!isSingleTask) workerCondition.notifyAll(); 1360 } 1361 1362 void waitUntilCompletion() 1363 { 1364 if (isSingleTask) 1365 { 1366 singleTaskThread.join(); 1367 } 1368 else 1369 { 1370 waiterCondition.wait(); 1371 } 1372 } 1373 1374 void notifyWaiters() 1375 { 1376 if (!isSingleTask) waiterCondition.notifyAll(); 1377 } 1378 1379 // Private constructor for creating dummy pools that only have one thread, 1380 // only execute one Task, and then terminate. This is used for 1381 // Task.executeInNewThread(). 1382 this(AbstractTask* task, int priority = int.max) 1383 { 1384 assert(task); 1385 1386 // Dummy value, not used. 1387 instanceStartIndex = 0; 1388 1389 this.isSingleTask = true; 1390 task.taskStatus = TaskStatus.inProgress; 1391 this.head = task; 1392 singleTaskThread = new Thread(&doSingleTask); 1393 singleTaskThread.start(); 1394 1395 // Disabled until writing code to support 1396 // running thread with specified priority 1397 // See https://d.puremagic.com/issues/show_bug.cgi?id=8960 1398 1399 /*if (priority != int.max) 1400 { 1401 singleTaskThread.priority = priority; 1402 }*/ 1403 } 1404 1405 public: 1406 // This is used in parallel_algorithm but is too unstable to document 1407 // as public API. 1408 size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow 1409 { 1410 import std.algorithm.comparison : max; 1411 1412 if (this.size == 0) 1413 { 1414 return rangeLen; 1415 } 1416 1417 immutable size_t eightSize = 4 * (this.size + 1); 1418 auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1); 1419 return max(ret, 1); 1420 } 1421 1422 /** 1423 Default constructor that initializes a $(D TaskPool) with 1424 $(D totalCPUs) - 1 worker threads. The minus 1 is included because the 1425 main thread will also be available to do work. 1426 1427 Note: On single-core machines, the primitives provided by $(D TaskPool) 1428 operate transparently in single-threaded mode. 1429 */ 1430 this() @trusted 1431 { 1432 this(totalCPUs - 1); 1433 } 1434 1435 /** 1436 Allows for custom number of worker threads. 1437 */ 1438 this(size_t nWorkers) @trusted 1439 { 1440 synchronized(typeid(TaskPool)) 1441 { 1442 instanceStartIndex = nextInstanceIndex; 1443 1444 // The first worker thread to be initialized will have this index, 1445 // and will increment it. The second worker to be initialized will 1446 // have this index plus 1. 1447 nextThreadIndex = instanceStartIndex; 1448 nextInstanceIndex += nWorkers; 1449 } 1450 1451 queueMutex = new Mutex(this); 1452 waiterMutex = new Mutex(); 1453 workerCondition = new Condition(queueMutex); 1454 waiterCondition = new Condition(waiterMutex); 1455 1456 pool = new ParallelismThread[nWorkers]; 1457 foreach (ref poolThread; pool) 1458 { 1459 poolThread = new ParallelismThread(&startWorkLoop); 1460 poolThread.pool = this; 1461 poolThread.start(); 1462 } 1463 } 1464 1465 /** 1466 Implements a parallel foreach loop over a range. This works by implicitly 1467 creating and submitting one $(D Task) to the $(D TaskPool) for each worker 1468 thread. A work unit is a set of consecutive elements of $(D range) to 1469 be processed by a worker thread between communication with any other 1470 thread. The number of elements processed per work unit is controlled by the 1471 $(D workUnitSize) parameter. Smaller work units provide better load 1472 balancing, but larger work units avoid the overhead of communicating 1473 with other threads frequently to fetch the next work unit. Large work 1474 units also avoid false sharing in cases where the range is being modified. 1475 The less time a single iteration of the loop takes, the larger 1476 $(D workUnitSize) should be. For very expensive loop bodies, 1477 $(D workUnitSize) should be 1. An overload that chooses a default work 1478 unit size is also available. 1479 1480 Example: 1481 --- 1482 // Find the logarithm of every number from 1 to 1483 // 10_000_000 in parallel. 1484 auto logs = new double[10_000_000]; 1485 1486 // Parallel foreach works with or without an index 1487 // variable. It can be iterate by ref if range.front 1488 // returns by ref. 1489 1490 // Iterate over logs using work units of size 100. 1491 foreach (i, ref elem; taskPool.parallel(logs, 100)) 1492 { 1493 elem = log(i + 1.0); 1494 } 1495 1496 // Same thing, but use the default work unit size. 1497 // 1498 // Timings on an Athlon 64 X2 dual core machine: 1499 // 1500 // Parallel foreach: 388 milliseconds 1501 // Regular foreach: 619 milliseconds 1502 foreach (i, ref elem; taskPool.parallel(logs)) 1503 { 1504 elem = log(i + 1.0); 1505 } 1506 --- 1507 1508 Notes: 1509 1510 The memory usage of this implementation is guaranteed to be constant 1511 in $(D range.length). 1512 1513 Breaking from a parallel foreach loop via a break, labeled break, 1514 labeled continue, return or goto statement throws a 1515 $(D ParallelForeachError). 1516 1517 In the case of non-random access ranges, parallel foreach buffers lazily 1518 to an array of size $(D workUnitSize) before executing the parallel portion 1519 of the loop. The exception is that, if a parallel foreach is executed 1520 over a range returned by $(D asyncBuf) or $(D map), the copying is elided 1521 and the buffers are simply swapped. In this case $(D workUnitSize) is 1522 ignored and the work unit size is set to the buffer size of $(D range). 1523 1524 A memory barrier is guaranteed to be executed on exit from the loop, 1525 so that results produced by all threads are visible in the calling thread. 1526 1527 $(B Exception Handling): 1528 1529 When at least one exception is thrown from inside a parallel foreach loop, 1530 the submission of additional $(D Task) objects is terminated as soon as 1531 possible, in a non-deterministic manner. All executing or 1532 enqueued work units are allowed to complete. Then, all exceptions that 1533 were thrown by any work unit are chained using $(D Throwable.next) and 1534 rethrown. The order of the exception chaining is non-deterministic. 1535 */ 1536 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 1537 { 1538 import std.exception : enforce; 1539 enforce(workUnitSize > 0, "workUnitSize must be > 0."); 1540 alias RetType = ParallelForeach!R; 1541 return RetType(this, range, workUnitSize); 1542 } 1543 1544 1545 /// Ditto 1546 ParallelForeach!R parallel(R)(R range) 1547 { 1548 static if (hasLength!R) 1549 { 1550 // Default work unit size is such that we would use 4x as many 1551 // slots as are in this thread pool. 1552 size_t workUnitSize = defaultWorkUnitSize(range.length); 1553 return parallel(range, workUnitSize); 1554 } 1555 else 1556 { 1557 // Just use a really, really dumb guess if the user is too lazy to 1558 // specify. 1559 return parallel(range, 512); 1560 } 1561 } 1562 1563 /// 1564 template amap(functions...) 1565 { 1566 /** 1567 Eager parallel map. The eagerness of this function means it has less 1568 overhead than the lazily evaluated $(D TaskPool.map) and should be 1569 preferred where the memory requirements of eagerness are acceptable. 1570 $(D functions) are the functions to be evaluated, passed as template 1571 alias parameters in a style similar to 1572 $(REF map, std,algorithm,iteration). 1573 The first argument must be a random access range. For performance 1574 reasons, amap will assume the range elements have not yet been 1575 initialized. Elements will be overwritten without calling a destructor 1576 nor doing an assignment. As such, the range must not contain meaningful 1577 data$(DDOC_COMMENT not a section): either un-initialized objects, or 1578 objects in their $(D .init) state. 1579 1580 --- 1581 auto numbers = iota(100_000_000.0); 1582 1583 // Find the square roots of numbers. 1584 // 1585 // Timings on an Athlon 64 X2 dual core machine: 1586 // 1587 // Parallel eager map: 0.802 s 1588 // Equivalent serial implementation: 1.768 s 1589 auto squareRoots = taskPool.amap!sqrt(numbers); 1590 --- 1591 1592 Immediately after the range argument, an optional work unit size argument 1593 may be provided. Work units as used by $(D amap) are identical to those 1594 defined for parallel foreach. If no work unit size is provided, the 1595 default work unit size is used. 1596 1597 --- 1598 // Same thing, but make work unit size 100. 1599 auto squareRoots = taskPool.amap!sqrt(numbers, 100); 1600 --- 1601 1602 An output range for returning the results may be provided as the last 1603 argument. If one is not provided, an array of the proper type will be 1604 allocated on the garbage collected heap. If one is provided, it must be a 1605 random access range with assignable elements, must have reference 1606 semantics with respect to assignment to its elements, and must have the 1607 same length as the input range. Writing to adjacent elements from 1608 different threads must be safe. 1609 1610 --- 1611 // Same thing, but explicitly allocate an array 1612 // to return the results in. The element type 1613 // of the array may be either the exact type 1614 // returned by functions or an implicit conversion 1615 // target. 1616 auto squareRoots = new float[numbers.length]; 1617 taskPool.amap!sqrt(numbers, squareRoots); 1618 1619 // Multiple functions, explicit output range, and 1620 // explicit work unit size. 1621 auto results = new Tuple!(float, real)[numbers.length]; 1622 taskPool.amap!(sqrt, log)(numbers, 100, results); 1623 --- 1624 1625 Note: 1626 1627 A memory barrier is guaranteed to be executed after all results are written 1628 but before returning so that results produced by all threads are visible 1629 in the calling thread. 1630 1631 Tips: 1632 1633 To perform the mapping operation in place, provide the same range for the 1634 input and output range. 1635 1636 To parallelize the copying of a range with expensive to evaluate elements 1637 to an array, pass an identity function (a function that just returns 1638 whatever argument is provided to it) to $(D amap). 1639 1640 $(B Exception Handling): 1641 1642 When at least one exception is thrown from inside the map functions, 1643 the submission of additional $(D Task) objects is terminated as soon as 1644 possible, in a non-deterministic manner. All currently executing or 1645 enqueued work units are allowed to complete. Then, all exceptions that 1646 were thrown from any work unit are chained using $(D Throwable.next) and 1647 rethrown. The order of the exception chaining is non-deterministic. 1648 */ 1649 auto amap(Args...)(Args args) 1650 if (isRandomAccessRange!(Args[0])) 1651 { 1652 import std.conv : emplaceRef; 1653 1654 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1655 1656 alias range = args[0]; 1657 immutable len = range.length; 1658 1659 static if ( 1660 Args.length > 1 && 1661 randAssignable!(Args[$ - 1]) && 1662 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) 1663 ) 1664 { 1665 import std.conv : text; 1666 import std.exception : enforce; 1667 1668 alias buf = args[$ - 1]; 1669 alias args2 = args[0..$ - 1]; 1670 alias Args2 = Args[0..$ - 1]; 1671 enforce(buf.length == len, 1672 text("Can't use a user supplied buffer that's the wrong ", 1673 "size. (Expected :", len, " Got: ", buf.length)); 1674 } 1675 else static if (randAssignable!(Args[$ - 1]) && Args.length > 1) 1676 { 1677 static assert(0, "Wrong buffer type."); 1678 } 1679 else 1680 { 1681 import std.array : uninitializedArray; 1682 1683 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); 1684 alias args2 = args; 1685 alias Args2 = Args; 1686 } 1687 1688 if (!len) return buf; 1689 1690 static if (isIntegral!(Args2[$ - 1])) 1691 { 1692 static assert(args2.length == 2); 1693 auto workUnitSize = cast(size_t) args2[1]; 1694 } 1695 else 1696 { 1697 static assert(args2.length == 1, Args); 1698 auto workUnitSize = defaultWorkUnitSize(range.length); 1699 } 1700 1701 alias R = typeof(range); 1702 1703 if (workUnitSize > len) 1704 { 1705 workUnitSize = len; 1706 } 1707 1708 // Handle as a special case: 1709 if (size == 0) 1710 { 1711 size_t index = 0; 1712 foreach (elem; range) 1713 { 1714 emplaceRef(buf[index++], fun(elem)); 1715 } 1716 return buf; 1717 } 1718 1719 // Effectively -1: chunkIndex + 1 == 0: 1720 shared size_t workUnitIndex = size_t.max; 1721 shared bool shouldContinue = true; 1722 1723 void doIt() 1724 { 1725 import std.algorithm.comparison : min; 1726 1727 scope(failure) 1728 { 1729 // If an exception is thrown, all threads should bail. 1730 atomicStore(shouldContinue, false); 1731 } 1732 1733 while (atomicLoad(shouldContinue)) 1734 { 1735 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 1736 immutable start = workUnitSize * myUnitIndex; 1737 if (start >= len) 1738 { 1739 atomicStore(shouldContinue, false); 1740 break; 1741 } 1742 1743 immutable end = min(len, start + workUnitSize); 1744 1745 static if (hasSlicing!R) 1746 { 1747 auto subrange = range[start .. end]; 1748 foreach (i; start .. end) 1749 { 1750 emplaceRef(buf[i], fun(subrange.front)); 1751 subrange.popFront(); 1752 } 1753 } 1754 else 1755 { 1756 foreach (i; start .. end) 1757 { 1758 emplaceRef(buf[i], fun(range[i])); 1759 } 1760 } 1761 } 1762 } 1763 1764 submitAndExecute(this, &doIt); 1765 return buf; 1766 } 1767 } 1768 1769 /// 1770 template map(functions...) 1771 { 1772 /** 1773 A semi-lazy parallel map that can be used for pipelining. The map 1774 functions are evaluated for the first $(D bufSize) elements and stored in a 1775 buffer and made available to $(D popFront). Meanwhile, in the 1776 background a second buffer of the same size is filled. When the first 1777 buffer is exhausted, it is swapped with the second buffer and filled while 1778 the values from what was originally the second buffer are read. This 1779 implementation allows for elements to be written to the buffer without 1780 the need for atomic operations or synchronization for each write, and 1781 enables the mapping function to be evaluated efficiently in parallel. 1782 1783 $(D map) has more overhead than the simpler procedure used by $(D amap) 1784 but avoids the need to keep all results in memory simultaneously and works 1785 with non-random access ranges. 1786 1787 Params: 1788 1789 source = The input range to be mapped. If $(D source) is not random 1790 access it will be lazily buffered to an array of size $(D bufSize) before 1791 the map function is evaluated. (For an exception to this rule, see Notes.) 1792 1793 bufSize = The size of the buffer to store the evaluated elements. 1794 1795 workUnitSize = The number of elements to evaluate in a single 1796 $(D Task). Must be less than or equal to $(D bufSize), and 1797 should be a fraction of $(D bufSize) such that all worker threads can be 1798 used. If the default of size_t.max is used, workUnitSize will be set to 1799 the pool-wide default. 1800 1801 Returns: An input range representing the results of the map. This range 1802 has a length iff $(D source) has a length. 1803 1804 Notes: 1805 1806 If a range returned by $(D map) or $(D asyncBuf) is used as an input to 1807 $(D map), then as an optimization the copying from the output buffer 1808 of the first range to the input buffer of the second range is elided, even 1809 though the ranges returned by $(D map) and $(D asyncBuf) are non-random 1810 access ranges. This means that the $(D bufSize) parameter passed to the 1811 current call to $(D map) will be ignored and the size of the buffer 1812 will be the buffer size of $(D source). 1813 1814 Example: 1815 --- 1816 // Pipeline reading a file, converting each line 1817 // to a number, taking the logarithms of the numbers, 1818 // and performing the additions necessary to find 1819 // the sum of the logarithms. 1820 1821 auto lineRange = File("numberList.txt").byLine(); 1822 auto dupedLines = std.algorithm.map!"a.idup"(lineRange); 1823 auto nums = taskPool.map!(to!double)(dupedLines); 1824 auto logs = taskPool.map!log10(nums); 1825 1826 double sum = 0; 1827 foreach (elem; logs) 1828 { 1829 sum += elem; 1830 } 1831 --- 1832 1833 $(B Exception Handling): 1834 1835 Any exceptions thrown while iterating over $(D source) 1836 or computing the map function are re-thrown on a call to $(D popFront) or, 1837 if thrown during construction, are simply allowed to propagate to the 1838 caller. In the case of exceptions thrown while computing the map function, 1839 the exceptions are chained as in $(D TaskPool.amap). 1840 */ 1841 auto 1842 map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) 1843 if (isInputRange!S) 1844 { 1845 import std.exception : enforce; 1846 1847 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, 1848 "Work unit size must be smaller than buffer size."); 1849 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1850 1851 static final class Map 1852 { 1853 // This is a class because the task needs to be located on the 1854 // heap and in the non-random access case source needs to be on 1855 // the heap, too. 1856 1857 private: 1858 enum bufferTrick = is(typeof(source.buf1)) && 1859 is(typeof(source.bufPos)) && 1860 is(typeof(source.doBufSwap())); 1861 1862 alias E = MapType!(S, functions); 1863 E[] buf1, buf2; 1864 S source; 1865 TaskPool pool; 1866 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 1867 size_t workUnitSize; 1868 size_t bufPos; 1869 bool lastTaskWaited; 1870 1871 static if (isRandomAccessRange!S) 1872 { 1873 alias FromType = S; 1874 1875 void popSource() 1876 { 1877 import std.algorithm.comparison : min; 1878 1879 static if (__traits(compiles, source[0 .. source.length])) 1880 { 1881 source = source[min(buf1.length, source.length)..source.length]; 1882 } 1883 else static if (__traits(compiles, source[0..$])) 1884 { 1885 source = source[min(buf1.length, source.length)..$]; 1886 } 1887 else 1888 { 1889 static assert(0, "S must have slicing for Map." 1890 ~ " " ~ S.stringof ~ " doesn't."); 1891 } 1892 } 1893 } 1894 else static if (bufferTrick) 1895 { 1896 // Make sure we don't have the buffer recycling overload of 1897 // asyncBuf. 1898 static if ( 1899 is(typeof(source.source)) && 1900 isRoundRobin!(typeof(source.source)) 1901 ) 1902 { 1903 static assert(0, "Cannot execute a parallel map on " ~ 1904 "the buffer recycling overload of asyncBuf." 1905 ); 1906 } 1907 1908 alias FromType = typeof(source.buf1); 1909 FromType from; 1910 1911 // Just swap our input buffer with source's output buffer. 1912 // No need to copy element by element. 1913 FromType dumpToFrom() 1914 { 1915 import std.algorithm.mutation : swap; 1916 1917 assert(source.buf1.length <= from.length); 1918 from.length = source.buf1.length; 1919 swap(source.buf1, from); 1920 1921 // Just in case this source has been popped before 1922 // being sent to map: 1923 from = from[source.bufPos..$]; 1924 1925 static if (is(typeof(source._length))) 1926 { 1927 source._length -= (from.length - source.bufPos); 1928 } 1929 1930 source.doBufSwap(); 1931 1932 return from; 1933 } 1934 } 1935 else 1936 { 1937 alias FromType = ElementType!S[]; 1938 1939 // The temporary array that data is copied to before being 1940 // mapped. 1941 FromType from; 1942 1943 FromType dumpToFrom() 1944 { 1945 assert(from !is null); 1946 1947 size_t i; 1948 for (; !source.empty && i < from.length; source.popFront()) 1949 { 1950 from[i++] = source.front; 1951 } 1952 1953 from = from[0 .. i]; 1954 return from; 1955 } 1956 } 1957 1958 static if (hasLength!S) 1959 { 1960 size_t _length; 1961 1962 public @property size_t length() const @safe pure nothrow 1963 { 1964 return _length; 1965 } 1966 } 1967 1968 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool) 1969 { 1970 static if (bufferTrick) 1971 { 1972 bufSize = source.buf1.length; 1973 } 1974 1975 buf1.length = bufSize; 1976 buf2.length = bufSize; 1977 1978 static if (!isRandomAccessRange!S) 1979 { 1980 from.length = bufSize; 1981 } 1982 1983 this.workUnitSize = (workUnitSize == size_t.max) ? 1984 pool.defaultWorkUnitSize(bufSize) : workUnitSize; 1985 this.source = source; 1986 this.pool = pool; 1987 1988 static if (hasLength!S) 1989 { 1990 _length = source.length; 1991 } 1992 1993 buf1 = fillBuf(buf1); 1994 submitBuf2(); 1995 } 1996 1997 // The from parameter is a dummy and ignored in the random access 1998 // case. 1999 E[] fillBuf(E[] buf) 2000 { 2001 import std.algorithm.comparison : min; 2002 2003 static if (isRandomAccessRange!S) 2004 { 2005 import std.range : take; 2006 auto toMap = take(source, buf.length); 2007 scope(success) popSource(); 2008 } 2009 else 2010 { 2011 auto toMap = dumpToFrom(); 2012 } 2013 2014 buf = buf[0 .. min(buf.length, toMap.length)]; 2015 2016 // Handle as a special case: 2017 if (pool.size == 0) 2018 { 2019 size_t index = 0; 2020 foreach (elem; toMap) 2021 { 2022 buf[index++] = fun(elem); 2023 } 2024 return buf; 2025 } 2026 2027 pool.amap!functions(toMap, workUnitSize, buf); 2028 2029 return buf; 2030 } 2031 2032 void submitBuf2() 2033 in 2034 { 2035 assert(nextBufTask.prev is null); 2036 assert(nextBufTask.next is null); 2037 } body 2038 { 2039 // Hack to reuse the task object. 2040 2041 nextBufTask = typeof(nextBufTask).init; 2042 nextBufTask._args[0] = &fillBuf; 2043 nextBufTask._args[1] = buf2; 2044 pool.put(nextBufTask); 2045 } 2046 2047 void doBufSwap() 2048 { 2049 if (lastTaskWaited) 2050 { 2051 // Then the source is empty. Signal it here. 2052 buf1 = null; 2053 buf2 = null; 2054 2055 static if (!isRandomAccessRange!S) 2056 { 2057 from = null; 2058 } 2059 2060 return; 2061 } 2062 2063 buf2 = buf1; 2064 buf1 = nextBufTask.yieldForce; 2065 bufPos = 0; 2066 2067 if (source.empty) 2068 { 2069 lastTaskWaited = true; 2070 } 2071 else 2072 { 2073 submitBuf2(); 2074 } 2075 } 2076 2077 public: 2078 @property auto front() 2079 { 2080 return buf1[bufPos]; 2081 } 2082 2083 void popFront() 2084 { 2085 static if (hasLength!S) 2086 { 2087 _length--; 2088 } 2089 2090 bufPos++; 2091 if (bufPos >= buf1.length) 2092 { 2093 doBufSwap(); 2094 } 2095 } 2096 2097 static if (isInfinite!S) 2098 { 2099 enum bool empty = false; 2100 } 2101 else 2102 { 2103 2104 bool empty() const @property 2105 { 2106 // popFront() sets this when source is empty 2107 return buf1.length == 0; 2108 } 2109 } 2110 } 2111 return new Map(source, bufSize, workUnitSize, this); 2112 } 2113 } 2114 2115 /** 2116 Given a $(D source) range that is expensive to iterate over, returns an 2117 input range that asynchronously buffers the contents of 2118 $(D source) into a buffer of $(D bufSize) elements in a worker thread, 2119 while making previously buffered elements from a second buffer, also of size 2120 $(D bufSize), available via the range interface of the returned 2121 object. The returned range has a length iff $(D hasLength!S). 2122 $(D asyncBuf) is useful, for example, when performing expensive operations 2123 on the elements of ranges that represent data on a disk or network. 2124 2125 Example: 2126 --- 2127 import std.conv, std.stdio; 2128 2129 void main() 2130 { 2131 // Fetch lines of a file in a background thread 2132 // while processing previously fetched lines, 2133 // dealing with byLine's buffer recycling by 2134 // eagerly duplicating every line. 2135 auto lines = File("foo.txt").byLine(); 2136 auto duped = std.algorithm.map!"a.idup"(lines); 2137 2138 // Fetch more lines in the background while we 2139 // process the lines already read into memory 2140 // into a matrix of doubles. 2141 double[][] matrix; 2142 auto asyncReader = taskPool.asyncBuf(duped); 2143 2144 foreach (line; asyncReader) 2145 { 2146 auto ls = line.split("\t"); 2147 matrix ~= to!(double[])(ls); 2148 } 2149 } 2150 --- 2151 2152 $(B Exception Handling): 2153 2154 Any exceptions thrown while iterating over $(D source) are re-thrown on a 2155 call to $(D popFront) or, if thrown during construction, simply 2156 allowed to propagate to the caller. 2157 */ 2158 auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S) 2159 { 2160 static final class AsyncBuf 2161 { 2162 // This is a class because the task and source both need to be on 2163 // the heap. 2164 2165 // The element type of S. 2166 alias E = ElementType!S; // Needs to be here b/c of forward ref bugs. 2167 2168 private: 2169 E[] buf1, buf2; 2170 S source; 2171 TaskPool pool; 2172 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 2173 size_t bufPos; 2174 bool lastTaskWaited; 2175 2176 static if (hasLength!S) 2177 { 2178 size_t _length; 2179 2180 // Available if hasLength!S. 2181 public @property size_t length() const @safe pure nothrow 2182 { 2183 return _length; 2184 } 2185 } 2186 2187 this(S source, size_t bufSize, TaskPool pool) 2188 { 2189 buf1.length = bufSize; 2190 buf2.length = bufSize; 2191 2192 this.source = source; 2193 this.pool = pool; 2194 2195 static if (hasLength!S) 2196 { 2197 _length = source.length; 2198 } 2199 2200 buf1 = fillBuf(buf1); 2201 submitBuf2(); 2202 } 2203 2204 E[] fillBuf(E[] buf) 2205 { 2206 assert(buf !is null); 2207 2208 size_t i; 2209 for (; !source.empty && i < buf.length; source.popFront()) 2210 { 2211 buf[i++] = source.front; 2212 } 2213 2214 buf = buf[0 .. i]; 2215 return buf; 2216 } 2217 2218 void submitBuf2() 2219 in 2220 { 2221 assert(nextBufTask.prev is null); 2222 assert(nextBufTask.next is null); 2223 } body 2224 { 2225 // Hack to reuse the task object. 2226 2227 nextBufTask = typeof(nextBufTask).init; 2228 nextBufTask._args[0] = &fillBuf; 2229 nextBufTask._args[1] = buf2; 2230 pool.put(nextBufTask); 2231 } 2232 2233 void doBufSwap() 2234 { 2235 if (lastTaskWaited) 2236 { 2237 // Then source is empty. Signal it here. 2238 buf1 = null; 2239 buf2 = null; 2240 return; 2241 } 2242 2243 buf2 = buf1; 2244 buf1 = nextBufTask.yieldForce; 2245 bufPos = 0; 2246 2247 if (source.empty) 2248 { 2249 lastTaskWaited = true; 2250 } 2251 else 2252 { 2253 submitBuf2(); 2254 } 2255 } 2256 2257 public: 2258 E front() @property 2259 { 2260 return buf1[bufPos]; 2261 } 2262 2263 void popFront() 2264 { 2265 static if (hasLength!S) 2266 { 2267 _length--; 2268 } 2269 2270 bufPos++; 2271 if (bufPos >= buf1.length) 2272 { 2273 doBufSwap(); 2274 } 2275 } 2276 2277 static if (isInfinite!S) 2278 { 2279 enum bool empty = false; 2280 } 2281 2282 else 2283 { 2284 /// 2285 bool empty() @property 2286 { 2287 // popFront() sets this when source is empty: 2288 return buf1.length == 0; 2289 } 2290 } 2291 } 2292 return new AsyncBuf(source, bufSize, this); 2293 } 2294 2295 /** 2296 Given a callable object $(D next) that writes to a user-provided buffer and 2297 a second callable object $(D empty) that determines whether more data is 2298 available to write via $(D next), returns an input range that 2299 asynchronously calls $(D next) with a set of size $(D nBuffers) of buffers 2300 and makes the results available in the order they were obtained via the 2301 input range interface of the returned object. Similarly to the 2302 input range overload of $(D asyncBuf), the first half of the buffers 2303 are made available via the range interface while the second half are 2304 filled and vice-versa. 2305 2306 Params: 2307 2308 next = A callable object that takes a single argument that must be an array 2309 with mutable elements. When called, $(D next) writes data to 2310 the array provided by the caller. 2311 2312 empty = A callable object that takes no arguments and returns a type 2313 implicitly convertible to $(D bool). This is used to signify 2314 that no more data is available to be obtained by calling $(D next). 2315 2316 initialBufSize = The initial size of each buffer. If $(D next) takes its 2317 array by reference, it may resize the buffers. 2318 2319 nBuffers = The number of buffers to cycle through when calling $(D next). 2320 2321 Example: 2322 --- 2323 // Fetch lines of a file in a background 2324 // thread while processing previously fetched 2325 // lines, without duplicating any lines. 2326 auto file = File("foo.txt"); 2327 2328 void next(ref char[] buf) 2329 { 2330 file.readln(buf); 2331 } 2332 2333 // Fetch more lines in the background while we 2334 // process the lines already read into memory 2335 // into a matrix of doubles. 2336 double[][] matrix; 2337 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 2338 2339 foreach (line; asyncReader) 2340 { 2341 auto ls = line.split("\t"); 2342 matrix ~= to!(double[])(ls); 2343 } 2344 --- 2345 2346 $(B Exception Handling): 2347 2348 Any exceptions thrown while iterating over $(D range) are re-thrown on a 2349 call to $(D popFront). 2350 2351 Warning: 2352 2353 Using the range returned by this function in a parallel foreach loop 2354 will not work because buffers may be overwritten while the task that 2355 processes them is in queue. This is checked for at compile time 2356 and will result in a static assertion failure. 2357 */ 2358 auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) 2359 if (is(typeof(C2.init()) : bool) && 2360 Parameters!C1.length == 1 && 2361 Parameters!C2.length == 0 && 2362 isArray!(Parameters!C1[0]) 2363 ) { 2364 auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers); 2365 return asyncBuf(roundRobin, nBuffers / 2); 2366 } 2367 2368 /// 2369 template reduce(functions...) 2370 { 2371 /** 2372 Parallel reduce on a random access range. Except as otherwise noted, 2373 usage is similar to $(REF _reduce, std,algorithm,iteration). This 2374 function works by splitting the range to be reduced into work units, 2375 which are slices to be reduced in parallel. Once the results from all 2376 work units are computed, a final serial reduction is performed on these 2377 results to compute the final answer. Therefore, care must be taken to 2378 choose the seed value appropriately. 2379 2380 Because the reduction is being performed in parallel, $(D functions) 2381 must be associative. For notational simplicity, let # be an 2382 infix operator representing $(D functions). Then, (a # b) # c must equal 2383 a # (b # c). Floating point addition is not associative 2384 even though addition in exact arithmetic is. Summing floating 2385 point numbers using this function may give different results than summing 2386 serially. However, for many practical purposes floating point addition 2387 can be treated as associative. 2388 2389 Note that, since $(D functions) are assumed to be associative, 2390 additional optimizations are made to the serial portion of the reduction 2391 algorithm. These take advantage of the instruction level parallelism of 2392 modern CPUs, in addition to the thread-level parallelism that the rest 2393 of this module exploits. This can lead to better than linear speedups 2394 relative to $(REF _reduce, std,algorithm,iteration), especially for 2395 fine-grained benchmarks like dot products. 2396 2397 An explicit seed may be provided as the first argument. If 2398 provided, it is used as the seed for all work units and for the final 2399 reduction of results from all work units. Therefore, if it is not the 2400 identity value for the operation being performed, results may differ 2401 from those generated by $(REF _reduce, std,algorithm,iteration) or 2402 depending on how many work units are used. The next argument must be 2403 the range to be reduced. 2404 --- 2405 // Find the sum of squares of a range in parallel, using 2406 // an explicit seed. 2407 // 2408 // Timings on an Athlon 64 X2 dual core machine: 2409 // 2410 // Parallel reduce: 72 milliseconds 2411 // Using std.algorithm.reduce instead: 181 milliseconds 2412 auto nums = iota(10_000_000.0f); 2413 auto sumSquares = taskPool.reduce!"a + b"( 2414 0.0, std.algorithm.map!"a * a"(nums) 2415 ); 2416 --- 2417 2418 If no explicit seed is provided, the first element of each work unit 2419 is used as a seed. For the final reduction, the result from the first 2420 work unit is used as the seed. 2421 --- 2422 // Find the sum of a range in parallel, using the first 2423 // element of each work unit as the seed. 2424 auto sum = taskPool.reduce!"a + b"(nums); 2425 --- 2426 2427 An explicit work unit size may be specified as the last argument. 2428 Specifying too small a work unit size will effectively serialize the 2429 reduction, as the final reduction of the result of each work unit will 2430 dominate computation time. If $(D TaskPool.size) for this instance 2431 is zero, this parameter is ignored and one work unit is used. 2432 --- 2433 // Use a work unit size of 100. 2434 auto sum2 = taskPool.reduce!"a + b"(nums, 100); 2435 2436 // Work unit size of 100 and explicit seed. 2437 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100); 2438 --- 2439 2440 Parallel reduce supports multiple functions, like 2441 $(D std.algorithm.reduce). 2442 --- 2443 // Find both the min and max of nums. 2444 auto minMax = taskPool.reduce!(min, max)(nums); 2445 assert(minMax[0] == reduce!min(nums)); 2446 assert(minMax[1] == reduce!max(nums)); 2447 --- 2448 2449 $(B Exception Handling): 2450 2451 After this function is finished executing, any exceptions thrown 2452 are chained together via $(D Throwable.next) and rethrown. The chaining 2453 order is non-deterministic. 2454 */ 2455 auto reduce(Args...)(Args args) 2456 { 2457 import core.exception : OutOfMemoryError; 2458 import std.conv : emplaceRef; 2459 import std.exception : enforce; 2460 2461 alias fun = reduceAdjoin!functions; 2462 alias finishFun = reduceFinish!functions; 2463 2464 static if (isIntegral!(Args[$ - 1])) 2465 { 2466 size_t workUnitSize = cast(size_t) args[$ - 1]; 2467 alias args2 = args[0..$ - 1]; 2468 alias Args2 = Args[0..$ - 1]; 2469 } 2470 else 2471 { 2472 alias args2 = args; 2473 alias Args2 = Args; 2474 } 2475 2476 auto makeStartValue(Type)(Type e) 2477 { 2478 static if (functions.length == 1) 2479 { 2480 return e; 2481 } 2482 else 2483 { 2484 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void; 2485 foreach (i, T; seed.Types) 2486 { 2487 emplaceRef(seed.expand[i], e); 2488 } 2489 2490 return seed; 2491 } 2492 } 2493 2494 static if (args2.length == 2) 2495 { 2496 static assert(isInputRange!(Args2[1])); 2497 alias range = args2[1]; 2498 alias seed = args2[0]; 2499 enum explicitSeed = true; 2500 2501 static if (!is(typeof(workUnitSize))) 2502 { 2503 size_t workUnitSize = defaultWorkUnitSize(range.length); 2504 } 2505 } 2506 else 2507 { 2508 static assert(args2.length == 1); 2509 alias range = args2[0]; 2510 2511 static if (!is(typeof(workUnitSize))) 2512 { 2513 size_t workUnitSize = defaultWorkUnitSize(range.length); 2514 } 2515 2516 enforce(!range.empty, 2517 "Cannot reduce an empty range with first element as start value."); 2518 2519 auto seed = makeStartValue(range.front); 2520 enum explicitSeed = false; 2521 range.popFront(); 2522 } 2523 2524 alias E = typeof(seed); 2525 alias R = typeof(range); 2526 2527 E reduceOnRange(R range, size_t lowerBound, size_t upperBound) 2528 { 2529 // This is for exploiting instruction level parallelism by 2530 // using multiple accumulator variables within each thread, 2531 // since we're assuming functions are associative anyhow. 2532 2533 // This is so that loops can be unrolled automatically. 2534 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); 2535 enum nILP = ilpTuple.length; 2536 immutable subSize = (upperBound - lowerBound) / nILP; 2537 2538 if (subSize <= 1) 2539 { 2540 // Handle as a special case. 2541 static if (explicitSeed) 2542 { 2543 E result = seed; 2544 } 2545 else 2546 { 2547 E result = makeStartValue(range[lowerBound]); 2548 lowerBound++; 2549 } 2550 2551 foreach (i; lowerBound .. upperBound) 2552 { 2553 result = fun(result, range[i]); 2554 } 2555 2556 return result; 2557 } 2558 2559 assert(subSize > 1); 2560 E[nILP] results; 2561 size_t[nILP] offsets; 2562 2563 foreach (i; ilpTuple) 2564 { 2565 offsets[i] = lowerBound + subSize * i; 2566 2567 static if (explicitSeed) 2568 { 2569 results[i] = seed; 2570 } 2571 else 2572 { 2573 results[i] = makeStartValue(range[offsets[i]]); 2574 offsets[i]++; 2575 } 2576 } 2577 2578 immutable nLoop = subSize - (!explicitSeed); 2579 foreach (i; 0 .. nLoop) 2580 { 2581 foreach (j; ilpTuple) 2582 { 2583 results[j] = fun(results[j], range[offsets[j]]); 2584 offsets[j]++; 2585 } 2586 } 2587 2588 // Finish the remainder. 2589 foreach (i; nILP * subSize + lowerBound .. upperBound) 2590 { 2591 results[$ - 1] = fun(results[$ - 1], range[i]); 2592 } 2593 2594 foreach (i; ilpTuple[1..$]) 2595 { 2596 results[0] = finishFun(results[0], results[i]); 2597 } 2598 2599 return results[0]; 2600 } 2601 2602 immutable len = range.length; 2603 if (len == 0) 2604 { 2605 return seed; 2606 } 2607 2608 if (this.size == 0) 2609 { 2610 return finishFun(seed, reduceOnRange(range, 0, len)); 2611 } 2612 2613 // Unlike the rest of the functions here, I can't use the Task object 2614 // recycling trick here because this has to work on non-commutative 2615 // operations. After all the tasks are done executing, fun() has to 2616 // be applied on the results of these to get a final result, but 2617 // it can't be evaluated out of order. 2618 2619 if (workUnitSize > len) 2620 { 2621 workUnitSize = len; 2622 } 2623 2624 immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1); 2625 assert(nWorkUnits * workUnitSize >= len); 2626 2627 alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t); 2628 RTask[] tasks; 2629 2630 // Can't use alloca() due to Bug 3753. Use a fixed buffer 2631 // backed by malloc(). 2632 enum maxStack = 2_048; 2633 byte[maxStack] buf = void; 2634 immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; 2635 2636 import core.stdc.stdlib : malloc, free; 2637 if (nBytesNeeded < maxStack) 2638 { 2639 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits]; 2640 } 2641 else 2642 { 2643 auto ptr = cast(RTask*) malloc(nBytesNeeded); 2644 if (!ptr) 2645 { 2646 throw new OutOfMemoryError( 2647 "Out of memory in std.parallelism." 2648 ); 2649 } 2650 2651 tasks = ptr[0 .. nWorkUnits]; 2652 } 2653 2654 scope(exit) 2655 { 2656 if (nBytesNeeded > maxStack) 2657 { 2658 free(tasks.ptr); 2659 } 2660 } 2661 2662 foreach (ref t; tasks[]) 2663 emplaceRef(t, RTask()); 2664 2665 // Hack to take the address of a nested function w/o 2666 // making a closure. 2667 static auto scopedAddress(D)(scope D del) @system 2668 { 2669 auto tmp = del; 2670 return tmp; 2671 } 2672 2673 size_t curPos = 0; 2674 void useTask(ref RTask task) 2675 { 2676 import std.algorithm.comparison : min; 2677 2678 task.pool = this; 2679 task._args[0] = scopedAddress(&reduceOnRange); 2680 task._args[3] = min(len, curPos + workUnitSize); // upper bound. 2681 task._args[1] = range; // range 2682 task._args[2] = curPos; // lower bound. 2683 2684 curPos += workUnitSize; 2685 } 2686 2687 foreach (ref task; tasks) 2688 { 2689 useTask(task); 2690 } 2691 2692 foreach (i; 1 .. tasks.length - 1) 2693 { 2694 tasks[i].next = tasks[i + 1].basePtr; 2695 tasks[i + 1].prev = tasks[i].basePtr; 2696 } 2697 2698 if (tasks.length > 1) 2699 { 2700 queueLock(); 2701 scope(exit) queueUnlock(); 2702 2703 abstractPutGroupNoSync( 2704 tasks[1].basePtr, 2705 tasks[$ - 1].basePtr 2706 ); 2707 } 2708 2709 if (tasks.length > 0) 2710 { 2711 try 2712 { 2713 tasks[0].job(); 2714 } 2715 catch (Throwable e) 2716 { 2717 tasks[0].exception = e; 2718 } 2719 tasks[0].taskStatus = TaskStatus.done; 2720 2721 // Try to execute each of these in the current thread 2722 foreach (ref task; tasks[1..$]) 2723 { 2724 tryDeleteExecute(task.basePtr); 2725 } 2726 } 2727 2728 // Now that we've tried to execute every task, they're all either 2729 // done or in progress. Force all of them. 2730 E result = seed; 2731 2732 Throwable firstException, lastException; 2733 2734 foreach (ref task; tasks) 2735 { 2736 try 2737 { 2738 task.yieldForce; 2739 } 2740 catch (Throwable e) 2741 { 2742 addToChain(e, firstException, lastException); 2743 continue; 2744 } 2745 2746 if (!firstException) result = finishFun(result, task.returnVal); 2747 } 2748 2749 if (firstException) throw firstException; 2750 2751 return result; 2752 } 2753 } 2754 2755 /** 2756 Gets the index of the current thread relative to this $(D TaskPool). Any 2757 thread not in this pool will receive an index of 0. The worker threads in 2758 this pool receive unique indices of 1 through $(D this.size). 2759 2760 This function is useful for maintaining worker-local resources. 2761 2762 Example: 2763 --- 2764 // Execute a loop that computes the greatest common 2765 // divisor of every number from 0 through 999 with 2766 // 42 in parallel. Write the results out to 2767 // a set of files, one for each thread. This allows 2768 // results to be written out without any synchronization. 2769 2770 import std.conv, std.range, std.numeric, std.stdio; 2771 2772 void main() 2773 { 2774 auto filesHandles = new File[taskPool.size + 1]; 2775 scope(exit) { 2776 foreach (ref handle; fileHandles) 2777 { 2778 handle.close(); 2779 } 2780 } 2781 2782 foreach (i, ref handle; fileHandles) 2783 { 2784 handle = File("workerResults" ~ to!string(i) ~ ".txt"); 2785 } 2786 2787 foreach (num; parallel(iota(1_000))) 2788 { 2789 auto outHandle = fileHandles[taskPool.workerIndex]; 2790 outHandle.writeln(num, '\t', gcd(num, 42)); 2791 } 2792 } 2793 --- 2794 */ 2795 size_t workerIndex() @property @safe const nothrow 2796 { 2797 immutable rawInd = threadIndex; 2798 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ? 2799 (rawInd - instanceStartIndex + 1) : 0; 2800 } 2801 2802 /** 2803 Struct for creating worker-local storage. Worker-local storage is 2804 thread-local storage that exists only for worker threads in a given 2805 $(D TaskPool) plus a single thread outside the pool. It is allocated on the 2806 garbage collected heap in a way that avoids _false sharing, and doesn't 2807 necessarily have global scope within any thread. It can be accessed from 2808 any worker thread in the $(D TaskPool) that created it, and one thread 2809 outside this $(D TaskPool). All threads outside the pool that created a 2810 given instance of worker-local storage share a single slot. 2811 2812 Since the underlying data for this struct is heap-allocated, this struct 2813 has reference semantics when passed between functions. 2814 2815 The main uses cases for $(D WorkerLocalStorageStorage) are: 2816 2817 1. Performing parallel reductions with an imperative, as opposed to 2818 functional, programming style. In this case, it's useful to treat 2819 $(D WorkerLocalStorageStorage) as local to each thread for only the parallel 2820 portion of an algorithm. 2821 2822 2. Recycling temporary buffers across iterations of a parallel foreach loop. 2823 2824 Example: 2825 --- 2826 // Calculate pi as in our synopsis example, but 2827 // use an imperative instead of a functional style. 2828 immutable n = 1_000_000_000; 2829 immutable delta = 1.0L / n; 2830 2831 auto sums = taskPool.workerLocalStorage(0.0L); 2832 foreach (i; parallel(iota(n))) 2833 { 2834 immutable x = ( i - 0.5L ) * delta; 2835 immutable toAdd = delta / ( 1.0 + x * x ); 2836 sums.get += toAdd; 2837 } 2838 2839 // Add up the results from each worker thread. 2840 real pi = 0; 2841 foreach (threadResult; sums.toRange) 2842 { 2843 pi += 4.0L * threadResult; 2844 } 2845 --- 2846 */ 2847 static struct WorkerLocalStorage(T) 2848 { 2849 private: 2850 TaskPool pool; 2851 size_t size; 2852 2853 size_t elemSize; 2854 bool* stillThreadLocal; 2855 2856 static size_t roundToLine(size_t num) pure nothrow 2857 { 2858 if (num % cacheLineSize == 0) 2859 { 2860 return num; 2861 } 2862 else 2863 { 2864 return ((num / cacheLineSize) + 1) * cacheLineSize; 2865 } 2866 } 2867 2868 void* data; 2869 2870 void initialize(TaskPool pool) 2871 { 2872 this.pool = pool; 2873 size = pool.size + 1; 2874 stillThreadLocal = new bool; 2875 *stillThreadLocal = true; 2876 2877 // Determines whether the GC should scan the array. 2878 auto blkInfo = (typeid(T).flags & 1) ? 2879 cast(GC.BlkAttr) 0 : 2880 GC.BlkAttr.NO_SCAN; 2881 2882 immutable nElem = pool.size + 1; 2883 elemSize = roundToLine(T.sizeof); 2884 2885 // The + 3 is to pad one full cache line worth of space on either side 2886 // of the data structure to make sure false sharing with completely 2887 // unrelated heap data is prevented, and to provide enough padding to 2888 // make sure that data is cache line-aligned. 2889 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize; 2890 2891 // Cache line align data ptr. 2892 data = cast(void*) roundToLine(cast(size_t) data); 2893 2894 foreach (i; 0 .. nElem) 2895 { 2896 this.opIndex(i) = T.init; 2897 } 2898 } 2899 2900 ref opIndex(this Qualified)(size_t index) 2901 { 2902 import std.conv : text; 2903 assert(index < size, text(index, '\t', uint.max)); 2904 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index)); 2905 } 2906 2907 void opIndexAssign(T val, size_t index) 2908 { 2909 assert(index < size); 2910 *(cast(T*) (data + elemSize * index)) = val; 2911 } 2912 2913 public: 2914 /** 2915 Get the current thread's instance. Returns by ref. 2916 Note that calling $(D get) from any thread 2917 outside the $(D TaskPool) that created this instance will return the 2918 same reference, so an instance of worker-local storage should only be 2919 accessed from one thread outside the pool that created it. If this 2920 rule is violated, undefined behavior will result. 2921 2922 If assertions are enabled and $(D toRange) has been called, then this 2923 WorkerLocalStorage instance is no longer worker-local and an assertion 2924 failure will result when calling this method. This is not checked 2925 when assertions are disabled for performance reasons. 2926 */ 2927 ref get(this Qualified)() @property 2928 { 2929 assert(*stillThreadLocal, 2930 "Cannot call get() on this instance of WorkerLocalStorage " ~ 2931 "because it is no longer worker-local." 2932 ); 2933 return opIndex(pool.workerIndex); 2934 } 2935 2936 /** 2937 Assign a value to the current thread's instance. This function has 2938 the same caveats as its overload. 2939 */ 2940 void get(T val) @property 2941 { 2942 assert(*stillThreadLocal, 2943 "Cannot call get() on this instance of WorkerLocalStorage " ~ 2944 "because it is no longer worker-local." 2945 ); 2946 2947 opIndexAssign(val, pool.workerIndex); 2948 } 2949 2950 /** 2951 Returns a range view of the values for all threads, which can be used 2952 to further process the results of each thread after running the parallel 2953 part of your algorithm. Do not use this method in the parallel portion 2954 of your algorithm. 2955 2956 Calling this function sets a flag indicating that this struct is no 2957 longer worker-local, and attempting to use the $(D get) method again 2958 will result in an assertion failure if assertions are enabled. 2959 */ 2960 WorkerLocalStorageRange!T toRange() @property 2961 { 2962 if (*stillThreadLocal) 2963 { 2964 *stillThreadLocal = false; 2965 2966 // Make absolutely sure results are visible to all threads. 2967 // This is probably not necessary since some other 2968 // synchronization primitive will be used to signal that the 2969 // parallel part of the algorithm is done, but the 2970 // performance impact should be negligible, so it's better 2971 // to be safe. 2972 ubyte barrierDummy; 2973 atomicSetUbyte(barrierDummy, 1); 2974 } 2975 2976 return WorkerLocalStorageRange!T(this); 2977 } 2978 } 2979 2980 /** 2981 Range primitives for worker-local storage. The purpose of this is to 2982 access results produced by each worker thread from a single thread once you 2983 are no longer using the worker-local storage from multiple threads. 2984 Do not use this struct in the parallel portion of your algorithm. 2985 2986 The proper way to instantiate this object is to call 2987 $(D WorkerLocalStorage.toRange). Once instantiated, this object behaves 2988 as a finite random-access range with assignable, lvalue elements and 2989 a length equal to the number of worker threads in the $(D TaskPool) that 2990 created it plus 1. 2991 */ 2992 static struct WorkerLocalStorageRange(T) 2993 { 2994 private: 2995 WorkerLocalStorage!T workerLocalStorage; 2996 2997 size_t _length; 2998 size_t beginOffset; 2999 3000 this(WorkerLocalStorage!T wl) 3001 { 3002 this.workerLocalStorage = wl; 3003 _length = wl.size; 3004 } 3005 3006 public: 3007 ref front(this Qualified)() @property 3008 { 3009 return this[0]; 3010 } 3011 3012 ref back(this Qualified)() @property 3013 { 3014 return this[_length - 1]; 3015 } 3016 3017 void popFront() 3018 { 3019 if (_length > 0) 3020 { 3021 beginOffset++; 3022 _length--; 3023 } 3024 } 3025 3026 void popBack() 3027 { 3028 if (_length > 0) 3029 { 3030 _length--; 3031 } 3032 } 3033 3034 typeof(this) save() @property 3035 { 3036 return this; 3037 } 3038 3039 ref opIndex(this Qualified)(size_t index) 3040 { 3041 assert(index < _length); 3042 return workerLocalStorage[index + beginOffset]; 3043 } 3044 3045 void opIndexAssign(T val, size_t index) 3046 { 3047 assert(index < _length); 3048 workerLocalStorage[index] = val; 3049 } 3050 3051 typeof(this) opSlice(size_t lower, size_t upper) 3052 { 3053 assert(upper <= _length); 3054 auto newWl = this.workerLocalStorage; 3055 newWl.data += lower * newWl.elemSize; 3056 newWl.size = upper - lower; 3057 return typeof(this)(newWl); 3058 } 3059 3060 bool empty() const @property 3061 { 3062 return length == 0; 3063 } 3064 3065 size_t length() const @property 3066 { 3067 return _length; 3068 } 3069 } 3070 3071 /** 3072 Creates an instance of worker-local storage, initialized with a given 3073 value. The value is $(D lazy) so that you can, for example, easily 3074 create one instance of a class for each worker. For usage example, 3075 see the $(D WorkerLocalStorage) struct. 3076 */ 3077 WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init) 3078 { 3079 WorkerLocalStorage!T ret; 3080 ret.initialize(this); 3081 foreach (i; 0 .. size + 1) 3082 { 3083 ret[i] = initialVal; 3084 } 3085 3086 // Memory barrier to make absolutely sure that what we wrote is 3087 // visible to worker threads. 3088 ubyte barrierDummy; 3089 atomicSetUbyte(barrierDummy, 0); 3090 3091 return ret; 3092 } 3093 3094 /** 3095 Signals to all worker threads to terminate as soon as they are finished 3096 with their current $(D Task), or immediately if they are not executing a 3097 $(D Task). $(D Task)s that were in queue will not be executed unless 3098 a call to $(D Task.workForce), $(D Task.yieldForce) or $(D Task.spinForce) 3099 causes them to be executed. 3100 3101 Use only if you have waited on every $(D Task) and therefore know the 3102 queue is empty, or if you speculatively executed some tasks and no longer 3103 need the results. 3104 */ 3105 void stop() @trusted 3106 { 3107 queueLock(); 3108 scope(exit) queueUnlock(); 3109 atomicSetUbyte(status, PoolState.stopNow); 3110 notifyAll(); 3111 } 3112 3113 /** 3114 Signals worker threads to terminate when the queue becomes empty. 3115 3116 If blocking argument is true, wait for all worker threads to terminate 3117 before returning. This option might be used in applications where 3118 task results are never consumed-- e.g. when $(D TaskPool) is employed as a 3119 rudimentary scheduler for tasks which communicate by means other than 3120 return values. 3121 3122 Warning: Calling this function with $(D blocking = true) from a worker 3123 thread that is a member of the same $(D TaskPool) that 3124 $(D finish) is being called on will result in a deadlock. 3125 */ 3126 void finish(bool blocking = false) @trusted 3127 { 3128 { 3129 queueLock(); 3130 scope(exit) queueUnlock(); 3131 atomicCasUbyte(status, PoolState.running, PoolState.finishing); 3132 notifyAll(); 3133 } 3134 if (blocking) 3135 { 3136 // Use this thread as a worker until everything is finished. 3137 executeWorkLoop(); 3138 3139 foreach (t; pool) 3140 { 3141 // Maybe there should be something here to prevent a thread 3142 // from calling join() on itself if this function is called 3143 // from a worker thread in the same pool, but: 3144 // 3145 // 1. Using an if statement to skip join() would result in 3146 // finish() returning without all tasks being finished. 3147 // 3148 // 2. If an exception were thrown, it would bubble up to the 3149 // Task from which finish() was called and likely be 3150 // swallowed. 3151 t.join(); 3152 } 3153 } 3154 } 3155 3156 /// Returns the number of worker threads in the pool. 3157 @property size_t size() @safe const pure nothrow 3158 { 3159 return pool.length; 3160 } 3161 3162 /** 3163 Put a $(D Task) object on the back of the task queue. The $(D Task) 3164 object may be passed by pointer or reference. 3165 3166 Example: 3167 --- 3168 import std.file; 3169 3170 // Create a task. 3171 auto t = task!read("foo.txt"); 3172 3173 // Add it to the queue to be executed. 3174 taskPool.put(t); 3175 --- 3176 3177 Notes: 3178 3179 @trusted overloads of this function are called for $(D Task)s if 3180 $(REF hasUnsharedAliasing, std,traits) is false for the $(D Task)'s 3181 return type or the function the $(D Task) executes is $(D pure). 3182 $(D Task) objects that meet all other requirements specified in the 3183 $(D @trusted) overloads of $(D task) and $(D scopedTask) may be created 3184 and executed from $(D @safe) code via $(D Task.executeInNewThread) but 3185 not via $(D TaskPool). 3186 3187 While this function takes the address of variables that may 3188 be on the stack, some overloads are marked as @trusted. 3189 $(D Task) includes a destructor that waits for the task to complete 3190 before destroying the stack frame it is allocated on. Therefore, 3191 it is impossible for the stack frame to be destroyed before the task is 3192 complete and no longer referenced by a $(D TaskPool). 3193 */ 3194 void put(alias fun, Args...)(ref Task!(fun, Args) task) 3195 if (!isSafeReturn!(typeof(task))) 3196 { 3197 task.pool = this; 3198 abstractPut(task.basePtr); 3199 } 3200 3201 /// Ditto 3202 void put(alias fun, Args...)(Task!(fun, Args)* task) 3203 if (!isSafeReturn!(typeof(*task))) 3204 { 3205 import std.exception : enforce; 3206 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3207 put(*task); 3208 } 3209 3210 @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task) 3211 if (isSafeReturn!(typeof(task))) 3212 { 3213 task.pool = this; 3214 abstractPut(task.basePtr); 3215 } 3216 3217 @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) 3218 if (isSafeReturn!(typeof(*task))) 3219 { 3220 import std.exception : enforce; 3221 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3222 put(*task); 3223 } 3224 3225 /** 3226 These properties control whether the worker threads are daemon threads. 3227 A daemon thread is automatically terminated when all non-daemon threads 3228 have terminated. A non-daemon thread will prevent a program from 3229 terminating as long as it has not terminated. 3230 3231 If any $(D TaskPool) with non-daemon threads is active, either $(D stop) 3232 or $(D finish) must be called on it before the program can terminate. 3233 3234 The worker treads in the $(D TaskPool) instance returned by the 3235 $(D taskPool) property are daemon by default. The worker threads of 3236 manually instantiated task pools are non-daemon by default. 3237 3238 Note: For a size zero pool, the getter arbitrarily returns true and the 3239 setter has no effect. 3240 */ 3241 bool isDaemon() @property @trusted 3242 { 3243 queueLock(); 3244 scope(exit) queueUnlock(); 3245 return (size == 0) ? true : pool[0].isDaemon; 3246 } 3247 3248 /// Ditto 3249 void isDaemon(bool newVal) @property @trusted 3250 { 3251 queueLock(); 3252 scope(exit) queueUnlock(); 3253 foreach (thread; pool) 3254 { 3255 thread.isDaemon = newVal; 3256 } 3257 } 3258 3259 /** 3260 These functions allow getting and setting the OS scheduling priority of 3261 the worker threads in this $(D TaskPool). They forward to 3262 $(D core.thread.Thread.priority), so a given priority value here means the 3263 same thing as an identical priority value in $(D core.thread). 3264 3265 Note: For a size zero pool, the getter arbitrarily returns 3266 $(D core.thread.Thread.PRIORITY_MIN) and the setter has no effect. 3267 */ 3268 int priority() @property @trusted 3269 { 3270 return (size == 0) ? core.thread.Thread.PRIORITY_MIN : 3271 pool[0].priority; 3272 } 3273 3274 /// Ditto 3275 void priority(int newPriority) @property @trusted 3276 { 3277 if (size > 0) 3278 { 3279 foreach (t; pool) 3280 { 3281 t.priority = newPriority; 3282 } 3283 } 3284 } 3285 } 3286 3287 /** 3288 Returns a lazily initialized global instantiation of $(D TaskPool). 3289 This function can safely be called concurrently from multiple non-worker 3290 threads. The worker threads in this pool are daemon threads, meaning that it 3291 is not necessary to call $(D TaskPool.stop) or $(D TaskPool.finish) before 3292 terminating the main thread. 3293 */ 3294 @property TaskPool taskPool() @trusted 3295 { 3296 import std.concurrency : initOnce; 3297 __gshared TaskPool pool; 3298 return initOnce!pool({ 3299 auto p = new TaskPool(defaultPoolThreads); 3300 p.isDaemon = true; 3301 return p; 3302 }()); 3303 } 3304 3305 private shared uint _defaultPoolThreads; 3306 shared static this() 3307 { 3308 atomicStore(_defaultPoolThreads, totalCPUs - 1); 3309 } 3310 3311 /** 3312 These properties get and set the number of worker threads in the $(D TaskPool) 3313 instance returned by $(D taskPool). The default value is $(D totalCPUs) - 1. 3314 Calling the setter after the first call to $(D taskPool) does not changes 3315 number of worker threads in the instance returned by $(D taskPool). 3316 */ 3317 @property uint defaultPoolThreads() @trusted 3318 { 3319 return atomicLoad(_defaultPoolThreads); 3320 } 3321 3322 /// Ditto 3323 @property void defaultPoolThreads(uint newVal) @trusted 3324 { 3325 atomicStore(_defaultPoolThreads, newVal); 3326 } 3327 3328 /** 3329 Convenience functions that forwards to $(D taskPool.parallel). The 3330 purpose of these is to make parallel foreach less verbose and more 3331 readable. 3332 3333 Example: 3334 --- 3335 // Find the logarithm of every number from 3336 // 1 to 1_000_000 in parallel, using the 3337 // default TaskPool instance. 3338 auto logs = new double[1_000_000]; 3339 3340 foreach (i, ref elem; parallel(logs)) 3341 { 3342 elem = log(i + 1.0); 3343 } 3344 --- 3345 3346 */ 3347 ParallelForeach!R parallel(R)(R range) 3348 { 3349 return taskPool.parallel(range); 3350 } 3351 3352 /// Ditto 3353 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 3354 { 3355 return taskPool.parallel(range, workUnitSize); 3356 } 3357 3358 // Thrown when a parallel foreach loop is broken from. 3359 class ParallelForeachError : Error 3360 { 3361 this() 3362 { 3363 super("Cannot break from a parallel foreach loop using break, return, " 3364 ~ "labeled break/continue or goto statements."); 3365 } 3366 } 3367 3368 /*------Structs that implement opApply for parallel foreach.------------------*/ 3369 private template randLen(R) 3370 { 3371 enum randLen = isRandomAccessRange!R && hasLength!R; 3372 } 3373 3374 private void submitAndExecute( 3375 TaskPool pool, 3376 scope void delegate() doIt 3377 ) 3378 { 3379 import core.exception : OutOfMemoryError; 3380 immutable nThreads = pool.size + 1; 3381 3382 alias PTask = typeof(scopedTask(doIt)); 3383 import core.stdc.stdlib : malloc, free; 3384 import core.stdc.string : memcpy; 3385 3386 // The logical thing to do would be to just use alloca() here, but that 3387 // causes problems on Windows for reasons that I don't understand 3388 // (tentatively a compiler bug) and definitely doesn't work on Posix due 3389 // to Bug 3753. Therefore, allocate a fixed buffer and fall back to 3390 // malloc() if someone's using a ridiculous amount of threads. Also, 3391 // the using a byte array instead of a PTask array as the fixed buffer 3392 // is to prevent d'tors from being called on uninitialized excess PTask 3393 // instances. 3394 enum nBuf = 64; 3395 byte[nBuf * PTask.sizeof] buf = void; 3396 PTask[] tasks; 3397 if (nThreads <= nBuf) 3398 { 3399 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads]; 3400 } 3401 else 3402 { 3403 auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof); 3404 if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism."); 3405 tasks = ptr[0 .. nThreads]; 3406 } 3407 3408 scope(exit) 3409 { 3410 if (nThreads > nBuf) 3411 { 3412 free(tasks.ptr); 3413 } 3414 } 3415 3416 foreach (ref t; tasks) 3417 { 3418 import core.stdc.string : memcpy; 3419 3420 // This silly looking code is necessary to prevent d'tors from being 3421 // called on uninitialized objects. 3422 auto temp = scopedTask(doIt); 3423 memcpy(&t, &temp, PTask.sizeof); 3424 3425 // This has to be done to t after copying, not temp before copying. 3426 // Otherwise, temp's destructor will sit here and wait for the 3427 // task to finish. 3428 t.pool = pool; 3429 } 3430 3431 foreach (i; 1 .. tasks.length - 1) 3432 { 3433 tasks[i].next = tasks[i + 1].basePtr; 3434 tasks[i + 1].prev = tasks[i].basePtr; 3435 } 3436 3437 if (tasks.length > 1) 3438 { 3439 pool.queueLock(); 3440 scope(exit) pool.queueUnlock(); 3441 3442 pool.abstractPutGroupNoSync( 3443 tasks[1].basePtr, 3444 tasks[$ - 1].basePtr 3445 ); 3446 } 3447 3448 if (tasks.length > 0) 3449 { 3450 try 3451 { 3452 tasks[0].job(); 3453 } 3454 catch (Throwable e) 3455 { 3456 tasks[0].exception = e; // nocoverage 3457 } 3458 tasks[0].taskStatus = TaskStatus.done; 3459 3460 // Try to execute each of these in the current thread 3461 foreach (ref task; tasks[1..$]) 3462 { 3463 pool.tryDeleteExecute(task.basePtr); 3464 } 3465 } 3466 3467 Throwable firstException, lastException; 3468 3469 foreach (i, ref task; tasks) 3470 { 3471 try 3472 { 3473 task.yieldForce; 3474 } 3475 catch (Throwable e) 3476 { 3477 addToChain(e, firstException, lastException); 3478 continue; 3479 } 3480 } 3481 3482 if (firstException) throw firstException; 3483 } 3484 3485 void foreachErr() 3486 { 3487 throw new ParallelForeachError(); 3488 } 3489 3490 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) 3491 { 3492 with(p) 3493 { 3494 int res = 0; 3495 size_t index = 0; 3496 3497 // The explicit ElementType!R in the foreach loops is necessary for 3498 // correct behavior when iterating over strings. 3499 static if (hasLvalueElements!R) 3500 { 3501 foreach (ref ElementType!R elem; range) 3502 { 3503 static if (Parameters!dg.length == 2) 3504 { 3505 res = dg(index, elem); 3506 } 3507 else 3508 { 3509 res = dg(elem); 3510 } 3511 if (res) break; 3512 index++; 3513 } 3514 } 3515 else 3516 { 3517 foreach (ElementType!R elem; range) 3518 { 3519 static if (Parameters!dg.length == 2) 3520 { 3521 res = dg(index, elem); 3522 } 3523 else 3524 { 3525 res = dg(elem); 3526 } 3527 if (res) break; 3528 index++; 3529 } 3530 } 3531 if (res) foreachErr; 3532 return res; 3533 } 3534 } 3535 3536 private enum string parallelApplyMixinRandomAccess = q{ 3537 // Handle empty thread pool as special case. 3538 if (pool.size == 0) 3539 { 3540 return doSizeZeroCase(this, dg); 3541 } 3542 3543 // Whether iteration is with or without an index variable. 3544 enum withIndex = Parameters!(typeof(dg)).length == 2; 3545 3546 shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0 3547 immutable len = range.length; 3548 if (!len) return 0; 3549 3550 shared bool shouldContinue = true; 3551 3552 void doIt() 3553 { 3554 import std.algorithm.comparison : min; 3555 3556 scope(failure) 3557 { 3558 // If an exception is thrown, all threads should bail. 3559 atomicStore(shouldContinue, false); 3560 } 3561 3562 while (atomicLoad(shouldContinue)) 3563 { 3564 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 3565 immutable start = workUnitSize * myUnitIndex; 3566 if (start >= len) 3567 { 3568 atomicStore(shouldContinue, false); 3569 break; 3570 } 3571 3572 immutable end = min(len, start + workUnitSize); 3573 3574 foreach (i; start .. end) 3575 { 3576 static if (withIndex) 3577 { 3578 if (dg(i, range[i])) foreachErr(); 3579 } 3580 else 3581 { 3582 if (dg(range[i])) foreachErr(); 3583 } 3584 } 3585 } 3586 } 3587 3588 submitAndExecute(pool, &doIt); 3589 3590 return 0; 3591 }; 3592 3593 enum string parallelApplyMixinInputRange = q{ 3594 // Handle empty thread pool as special case. 3595 if (pool.size == 0) 3596 { 3597 return doSizeZeroCase(this, dg); 3598 } 3599 3600 // Whether iteration is with or without an index variable. 3601 enum withIndex = Parameters!(typeof(dg)).length == 2; 3602 3603 // This protects the range while copying it. 3604 auto rangeMutex = new Mutex(); 3605 3606 shared bool shouldContinue = true; 3607 3608 // The total number of elements that have been popped off range. 3609 // This is updated only while protected by rangeMutex; 3610 size_t nPopped = 0; 3611 3612 static if ( 3613 is(typeof(range.buf1)) && 3614 is(typeof(range.bufPos)) && 3615 is(typeof(range.doBufSwap())) 3616 ) 3617 { 3618 // Make sure we don't have the buffer recycling overload of 3619 // asyncBuf. 3620 static if ( 3621 is(typeof(range.source)) && 3622 isRoundRobin!(typeof(range.source)) 3623 ) 3624 { 3625 static assert(0, "Cannot execute a parallel foreach loop on " ~ 3626 "the buffer recycling overload of asyncBuf."); 3627 } 3628 3629 enum bool bufferTrick = true; 3630 } 3631 else 3632 { 3633 enum bool bufferTrick = false; 3634 } 3635 3636 void doIt() 3637 { 3638 scope(failure) 3639 { 3640 // If an exception is thrown, all threads should bail. 3641 atomicStore(shouldContinue, false); 3642 } 3643 3644 static if (hasLvalueElements!R) 3645 { 3646 alias Temp = ElementType!R*[]; 3647 Temp temp; 3648 3649 // Returns: The previous value of nPopped. 3650 size_t makeTemp() 3651 { 3652 import std.algorithm.internal : addressOf; 3653 import std.array : uninitializedArray; 3654 3655 if (temp is null) 3656 { 3657 temp = uninitializedArray!Temp(workUnitSize); 3658 } 3659 3660 rangeMutex.lock(); 3661 scope(exit) rangeMutex.unlock(); 3662 3663 size_t i = 0; 3664 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3665 { 3666 temp[i] = addressOf(range.front); 3667 } 3668 3669 temp = temp[0 .. i]; 3670 auto ret = nPopped; 3671 nPopped += temp.length; 3672 return ret; 3673 } 3674 3675 } 3676 else 3677 { 3678 3679 alias Temp = ElementType!R[]; 3680 Temp temp; 3681 3682 // Returns: The previous value of nPopped. 3683 static if (!bufferTrick) size_t makeTemp() 3684 { 3685 import std.array : uninitializedArray; 3686 3687 if (temp is null) 3688 { 3689 temp = uninitializedArray!Temp(workUnitSize); 3690 } 3691 3692 rangeMutex.lock(); 3693 scope(exit) rangeMutex.unlock(); 3694 3695 size_t i = 0; 3696 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3697 { 3698 temp[i] = range.front; 3699 } 3700 3701 temp = temp[0 .. i]; 3702 auto ret = nPopped; 3703 nPopped += temp.length; 3704 return ret; 3705 } 3706 3707 static if (bufferTrick) size_t makeTemp() 3708 { 3709 import std.algorithm.mutation : swap; 3710 rangeMutex.lock(); 3711 scope(exit) rangeMutex.unlock(); 3712 3713 // Elide copying by just swapping buffers. 3714 temp.length = range.buf1.length; 3715 swap(range.buf1, temp); 3716 3717 // This is necessary in case popFront() has been called on 3718 // range before entering the parallel foreach loop. 3719 temp = temp[range.bufPos..$]; 3720 3721 static if (is(typeof(range._length))) 3722 { 3723 range._length -= (temp.length - range.bufPos); 3724 } 3725 3726 range.doBufSwap(); 3727 auto ret = nPopped; 3728 nPopped += temp.length; 3729 return ret; 3730 } 3731 } 3732 3733 while (atomicLoad(shouldContinue)) 3734 { 3735 auto overallIndex = makeTemp(); 3736 if (temp.empty) 3737 { 3738 atomicStore(shouldContinue, false); 3739 break; 3740 } 3741 3742 foreach (i; 0 .. temp.length) 3743 { 3744 scope(success) overallIndex++; 3745 3746 static if (hasLvalueElements!R) 3747 { 3748 static if (withIndex) 3749 { 3750 if (dg(overallIndex, *temp[i])) foreachErr(); 3751 } 3752 else 3753 { 3754 if (dg(*temp[i])) foreachErr(); 3755 } 3756 } 3757 else 3758 { 3759 static if (withIndex) 3760 { 3761 if (dg(overallIndex, temp[i])) foreachErr(); 3762 } 3763 else 3764 { 3765 if (dg(temp[i])) foreachErr(); 3766 } 3767 } 3768 } 3769 } 3770 } 3771 3772 submitAndExecute(pool, &doIt); 3773 3774 return 0; 3775 }; 3776 3777 // Calls e.next until the end of the chain is found. 3778 private Throwable findLastException(Throwable e) pure nothrow 3779 { 3780 if (e is null) return null; 3781 3782 while (e.next) 3783 { 3784 e = e.next; 3785 } 3786 3787 return e; 3788 } 3789 3790 // Adds e to the exception chain. 3791 private void addToChain( 3792 Throwable e, 3793 ref Throwable firstException, 3794 ref Throwable lastException 3795 ) pure nothrow 3796 { 3797 if (firstException) 3798 { 3799 assert(lastException); // nocoverage 3800 lastException.next = e; // nocoverage 3801 lastException = findLastException(e); // nocoverage 3802 } 3803 else 3804 { 3805 firstException = e; 3806 lastException = findLastException(e); 3807 } 3808 } 3809 3810 private struct ParallelForeach(R) 3811 { 3812 TaskPool pool; 3813 R range; 3814 size_t workUnitSize; 3815 alias E = ElementType!R; 3816 3817 static if (hasLvalueElements!R) 3818 { 3819 alias NoIndexDg = int delegate(ref E); 3820 alias IndexDg = int delegate(size_t, ref E); 3821 } 3822 else 3823 { 3824 alias NoIndexDg = int delegate(E); 3825 alias IndexDg = int delegate(size_t, E); 3826 } 3827 3828 int opApply(scope NoIndexDg dg) 3829 { 3830 static if (randLen!R) 3831 { 3832 mixin(parallelApplyMixinRandomAccess); 3833 } 3834 else 3835 { 3836 mixin(parallelApplyMixinInputRange); 3837 } 3838 } 3839 3840 int opApply(scope IndexDg dg) 3841 { 3842 static if (randLen!R) 3843 { 3844 mixin(parallelApplyMixinRandomAccess); 3845 } 3846 else 3847 { 3848 mixin(parallelApplyMixinInputRange); 3849 } 3850 } 3851 } 3852 3853 /* 3854 This struct buffers the output of a callable that outputs data into a 3855 user-supplied buffer into a set of buffers of some fixed size. It allows these 3856 buffers to be accessed with an input range interface. This is used internally 3857 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an 3858 instance and forwards it to the input range overload of asyncBuf. 3859 */ 3860 private struct RoundRobinBuffer(C1, C2) 3861 { 3862 // No need for constraints because they're already checked for in asyncBuf. 3863 3864 alias Array = Parameters!(C1.init)[0]; 3865 alias T = typeof(Array.init[0]); 3866 3867 T[][] bufs; 3868 size_t index; 3869 C1 nextDel; 3870 C2 emptyDel; 3871 bool _empty; 3872 bool primed; 3873 3874 this( 3875 C1 nextDel, 3876 C2 emptyDel, 3877 size_t initialBufSize, 3878 size_t nBuffers 3879 ) { 3880 this.nextDel = nextDel; 3881 this.emptyDel = emptyDel; 3882 bufs.length = nBuffers; 3883 3884 foreach (ref buf; bufs) 3885 { 3886 buf.length = initialBufSize; 3887 } 3888 } 3889 3890 void prime() 3891 in 3892 { 3893 assert(!empty); 3894 } 3895 body 3896 { 3897 scope(success) primed = true; 3898 nextDel(bufs[index]); 3899 } 3900 3901 3902 T[] front() @property 3903 in 3904 { 3905 assert(!empty); 3906 } 3907 body 3908 { 3909 if (!primed) prime(); 3910 return bufs[index]; 3911 } 3912 3913 void popFront() 3914 { 3915 if (empty || emptyDel()) 3916 { 3917 _empty = true; 3918 return; 3919 } 3920 3921 index = (index + 1) % bufs.length; 3922 primed = false; 3923 } 3924 3925 bool empty() @property const @safe pure nothrow 3926 { 3927 return _empty; 3928 } 3929 } 3930 3931 version (unittest) 3932 { 3933 // This was the only way I could get nested maps to work. 3934 __gshared TaskPool poolInstance; 3935 3936 import std.stdio; 3937 } 3938 3939 // These test basic functionality but don't stress test for threading bugs. 3940 // These are the tests that should be run every time Phobos is compiled. 3941 @system unittest 3942 { 3943 import std.algorithm.comparison : equal, min, max; 3944 import std.algorithm.iteration : filter, map, reduce; 3945 import std.array : split; 3946 import std.conv : text; 3947 import std.exception : assertThrown; 3948 import std.math : approxEqual, sqrt, log; 3949 import std.range : indexed, iota, join; 3950 import std.typecons : Tuple, tuple; 3951 3952 poolInstance = new TaskPool(2); 3953 scope(exit) poolInstance.stop(); 3954 3955 // The only way this can be verified is manually. 3956 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs); 3957 3958 auto oldPriority = poolInstance.priority; 3959 poolInstance.priority = Thread.PRIORITY_MAX; 3960 assert(poolInstance.priority == Thread.PRIORITY_MAX); 3961 3962 poolInstance.priority = Thread.PRIORITY_MIN; 3963 assert(poolInstance.priority == Thread.PRIORITY_MIN); 3964 3965 poolInstance.priority = oldPriority; 3966 assert(poolInstance.priority == oldPriority); 3967 3968 static void refFun(ref uint num) 3969 { 3970 num++; 3971 } 3972 3973 uint x; 3974 3975 // Test task(). 3976 auto t = task!refFun(x); 3977 poolInstance.put(t); 3978 t.yieldForce; 3979 assert(t.args[0] == 1); 3980 3981 auto t2 = task(&refFun, x); 3982 poolInstance.put(t2); 3983 t2.yieldForce; 3984 assert(t2.args[0] == 1); 3985 3986 // Test scopedTask(). 3987 auto st = scopedTask!refFun(x); 3988 poolInstance.put(st); 3989 st.yieldForce; 3990 assert(st.args[0] == 1); 3991 3992 auto st2 = scopedTask(&refFun, x); 3993 poolInstance.put(st2); 3994 st2.yieldForce; 3995 assert(st2.args[0] == 1); 3996 3997 // Test executeInNewThread(). 3998 auto ct = scopedTask!refFun(x); 3999 ct.executeInNewThread(Thread.PRIORITY_MAX); 4000 ct.yieldForce; 4001 assert(ct.args[0] == 1); 4002 4003 // Test ref return. 4004 uint toInc = 0; 4005 static ref T makeRef(T)(ref T num) 4006 { 4007 return num; 4008 } 4009 4010 auto t3 = task!makeRef(toInc); 4011 taskPool.put(t3); 4012 assert(t3.args[0] == 0); 4013 t3.spinForce++; 4014 assert(t3.args[0] == 1); 4015 4016 static void testSafe() @safe { 4017 static int bump(int num) 4018 { 4019 return num + 1; 4020 } 4021 4022 auto safePool = new TaskPool(0); 4023 auto t = task(&bump, 1); 4024 taskPool.put(t); 4025 assert(t.yieldForce == 2); 4026 4027 auto st = scopedTask(&bump, 1); 4028 taskPool.put(st); 4029 assert(st.yieldForce == 2); 4030 safePool.stop(); 4031 } 4032 4033 auto arr = [1,2,3,4,5]; 4034 auto nums = new uint[5]; 4035 auto nums2 = new uint[5]; 4036 4037 foreach (i, ref elem; poolInstance.parallel(arr)) 4038 { 4039 elem++; 4040 nums[i] = cast(uint) i + 2; 4041 nums2[i] = elem; 4042 } 4043 4044 assert(nums == [2,3,4,5,6], text(nums)); 4045 assert(nums2 == nums, text(nums2)); 4046 assert(arr == nums, text(arr)); 4047 4048 // Test const/immutable arguments. 4049 static int add(int lhs, int rhs) 4050 { 4051 return lhs + rhs; 4052 } 4053 immutable addLhs = 1; 4054 immutable addRhs = 2; 4055 auto addTask = task(&add, addLhs, addRhs); 4056 auto addScopedTask = scopedTask(&add, addLhs, addRhs); 4057 poolInstance.put(addTask); 4058 poolInstance.put(addScopedTask); 4059 assert(addTask.yieldForce == 3); 4060 assert(addScopedTask.yieldForce == 3); 4061 4062 // Test parallel foreach with non-random access range. 4063 auto range = filter!"a != 666"([0, 1, 2, 3, 4]); 4064 4065 foreach (i, elem; poolInstance.parallel(range)) 4066 { 4067 nums[i] = cast(uint) i; 4068 } 4069 4070 assert(nums == [0,1,2,3,4]); 4071 4072 auto logs = new double[1_000_000]; 4073 foreach (i, ref elem; poolInstance.parallel(logs)) 4074 { 4075 elem = log(i + 1.0); 4076 } 4077 4078 foreach (i, elem; logs) 4079 { 4080 assert(approxEqual(elem, cast(double) log(i + 1))); 4081 } 4082 4083 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); 4084 assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); 4085 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == 4086 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4087 4088 auto tupleBuf = new Tuple!(int, int)[3]; 4089 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf); 4090 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4091 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf); 4092 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4093 4094 // Test amap with a non-array buffer. 4095 auto toIndex = new int[5]; 4096 auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); 4097 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); 4098 assert(equal(ind, [2, 4, 6, 8, 10])); 4099 assert(equal(toIndex, [8, 4, 10, 2, 6])); 4100 poolInstance.amap!"a / 2"(ind, ind); 4101 assert(equal(ind, [1, 2, 3, 4, 5])); 4102 assert(equal(toIndex, [4, 2, 5, 1, 3])); 4103 4104 auto buf = new int[5]; 4105 poolInstance.amap!"a * a"([1,2,3,4,5], buf); 4106 assert(buf == [1,4,9,16,25]); 4107 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf); 4108 assert(buf == [1,4,9,16,25]); 4109 4110 assert(poolInstance.reduce!"a + b"([1]) == 1); 4111 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10); 4112 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10); 4113 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10); 4114 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4)); 4115 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == 4116 tuple(10, 24)); 4117 4118 immutable serialAns = reduce!"a + b"(iota(1000)); 4119 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); 4120 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); 4121 4122 // Test worker-local storage. 4123 auto wl = poolInstance.workerLocalStorage(0); 4124 foreach (i; poolInstance.parallel(iota(1000), 1)) 4125 { 4126 wl.get = wl.get + i; 4127 } 4128 4129 auto wlRange = wl.toRange; 4130 auto parallelSum = poolInstance.reduce!"a + b"(wlRange); 4131 assert(parallelSum == 499500); 4132 assert(wlRange[0 .. 1][0] == wlRange[0]); 4133 assert(wlRange[1 .. 2][0] == wlRange[1]); 4134 4135 // Test finish() 4136 { 4137 static void slowFun() { Thread.sleep(dur!"msecs"(1)); } 4138 4139 auto pool1 = new TaskPool(); 4140 auto tSlow = task!slowFun(); 4141 pool1.put(tSlow); 4142 pool1.finish(); 4143 tSlow.yieldForce; 4144 // Can't assert that pool1.status == PoolState.stopNow because status 4145 // doesn't change until after the "done" flag is set and the waiting 4146 // thread is woken up. 4147 4148 auto pool2 = new TaskPool(); 4149 auto tSlow2 = task!slowFun(); 4150 pool2.put(tSlow2); 4151 pool2.finish(true); // blocking 4152 assert(tSlow2.done); 4153 4154 // Test fix for Bug 8582 by making pool size zero. 4155 auto pool3 = new TaskPool(0); 4156 auto tSlow3 = task!slowFun(); 4157 pool3.put(tSlow3); 4158 pool3.finish(true); // blocking 4159 assert(tSlow3.done); 4160 4161 // This is correct because no thread will terminate unless pool2.status 4162 // and pool3.status have already been set to stopNow. 4163 assert(pool2.status == TaskPool.PoolState.stopNow); 4164 assert(pool3.status == TaskPool.PoolState.stopNow); 4165 } 4166 4167 // Test default pool stuff. 4168 assert(taskPool.size == totalCPUs - 1); 4169 4170 nums = new uint[1000]; 4171 foreach (i; parallel(iota(1000))) 4172 { 4173 nums[i] = cast(uint) i; 4174 } 4175 assert(equal(nums, iota(1000))); 4176 4177 assert(equal( 4178 poolInstance.map!"a * a"(iota(30_000_001), 10_000), 4179 map!"a * a"(iota(30_000_001)) 4180 )); 4181 4182 // The filter is to kill random access and test the non-random access 4183 // branch. 4184 assert(equal( 4185 poolInstance.map!"a * a"( 4186 filter!"a == a"(iota(30_000_001) 4187 ), 10_000, 1000), 4188 map!"a * a"(iota(30_000_001)) 4189 )); 4190 4191 assert( 4192 reduce!"a + b"(0UL, 4193 poolInstance.map!"a * a"(iota(3_000_001), 10_000) 4194 ) == 4195 reduce!"a + b"(0UL, 4196 map!"a * a"(iota(3_000_001)) 4197 ) 4198 ); 4199 4200 assert(equal( 4201 iota(1_000_002), 4202 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) 4203 )); 4204 4205 { 4206 import std.conv : to; 4207 import std.file : deleteme; 4208 4209 string temp_file = deleteme ~ "-tempDelMe.txt"; 4210 auto file = File(temp_file, "wb"); 4211 scope(exit) 4212 { 4213 file.close(); 4214 import std.file; 4215 remove(temp_file); 4216 } 4217 4218 auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; 4219 foreach (row; written) 4220 { 4221 file.writeln(join(to!(string[])(row), "\t")); 4222 } 4223 4224 file = File(temp_file); 4225 4226 void next(ref char[] buf) 4227 { 4228 file.readln(buf); 4229 import std.string : chomp; 4230 buf = chomp(buf); 4231 } 4232 4233 double[][] read; 4234 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 4235 4236 foreach (line; asyncReader) 4237 { 4238 if (line.length == 0) continue; 4239 auto ls = line.split("\t"); 4240 read ~= to!(double[])(ls); 4241 } 4242 4243 assert(read == written); 4244 file.close(); 4245 } 4246 4247 // Test Map/AsyncBuf chaining. 4248 4249 auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100); 4250 auto temp = poolInstance.map!sqrt( 4251 abuf, 100, 5 4252 ); 4253 auto lmchain = poolInstance.map!"a * a"(temp, 100, 5); 4254 lmchain.popFront(); 4255 4256 int ii; 4257 foreach ( elem; (lmchain)) 4258 { 4259 if (!approxEqual(elem, ii)) 4260 { 4261 stderr.writeln(ii, '\t', elem); 4262 } 4263 ii++; 4264 } 4265 4266 // Test buffer trick in parallel foreach. 4267 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100); 4268 abuf.popFront(); 4269 auto bufTrickTest = new size_t[abuf.length]; 4270 foreach (i, elem; parallel(abuf)) 4271 { 4272 bufTrickTest[i] = i; 4273 } 4274 4275 assert(equal(iota(1_000_000), bufTrickTest)); 4276 4277 auto myTask = task!(std.math.abs)(-1); 4278 taskPool.put(myTask); 4279 assert(myTask.spinForce == 1); 4280 4281 // Test that worker local storage from one pool receives an index of 0 4282 // when the index is queried w.r.t. another pool. The only way to do this 4283 // is non-deterministically. 4284 foreach (i; parallel(iota(1000), 1)) 4285 { 4286 assert(poolInstance.workerIndex == 0); 4287 } 4288 4289 foreach (i; poolInstance.parallel(iota(1000), 1)) 4290 { 4291 assert(taskPool.workerIndex == 0); 4292 } 4293 4294 // Test exception handling. 4295 static void parallelForeachThrow() 4296 { 4297 foreach (elem; parallel(iota(10))) 4298 { 4299 throw new Exception(""); 4300 } 4301 } 4302 4303 assertThrown!Exception(parallelForeachThrow()); 4304 4305 static int reduceException(int a, int b) 4306 { 4307 throw new Exception(""); 4308 } 4309 4310 assertThrown!Exception( 4311 poolInstance.reduce!reduceException(iota(3)) 4312 ); 4313 4314 static int mapException(int a) 4315 { 4316 throw new Exception(""); 4317 } 4318 4319 assertThrown!Exception( 4320 poolInstance.amap!mapException(iota(3)) 4321 ); 4322 4323 static void mapThrow() 4324 { 4325 auto m = poolInstance.map!mapException(iota(3)); 4326 m.popFront(); 4327 } 4328 4329 assertThrown!Exception(mapThrow()); 4330 4331 struct ThrowingRange 4332 { 4333 @property int front() 4334 { 4335 return 1; 4336 } 4337 void popFront() 4338 { 4339 throw new Exception(""); 4340 } 4341 enum bool empty = false; 4342 } 4343 4344 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init)); 4345 } 4346 4347 //version = parallelismStressTest; 4348 4349 // These are more like stress tests than real unit tests. They print out 4350 // tons of stuff and should not be run every time make unittest is run. 4351 version (parallelismStressTest) 4352 { 4353 @safe unittest 4354 { 4355 size_t attempt; 4356 for (; attempt < 10; attempt++) 4357 foreach (poolSize; [0, 4]) 4358 { 4359 4360 poolInstance = new TaskPool(poolSize); 4361 4362 uint[] numbers = new uint[1_000]; 4363 4364 foreach (i; poolInstance.parallel( iota(0, numbers.length)) ) 4365 { 4366 numbers[i] = cast(uint) i; 4367 } 4368 4369 // Make sure it works. 4370 foreach (i; 0 .. numbers.length) 4371 { 4372 assert(numbers[i] == i); 4373 } 4374 4375 stderr.writeln("Done creating nums."); 4376 4377 4378 auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); 4379 foreach (num; poolInstance.parallel(myNumbers)) 4380 { 4381 assert(num % 7 > 0 && num < 1000); 4382 } 4383 stderr.writeln("Done modulus test."); 4384 4385 uint[] squares = poolInstance.amap!"a * a"(numbers, 100); 4386 assert(squares.length == numbers.length); 4387 foreach (i, number; numbers) 4388 { 4389 assert(squares[i] == number * number); 4390 } 4391 stderr.writeln("Done squares."); 4392 4393 auto sumFuture = task!( reduce!"a + b" )(numbers); 4394 poolInstance.put(sumFuture); 4395 4396 ulong sumSquares = 0; 4397 foreach (elem; numbers) 4398 { 4399 sumSquares += elem * elem; 4400 } 4401 4402 uint mySum = sumFuture.spinForce(); 4403 assert(mySum == 999 * 1000 / 2); 4404 4405 auto mySumParallel = poolInstance.reduce!"a + b"(numbers); 4406 assert(mySum == mySumParallel); 4407 stderr.writeln("Done sums."); 4408 4409 auto myTask = task( 4410 { 4411 synchronized writeln("Our lives are parallel...Our lives are parallel."); 4412 }); 4413 poolInstance.put(myTask); 4414 4415 auto nestedOuter = "abcd"; 4416 auto nestedInner = iota(0, 10, 2); 4417 4418 foreach (i, letter; poolInstance.parallel(nestedOuter, 1)) 4419 { 4420 foreach (j, number; poolInstance.parallel(nestedInner, 1)) 4421 { 4422 synchronized writeln(i, ": ", letter, " ", j, ": ", number); 4423 } 4424 } 4425 4426 poolInstance.stop(); 4427 } 4428 4429 assert(attempt == 10); 4430 writeln("Press enter to go to next round of unittests."); 4431 readln(); 4432 } 4433 4434 // These unittests are intended more for actual testing and not so much 4435 // as examples. 4436 @safe unittest 4437 { 4438 foreach (attempt; 0 .. 10) 4439 foreach (poolSize; [0, 4]) 4440 { 4441 poolInstance = new TaskPool(poolSize); 4442 4443 // Test indexing. 4444 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex); 4445 assert(poolInstance.workerIndex() == 0); 4446 4447 // Test worker-local storage. 4448 auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1); 4449 foreach (i; poolInstance.parallel(iota(0U, 1_000_000))) 4450 { 4451 workerLocalStorage.get++; 4452 } 4453 assert(reduce!"a + b"(workerLocalStorage.toRange) == 4454 1_000_000 + poolInstance.size + 1); 4455 4456 // Make sure work is reasonably balanced among threads. This test is 4457 // non-deterministic and is more of a sanity check than something that 4458 // has an absolute pass/fail. 4459 shared(uint)[void*] nJobsByThread; 4460 foreach (thread; poolInstance.pool) 4461 { 4462 nJobsByThread[cast(void*) thread] = 0; 4463 } 4464 nJobsByThread[ cast(void*) Thread.getThis()] = 0; 4465 4466 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 )) 4467 { 4468 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1); 4469 } 4470 4471 stderr.writeln("\nCurrent thread is: ", 4472 cast(void*) Thread.getThis()); 4473 stderr.writeln("Workload distribution: "); 4474 foreach (k, v; nJobsByThread) 4475 { 4476 stderr.writeln(k, '\t', v); 4477 } 4478 4479 // Test whether amap can be nested. 4480 real[][] matrix = new real[][](1000, 1000); 4481 foreach (i; poolInstance.parallel( iota(0, matrix.length) )) 4482 { 4483 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) )) 4484 { 4485 matrix[i][j] = i * j; 4486 } 4487 } 4488 4489 // Get around weird bugs having to do w/ sqrt being an intrinsic: 4490 static real mySqrt(real num) 4491 { 4492 return sqrt(num); 4493 } 4494 4495 static real[] parallelSqrt(real[] nums) 4496 { 4497 return poolInstance.amap!mySqrt(nums); 4498 } 4499 4500 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix); 4501 4502 foreach (i, row; sqrtMatrix) 4503 { 4504 foreach (j, elem; row) 4505 { 4506 real shouldBe = sqrt( cast(real) i * j); 4507 assert(approxEqual(shouldBe, elem)); 4508 sqrtMatrix[i][j] = shouldBe; 4509 } 4510 } 4511 4512 auto saySuccess = task( 4513 { 4514 stderr.writeln( 4515 "Success doing matrix stuff that involves nested pool use."); 4516 }); 4517 poolInstance.put(saySuccess); 4518 saySuccess.workForce(); 4519 4520 // A more thorough test of amap, reduce: Find the sum of the square roots of 4521 // matrix. 4522 4523 static real parallelSum(real[] input) 4524 { 4525 return poolInstance.reduce!"a + b"(input); 4526 } 4527 4528 auto sumSqrt = poolInstance.reduce!"a + b"( 4529 poolInstance.amap!parallelSum( 4530 sqrtMatrix 4531 ) 4532 ); 4533 4534 assert(approxEqual(sumSqrt, 4.437e8)); 4535 stderr.writeln("Done sum of square roots."); 4536 4537 // Test whether tasks work with function pointers. 4538 auto nanTask = task(&isNaN, 1.0L); 4539 poolInstance.put(nanTask); 4540 assert(nanTask.spinForce == false); 4541 4542 if (poolInstance.size > 0) 4543 { 4544 // Test work waiting. 4545 static void uselessFun() 4546 { 4547 foreach (i; 0 .. 1_000_000) {} 4548 } 4549 4550 auto uselessTasks = new typeof(task(&uselessFun))[1000]; 4551 foreach (ref uselessTask; uselessTasks) 4552 { 4553 uselessTask = task(&uselessFun); 4554 } 4555 foreach (ref uselessTask; uselessTasks) 4556 { 4557 poolInstance.put(uselessTask); 4558 } 4559 foreach (ref uselessTask; uselessTasks) 4560 { 4561 uselessTask.workForce(); 4562 } 4563 } 4564 4565 // Test the case of non-random access + ref returns. 4566 int[] nums = [1,2,3,4,5]; 4567 static struct RemoveRandom 4568 { 4569 int[] arr; 4570 4571 ref int front() 4572 { 4573 return arr.front; 4574 } 4575 void popFront() 4576 { 4577 arr.popFront(); 4578 } 4579 bool empty() 4580 { 4581 return arr.empty; 4582 } 4583 } 4584 4585 auto refRange = RemoveRandom(nums); 4586 foreach (ref elem; poolInstance.parallel(refRange)) 4587 { 4588 elem++; 4589 } 4590 assert(nums == [2,3,4,5,6], text(nums)); 4591 stderr.writeln("Nums: ", nums); 4592 4593 poolInstance.stop(); 4594 } 4595 } 4596 } 4597 4598 version (unittest) 4599 { 4600 struct __S_12733 4601 { 4602 invariant() { assert(checksum == 1_234_567_890); } 4603 this(ulong u){n = u;} 4604 void opAssign(__S_12733 s){this.n = s.n;} 4605 ulong n; 4606 ulong checksum = 1_234_567_890; 4607 } 4608 4609 static auto __genPair_12733(ulong n) { return __S_12733(n); } 4610 } 4611 4612 @system unittest 4613 { 4614 immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ]; 4615 4616 auto result = taskPool.amap!__genPair_12733(data); 4617 } 4618 4619 @safe unittest 4620 { 4621 import std.range : iota; 4622 4623 // this test was in std.range, but caused cycles. 4624 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); 4625 } 4626 4627 @safe unittest 4628 { 4629 import std.algorithm.iteration : each; 4630 4631 long[] arr; 4632 static assert(is(typeof({ 4633 arr.parallel.each!"a++"; 4634 }))); 4635 } 4636 4637 // https://issues.dlang.org/show_bug.cgi?id=17539 4638 @system unittest 4639 { 4640 import std.random : rndGen; 4641 // ensure compilation 4642 try foreach (rnd; rndGen.parallel) break; 4643 catch (ParallelForeachError e) {} 4644 } 4645