1 /** 2 * $(SCRIPT inhibitQuickIndex = 1;) 3 * $(DIVC quickindex, 4 * $(BOOKTABLE, 5 * $(TR $(TH Category) $(TH Symbols)) 6 * $(TR $(TD Tid) $(TD 7 * $(MYREF locate) 8 * $(MYREF ownerTid) 9 * $(MYREF register) 10 * $(MYREF spawn) 11 * $(MYREF spawnLinked) 12 * $(MYREF thisTid) 13 * $(MYREF Tid) 14 * $(MYREF TidMissingException) 15 * $(MYREF unregister) 16 * )) 17 * $(TR $(TD Message passing) $(TD 18 * $(MYREF prioritySend) 19 * $(MYREF receive) 20 * $(MYREF receiveOnly) 21 * $(MYREF receiveTimeout) 22 * $(MYREF send) 23 * $(MYREF setMaxMailboxSize) 24 * )) 25 * $(TR $(TD Message-related types) $(TD 26 * $(MYREF LinkTerminated) 27 * $(MYREF MailboxFull) 28 * $(MYREF MessageMismatch) 29 * $(MYREF OnCrowding) 30 * $(MYREF OwnerTerminated) 31 * $(MYREF PriorityMessageException) 32 * )) 33 * $(TR $(TD Scheduler) $(TD 34 * $(MYREF FiberScheduler) 35 * $(MYREF Generator) 36 * $(MYREF Scheduler) 37 * $(MYREF scheduler) 38 * $(MYREF ThreadInfo) 39 * $(MYREF ThreadScheduler) 40 * $(MYREF yield) 41 * )) 42 * $(TR $(TD Misc) $(TD 43 * $(MYREF initOnce) 44 * )) 45 * )) 46 * 47 * This is a low-level messaging API upon which more structured or restrictive 48 * APIs may be built. The general idea is that every messageable entity is 49 * represented by a common handle type called a Tid, which allows messages to 50 * be sent to logical threads that are executing in both the current process 51 * and in external processes using the same interface. This is an important 52 * aspect of scalability because it allows the components of a program to be 53 * spread across available resources with few to no changes to the actual 54 * implementation. 55 * 56 * A logical thread is an execution context that has its own stack and which 57 * runs asynchronously to other logical threads. These may be preemptively 58 * scheduled kernel threads, fibers (cooperative user-space threads), or some 59 * other concept with similar behavior. 60 * 61 * The type of concurrency used when logical threads are created is determined 62 * by the Scheduler selected at initialization time. The default behavior is 63 * currently to create a new kernel thread per call to spawn, but other 64 * schedulers are available that multiplex fibers across the main thread or 65 * use some combination of the two approaches. 66 * 67 * Copyright: Copyright Sean Kelly 2009 - 2014. 68 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 69 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak 70 * Source: $(PHOBOSSRC std/concurrency.d) 71 */ 72 /* Copyright Sean Kelly 2009 - 2014. 73 * Distributed under the Boost Software License, Version 1.0. 74 * (See accompanying file LICENSE_1_0.txt or copy at 75 * http://www.boost.org/LICENSE_1_0.txt) 76 */ 77 module std.concurrency; 78 79 public import std.variant; 80 81 import core.atomic; 82 import core.sync.condition; 83 import core.sync.mutex; 84 import core.thread; 85 import std.range.primitives; 86 import std.range.interfaces : InputRange; 87 import std.traits; 88 89 /// 90 @system unittest 91 { 92 __gshared string received; 93 static void spawnedFunc(Tid ownerTid) 94 { 95 import std.conv : text; 96 // Receive a message from the owner thread. 97 receive((int i){ 98 received = text("Received the number ", i); 99 100 // Send a message back to the owner thread 101 // indicating success. 102 send(ownerTid, true); 103 }); 104 } 105 106 // Start spawnedFunc in a new thread. 107 auto childTid = spawn(&spawnedFunc, thisTid); 108 109 // Send the number 42 to this new thread. 110 send(childTid, 42); 111 112 // Receive the result code. 113 auto wasSuccessful = receiveOnly!(bool); 114 assert(wasSuccessful); 115 assert(received == "Received the number 42"); 116 } 117 118 private 119 { 120 bool hasLocalAliasing(Types...)() 121 { 122 import std.typecons : Rebindable; 123 124 // Works around "statement is not reachable" 125 bool doesIt = false; 126 static foreach (T; Types) 127 { 128 static if (is(T == Tid)) 129 { /* Allowed */ } 130 else static if (is(T : Rebindable!R, R)) 131 doesIt |= hasLocalAliasing!R; 132 else static if (is(T == struct)) 133 doesIt |= hasLocalAliasing!(typeof(T.tupleof)); 134 else 135 doesIt |= std.traits.hasUnsharedAliasing!(T); 136 } 137 return doesIt; 138 } 139 140 @safe unittest 141 { 142 static struct Container { Tid t; } 143 static assert(!hasLocalAliasing!(Tid, Container, int)); 144 } 145 146 // https://issues.dlang.org/show_bug.cgi?id=20097 147 @safe unittest 148 { 149 import std.datetime.systime : SysTime; 150 static struct Container { SysTime time; } 151 static assert(!hasLocalAliasing!(SysTime, Container)); 152 } 153 154 enum MsgType 155 { 156 standard, 157 priority, 158 linkDead, 159 } 160 161 struct Message 162 { 163 MsgType type; 164 Variant data; 165 166 this(T...)(MsgType t, T vals) if (T.length > 0) 167 { 168 static if (T.length == 1) 169 { 170 type = t; 171 data = vals[0]; 172 } 173 else 174 { 175 import std.typecons : Tuple; 176 177 type = t; 178 data = Tuple!(T)(vals); 179 } 180 } 181 182 @property auto convertsTo(T...)() 183 { 184 static if (T.length == 1) 185 { 186 return is(T[0] == Variant) || data.convertsTo!(T); 187 } 188 else 189 { 190 import std.typecons : Tuple; 191 return data.convertsTo!(Tuple!(T)); 192 } 193 } 194 195 @property auto get(T...)() 196 { 197 static if (T.length == 1) 198 { 199 static if (is(T[0] == Variant)) 200 return data; 201 else 202 return data.get!(T); 203 } 204 else 205 { 206 import std.typecons : Tuple; 207 return data.get!(Tuple!(T)); 208 } 209 } 210 211 auto map(Op)(Op op) 212 { 213 alias Args = Parameters!(Op); 214 215 static if (Args.length == 1) 216 { 217 static if (is(Args[0] == Variant)) 218 return op(data); 219 else 220 return op(data.get!(Args)); 221 } 222 else 223 { 224 import std.typecons : Tuple; 225 return op(data.get!(Tuple!(Args)).expand); 226 } 227 } 228 } 229 230 void checkops(T...)(T ops) 231 { 232 import std.format : format; 233 234 foreach (i, t1; T) 235 { 236 static assert(isFunctionPointer!t1 || isDelegate!t1, 237 format!"T %d is not a function pointer or delegates"(i)); 238 alias a1 = Parameters!(t1); 239 alias r1 = ReturnType!(t1); 240 241 static if (i < T.length - 1 && is(r1 == void)) 242 { 243 static assert(a1.length != 1 || !is(a1[0] == Variant), 244 "function with arguments " ~ a1.stringof ~ 245 " occludes successive function"); 246 247 foreach (t2; T[i + 1 .. $]) 248 { 249 alias a2 = Parameters!(t2); 250 251 static assert(!is(a1 == a2), 252 "function with arguments " ~ a1.stringof ~ " occludes successive function"); 253 } 254 } 255 } 256 } 257 258 @property ref ThreadInfo thisInfo() nothrow 259 { 260 if (scheduler is null) 261 return ThreadInfo.thisInfo; 262 return scheduler.thisInfo; 263 } 264 } 265 266 static ~this() 267 { 268 thisInfo.cleanup(); 269 } 270 271 // Exceptions 272 273 /** 274 * Thrown on calls to `receiveOnly` if a message other than the type 275 * the receiving thread expected is sent. 276 */ 277 class MessageMismatch : Exception 278 { 279 /// 280 this(string msg = "Unexpected message type") @safe pure nothrow @nogc 281 { 282 super(msg); 283 } 284 } 285 286 /** 287 * Thrown on calls to `receive` if the thread that spawned the receiving 288 * thread has terminated and no more messages exist. 289 */ 290 class OwnerTerminated : Exception 291 { 292 /// 293 this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc 294 { 295 super(msg); 296 tid = t; 297 } 298 299 Tid tid; 300 } 301 302 /** 303 * Thrown if a linked thread has terminated. 304 */ 305 class LinkTerminated : Exception 306 { 307 /// 308 this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc 309 { 310 super(msg); 311 tid = t; 312 } 313 314 Tid tid; 315 } 316 317 /** 318 * Thrown if a message was sent to a thread via 319 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler 320 * for a message of this type. 321 */ 322 class PriorityMessageException : Exception 323 { 324 /// 325 this(Variant vals) 326 { 327 super("Priority message"); 328 message = vals; 329 } 330 331 /** 332 * The message that was sent. 333 */ 334 Variant message; 335 } 336 337 /** 338 * Thrown on mailbox crowding if the mailbox is configured with 339 * `OnCrowding.throwException`. 340 */ 341 class MailboxFull : Exception 342 { 343 /// 344 this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc 345 { 346 super(msg); 347 tid = t; 348 } 349 350 Tid tid; 351 } 352 353 /** 354 * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't 355 * find an owner thread. 356 */ 357 class TidMissingException : Exception 358 { 359 import std.exception : basicExceptionCtors; 360 /// 361 mixin basicExceptionCtors; 362 } 363 364 365 // Thread ID 366 367 368 /** 369 * An opaque type used to represent a logical thread. 370 */ 371 struct Tid 372 { 373 private: 374 this(MessageBox m) @safe pure nothrow @nogc 375 { 376 mbox = m; 377 } 378 379 MessageBox mbox; 380 381 public: 382 383 /** 384 * Generate a convenient string for identifying this Tid. This is only 385 * useful to see if Tid's that are currently executing are the same or 386 * different, e.g. for logging and debugging. It is potentially possible 387 * that a Tid executed in the future will have the same toString() output 388 * as another Tid that has already terminated. 389 */ 390 void toString(W)(ref W w) const 391 { 392 import std.format.write : formattedWrite; 393 auto p = () @trusted { return cast(void*) mbox; }(); 394 formattedWrite(w, "Tid(%x)", p); 395 } 396 397 } 398 399 @safe unittest 400 { 401 import std.conv : text; 402 Tid tid; 403 assert(text(tid) == "Tid(0)"); 404 auto tid2 = thisTid; 405 assert(text(tid2) != "Tid(0)"); 406 auto tid3 = tid2; 407 assert(text(tid2) == text(tid3)); 408 } 409 410 // https://issues.dlang.org/show_bug.cgi?id=21512 411 @system unittest 412 { 413 import std.format : format; 414 415 const(Tid) b = spawn(() {}); 416 assert(format!"%s"(b)[0 .. 4] == "Tid("); 417 } 418 419 /** 420 * Returns: The $(LREF Tid) of the caller's thread. 421 */ 422 @property Tid thisTid() @safe 423 { 424 // TODO: remove when concurrency is safe 425 static auto trus() @trusted 426 { 427 if (thisInfo.ident != Tid.init) 428 return thisInfo.ident; 429 thisInfo.ident = Tid(new MessageBox); 430 return thisInfo.ident; 431 } 432 433 return trus(); 434 } 435 436 /** 437 * Return the Tid of the thread which spawned the caller's thread. 438 * 439 * Throws: A `TidMissingException` exception if 440 * there is no owner thread. 441 */ 442 @property Tid ownerTid() 443 { 444 import std.exception : enforce; 445 446 enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread."); 447 return thisInfo.owner; 448 } 449 450 @system unittest 451 { 452 import std.exception : assertThrown; 453 454 static void fun() 455 { 456 string res = receiveOnly!string(); 457 assert(res == "Main calling"); 458 ownerTid.send("Child responding"); 459 } 460 461 assertThrown!TidMissingException(ownerTid); 462 auto child = spawn(&fun); 463 child.send("Main calling"); 464 string res = receiveOnly!string(); 465 assert(res == "Child responding"); 466 } 467 468 // Thread Creation 469 470 private template isSpawnable(F, T...) 471 { 472 template isParamsImplicitlyConvertible(F1, F2, int i = 0) 473 { 474 alias param1 = Parameters!F1; 475 alias param2 = Parameters!F2; 476 static if (param1.length != param2.length) 477 enum isParamsImplicitlyConvertible = false; 478 else static if (param1.length == i) 479 enum isParamsImplicitlyConvertible = true; 480 else static if (isImplicitlyConvertible!(param2[i], param1[i])) 481 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, 482 F2, i + 1); 483 else 484 enum isParamsImplicitlyConvertible = false; 485 } 486 487 enum isSpawnable = isCallable!F && is(ReturnType!F : void) 488 && isParamsImplicitlyConvertible!(F, void function(T)) 489 && (isFunctionPointer!F || !hasUnsharedAliasing!F); 490 } 491 492 /** 493 * Starts fn(args) in a new logical thread. 494 * 495 * Executes the supplied function in a new logical thread represented by 496 * `Tid`. The calling thread is designated as the owner of the new thread. 497 * When the owner thread terminates an `OwnerTerminated` message will be 498 * sent to the new thread, causing an `OwnerTerminated` exception to be 499 * thrown on `receive()`. 500 * 501 * Params: 502 * fn = The function to execute. 503 * args = Arguments to the function. 504 * 505 * Returns: 506 * A Tid representing the new logical thread. 507 * 508 * Notes: 509 * `args` must not have unshared aliasing. In other words, all arguments 510 * to `fn` must either be `shared` or `immutable` or have no 511 * pointer indirection. This is necessary for enforcing isolation among 512 * threads. 513 * 514 * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning 515 * `fn` must be either `shared` or `immutable`. */ 516 Tid spawn(F, T...)(F fn, T args) 517 if (isSpawnable!(F, T)) 518 { 519 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 520 return _spawn(false, fn, args); 521 } 522 523 /// 524 @system unittest 525 { 526 static void f(string msg) 527 { 528 assert(msg == "Hello World"); 529 } 530 531 auto tid = spawn(&f, "Hello World"); 532 } 533 534 /// Fails: char[] has mutable aliasing. 535 @system unittest 536 { 537 string msg = "Hello, World!"; 538 539 static void f1(string msg) {} 540 static assert(!__traits(compiles, spawn(&f1, msg.dup))); 541 static assert( __traits(compiles, spawn(&f1, msg.idup))); 542 543 static void f2(char[] msg) {} 544 static assert(!__traits(compiles, spawn(&f2, msg.dup))); 545 static assert(!__traits(compiles, spawn(&f2, msg.idup))); 546 } 547 548 /// New thread with anonymous function 549 @system unittest 550 { 551 spawn({ 552 ownerTid.send("This is so great!"); 553 }); 554 assert(receiveOnly!string == "This is so great!"); 555 } 556 557 @system unittest 558 { 559 import core.thread : thread_joinAll; 560 561 __gshared string receivedMessage; 562 static void f1(string msg) 563 { 564 receivedMessage = msg; 565 } 566 567 auto tid1 = spawn(&f1, "Hello World"); 568 thread_joinAll; 569 assert(receivedMessage == "Hello World"); 570 } 571 572 /** 573 * Starts fn(args) in a logical thread and will receive a LinkTerminated 574 * message when the operation terminates. 575 * 576 * Executes the supplied function in a new logical thread represented by 577 * Tid. This new thread is linked to the calling thread so that if either 578 * it or the calling thread terminates a LinkTerminated message will be sent 579 * to the other, causing a LinkTerminated exception to be thrown on receive(). 580 * The owner relationship from spawn() is preserved as well, so if the link 581 * between threads is broken, owner termination will still result in an 582 * OwnerTerminated exception to be thrown on receive(). 583 * 584 * Params: 585 * fn = The function to execute. 586 * args = Arguments to the function. 587 * 588 * Returns: 589 * A Tid representing the new thread. 590 */ 591 Tid spawnLinked(F, T...)(F fn, T args) 592 if (isSpawnable!(F, T)) 593 { 594 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 595 return _spawn(true, fn, args); 596 } 597 598 /* 599 * 600 */ 601 private Tid _spawn(F, T...)(bool linked, F fn, T args) 602 if (isSpawnable!(F, T)) 603 { 604 // TODO: MessageList and &exec should be shared. 605 auto spawnTid = Tid(new MessageBox); 606 auto ownerTid = thisTid; 607 608 void exec() 609 { 610 thisInfo.ident = spawnTid; 611 thisInfo.owner = ownerTid; 612 fn(args); 613 } 614 615 // TODO: MessageList and &exec should be shared. 616 if (scheduler !is null) 617 scheduler.spawn(&exec); 618 else 619 { 620 auto t = new Thread(&exec); 621 t.start(); 622 } 623 thisInfo.links[spawnTid] = linked; 624 return spawnTid; 625 } 626 627 @system unittest 628 { 629 void function() fn1; 630 void function(int) fn2; 631 static assert(__traits(compiles, spawn(fn1))); 632 static assert(__traits(compiles, spawn(fn2, 2))); 633 static assert(!__traits(compiles, spawn(fn1, 1))); 634 static assert(!__traits(compiles, spawn(fn2))); 635 636 void delegate(int) shared dg1; 637 shared(void delegate(int)) dg2; 638 shared(void delegate(long) shared) dg3; 639 shared(void delegate(real, int, long) shared) dg4; 640 void delegate(int) immutable dg5; 641 void delegate(int) dg6; 642 static assert(__traits(compiles, spawn(dg1, 1))); 643 static assert(__traits(compiles, spawn(dg2, 2))); 644 static assert(__traits(compiles, spawn(dg3, 3))); 645 static assert(__traits(compiles, spawn(dg4, 4, 4, 4))); 646 static assert(__traits(compiles, spawn(dg5, 5))); 647 static assert(!__traits(compiles, spawn(dg6, 6))); 648 649 auto callable1 = new class{ void opCall(int) shared {} }; 650 auto callable2 = cast(shared) new class{ void opCall(int) shared {} }; 651 auto callable3 = new class{ void opCall(int) immutable {} }; 652 auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} }; 653 auto callable5 = new class{ void opCall(int) {} }; 654 auto callable6 = cast(shared) new class{ void opCall(int) immutable {} }; 655 auto callable7 = cast(immutable) new class{ void opCall(int) shared {} }; 656 auto callable8 = cast(shared) new class{ void opCall(int) const shared {} }; 657 auto callable9 = cast(const shared) new class{ void opCall(int) shared {} }; 658 auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} }; 659 auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} }; 660 static assert(!__traits(compiles, spawn(callable1, 1))); 661 static assert( __traits(compiles, spawn(callable2, 2))); 662 static assert(!__traits(compiles, spawn(callable3, 3))); 663 static assert( __traits(compiles, spawn(callable4, 4))); 664 static assert(!__traits(compiles, spawn(callable5, 5))); 665 static assert(!__traits(compiles, spawn(callable6, 6))); 666 static assert(!__traits(compiles, spawn(callable7, 7))); 667 static assert( __traits(compiles, spawn(callable8, 8))); 668 static assert(!__traits(compiles, spawn(callable9, 9))); 669 static assert( __traits(compiles, spawn(callable10, 10))); 670 static assert( __traits(compiles, spawn(callable11, 11))); 671 } 672 673 /** 674 * Places the values as a message at the back of tid's message queue. 675 * 676 * Sends the supplied value to the thread represented by tid. As with 677 * $(REF spawn, std,concurrency), `T` must not have unshared aliasing. 678 */ 679 void send(T...)(Tid tid, T vals) 680 in (tid.mbox !is null) 681 { 682 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 683 _send(tid, vals); 684 } 685 686 /** 687 * Places the values as a message on the front of tid's message queue. 688 * 689 * Send a message to `tid` but place it at the front of `tid`'s message 690 * queue instead of at the back. This function is typically used for 691 * out-of-band communication, to signal exceptional conditions, etc. 692 */ 693 void prioritySend(T...)(Tid tid, T vals) 694 in (tid.mbox !is null) 695 { 696 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 697 _send(MsgType.priority, tid, vals); 698 } 699 700 /* 701 * ditto 702 */ 703 private void _send(T...)(Tid tid, T vals) 704 in (tid.mbox !is null) 705 { 706 _send(MsgType.standard, tid, vals); 707 } 708 709 /* 710 * Implementation of send. This allows parameter checking to be different for 711 * both Tid.send() and .send(). 712 */ 713 private void _send(T...)(MsgType type, Tid tid, T vals) 714 in (tid.mbox !is null) 715 { 716 auto msg = Message(type, vals); 717 tid.mbox.put(msg); 718 } 719 720 /** 721 * Receives a message from another thread. 722 * 723 * Receive a message from another thread, or block if no messages of the 724 * specified types are available. This function works by pattern matching 725 * a message against a set of delegates and executing the first match found. 726 * 727 * If a delegate that accepts a $(REF Variant, std,variant) is included as 728 * the last argument to `receive`, it will match any message that was not 729 * matched by an earlier delegate. If more than one argument is sent, 730 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 731 * sent. 732 * 733 * Params: 734 * ops = Variadic list of function pointers and delegates. Entries 735 * in this list must not occlude later entries. 736 * 737 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 738 */ 739 void receive(T...)( T ops ) 740 in 741 { 742 assert(thisInfo.ident.mbox !is null, 743 "Cannot receive a message until a thread was spawned " 744 ~ "or thisTid was passed to a running thread."); 745 } 746 do 747 { 748 checkops( ops ); 749 750 thisInfo.ident.mbox.get( ops ); 751 } 752 753 /// 754 @system unittest 755 { 756 import std.variant : Variant; 757 758 auto process = () 759 { 760 receive( 761 (int i) { ownerTid.send(1); }, 762 (double f) { ownerTid.send(2); }, 763 (Variant v) { ownerTid.send(3); } 764 ); 765 }; 766 767 { 768 auto tid = spawn(process); 769 send(tid, 42); 770 assert(receiveOnly!int == 1); 771 } 772 773 { 774 auto tid = spawn(process); 775 send(tid, 3.14); 776 assert(receiveOnly!int == 2); 777 } 778 779 { 780 auto tid = spawn(process); 781 send(tid, "something else"); 782 assert(receiveOnly!int == 3); 783 } 784 } 785 786 @safe unittest 787 { 788 static assert( __traits( compiles, 789 { 790 receive( (Variant x) {} ); 791 receive( (int x) {}, (Variant x) {} ); 792 } ) ); 793 794 static assert( !__traits( compiles, 795 { 796 receive( (Variant x) {}, (int x) {} ); 797 } ) ); 798 799 static assert( !__traits( compiles, 800 { 801 receive( (int x) {}, (int x) {} ); 802 } ) ); 803 } 804 805 // Make sure receive() works with free functions as well. 806 version (StdUnittest) 807 { 808 private void receiveFunction(int x) {} 809 } 810 @safe unittest 811 { 812 static assert( __traits( compiles, 813 { 814 receive( &receiveFunction ); 815 receive( &receiveFunction, (Variant x) {} ); 816 } ) ); 817 } 818 819 820 private template receiveOnlyRet(T...) 821 { 822 static if ( T.length == 1 ) 823 { 824 alias receiveOnlyRet = T[0]; 825 } 826 else 827 { 828 import std.typecons : Tuple; 829 alias receiveOnlyRet = Tuple!(T); 830 } 831 } 832 833 /** 834 * Receives only messages with arguments of the specified types. 835 * 836 * Params: 837 * T = Variadic list of types to be received. 838 * 839 * Returns: The received message. If `T` has more than one entry, 840 * the message will be packed into a $(REF Tuple, std,typecons). 841 * 842 * Throws: $(LREF MessageMismatch) if a message of types other than `T` 843 * is received, 844 * $(LREF OwnerTerminated) when the sending thread was terminated. 845 */ 846 receiveOnlyRet!(T) receiveOnly(T...)() 847 in 848 { 849 assert(thisInfo.ident.mbox !is null, 850 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 851 } 852 do 853 { 854 import std.format : format; 855 import std.meta : allSatisfy; 856 import std.typecons : Tuple; 857 858 Tuple!(T) ret; 859 860 thisInfo.ident.mbox.get((T val) { 861 static if (T.length) 862 { 863 static if (allSatisfy!(isAssignable, T)) 864 { 865 ret.field = val; 866 } 867 else 868 { 869 import core.lifetime : emplace; 870 emplace(&ret, val); 871 } 872 } 873 }, 874 (LinkTerminated e) { throw e; }, 875 (OwnerTerminated e) { throw e; }, 876 (Variant val) { 877 static if (T.length > 1) 878 string exp = T.stringof; 879 else 880 string exp = T[0].stringof; 881 882 throw new MessageMismatch( 883 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString())); 884 }); 885 static if (T.length == 1) 886 return ret[0]; 887 else 888 return ret; 889 } 890 891 /// 892 @system unittest 893 { 894 auto tid = spawn( 895 { 896 assert(receiveOnly!int == 42); 897 }); 898 send(tid, 42); 899 } 900 901 /// 902 @system unittest 903 { 904 auto tid = spawn( 905 { 906 assert(receiveOnly!string == "text"); 907 }); 908 send(tid, "text"); 909 } 910 911 /// 912 @system unittest 913 { 914 struct Record { string name; int age; } 915 916 auto tid = spawn( 917 { 918 auto msg = receiveOnly!(double, Record); 919 assert(msg[0] == 0.5); 920 assert(msg[1].name == "Alice"); 921 assert(msg[1].age == 31); 922 }); 923 924 send(tid, 0.5, Record("Alice", 31)); 925 } 926 927 @system unittest 928 { 929 static void t1(Tid mainTid) 930 { 931 try 932 { 933 receiveOnly!string(); 934 mainTid.send(""); 935 } 936 catch (Throwable th) 937 { 938 mainTid.send(th.msg); 939 } 940 } 941 942 auto tid = spawn(&t1, thisTid); 943 tid.send(1); 944 string result = receiveOnly!string(); 945 assert(result == "Unexpected message type: expected 'string', got 'int'"); 946 } 947 948 // https://issues.dlang.org/show_bug.cgi?id=21663 949 @safe unittest 950 { 951 alias test = receiveOnly!(string, bool, bool); 952 } 953 954 /** 955 * Receives a message from another thread and gives up if no match 956 * arrives within a specified duration. 957 * 958 * Receive a message from another thread, or block until `duration` exceeds, 959 * if no messages of the specified types are available. This function works 960 * by pattern matching a message against a set of delegates and executing 961 * the first match found. 962 * 963 * If a delegate that accepts a $(REF Variant, std,variant) is included as 964 * the last argument, it will match any message that was not 965 * matched by an earlier delegate. If more than one argument is sent, 966 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 967 * sent. 968 * 969 * Params: 970 * duration = Duration, how long to wait. If `duration` is negative, 971 * won't wait at all. 972 * ops = Variadic list of function pointers and delegates. Entries 973 * in this list must not occlude later entries. 974 * 975 * Returns: `true` if it received a message and `false` if it timed out waiting 976 * for one. 977 * 978 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 979 */ 980 bool receiveTimeout(T...)(Duration duration, T ops) 981 in 982 { 983 assert(thisInfo.ident.mbox !is null, 984 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 985 } 986 do 987 { 988 checkops(ops); 989 990 return thisInfo.ident.mbox.get(duration, ops); 991 } 992 993 @safe unittest 994 { 995 static assert(__traits(compiles, { 996 receiveTimeout(msecs(0), (Variant x) {}); 997 receiveTimeout(msecs(0), (int x) {}, (Variant x) {}); 998 })); 999 1000 static assert(!__traits(compiles, { 1001 receiveTimeout(msecs(0), (Variant x) {}, (int x) {}); 1002 })); 1003 1004 static assert(!__traits(compiles, { 1005 receiveTimeout(msecs(0), (int x) {}, (int x) {}); 1006 })); 1007 1008 static assert(__traits(compiles, { 1009 receiveTimeout(msecs(10), (int x) {}, (Variant x) {}); 1010 })); 1011 } 1012 1013 // MessageBox Limits 1014 1015 /** 1016 * These behaviors may be specified when a mailbox is full. 1017 */ 1018 enum OnCrowding 1019 { 1020 block, /// Wait until room is available. 1021 throwException, /// Throw a MailboxFull exception. 1022 ignore /// Abort the send and return. 1023 } 1024 1025 private 1026 { 1027 bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc 1028 { 1029 return true; 1030 } 1031 1032 bool onCrowdingThrow(Tid tid) @safe pure 1033 { 1034 throw new MailboxFull(tid); 1035 } 1036 1037 bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc 1038 { 1039 return false; 1040 } 1041 } 1042 1043 /** 1044 * Sets a maximum mailbox size. 1045 * 1046 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1047 * If this limit is reached, the caller attempting to add a new message will 1048 * execute the behavior specified by doThis. If messages is zero, the mailbox 1049 * is unbounded. 1050 * 1051 * Params: 1052 * tid = The Tid of the thread for which this limit should be set. 1053 * messages = The maximum number of messages or zero if no limit. 1054 * doThis = The behavior executed when a message is sent to a full 1055 * mailbox. 1056 */ 1057 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure 1058 in (tid.mbox !is null) 1059 { 1060 final switch (doThis) 1061 { 1062 case OnCrowding.block: 1063 return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock); 1064 case OnCrowding.throwException: 1065 return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow); 1066 case OnCrowding.ignore: 1067 return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore); 1068 } 1069 } 1070 1071 /** 1072 * Sets a maximum mailbox size. 1073 * 1074 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1075 * If this limit is reached, the caller attempting to add a new message will 1076 * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded. 1077 * 1078 * Params: 1079 * tid = The Tid of the thread for which this limit should be set. 1080 * messages = The maximum number of messages or zero if no limit. 1081 * onCrowdingDoThis = The routine called when a message is sent to a full 1082 * mailbox. 1083 */ 1084 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis) 1085 in (tid.mbox !is null) 1086 { 1087 tid.mbox.setMaxMsgs(messages, onCrowdingDoThis); 1088 } 1089 1090 private 1091 { 1092 __gshared Tid[string] tidByName; 1093 __gshared string[][Tid] namesByTid; 1094 } 1095 1096 private @property Mutex registryLock() 1097 { 1098 __gshared Mutex impl; 1099 initOnce!impl(new Mutex); 1100 return impl; 1101 } 1102 1103 private void unregisterMe(ref ThreadInfo me) 1104 { 1105 if (me.ident != Tid.init) 1106 { 1107 synchronized (registryLock) 1108 { 1109 if (auto allNames = me.ident in namesByTid) 1110 { 1111 foreach (name; *allNames) 1112 tidByName.remove(name); 1113 namesByTid.remove(me.ident); 1114 } 1115 } 1116 } 1117 } 1118 1119 /** 1120 * Associates name with tid. 1121 * 1122 * Associates name with tid in a process-local map. When the thread 1123 * represented by tid terminates, any names associated with it will be 1124 * automatically unregistered. 1125 * 1126 * Params: 1127 * name = The name to associate with tid. 1128 * tid = The tid register by name. 1129 * 1130 * Returns: 1131 * true if the name is available and tid is not known to represent a 1132 * defunct thread. 1133 */ 1134 bool register(string name, Tid tid) 1135 in (tid.mbox !is null) 1136 { 1137 synchronized (registryLock) 1138 { 1139 if (name in tidByName) 1140 return false; 1141 if (tid.mbox.isClosed) 1142 return false; 1143 namesByTid[tid] ~= name; 1144 tidByName[name] = tid; 1145 return true; 1146 } 1147 } 1148 1149 /** 1150 * Removes the registered name associated with a tid. 1151 * 1152 * Params: 1153 * name = The name to unregister. 1154 * 1155 * Returns: 1156 * true if the name is registered, false if not. 1157 */ 1158 bool unregister(string name) 1159 { 1160 import std.algorithm.mutation : remove, SwapStrategy; 1161 import std.algorithm.searching : countUntil; 1162 1163 synchronized (registryLock) 1164 { 1165 if (auto tid = name in tidByName) 1166 { 1167 auto allNames = *tid in namesByTid; 1168 auto pos = countUntil(*allNames, name); 1169 remove!(SwapStrategy.unstable)(*allNames, pos); 1170 tidByName.remove(name); 1171 return true; 1172 } 1173 return false; 1174 } 1175 } 1176 1177 /** 1178 * Gets the Tid associated with name. 1179 * 1180 * Params: 1181 * name = The name to locate within the registry. 1182 * 1183 * Returns: 1184 * The associated Tid or Tid.init if name is not registered. 1185 */ 1186 Tid locate(string name) 1187 { 1188 synchronized (registryLock) 1189 { 1190 if (auto tid = name in tidByName) 1191 return *tid; 1192 return Tid.init; 1193 } 1194 } 1195 1196 /** 1197 * Encapsulates all implementation-level data needed for scheduling. 1198 * 1199 * When defining a Scheduler, an instance of this struct must be associated 1200 * with each logical thread. It contains all implementation-level information 1201 * needed by the internal API. 1202 */ 1203 struct ThreadInfo 1204 { 1205 Tid ident; 1206 bool[Tid] links; 1207 Tid owner; 1208 1209 /** 1210 * Gets a thread-local instance of ThreadInfo. 1211 * 1212 * Gets a thread-local instance of ThreadInfo, which should be used as the 1213 * default instance when info is requested for a thread not created by the 1214 * Scheduler. 1215 */ 1216 static @property ref thisInfo() nothrow 1217 { 1218 static ThreadInfo val; 1219 return val; 1220 } 1221 1222 /** 1223 * Cleans up this ThreadInfo. 1224 * 1225 * This must be called when a scheduled thread terminates. It tears down 1226 * the messaging system for the thread and notifies interested parties of 1227 * the thread's termination. 1228 */ 1229 void cleanup() 1230 { 1231 if (ident.mbox !is null) 1232 ident.mbox.close(); 1233 foreach (tid; links.keys) 1234 _send(MsgType.linkDead, tid, ident); 1235 if (owner != Tid.init) 1236 _send(MsgType.linkDead, owner, ident); 1237 unregisterMe(this); // clean up registry entries 1238 } 1239 1240 // https://issues.dlang.org/show_bug.cgi?id=20160 1241 @system unittest 1242 { 1243 register("main_thread", thisTid()); 1244 1245 ThreadInfo t; 1246 t.cleanup(); 1247 1248 assert(locate("main_thread") == thisTid()); 1249 } 1250 } 1251 1252 /** 1253 * A Scheduler controls how threading is performed by spawn. 1254 * 1255 * Implementing a Scheduler allows the concurrency mechanism used by this 1256 * module to be customized according to different needs. By default, a call 1257 * to spawn will create a new kernel thread that executes the supplied routine 1258 * and terminates when finished. But it is possible to create Schedulers that 1259 * reuse threads, that multiplex Fibers (coroutines) across a single thread, 1260 * or any number of other approaches. By making the choice of Scheduler a 1261 * user-level option, std.concurrency may be used for far more types of 1262 * application than if this behavior were predefined. 1263 * 1264 * Example: 1265 * --- 1266 * import std.concurrency; 1267 * import std.stdio; 1268 * 1269 * void main() 1270 * { 1271 * scheduler = new FiberScheduler; 1272 * scheduler.start( 1273 * { 1274 * writeln("the rest of main goes here"); 1275 * }); 1276 * } 1277 * --- 1278 * 1279 * Some schedulers have a dispatching loop that must run if they are to work 1280 * properly, so for the sake of consistency, when using a scheduler, start() 1281 * must be called within main(). This yields control to the scheduler and 1282 * will ensure that any spawned threads are executed in an expected manner. 1283 */ 1284 interface Scheduler 1285 { 1286 /** 1287 * Spawns the supplied op and starts the Scheduler. 1288 * 1289 * This is intended to be called at the start of the program to yield all 1290 * scheduling to the active Scheduler instance. This is necessary for 1291 * schedulers that explicitly dispatch threads rather than simply relying 1292 * on the operating system to do so, and so start should always be called 1293 * within main() to begin normal program execution. 1294 * 1295 * Params: 1296 * op = A wrapper for whatever the main thread would have done in the 1297 * absence of a custom scheduler. It will be automatically executed 1298 * via a call to spawn by the Scheduler. 1299 */ 1300 void start(void delegate() op); 1301 1302 /** 1303 * Assigns a logical thread to execute the supplied op. 1304 * 1305 * This routine is called by spawn. It is expected to instantiate a new 1306 * logical thread and run the supplied operation. This thread must call 1307 * thisInfo.cleanup() when the thread terminates if the scheduled thread 1308 * is not a kernel thread--all kernel threads will have their ThreadInfo 1309 * cleaned up automatically by a thread-local destructor. 1310 * 1311 * Params: 1312 * op = The function to execute. This may be the actual function passed 1313 * by the user to spawn itself, or may be a wrapper function. 1314 */ 1315 void spawn(void delegate() op); 1316 1317 /** 1318 * Yields execution to another logical thread. 1319 * 1320 * This routine is called at various points within concurrency-aware APIs 1321 * to provide a scheduler a chance to yield execution when using some sort 1322 * of cooperative multithreading model. If this is not appropriate, such 1323 * as when each logical thread is backed by a dedicated kernel thread, 1324 * this routine may be a no-op. 1325 */ 1326 void yield() nothrow; 1327 1328 /** 1329 * Returns an appropriate ThreadInfo instance. 1330 * 1331 * Returns an instance of ThreadInfo specific to the logical thread that 1332 * is calling this routine or, if the calling thread was not create by 1333 * this scheduler, returns ThreadInfo.thisInfo instead. 1334 */ 1335 @property ref ThreadInfo thisInfo() nothrow; 1336 1337 /** 1338 * Creates a Condition variable analog for signaling. 1339 * 1340 * Creates a new Condition variable analog which is used to check for and 1341 * to signal the addition of messages to a thread's message queue. Like 1342 * yield, some schedulers may need to define custom behavior so that calls 1343 * to Condition.wait() yield to another thread when no new messages are 1344 * available instead of blocking. 1345 * 1346 * Params: 1347 * m = The Mutex that will be associated with this condition. It will be 1348 * locked prior to any operation on the condition, and so in some 1349 * cases a Scheduler may need to hold this reference and unlock the 1350 * mutex before yielding execution to another logical thread. 1351 */ 1352 Condition newCondition(Mutex m) nothrow; 1353 } 1354 1355 /** 1356 * An example Scheduler using kernel threads. 1357 * 1358 * This is an example Scheduler that mirrors the default scheduling behavior 1359 * of creating one kernel thread per call to spawn. It is fully functional 1360 * and may be instantiated and used, but is not a necessary part of the 1361 * default functioning of this module. 1362 */ 1363 class ThreadScheduler : Scheduler 1364 { 1365 /** 1366 * This simply runs op directly, since no real scheduling is needed by 1367 * this approach. 1368 */ 1369 void start(void delegate() op) 1370 { 1371 op(); 1372 } 1373 1374 /** 1375 * Creates a new kernel thread and assigns it to run the supplied op. 1376 */ 1377 void spawn(void delegate() op) 1378 { 1379 auto t = new Thread(op); 1380 t.start(); 1381 } 1382 1383 /** 1384 * This scheduler does no explicit multiplexing, so this is a no-op. 1385 */ 1386 void yield() nothrow 1387 { 1388 // no explicit yield needed 1389 } 1390 1391 /** 1392 * Returns ThreadInfo.thisInfo, since it is a thread-local instance of 1393 * ThreadInfo, which is the correct behavior for this scheduler. 1394 */ 1395 @property ref ThreadInfo thisInfo() nothrow 1396 { 1397 return ThreadInfo.thisInfo; 1398 } 1399 1400 /** 1401 * Creates a new Condition variable. No custom behavior is needed here. 1402 */ 1403 Condition newCondition(Mutex m) nothrow 1404 { 1405 return new Condition(m); 1406 } 1407 } 1408 1409 /** 1410 * An example Scheduler using Fibers. 1411 * 1412 * This is an example scheduler that creates a new Fiber per call to spawn 1413 * and multiplexes the execution of all fibers within the main thread. 1414 */ 1415 class FiberScheduler : Scheduler 1416 { 1417 /** 1418 * This creates a new Fiber for the supplied op and then starts the 1419 * dispatcher. 1420 */ 1421 void start(void delegate() op) 1422 { 1423 create(op); 1424 dispatch(); 1425 } 1426 1427 /** 1428 * This created a new Fiber for the supplied op and adds it to the 1429 * dispatch list. 1430 */ 1431 void spawn(void delegate() op) nothrow 1432 { 1433 create(op); 1434 yield(); 1435 } 1436 1437 /** 1438 * If the caller is a scheduled Fiber, this yields execution to another 1439 * scheduled Fiber. 1440 */ 1441 void yield() nothrow 1442 { 1443 // NOTE: It's possible that we should test whether the calling Fiber 1444 // is an InfoFiber before yielding, but I think it's reasonable 1445 // that any (non-Generator) fiber should yield here. 1446 if (Fiber.getThis()) 1447 Fiber.yield(); 1448 } 1449 1450 /** 1451 * Returns an appropriate ThreadInfo instance. 1452 * 1453 * Returns a ThreadInfo instance specific to the calling Fiber if the 1454 * Fiber was created by this dispatcher, otherwise it returns 1455 * ThreadInfo.thisInfo. 1456 */ 1457 @property ref ThreadInfo thisInfo() nothrow 1458 { 1459 auto f = cast(InfoFiber) Fiber.getThis(); 1460 1461 if (f !is null) 1462 return f.info; 1463 return ThreadInfo.thisInfo; 1464 } 1465 1466 /** 1467 * Returns a Condition analog that yields when wait or notify is called. 1468 * 1469 * Bug: 1470 * For the default implementation, `notifyAll`will behave like `notify`. 1471 * 1472 * Params: 1473 * m = A `Mutex` to use for locking if the condition needs to be waited on 1474 * or notified from multiple `Thread`s. 1475 * If `null`, no `Mutex` will be used and it is assumed that the 1476 * `Condition` is only waited on/notified from one `Thread`. 1477 */ 1478 Condition newCondition(Mutex m) nothrow 1479 { 1480 return new FiberCondition(m); 1481 } 1482 1483 protected: 1484 /** 1485 * Creates a new Fiber which calls the given delegate. 1486 * 1487 * Params: 1488 * op = The delegate the fiber should call 1489 */ 1490 void create(void delegate() op) nothrow 1491 { 1492 void wrap() 1493 { 1494 scope (exit) 1495 { 1496 thisInfo.cleanup(); 1497 } 1498 op(); 1499 } 1500 1501 m_fibers ~= new InfoFiber(&wrap); 1502 } 1503 1504 /** 1505 * Fiber which embeds a ThreadInfo 1506 */ 1507 static class InfoFiber : Fiber 1508 { 1509 ThreadInfo info; 1510 1511 this(void delegate() op) nothrow 1512 { 1513 super(op); 1514 } 1515 1516 this(void delegate() op, size_t sz) nothrow 1517 { 1518 super(op, sz); 1519 } 1520 } 1521 1522 private: 1523 class FiberCondition : Condition 1524 { 1525 this(Mutex m) nothrow 1526 { 1527 super(m); 1528 notified = false; 1529 } 1530 1531 override void wait() nothrow 1532 { 1533 scope (exit) notified = false; 1534 1535 while (!notified) 1536 switchContext(); 1537 } 1538 1539 override bool wait(Duration period) nothrow 1540 { 1541 import core.time : MonoTime; 1542 1543 scope (exit) notified = false; 1544 1545 for (auto limit = MonoTime.currTime + period; 1546 !notified && !period.isNegative; 1547 period = limit - MonoTime.currTime) 1548 { 1549 this.outer.yield(); 1550 } 1551 return notified; 1552 } 1553 1554 override void notify() nothrow 1555 { 1556 notified = true; 1557 switchContext(); 1558 } 1559 1560 override void notifyAll() nothrow 1561 { 1562 notified = true; 1563 switchContext(); 1564 } 1565 1566 private: 1567 void switchContext() nothrow 1568 { 1569 if (mutex_nothrow) mutex_nothrow.unlock_nothrow(); 1570 scope (exit) 1571 if (mutex_nothrow) 1572 mutex_nothrow.lock_nothrow(); 1573 this.outer.yield(); 1574 } 1575 1576 bool notified; 1577 } 1578 1579 void dispatch() 1580 { 1581 import std.algorithm.mutation : remove; 1582 1583 while (m_fibers.length > 0) 1584 { 1585 auto t = m_fibers[m_pos].call(Fiber.Rethrow.no); 1586 if (t !is null && !(cast(OwnerTerminated) t)) 1587 { 1588 throw t; 1589 } 1590 if (m_fibers[m_pos].state == Fiber.State.TERM) 1591 { 1592 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length) 1593 m_pos = 0; 1594 } 1595 else if (m_pos++ >= m_fibers.length - 1) 1596 { 1597 m_pos = 0; 1598 } 1599 } 1600 } 1601 1602 Fiber[] m_fibers; 1603 size_t m_pos; 1604 } 1605 1606 @system unittest 1607 { 1608 static void receive(Condition cond, ref size_t received) 1609 { 1610 while (true) 1611 { 1612 synchronized (cond.mutex) 1613 { 1614 cond.wait(); 1615 ++received; 1616 } 1617 } 1618 } 1619 1620 static void send(Condition cond, ref size_t sent) 1621 { 1622 while (true) 1623 { 1624 synchronized (cond.mutex) 1625 { 1626 ++sent; 1627 cond.notify(); 1628 } 1629 } 1630 } 1631 1632 auto fs = new FiberScheduler; 1633 auto mtx = new Mutex; 1634 auto cond = fs.newCondition(mtx); 1635 1636 size_t received, sent; 1637 auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); }); 1638 waiter.call(); 1639 assert(received == 0); 1640 notifier.call(); 1641 assert(sent == 1); 1642 assert(received == 0); 1643 waiter.call(); 1644 assert(received == 1); 1645 waiter.call(); 1646 assert(received == 1); 1647 } 1648 1649 /** 1650 * Sets the Scheduler behavior within the program. 1651 * 1652 * This variable sets the Scheduler behavior within this program. Typically, 1653 * when setting a Scheduler, scheduler.start() should be called in main. This 1654 * routine will not return until program execution is complete. 1655 */ 1656 __gshared Scheduler scheduler; 1657 1658 // Generator 1659 1660 /** 1661 * If the caller is a Fiber and is not a Generator, this function will call 1662 * scheduler.yield() or Fiber.yield(), as appropriate. 1663 */ 1664 void yield() nothrow 1665 { 1666 auto fiber = Fiber.getThis(); 1667 if (!(cast(IsGenerator) fiber)) 1668 { 1669 if (scheduler is null) 1670 { 1671 if (fiber) 1672 return Fiber.yield(); 1673 } 1674 else 1675 scheduler.yield(); 1676 } 1677 } 1678 1679 /// Used to determine whether a Generator is running. 1680 private interface IsGenerator {} 1681 1682 1683 /** 1684 * A Generator is a Fiber that periodically returns values of type T to the 1685 * caller via yield. This is represented as an InputRange. 1686 */ 1687 class Generator(T) : 1688 Fiber, IsGenerator, InputRange!T 1689 { 1690 /** 1691 * Initializes a generator object which is associated with a static 1692 * D function. The function will be called once to prepare the range 1693 * for iteration. 1694 * 1695 * Params: 1696 * fn = The fiber function. 1697 * 1698 * In: 1699 * fn must not be null. 1700 */ 1701 this(void function() fn) 1702 { 1703 super(fn); 1704 call(); 1705 } 1706 1707 /** 1708 * Initializes a generator object which is associated with a static 1709 * D function. The function will be called once to prepare the range 1710 * for iteration. 1711 * 1712 * Params: 1713 * fn = The fiber function. 1714 * sz = The stack size for this fiber. 1715 * 1716 * In: 1717 * fn must not be null. 1718 */ 1719 this(void function() fn, size_t sz) 1720 { 1721 super(fn, sz); 1722 call(); 1723 } 1724 1725 /** 1726 * Initializes a generator object which is associated with a static 1727 * D function. The function will be called once to prepare the range 1728 * for iteration. 1729 * 1730 * Params: 1731 * fn = The fiber function. 1732 * sz = The stack size for this fiber. 1733 * guardPageSize = size of the guard page to trap fiber's stack 1734 * overflows. Refer to $(REF Fiber, core,thread)'s 1735 * documentation for more details. 1736 * 1737 * In: 1738 * fn must not be null. 1739 */ 1740 this(void function() fn, size_t sz, size_t guardPageSize) 1741 { 1742 super(fn, sz, guardPageSize); 1743 call(); 1744 } 1745 1746 /** 1747 * Initializes a generator object which is associated with a dynamic 1748 * D function. The function will be called once to prepare the range 1749 * for iteration. 1750 * 1751 * Params: 1752 * dg = The fiber function. 1753 * 1754 * In: 1755 * dg must not be null. 1756 */ 1757 this(void delegate() dg) 1758 { 1759 super(dg); 1760 call(); 1761 } 1762 1763 /** 1764 * Initializes a generator object which is associated with a dynamic 1765 * D function. The function will be called once to prepare the range 1766 * for iteration. 1767 * 1768 * Params: 1769 * dg = The fiber function. 1770 * sz = The stack size for this fiber. 1771 * 1772 * In: 1773 * dg must not be null. 1774 */ 1775 this(void delegate() dg, size_t sz) 1776 { 1777 super(dg, sz); 1778 call(); 1779 } 1780 1781 /** 1782 * Initializes a generator object which is associated with a dynamic 1783 * D function. The function will be called once to prepare the range 1784 * for iteration. 1785 * 1786 * Params: 1787 * dg = The fiber function. 1788 * sz = The stack size for this fiber. 1789 * guardPageSize = size of the guard page to trap fiber's stack 1790 * overflows. Refer to $(REF Fiber, core,thread)'s 1791 * documentation for more details. 1792 * 1793 * In: 1794 * dg must not be null. 1795 */ 1796 this(void delegate() dg, size_t sz, size_t guardPageSize) 1797 { 1798 super(dg, sz, guardPageSize); 1799 call(); 1800 } 1801 1802 /** 1803 * Returns true if the generator is empty. 1804 */ 1805 final bool empty() @property 1806 { 1807 return m_value is null || state == State.TERM; 1808 } 1809 1810 /** 1811 * Obtains the next value from the underlying function. 1812 */ 1813 final void popFront() 1814 { 1815 call(); 1816 } 1817 1818 /** 1819 * Returns the most recently generated value by shallow copy. 1820 */ 1821 final T front() @property 1822 { 1823 return *m_value; 1824 } 1825 1826 /** 1827 * Returns the most recently generated value without executing a 1828 * copy contructor. Will not compile for element types defining a 1829 * postblit, because Generator does not return by reference. 1830 */ 1831 final T moveFront() 1832 { 1833 static if (!hasElaborateCopyConstructor!T) 1834 { 1835 return front; 1836 } 1837 else 1838 { 1839 static assert(0, 1840 "Fiber front is always rvalue and thus cannot be moved since it defines a postblit."); 1841 } 1842 } 1843 1844 final int opApply(scope int delegate(T) loopBody) 1845 { 1846 int broken; 1847 for (; !empty; popFront()) 1848 { 1849 broken = loopBody(front); 1850 if (broken) break; 1851 } 1852 return broken; 1853 } 1854 1855 final int opApply(scope int delegate(size_t, T) loopBody) 1856 { 1857 int broken; 1858 for (size_t i; !empty; ++i, popFront()) 1859 { 1860 broken = loopBody(i, front); 1861 if (broken) break; 1862 } 1863 return broken; 1864 } 1865 private: 1866 T* m_value; 1867 } 1868 1869 /// 1870 @system unittest 1871 { 1872 auto tid = spawn({ 1873 int i; 1874 while (i < 9) 1875 i = receiveOnly!int; 1876 1877 ownerTid.send(i * 2); 1878 }); 1879 1880 auto r = new Generator!int({ 1881 foreach (i; 1 .. 10) 1882 yield(i); 1883 }); 1884 1885 foreach (e; r) 1886 tid.send(e); 1887 1888 assert(receiveOnly!int == 18); 1889 } 1890 1891 /** 1892 * Yields a value of type T to the caller of the currently executing 1893 * generator. 1894 * 1895 * Params: 1896 * value = The value to yield. 1897 */ 1898 void yield(T)(ref T value) 1899 { 1900 Generator!T cur = cast(Generator!T) Fiber.getThis(); 1901 if (cur !is null && cur.state == Fiber.State.EXEC) 1902 { 1903 cur.m_value = &value; 1904 return Fiber.yield(); 1905 } 1906 throw new Exception("yield(T) called with no active generator for the supplied type"); 1907 } 1908 1909 /// ditto 1910 void yield(T)(T value) 1911 { 1912 yield(value); 1913 } 1914 1915 @system unittest 1916 { 1917 import core.exception; 1918 import std.exception; 1919 1920 auto mainTid = thisTid; 1921 alias testdg = () { 1922 auto tid = spawn( 1923 (Tid mainTid) { 1924 int i; 1925 scope (failure) mainTid.send(false); 1926 try 1927 { 1928 for (i = 1; i < 10; i++) 1929 { 1930 if (receiveOnly!int() != i) 1931 { 1932 mainTid.send(false); 1933 break; 1934 } 1935 } 1936 } 1937 catch (OwnerTerminated e) 1938 { 1939 // i will advance 1 past the last value expected 1940 mainTid.send(i == 4); 1941 } 1942 }, mainTid); 1943 auto r = new Generator!int( 1944 { 1945 assertThrown!Exception(yield(2.0)); 1946 yield(); // ensure this is a no-op 1947 yield(1); 1948 yield(); // also once something has been yielded 1949 yield(2); 1950 yield(3); 1951 }); 1952 1953 foreach (e; r) 1954 { 1955 tid.send(e); 1956 } 1957 }; 1958 1959 scheduler = new ThreadScheduler; 1960 scheduler.spawn(testdg); 1961 assert(receiveOnly!bool()); 1962 1963 scheduler = new FiberScheduler; 1964 scheduler.start(testdg); 1965 assert(receiveOnly!bool()); 1966 scheduler = null; 1967 } 1968 /// 1969 @system unittest 1970 { 1971 import std.range; 1972 1973 InputRange!int myIota = iota(10).inputRangeObject; 1974 1975 myIota.popFront(); 1976 myIota.popFront(); 1977 assert(myIota.moveFront == 2); 1978 assert(myIota.front == 2); 1979 myIota.popFront(); 1980 assert(myIota.front == 3); 1981 1982 //can be assigned to std.range.interfaces.InputRange directly 1983 myIota = new Generator!int( 1984 { 1985 foreach (i; 0 .. 10) yield(i); 1986 }); 1987 1988 myIota.popFront(); 1989 myIota.popFront(); 1990 assert(myIota.moveFront == 2); 1991 assert(myIota.front == 2); 1992 myIota.popFront(); 1993 assert(myIota.front == 3); 1994 1995 size_t[2] counter = [0, 0]; 1996 foreach (i, unused; myIota) counter[] += [1, i]; 1997 1998 assert(myIota.empty); 1999 assert(counter == [7, 21]); 2000 } 2001 2002 private 2003 { 2004 /* 2005 * A MessageBox is a message queue for one thread. Other threads may send 2006 * messages to this owner by calling put(), and the owner receives them by 2007 * calling get(). The put() call is therefore effectively shared and the 2008 * get() call is effectively local. setMaxMsgs may be used by any thread 2009 * to limit the size of the message queue. 2010 */ 2011 class MessageBox 2012 { 2013 this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */ 2014 { 2015 m_lock = new Mutex; 2016 m_closed = false; 2017 2018 if (scheduler is null) 2019 { 2020 m_putMsg = new Condition(m_lock); 2021 m_notFull = new Condition(m_lock); 2022 } 2023 else 2024 { 2025 m_putMsg = scheduler.newCondition(m_lock); 2026 m_notFull = scheduler.newCondition(m_lock); 2027 } 2028 } 2029 2030 /// 2031 final @property bool isClosed() @safe @nogc pure 2032 { 2033 synchronized (m_lock) 2034 { 2035 return m_closed; 2036 } 2037 } 2038 2039 /* 2040 * Sets a limit on the maximum number of user messages allowed in the 2041 * mailbox. If this limit is reached, the caller attempting to add 2042 * a new message will execute call. If num is zero, there is no limit 2043 * on the message queue. 2044 * 2045 * Params: 2046 * num = The maximum size of the queue or zero if the queue is 2047 * unbounded. 2048 * call = The routine to call when the queue is full. 2049 */ 2050 final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure 2051 { 2052 synchronized (m_lock) 2053 { 2054 m_maxMsgs = num; 2055 m_onMaxMsgs = call; 2056 } 2057 } 2058 2059 /* 2060 * If maxMsgs is not set, the message is added to the queue and the 2061 * owner is notified. If the queue is full, the message will still be 2062 * accepted if it is a control message, otherwise onCrowdingDoThis is 2063 * called. If the routine returns true, this call will block until 2064 * the owner has made space available in the queue. If it returns 2065 * false, this call will abort. 2066 * 2067 * Params: 2068 * msg = The message to put in the queue. 2069 * 2070 * Throws: 2071 * An exception if the queue is full and onCrowdingDoThis throws. 2072 */ 2073 final void put(ref Message msg) 2074 { 2075 synchronized (m_lock) 2076 { 2077 // TODO: Generate an error here if m_closed is true, or maybe 2078 // put a message in the caller's queue? 2079 if (!m_closed) 2080 { 2081 while (true) 2082 { 2083 if (isPriorityMsg(msg)) 2084 { 2085 m_sharedPty.put(msg); 2086 m_putMsg.notify(); 2087 return; 2088 } 2089 if (!mboxFull() || isControlMsg(msg)) 2090 { 2091 m_sharedBox.put(msg); 2092 m_putMsg.notify(); 2093 return; 2094 } 2095 if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid)) 2096 { 2097 return; 2098 } 2099 m_putQueue++; 2100 m_notFull.wait(); 2101 m_putQueue--; 2102 } 2103 } 2104 } 2105 } 2106 2107 /* 2108 * Matches ops against each message in turn until a match is found. 2109 * 2110 * Params: 2111 * ops = The operations to match. Each may return a bool to indicate 2112 * whether a message with a matching type is truly a match. 2113 * 2114 * Returns: 2115 * true if a message was retrieved and false if not (such as if a 2116 * timeout occurred). 2117 * 2118 * Throws: 2119 * LinkTerminated if a linked thread terminated, or OwnerTerminated 2120 * if the owner thread terminates and no existing messages match the 2121 * supplied ops. 2122 */ 2123 bool get(T...)(scope T vals) 2124 { 2125 import std.meta : AliasSeq; 2126 2127 static assert(T.length, "T must not be empty"); 2128 2129 static if (isImplicitlyConvertible!(T[0], Duration)) 2130 { 2131 alias Ops = AliasSeq!(T[1 .. $]); 2132 alias ops = vals[1 .. $]; 2133 enum timedWait = true; 2134 Duration period = vals[0]; 2135 } 2136 else 2137 { 2138 alias Ops = AliasSeq!(T); 2139 alias ops = vals[0 .. $]; 2140 enum timedWait = false; 2141 } 2142 2143 bool onStandardMsg(ref Message msg) 2144 { 2145 foreach (i, t; Ops) 2146 { 2147 alias Args = Parameters!(t); 2148 auto op = ops[i]; 2149 2150 if (msg.convertsTo!(Args)) 2151 { 2152 alias RT = ReturnType!(t); 2153 static if (is(RT == bool)) 2154 { 2155 return msg.map(op); 2156 } 2157 else 2158 { 2159 msg.map(op); 2160 static if (!is(immutable RT == immutable noreturn)) 2161 return true; 2162 } 2163 } 2164 } 2165 return false; 2166 } 2167 2168 bool onLinkDeadMsg(ref Message msg) 2169 { 2170 assert(msg.convertsTo!(Tid), 2171 "Message could be converted to Tid"); 2172 auto tid = msg.get!(Tid); 2173 2174 if (bool* pDepends = tid in thisInfo.links) 2175 { 2176 auto depends = *pDepends; 2177 thisInfo.links.remove(tid); 2178 // Give the owner relationship precedence. 2179 if (depends && tid != thisInfo.owner) 2180 { 2181 auto e = new LinkTerminated(tid); 2182 auto m = Message(MsgType.standard, e); 2183 if (onStandardMsg(m)) 2184 return true; 2185 throw e; 2186 } 2187 } 2188 if (tid == thisInfo.owner) 2189 { 2190 thisInfo.owner = Tid.init; 2191 auto e = new OwnerTerminated(tid); 2192 auto m = Message(MsgType.standard, e); 2193 if (onStandardMsg(m)) 2194 return true; 2195 throw e; 2196 } 2197 return false; 2198 } 2199 2200 bool onControlMsg(ref Message msg) 2201 { 2202 switch (msg.type) 2203 { 2204 case MsgType.linkDead: 2205 return onLinkDeadMsg(msg); 2206 default: 2207 return false; 2208 } 2209 } 2210 2211 bool scan(ref ListT list) 2212 { 2213 for (auto range = list[]; !range.empty;) 2214 { 2215 // Only the message handler will throw, so if this occurs 2216 // we can be certain that the message was handled. 2217 scope (failure) 2218 list.removeAt(range); 2219 2220 if (isControlMsg(range.front)) 2221 { 2222 if (onControlMsg(range.front)) 2223 { 2224 // Although the linkDead message is a control message, 2225 // it can be handled by the user. Since the linkDead 2226 // message throws if not handled, if we get here then 2227 // it has been handled and we can return from receive. 2228 // This is a weird special case that will have to be 2229 // handled in a more general way if more are added. 2230 if (!isLinkDeadMsg(range.front)) 2231 { 2232 list.removeAt(range); 2233 continue; 2234 } 2235 list.removeAt(range); 2236 return true; 2237 } 2238 range.popFront(); 2239 continue; 2240 } 2241 else 2242 { 2243 if (onStandardMsg(range.front)) 2244 { 2245 list.removeAt(range); 2246 return true; 2247 } 2248 range.popFront(); 2249 continue; 2250 } 2251 } 2252 return false; 2253 } 2254 2255 bool pty(ref ListT list) 2256 { 2257 if (!list.empty) 2258 { 2259 auto range = list[]; 2260 2261 if (onStandardMsg(range.front)) 2262 { 2263 list.removeAt(range); 2264 return true; 2265 } 2266 if (range.front.convertsTo!(Throwable)) 2267 throw range.front.get!(Throwable); 2268 else if (range.front.convertsTo!(shared(Throwable))) 2269 throw range.front.get!(shared(Throwable)); 2270 else 2271 throw new PriorityMessageException(range.front.data); 2272 } 2273 return false; 2274 } 2275 2276 static if (timedWait) 2277 { 2278 import core.time : MonoTime; 2279 auto limit = MonoTime.currTime + period; 2280 } 2281 2282 while (true) 2283 { 2284 ListT arrived; 2285 2286 if (pty(m_localPty) || scan(m_localBox)) 2287 { 2288 return true; 2289 } 2290 yield(); 2291 synchronized (m_lock) 2292 { 2293 updateMsgCount(); 2294 while (m_sharedPty.empty && m_sharedBox.empty) 2295 { 2296 // NOTE: We're notifying all waiters here instead of just 2297 // a few because the onCrowding behavior may have 2298 // changed and we don't want to block sender threads 2299 // unnecessarily if the new behavior is not to block. 2300 // This will admittedly result in spurious wakeups 2301 // in other situations, but what can you do? 2302 if (m_putQueue && !mboxFull()) 2303 m_notFull.notifyAll(); 2304 static if (timedWait) 2305 { 2306 if (period <= Duration.zero || !m_putMsg.wait(period)) 2307 return false; 2308 } 2309 else 2310 { 2311 m_putMsg.wait(); 2312 } 2313 } 2314 m_localPty.put(m_sharedPty); 2315 arrived.put(m_sharedBox); 2316 } 2317 if (m_localPty.empty) 2318 { 2319 scope (exit) m_localBox.put(arrived); 2320 if (scan(arrived)) 2321 { 2322 return true; 2323 } 2324 else 2325 { 2326 static if (timedWait) 2327 { 2328 period = limit - MonoTime.currTime; 2329 } 2330 continue; 2331 } 2332 } 2333 m_localBox.put(arrived); 2334 pty(m_localPty); 2335 return true; 2336 } 2337 } 2338 2339 /* 2340 * Called on thread termination. This routine processes any remaining 2341 * control messages, clears out message queues, and sets a flag to 2342 * reject any future messages. 2343 */ 2344 final void close() 2345 { 2346 static void onLinkDeadMsg(ref Message msg) 2347 { 2348 assert(msg.convertsTo!(Tid), 2349 "Message could be converted to Tid"); 2350 auto tid = msg.get!(Tid); 2351 2352 thisInfo.links.remove(tid); 2353 if (tid == thisInfo.owner) 2354 thisInfo.owner = Tid.init; 2355 } 2356 2357 static void sweep(ref ListT list) 2358 { 2359 for (auto range = list[]; !range.empty; range.popFront()) 2360 { 2361 if (range.front.type == MsgType.linkDead) 2362 onLinkDeadMsg(range.front); 2363 } 2364 } 2365 2366 ListT arrived; 2367 2368 sweep(m_localBox); 2369 synchronized (m_lock) 2370 { 2371 arrived.put(m_sharedBox); 2372 m_closed = true; 2373 } 2374 m_localBox.clear(); 2375 sweep(arrived); 2376 } 2377 2378 private: 2379 // Routines involving local data only, no lock needed. 2380 2381 bool mboxFull() @safe @nogc pure nothrow 2382 { 2383 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; 2384 } 2385 2386 void updateMsgCount() @safe @nogc pure nothrow 2387 { 2388 m_localMsgs = m_localBox.length; 2389 } 2390 2391 bool isControlMsg(ref Message msg) @safe @nogc pure nothrow 2392 { 2393 return msg.type != MsgType.standard && msg.type != MsgType.priority; 2394 } 2395 2396 bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow 2397 { 2398 return msg.type == MsgType.priority; 2399 } 2400 2401 bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow 2402 { 2403 return msg.type == MsgType.linkDead; 2404 } 2405 2406 alias OnMaxFn = bool function(Tid); 2407 alias ListT = List!(Message); 2408 2409 ListT m_localBox; 2410 ListT m_localPty; 2411 2412 Mutex m_lock; 2413 Condition m_putMsg; 2414 Condition m_notFull; 2415 size_t m_putQueue; 2416 ListT m_sharedBox; 2417 ListT m_sharedPty; 2418 OnMaxFn m_onMaxMsgs; 2419 size_t m_localMsgs; 2420 size_t m_maxMsgs; 2421 bool m_closed; 2422 } 2423 2424 /* 2425 * 2426 */ 2427 struct List(T) 2428 { 2429 struct Range 2430 { 2431 import std.exception : enforce; 2432 2433 @property bool empty() const 2434 { 2435 return !m_prev.next; 2436 } 2437 2438 @property ref T front() 2439 { 2440 enforce(m_prev.next, "invalid list node"); 2441 return m_prev.next.val; 2442 } 2443 2444 @property void front(T val) 2445 { 2446 enforce(m_prev.next, "invalid list node"); 2447 m_prev.next.val = val; 2448 } 2449 2450 void popFront() 2451 { 2452 enforce(m_prev.next, "invalid list node"); 2453 m_prev = m_prev.next; 2454 } 2455 2456 private this(Node* p) 2457 { 2458 m_prev = p; 2459 } 2460 2461 private Node* m_prev; 2462 } 2463 2464 void put(T val) 2465 { 2466 put(newNode(val)); 2467 } 2468 2469 void put(ref List!(T) rhs) 2470 { 2471 if (!rhs.empty) 2472 { 2473 put(rhs.m_first); 2474 while (m_last.next !is null) 2475 { 2476 m_last = m_last.next; 2477 m_count++; 2478 } 2479 rhs.m_first = null; 2480 rhs.m_last = null; 2481 rhs.m_count = 0; 2482 } 2483 } 2484 2485 Range opSlice() 2486 { 2487 return Range(cast(Node*)&m_first); 2488 } 2489 2490 void removeAt(Range r) 2491 { 2492 import std.exception : enforce; 2493 2494 assert(m_count, "Can not remove from empty Range"); 2495 Node* n = r.m_prev; 2496 enforce(n && n.next, "attempting to remove invalid list node"); 2497 2498 if (m_last is m_first) 2499 m_last = null; 2500 else if (m_last is n.next) 2501 m_last = n; // nocoverage 2502 Node* to_free = n.next; 2503 n.next = n.next.next; 2504 freeNode(to_free); 2505 m_count--; 2506 } 2507 2508 @property size_t length() 2509 { 2510 return m_count; 2511 } 2512 2513 void clear() 2514 { 2515 m_first = m_last = null; 2516 m_count = 0; 2517 } 2518 2519 @property bool empty() 2520 { 2521 return m_first is null; 2522 } 2523 2524 private: 2525 struct Node 2526 { 2527 Node* next; 2528 T val; 2529 2530 this(T v) 2531 { 2532 val = v; 2533 } 2534 } 2535 2536 static shared struct SpinLock 2537 { 2538 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } } 2539 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); } 2540 bool locked; 2541 } 2542 2543 static shared SpinLock sm_lock; 2544 static shared Node* sm_head; 2545 2546 Node* newNode(T v) 2547 { 2548 Node* n; 2549 { 2550 sm_lock.lock(); 2551 scope (exit) sm_lock.unlock(); 2552 2553 if (sm_head) 2554 { 2555 n = cast(Node*) sm_head; 2556 sm_head = sm_head.next; 2557 } 2558 } 2559 if (n) 2560 { 2561 import core.lifetime : emplace; 2562 emplace!Node(n, v); 2563 } 2564 else 2565 { 2566 n = new Node(v); 2567 } 2568 return n; 2569 } 2570 2571 void freeNode(Node* n) 2572 { 2573 // destroy val to free any owned GC memory 2574 destroy(n.val); 2575 2576 sm_lock.lock(); 2577 scope (exit) sm_lock.unlock(); 2578 2579 auto sn = cast(shared(Node)*) n; 2580 sn.next = sm_head; 2581 sm_head = sn; 2582 } 2583 2584 void put(Node* n) 2585 { 2586 m_count++; 2587 if (!empty) 2588 { 2589 m_last.next = n; 2590 m_last = n; 2591 return; 2592 } 2593 m_first = n; 2594 m_last = n; 2595 } 2596 2597 Node* m_first; 2598 Node* m_last; 2599 size_t m_count; 2600 } 2601 } 2602 2603 @system unittest 2604 { 2605 import std.typecons : tuple, Tuple; 2606 2607 static void testfn(Tid tid) 2608 { 2609 receive((float val) { assert(0); }, (int val, int val2) { 2610 assert(val == 42 && val2 == 86); 2611 }); 2612 receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); }); 2613 receive((Variant val) { }); 2614 receive((string val) { 2615 if ("the quick brown fox" != val) 2616 return false; 2617 return true; 2618 }, (string val) { assert(false); }); 2619 prioritySend(tid, "done"); 2620 } 2621 2622 static void runTest(Tid tid) 2623 { 2624 send(tid, 42, 86); 2625 send(tid, tuple(42, 86)); 2626 send(tid, "hello", "there"); 2627 send(tid, "the quick brown fox"); 2628 receive((string val) { assert(val == "done"); }); 2629 } 2630 2631 static void simpleTest() 2632 { 2633 auto tid = spawn(&testfn, thisTid); 2634 runTest(tid); 2635 2636 // Run the test again with a limited mailbox size. 2637 tid = spawn(&testfn, thisTid); 2638 setMaxMailboxSize(tid, 2, OnCrowding.block); 2639 runTest(tid); 2640 } 2641 2642 simpleTest(); 2643 2644 scheduler = new ThreadScheduler; 2645 simpleTest(); 2646 scheduler = null; 2647 } 2648 2649 private @property shared(Mutex) initOnceLock() 2650 { 2651 static shared Mutex lock; 2652 if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock)) 2653 return mtx; 2654 auto mtx = new shared Mutex; 2655 if (cas(&lock, cast(shared) null, mtx)) 2656 return mtx; 2657 return atomicLoad!(MemoryOrder.acq)(lock); 2658 } 2659 2660 /** 2661 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a 2662 * thread-safe manner. 2663 * 2664 * The implementation guarantees that all threads simultaneously calling 2665 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is 2666 * fully initialized. All side-effects of $(D_PARAM init) are globally visible 2667 * afterwards. 2668 * 2669 * Params: 2670 * var = The variable to initialize 2671 * init = The lazy initializer value 2672 * 2673 * Returns: 2674 * A reference to the initialized variable 2675 */ 2676 auto ref initOnce(alias var)(lazy typeof(var) init) 2677 { 2678 return initOnce!var(init, initOnceLock); 2679 } 2680 2681 /// A typical use-case is to perform lazy but thread-safe initialization. 2682 @system unittest 2683 { 2684 static class MySingleton 2685 { 2686 static MySingleton instance() 2687 { 2688 __gshared MySingleton inst; 2689 return initOnce!inst(new MySingleton); 2690 } 2691 } 2692 2693 assert(MySingleton.instance !is null); 2694 } 2695 2696 @system unittest 2697 { 2698 static class MySingleton 2699 { 2700 static MySingleton instance() 2701 { 2702 __gshared MySingleton inst; 2703 return initOnce!inst(new MySingleton); 2704 } 2705 2706 private: 2707 this() { val = ++cnt; } 2708 size_t val; 2709 __gshared size_t cnt; 2710 } 2711 2712 foreach (_; 0 .. 10) 2713 spawn({ ownerTid.send(MySingleton.instance.val); }); 2714 foreach (_; 0 .. 10) 2715 assert(receiveOnly!size_t == MySingleton.instance.val); 2716 assert(MySingleton.cnt == 1); 2717 } 2718 2719 /** 2720 * Same as above, but takes a separate mutex instead of sharing one among 2721 * all initOnce instances. 2722 * 2723 * This should be used to avoid dead-locks when the $(D_PARAM init) 2724 * expression waits for the result of another thread that might also 2725 * call initOnce. Use with care. 2726 * 2727 * Params: 2728 * var = The variable to initialize 2729 * init = The lazy initializer value 2730 * mutex = A mutex to prevent race conditions 2731 * 2732 * Returns: 2733 * A reference to the initialized variable 2734 */ 2735 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex) 2736 { 2737 // check that var is global, can't take address of a TLS variable 2738 static assert(is(typeof({ __gshared p = &var; })), 2739 "var must be 'static shared' or '__gshared'."); 2740 import core.atomic : atomicLoad, MemoryOrder, atomicStore; 2741 2742 static shared bool flag; 2743 if (!atomicLoad!(MemoryOrder.acq)(flag)) 2744 { 2745 synchronized (mutex) 2746 { 2747 if (!atomicLoad!(MemoryOrder.raw)(flag)) 2748 { 2749 var = init; 2750 static if (!is(immutable typeof(var) == immutable noreturn)) 2751 atomicStore!(MemoryOrder.rel)(flag, true); 2752 } 2753 } 2754 } 2755 return var; 2756 } 2757 2758 /// ditto 2759 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) 2760 { 2761 return initOnce!var(init, cast(shared) mutex); 2762 } 2763 2764 /// Use a separate mutex when init blocks on another thread that might also call initOnce. 2765 @system unittest 2766 { 2767 import core.sync.mutex : Mutex; 2768 2769 static shared bool varA, varB; 2770 static shared Mutex m; 2771 m = new shared Mutex; 2772 2773 spawn({ 2774 // use a different mutex for varB to avoid a dead-lock 2775 initOnce!varB(true, m); 2776 ownerTid.send(true); 2777 }); 2778 // init depends on the result of the spawned thread 2779 initOnce!varA(receiveOnly!bool); 2780 assert(varA == true); 2781 assert(varB == true); 2782 } 2783 2784 @system unittest 2785 { 2786 static shared bool a; 2787 __gshared bool b; 2788 static bool c; 2789 bool d; 2790 initOnce!a(true); 2791 initOnce!b(true); 2792 static assert(!__traits(compiles, initOnce!c(true))); // TLS 2793 static assert(!__traits(compiles, initOnce!d(true))); // local variable 2794 } 2795 2796 // test ability to send shared arrays 2797 @system unittest 2798 { 2799 static shared int[] x = new shared(int)[1]; 2800 auto tid = spawn({ 2801 auto arr = receiveOnly!(shared(int)[]); 2802 arr[0] = 5; 2803 ownerTid.send(true); 2804 }); 2805 tid.send(x); 2806 receiveOnly!(bool); 2807 assert(x[0] == 5); 2808 } 2809 2810 // https://issues.dlang.org/show_bug.cgi?id=13930 2811 @system unittest 2812 { 2813 immutable aa = ["0":0]; 2814 thisTid.send(aa); 2815 receiveOnly!(immutable int[string]); // compile error 2816 } 2817 2818 // https://issues.dlang.org/show_bug.cgi?id=19345 2819 @system unittest 2820 { 2821 static struct Aggregate { const int a; const int[5] b; } 2822 static void t1(Tid mainTid) 2823 { 2824 const sendMe = Aggregate(42, [1, 2, 3, 4, 5]); 2825 mainTid.send(sendMe); 2826 } 2827 2828 spawn(&t1, thisTid); 2829 auto result1 = receiveOnly!(const Aggregate)(); 2830 immutable expected = Aggregate(42, [1, 2, 3, 4, 5]); 2831 assert(result1 == expected); 2832 } 2833 2834 // Noreturn support 2835 @system unittest 2836 { 2837 static noreturn foo(int) { throw new Exception(""); } 2838 2839 if (false) spawn(&foo, 1); 2840 if (false) spawnLinked(&foo, 1); 2841 2842 if (false) receive(&foo); 2843 if (false) receiveTimeout(Duration.init, &foo); 2844 2845 // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend 2846 static assert(__traits(compiles, receiveOnly!noreturn() )); 2847 static assert(__traits(compiles, send(Tid.init, noreturn.init) )); 2848 static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init) )); 2849 static assert(__traits(compiles, yield(noreturn.init) )); 2850 2851 static assert(__traits(compiles, { 2852 __gshared noreturn n; 2853 initOnce!n(noreturn.init); 2854 })); 2855 } 2856