xref: /netbsd-src/external/gpl3/gcc/dist/libphobos/src/std/concurrency.d (revision b1e838363e3c6fc78a55519254d99869742dd33c)
1 /**
2  * $(SCRIPT inhibitQuickIndex = 1;)
3  * $(DIVC quickindex,
4  * $(BOOKTABLE,
5  * $(TR $(TH Category) $(TH Symbols))
6  * $(TR $(TD Tid) $(TD
7  *     $(MYREF locate)
8  *     $(MYREF ownerTid)
9  *     $(MYREF register)
10  *     $(MYREF spawn)
11  *     $(MYREF spawnLinked)
12  *     $(MYREF thisTid)
13  *     $(MYREF Tid)
14  *     $(MYREF TidMissingException)
15  *     $(MYREF unregister)
16  * ))
17  * $(TR $(TD Message passing) $(TD
18  *     $(MYREF prioritySend)
19  *     $(MYREF receive)
20  *     $(MYREF receiveOnly)
21  *     $(MYREF receiveTimeout)
22  *     $(MYREF send)
23  *     $(MYREF setMaxMailboxSize)
24  * ))
25  * $(TR $(TD Message-related types) $(TD
26  *     $(MYREF LinkTerminated)
27  *     $(MYREF MailboxFull)
28  *     $(MYREF MessageMismatch)
29  *     $(MYREF OnCrowding)
30  *     $(MYREF OwnerTerminated)
31  *     $(MYREF PriorityMessageException)
32  * ))
33  * $(TR $(TD Scheduler) $(TD
34  *     $(MYREF FiberScheduler)
35  *     $(MYREF Generator)
36  *     $(MYREF Scheduler)
37  *     $(MYREF scheduler)
38  *     $(MYREF ThreadInfo)
39  *     $(MYREF ThreadScheduler)
40  *     $(MYREF yield)
41  * ))
42  * $(TR $(TD Misc) $(TD
43  *     $(MYREF initOnce)
44  * ))
45  * ))
46  *
47  * This is a low-level messaging API upon which more structured or restrictive
48  * APIs may be built.  The general idea is that every messageable entity is
49  * represented by a common handle type called a Tid, which allows messages to
50  * be sent to logical threads that are executing in both the current process
51  * and in external processes using the same interface.  This is an important
52  * aspect of scalability because it allows the components of a program to be
53  * spread across available resources with few to no changes to the actual
54  * implementation.
55  *
56  * A logical thread is an execution context that has its own stack and which
57  * runs asynchronously to other logical threads.  These may be preemptively
58  * scheduled kernel threads, fibers (cooperative user-space threads), or some
59  * other concept with similar behavior.
60  *
61  * The type of concurrency used when logical threads are created is determined
62  * by the Scheduler selected at initialization time.  The default behavior is
63  * currently to create a new kernel thread per call to spawn, but other
64  * schedulers are available that multiplex fibers across the main thread or
65  * use some combination of the two approaches.
66  *
67  * Copyright: Copyright Sean Kelly 2009 - 2014.
68  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
69  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
70  * Source:    $(PHOBOSSRC std/concurrency.d)
71  */
72 /*          Copyright Sean Kelly 2009 - 2014.
73  * Distributed under the Boost Software License, Version 1.0.
74  *    (See accompanying file LICENSE_1_0.txt or copy at
75  *          http://www.boost.org/LICENSE_1_0.txt)
76  */
77 module std.concurrency;
78 
79 public import std.variant;
80 
81 import core.atomic;
82 import core.sync.condition;
83 import core.sync.mutex;
84 import core.thread;
85 import std.range.primitives;
86 import std.range.interfaces : InputRange;
87 import std.traits;
88 
89 ///
90 @system unittest
91 {
92     __gshared string received;
spawnedFunc(Tid ownerTid)93     static void spawnedFunc(Tid ownerTid)
94     {
95         import std.conv : text;
96         // Receive a message from the owner thread.
97         receive((int i){
98             received = text("Received the number ", i);
99 
100             // Send a message back to the owner thread
101             // indicating success.
102             send(ownerTid, true);
103         });
104     }
105 
106     // Start spawnedFunc in a new thread.
107     auto childTid = spawn(&spawnedFunc, thisTid);
108 
109     // Send the number 42 to this new thread.
110     send(childTid, 42);
111 
112     // Receive the result code.
113     auto wasSuccessful = receiveOnly!(bool);
114     assert(wasSuccessful);
115     assert(received == "Received the number 42");
116 }
117 
118 private
119 {
hasLocalAliasing(Types...)120     bool hasLocalAliasing(Types...)()
121     {
122         import std.typecons : Rebindable;
123 
124         // Works around "statement is not reachable"
125         bool doesIt = false;
126         static foreach (T; Types)
127         {
128             static if (is(T == Tid))
129             { /* Allowed */ }
130             else static if (is(T : Rebindable!R, R))
131                 doesIt |= hasLocalAliasing!R;
132             else static if (is(T == struct))
133                 doesIt |= hasLocalAliasing!(typeof(T.tupleof));
134             else
135                 doesIt |= std.traits.hasUnsharedAliasing!(T);
136         }
137         return doesIt;
138     }
139 
140     @safe unittest
141     {
142         static struct Container { Tid t; }
143         static assert(!hasLocalAliasing!(Tid, Container, int));
144     }
145 
146     // https://issues.dlang.org/show_bug.cgi?id=20097
147     @safe unittest
148     {
149         import std.datetime.systime : SysTime;
150         static struct Container { SysTime time; }
151         static assert(!hasLocalAliasing!(SysTime, Container));
152     }
153 
154     enum MsgType
155     {
156         standard,
157         priority,
158         linkDead,
159     }
160 
161     struct Message
162     {
163         MsgType type;
164         Variant data;
165 
166         this(T...)(MsgType t, T vals) if (T.length > 0)
167         {
168             static if (T.length == 1)
169             {
170                 type = t;
171                 data = vals[0];
172             }
173             else
174             {
175                 import std.typecons : Tuple;
176 
177                 type = t;
178                 data = Tuple!(T)(vals);
179             }
180         }
181 
convertsTo(T...)182         @property auto convertsTo(T...)()
183         {
184             static if (T.length == 1)
185             {
186                 return is(T[0] == Variant) || data.convertsTo!(T);
187             }
188             else
189             {
190                 import std.typecons : Tuple;
191                 return data.convertsTo!(Tuple!(T));
192             }
193         }
194 
get(T...)195         @property auto get(T...)()
196         {
197             static if (T.length == 1)
198             {
199                 static if (is(T[0] == Variant))
200                     return data;
201                 else
202                     return data.get!(T);
203             }
204             else
205             {
206                 import std.typecons : Tuple;
207                 return data.get!(Tuple!(T));
208             }
209         }
210 
map(Op)211         auto map(Op)(Op op)
212         {
213             alias Args = Parameters!(Op);
214 
215             static if (Args.length == 1)
216             {
217                 static if (is(Args[0] == Variant))
218                     return op(data);
219                 else
220                     return op(data.get!(Args));
221             }
222             else
223             {
224                 import std.typecons : Tuple;
225                 return op(data.get!(Tuple!(Args)).expand);
226             }
227         }
228     }
229 
checkops(T...)230     void checkops(T...)(T ops)
231     {
232         import std.format : format;
233 
234         foreach (i, t1; T)
235         {
236             static assert(isFunctionPointer!t1 || isDelegate!t1,
237                     format!"T %d is not a function pointer or delegates"(i));
238             alias a1 = Parameters!(t1);
239             alias r1 = ReturnType!(t1);
240 
241             static if (i < T.length - 1 && is(r1 == void))
242             {
243                 static assert(a1.length != 1 || !is(a1[0] == Variant),
244                               "function with arguments " ~ a1.stringof ~
245                               " occludes successive function");
246 
247                 foreach (t2; T[i + 1 .. $])
248                 {
249                     alias a2 = Parameters!(t2);
250 
251                     static assert(!is(a1 == a2),
252                         "function with arguments " ~ a1.stringof ~ " occludes successive function");
253                 }
254             }
255         }
256     }
257 
thisInfo()258     @property ref ThreadInfo thisInfo() nothrow
259     {
260         if (scheduler is null)
261             return ThreadInfo.thisInfo;
262         return scheduler.thisInfo;
263     }
264 }
265 
~this()266 static ~this()
267 {
268     thisInfo.cleanup();
269 }
270 
271 // Exceptions
272 
273 /**
274  * Thrown on calls to `receiveOnly` if a message other than the type
275  * the receiving thread expected is sent.
276  */
277 class MessageMismatch : Exception
278 {
279     ///
280     this(string msg = "Unexpected message type") @safe pure nothrow @nogc
281     {
282         super(msg);
283     }
284 }
285 
286 /**
287  * Thrown on calls to `receive` if the thread that spawned the receiving
288  * thread has terminated and no more messages exist.
289  */
290 class OwnerTerminated : Exception
291 {
292     ///
293     this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
294     {
295         super(msg);
296         tid = t;
297     }
298 
299     Tid tid;
300 }
301 
302 /**
303  * Thrown if a linked thread has terminated.
304  */
305 class LinkTerminated : Exception
306 {
307     ///
308     this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
309     {
310         super(msg);
311         tid = t;
312     }
313 
314     Tid tid;
315 }
316 
317 /**
318  * Thrown if a message was sent to a thread via
319  * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
320  * for a message of this type.
321  */
322 class PriorityMessageException : Exception
323 {
324     ///
this(Variant vals)325     this(Variant vals)
326     {
327         super("Priority message");
328         message = vals;
329     }
330 
331     /**
332      * The message that was sent.
333      */
334     Variant message;
335 }
336 
337 /**
338  * Thrown on mailbox crowding if the mailbox is configured with
339  * `OnCrowding.throwException`.
340  */
341 class MailboxFull : Exception
342 {
343     ///
344     this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
345     {
346         super(msg);
347         tid = t;
348     }
349 
350     Tid tid;
351 }
352 
353 /**
354  * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't
355  * find an owner thread.
356  */
357 class TidMissingException : Exception
358 {
359     import std.exception : basicExceptionCtors;
360     ///
361     mixin basicExceptionCtors;
362 }
363 
364 
365 // Thread ID
366 
367 
368 /**
369  * An opaque type used to represent a logical thread.
370  */
371 struct Tid
372 {
373 private:
thisTid374     this(MessageBox m) @safe pure nothrow @nogc
375     {
376         mbox = m;
377     }
378 
379     MessageBox mbox;
380 
381 public:
382 
383     /**
384      * Generate a convenient string for identifying this Tid.  This is only
385      * useful to see if Tid's that are currently executing are the same or
386      * different, e.g. for logging and debugging.  It is potentially possible
387      * that a Tid executed in the future will have the same toString() output
388      * as another Tid that has already terminated.
389      */
toStringTid390     void toString(W)(ref W w) const
391     {
392         import std.format.write : formattedWrite;
393         auto p = () @trusted { return cast(void*) mbox; }();
394         formattedWrite(w, "Tid(%x)", p);
395     }
396 
397 }
398 
399 @safe unittest
400 {
401     import std.conv : text;
402     Tid tid;
403     assert(text(tid) == "Tid(0)");
404     auto tid2 = thisTid;
405     assert(text(tid2) != "Tid(0)");
406     auto tid3 = tid2;
407     assert(text(tid2) == text(tid3));
408 }
409 
410 // https://issues.dlang.org/show_bug.cgi?id=21512
411 @system unittest
412 {
413     import std.format : format;
414 
415     const(Tid) b = spawn(() {});
416     assert(format!"%s"(b)[0 .. 4] == "Tid(");
417 }
418 
419 /**
420  * Returns: The $(LREF Tid) of the caller's thread.
421  */
thisTid()422 @property Tid thisTid() @safe
423 {
424     // TODO: remove when concurrency is safe
425     static auto trus() @trusted
426     {
427         if (thisInfo.ident != Tid.init)
428             return thisInfo.ident;
429         thisInfo.ident = Tid(new MessageBox);
430         return thisInfo.ident;
431     }
432 
433     return trus();
434 }
435 
436 /**
437  * Return the Tid of the thread which spawned the caller's thread.
438  *
439  * Throws: A `TidMissingException` exception if
440  * there is no owner thread.
441  */
ownerTid()442 @property Tid ownerTid()
443 {
444     import std.exception : enforce;
445 
446     enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
447     return thisInfo.owner;
448 }
449 
450 @system unittest
451 {
452     import std.exception : assertThrown;
453 
fun()454     static void fun()
455     {
456         string res = receiveOnly!string();
457         assert(res == "Main calling");
458         ownerTid.send("Child responding");
459     }
460 
461     assertThrown!TidMissingException(ownerTid);
462     auto child = spawn(&fun);
463     child.send("Main calling");
464     string res = receiveOnly!string();
465     assert(res == "Child responding");
466 }
467 
468 // Thread Creation
469 
isSpawnable(F,T...)470 private template isSpawnable(F, T...)
471 {
472     template isParamsImplicitlyConvertible(F1, F2, int i = 0)
473     {
474         alias param1 = Parameters!F1;
475         alias param2 = Parameters!F2;
476         static if (param1.length != param2.length)
477             enum isParamsImplicitlyConvertible = false;
478         else static if (param1.length == i)
479             enum isParamsImplicitlyConvertible = true;
480         else static if (isImplicitlyConvertible!(param2[i], param1[i]))
481             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
482                     F2, i + 1);
483         else
484             enum isParamsImplicitlyConvertible = false;
485     }
486 
487     enum isSpawnable = isCallable!F && is(ReturnType!F : void)
488             && isParamsImplicitlyConvertible!(F, void function(T))
489             && (isFunctionPointer!F || !hasUnsharedAliasing!F);
490 }
491 
492 /**
493  * Starts fn(args) in a new logical thread.
494  *
495  * Executes the supplied function in a new logical thread represented by
496  * `Tid`.  The calling thread is designated as the owner of the new thread.
497  * When the owner thread terminates an `OwnerTerminated` message will be
498  * sent to the new thread, causing an `OwnerTerminated` exception to be
499  * thrown on `receive()`.
500  *
501  * Params:
502  *  fn   = The function to execute.
503  *  args = Arguments to the function.
504  *
505  * Returns:
506  *  A Tid representing the new logical thread.
507  *
508  * Notes:
509  *  `args` must not have unshared aliasing.  In other words, all arguments
510  *  to `fn` must either be `shared` or `immutable` or have no
511  *  pointer indirection.  This is necessary for enforcing isolation among
512  *  threads.
513  *
514  * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning
515  * `fn` must be either `shared` or `immutable`. */
516 Tid spawn(F, T...)(F fn, T args)
517 if (isSpawnable!(F, T))
518 {
519     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
520     return _spawn(false, fn, args);
521 }
522 
523 ///
524 @system unittest
525 {
f(string msg)526     static void f(string msg)
527     {
528         assert(msg == "Hello World");
529     }
530 
531     auto tid = spawn(&f, "Hello World");
532 }
533 
534 /// Fails: char[] has mutable aliasing.
535 @system unittest
536 {
537     string msg = "Hello, World!";
538 
f1(string msg)539     static void f1(string msg) {}
540     static assert(!__traits(compiles, spawn(&f1, msg.dup)));
541     static assert( __traits(compiles, spawn(&f1, msg.idup)));
542 
f2(char[]msg)543     static void f2(char[] msg) {}
544     static assert(!__traits(compiles, spawn(&f2, msg.dup)));
545     static assert(!__traits(compiles, spawn(&f2, msg.idup)));
546 }
547 
548 /// New thread with anonymous function
549 @system unittest
550 {
551     spawn({
552         ownerTid.send("This is so great!");
553     });
554     assert(receiveOnly!string == "This is so great!");
555 }
556 
557 @system unittest
558 {
559     import core.thread : thread_joinAll;
560 
561     __gshared string receivedMessage;
f1(string msg)562     static void f1(string msg)
563     {
564         receivedMessage = msg;
565     }
566 
567     auto tid1 = spawn(&f1, "Hello World");
568     thread_joinAll;
569     assert(receivedMessage == "Hello World");
570 }
571 
572 /**
573  * Starts fn(args) in a logical thread and will receive a LinkTerminated
574  * message when the operation terminates.
575  *
576  * Executes the supplied function in a new logical thread represented by
577  * Tid.  This new thread is linked to the calling thread so that if either
578  * it or the calling thread terminates a LinkTerminated message will be sent
579  * to the other, causing a LinkTerminated exception to be thrown on receive().
580  * The owner relationship from spawn() is preserved as well, so if the link
581  * between threads is broken, owner termination will still result in an
582  * OwnerTerminated exception to be thrown on receive().
583  *
584  * Params:
585  *  fn   = The function to execute.
586  *  args = Arguments to the function.
587  *
588  * Returns:
589  *  A Tid representing the new thread.
590  */
591 Tid spawnLinked(F, T...)(F fn, T args)
592 if (isSpawnable!(F, T))
593 {
594     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
595     return _spawn(true, fn, args);
596 }
597 
598 /*
599  *
600  */
601 private Tid _spawn(F, T...)(bool linked, F fn, T args)
602 if (isSpawnable!(F, T))
603 {
604     // TODO: MessageList and &exec should be shared.
605     auto spawnTid = Tid(new MessageBox);
606     auto ownerTid = thisTid;
607 
exec()608     void exec()
609     {
610         thisInfo.ident = spawnTid;
611         thisInfo.owner = ownerTid;
612         fn(args);
613     }
614 
615     // TODO: MessageList and &exec should be shared.
616     if (scheduler !is null)
617         scheduler.spawn(&exec);
618     else
619     {
620         auto t = new Thread(&exec);
621         t.start();
622     }
623     thisInfo.links[spawnTid] = linked;
624     return spawnTid;
625 }
626 
627 @system unittest
628 {
629     void function() fn1;
630     void function(int) fn2;
631     static assert(__traits(compiles, spawn(fn1)));
632     static assert(__traits(compiles, spawn(fn2, 2)));
633     static assert(!__traits(compiles, spawn(fn1, 1)));
634     static assert(!__traits(compiles, spawn(fn2)));
635 
636     void delegate(int) shared dg1;
637     shared(void delegate(int)) dg2;
638     shared(void delegate(long) shared) dg3;
639     shared(void delegate(real, int, long) shared) dg4;
640     void delegate(int) immutable dg5;
641     void delegate(int) dg6;
642     static assert(__traits(compiles, spawn(dg1, 1)));
643     static assert(__traits(compiles, spawn(dg2, 2)));
644     static assert(__traits(compiles, spawn(dg3, 3)));
645     static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
646     static assert(__traits(compiles, spawn(dg5, 5)));
647     static assert(!__traits(compiles, spawn(dg6, 6)));
648 
opCall(int)649     auto callable1  = new class{ void opCall(int) shared {} };
cast(shared)650     auto callable2  = cast(shared) new class{ void opCall(int) shared {} };
opCall(int)651     auto callable3  = new class{ void opCall(int) immutable {} };
cast(immutable)652     auto callable4  = cast(immutable) new class{ void opCall(int) immutable {} };
opCall(int)653     auto callable5  = new class{ void opCall(int) {} };
cast(shared)654     auto callable6  = cast(shared) new class{ void opCall(int) immutable {} };
cast(immutable)655     auto callable7  = cast(immutable) new class{ void opCall(int) shared {} };
cast(shared)656     auto callable8  = cast(shared) new class{ void opCall(int) const shared {} };
cast(const shared)657     auto callable9  = cast(const shared) new class{ void opCall(int) shared {} };
cast(const shared)658     auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
cast(immutable)659     auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
660     static assert(!__traits(compiles, spawn(callable1,  1)));
661     static assert( __traits(compiles, spawn(callable2,  2)));
662     static assert(!__traits(compiles, spawn(callable3,  3)));
663     static assert( __traits(compiles, spawn(callable4,  4)));
664     static assert(!__traits(compiles, spawn(callable5,  5)));
665     static assert(!__traits(compiles, spawn(callable6,  6)));
666     static assert(!__traits(compiles, spawn(callable7,  7)));
667     static assert( __traits(compiles, spawn(callable8,  8)));
668     static assert(!__traits(compiles, spawn(callable9,  9)));
669     static assert( __traits(compiles, spawn(callable10, 10)));
670     static assert( __traits(compiles, spawn(callable11, 11)));
671 }
672 
673 /**
674  * Places the values as a message at the back of tid's message queue.
675  *
676  * Sends the supplied value to the thread represented by tid.  As with
677  * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
678  */
send(T...)679 void send(T...)(Tid tid, T vals)
680 in (tid.mbox !is null)
681 {
682     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
683     _send(tid, vals);
684 }
685 
686 /**
687  * Places the values as a message on the front of tid's message queue.
688  *
689  * Send a message to `tid` but place it at the front of `tid`'s message
690  * queue instead of at the back.  This function is typically used for
691  * out-of-band communication, to signal exceptional conditions, etc.
692  */
prioritySend(T...)693 void prioritySend(T...)(Tid tid, T vals)
694 in (tid.mbox !is null)
695 {
696     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
697     _send(MsgType.priority, tid, vals);
698 }
699 
700 /*
701  * ditto
702  */
_send(T...)703 private void _send(T...)(Tid tid, T vals)
704 in (tid.mbox !is null)
705 {
706     _send(MsgType.standard, tid, vals);
707 }
708 
709 /*
710  * Implementation of send.  This allows parameter checking to be different for
711  * both Tid.send() and .send().
712  */
_send(T...)713 private void _send(T...)(MsgType type, Tid tid, T vals)
714 in (tid.mbox !is null)
715 {
716     auto msg = Message(type, vals);
717     tid.mbox.put(msg);
718 }
719 
720 /**
721  * Receives a message from another thread.
722  *
723  * Receive a message from another thread, or block if no messages of the
724  * specified types are available.  This function works by pattern matching
725  * a message against a set of delegates and executing the first match found.
726  *
727  * If a delegate that accepts a $(REF Variant, std,variant) is included as
728  * the last argument to `receive`, it will match any message that was not
729  * matched by an earlier delegate.  If more than one argument is sent,
730  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
731  * sent.
732  *
733  * Params:
734  *     ops = Variadic list of function pointers and delegates. Entries
735  *           in this list must not occlude later entries.
736  *
737  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
738  */
receive(T...)739 void receive(T...)( T ops )
740 in
741 {
742     assert(thisInfo.ident.mbox !is null,
743            "Cannot receive a message until a thread was spawned "
744            ~ "or thisTid was passed to a running thread.");
745 }
746 do
747 {
748     checkops( ops );
749 
750     thisInfo.ident.mbox.get( ops );
751 }
752 
753 ///
754 @system unittest
755 {
756     import std.variant : Variant;
757 
758     auto process = ()
759     {
760         receive(
761             (int i) { ownerTid.send(1); },
762             (double f) { ownerTid.send(2); },
763             (Variant v) { ownerTid.send(3); }
764         );
765     };
766 
767     {
768         auto tid = spawn(process);
769         send(tid, 42);
770         assert(receiveOnly!int == 1);
771     }
772 
773     {
774         auto tid = spawn(process);
775         send(tid, 3.14);
776         assert(receiveOnly!int == 2);
777     }
778 
779     {
780         auto tid = spawn(process);
781         send(tid, "something else");
782         assert(receiveOnly!int == 3);
783     }
784 }
785 
786 @safe unittest
787 {
788     static assert( __traits( compiles,
789                       {
790                           receive( (Variant x) {} );
791                           receive( (int x) {}, (Variant x) {} );
792                       } ) );
793 
794     static assert( !__traits( compiles,
795                        {
796                            receive( (Variant x) {}, (int x) {} );
797                        } ) );
798 
799     static assert( !__traits( compiles,
800                        {
801                            receive( (int x) {}, (int x) {} );
802                        } ) );
803 }
804 
805 // Make sure receive() works with free functions as well.
version(StdUnittest)806 version (StdUnittest)
807 {
808     private void receiveFunction(int x) {}
809 }
810 @safe unittest
811 {
812     static assert( __traits( compiles,
813                       {
814                           receive( &receiveFunction );
815                           receive( &receiveFunction, (Variant x) {} );
816                       } ) );
817 }
818 
819 
receiveOnlyRet(T...)820 private template receiveOnlyRet(T...)
821 {
822     static if ( T.length == 1 )
823     {
824         alias receiveOnlyRet = T[0];
825     }
826     else
827     {
828         import std.typecons : Tuple;
829         alias receiveOnlyRet = Tuple!(T);
830     }
831 }
832 
833 /**
834  * Receives only messages with arguments of the specified types.
835  *
836  * Params:
837  *     T = Variadic list of types to be received.
838  *
839  * Returns: The received message.  If `T` has more than one entry,
840  *          the message will be packed into a $(REF Tuple, std,typecons).
841  *
842  * Throws: $(LREF MessageMismatch) if a message of types other than `T`
843  *         is received,
844  *         $(LREF OwnerTerminated) when the sending thread was terminated.
845  */
846 receiveOnlyRet!(T) receiveOnly(T...)()
847 in
848 {
849     assert(thisInfo.ident.mbox !is null,
850         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
851 }
852 do
853 {
854     import std.format : format;
855     import std.meta : allSatisfy;
856     import std.typecons : Tuple;
857 
858     Tuple!(T) ret;
859 
860     thisInfo.ident.mbox.get((T val) {
861         static if (T.length)
862         {
863             static if (allSatisfy!(isAssignable, T))
864             {
865                 ret.field = val;
866             }
867             else
868             {
869                 import core.lifetime : emplace;
870                 emplace(&ret, val);
871             }
872         }
873     },
874     (LinkTerminated e) { throw e; },
875     (OwnerTerminated e) { throw e; },
876     (Variant val) {
877         static if (T.length > 1)
878             string exp = T.stringof;
879         else
880             string exp = T[0].stringof;
881 
882         throw new MessageMismatch(
883             format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
884     });
885     static if (T.length == 1)
886         return ret[0];
887     else
888         return ret;
889 }
890 
891 ///
892 @system unittest
893 {
894     auto tid = spawn(
895     {
896         assert(receiveOnly!int == 42);
897     });
898     send(tid, 42);
899 }
900 
901 ///
902 @system unittest
903 {
904     auto tid = spawn(
905     {
906         assert(receiveOnly!string == "text");
907     });
908     send(tid, "text");
909 }
910 
911 ///
912 @system unittest
913 {
914     struct Record { string name; int age; }
915 
916     auto tid = spawn(
917     {
918         auto msg = receiveOnly!(double, Record);
919         assert(msg[0] == 0.5);
920         assert(msg[1].name == "Alice");
921         assert(msg[1].age == 31);
922     });
923 
924     send(tid, 0.5, Record("Alice", 31));
925 }
926 
927 @system unittest
928 {
t1(Tid mainTid)929     static void t1(Tid mainTid)
930     {
931         try
932         {
933             receiveOnly!string();
934             mainTid.send("");
935         }
936         catch (Throwable th)
937         {
938             mainTid.send(th.msg);
939         }
940     }
941 
942     auto tid = spawn(&t1, thisTid);
943     tid.send(1);
944     string result = receiveOnly!string();
945     assert(result == "Unexpected message type: expected 'string', got 'int'");
946 }
947 
948 // https://issues.dlang.org/show_bug.cgi?id=21663
949 @safe unittest
950 {
951     alias test = receiveOnly!(string, bool, bool);
952 }
953 
954 /**
955  * Receives a message from another thread and gives up if no match
956  * arrives within a specified duration.
957  *
958  * Receive a message from another thread, or block until `duration` exceeds,
959  * if no messages of the specified types are available. This function works
960  * by pattern matching a message against a set of delegates and executing
961  * the first match found.
962  *
963  * If a delegate that accepts a $(REF Variant, std,variant) is included as
964  * the last argument, it will match any message that was not
965  * matched by an earlier delegate.  If more than one argument is sent,
966  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
967  * sent.
968  *
969  * Params:
970  *     duration = Duration, how long to wait. If `duration` is negative,
971  *                won't wait at all.
972  *     ops = Variadic list of function pointers and delegates. Entries
973  *           in this list must not occlude later entries.
974  *
975  * Returns: `true` if it received a message and `false` if it timed out waiting
976  *          for one.
977  *
978  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
979  */
receiveTimeout(T...)980 bool receiveTimeout(T...)(Duration duration, T ops)
981 in
982 {
983     assert(thisInfo.ident.mbox !is null,
984         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
985 }
986 do
987 {
988     checkops(ops);
989 
990     return thisInfo.ident.mbox.get(duration, ops);
991 }
992 
993 @safe unittest
994 {
995     static assert(__traits(compiles, {
996         receiveTimeout(msecs(0), (Variant x) {});
997         receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
998     }));
999 
1000     static assert(!__traits(compiles, {
1001         receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
1002     }));
1003 
1004     static assert(!__traits(compiles, {
1005         receiveTimeout(msecs(0), (int x) {}, (int x) {});
1006     }));
1007 
1008     static assert(__traits(compiles, {
1009         receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
1010     }));
1011 }
1012 
1013 // MessageBox Limits
1014 
1015 /**
1016  * These behaviors may be specified when a mailbox is full.
1017  */
1018 enum OnCrowding
1019 {
1020     block, /// Wait until room is available.
1021     throwException, /// Throw a MailboxFull exception.
1022     ignore /// Abort the send and return.
1023 }
1024 
1025 private
1026 {
onCrowdingBlock(Tid tid)1027     bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
1028     {
1029         return true;
1030     }
1031 
onCrowdingThrow(Tid tid)1032     bool onCrowdingThrow(Tid tid) @safe pure
1033     {
1034         throw new MailboxFull(tid);
1035     }
1036 
onCrowdingIgnore(Tid tid)1037     bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
1038     {
1039         return false;
1040     }
1041 }
1042 
1043 /**
1044  * Sets a maximum mailbox size.
1045  *
1046  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1047  * If this limit is reached, the caller attempting to add a new message will
1048  * execute the behavior specified by doThis.  If messages is zero, the mailbox
1049  * is unbounded.
1050  *
1051  * Params:
1052  *  tid      = The Tid of the thread for which this limit should be set.
1053  *  messages = The maximum number of messages or zero if no limit.
1054  *  doThis   = The behavior executed when a message is sent to a full
1055  *             mailbox.
1056  */
setMaxMailboxSize(Tid tid,size_t messages,OnCrowding doThis)1057 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
1058 in (tid.mbox !is null)
1059 {
1060     final switch (doThis)
1061     {
1062     case OnCrowding.block:
1063         return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
1064     case OnCrowding.throwException:
1065         return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
1066     case OnCrowding.ignore:
1067         return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
1068     }
1069 }
1070 
1071 /**
1072  * Sets a maximum mailbox size.
1073  *
1074  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1075  * If this limit is reached, the caller attempting to add a new message will
1076  * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
1077  *
1078  * Params:
1079  *  tid      = The Tid of the thread for which this limit should be set.
1080  *  messages = The maximum number of messages or zero if no limit.
1081  *  onCrowdingDoThis = The routine called when a message is sent to a full
1082  *                     mailbox.
1083  */
setMaxMailboxSize(Tid tid,size_t messages,bool function (Tid)onCrowdingDoThis)1084 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
1085 in (tid.mbox !is null)
1086 {
1087     tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
1088 }
1089 
1090 private
1091 {
1092     __gshared Tid[string] tidByName;
1093     __gshared string[][Tid] namesByTid;
1094 }
1095 
registryLock()1096 private @property Mutex registryLock()
1097 {
1098     __gshared Mutex impl;
1099     initOnce!impl(new Mutex);
1100     return impl;
1101 }
1102 
unregisterMe(ref ThreadInfo me)1103 private void unregisterMe(ref ThreadInfo me)
1104 {
1105     if (me.ident != Tid.init)
1106     {
1107         synchronized (registryLock)
1108         {
1109             if (auto allNames = me.ident in namesByTid)
1110             {
1111                 foreach (name; *allNames)
1112                     tidByName.remove(name);
1113                 namesByTid.remove(me.ident);
1114             }
1115         }
1116     }
1117 }
1118 
1119 /**
1120  * Associates name with tid.
1121  *
1122  * Associates name with tid in a process-local map.  When the thread
1123  * represented by tid terminates, any names associated with it will be
1124  * automatically unregistered.
1125  *
1126  * Params:
1127  *  name = The name to associate with tid.
1128  *  tid  = The tid register by name.
1129  *
1130  * Returns:
1131  *  true if the name is available and tid is not known to represent a
1132  *  defunct thread.
1133  */
1134 bool register(string name, Tid tid)
1135 in (tid.mbox !is null)
1136 {
synchronized(registryLock)1137     synchronized (registryLock)
1138     {
1139         if (name in tidByName)
1140             return false;
1141         if (tid.mbox.isClosed)
1142             return false;
1143         namesByTid[tid] ~= name;
1144         tidByName[name] = tid;
1145         return true;
1146     }
1147 }
1148 
1149 /**
1150  * Removes the registered name associated with a tid.
1151  *
1152  * Params:
1153  *  name = The name to unregister.
1154  *
1155  * Returns:
1156  *  true if the name is registered, false if not.
1157  */
unregister(string name)1158 bool unregister(string name)
1159 {
1160     import std.algorithm.mutation : remove, SwapStrategy;
1161     import std.algorithm.searching : countUntil;
1162 
1163     synchronized (registryLock)
1164     {
1165         if (auto tid = name in tidByName)
1166         {
1167             auto allNames = *tid in namesByTid;
1168             auto pos = countUntil(*allNames, name);
1169             remove!(SwapStrategy.unstable)(*allNames, pos);
1170             tidByName.remove(name);
1171             return true;
1172         }
1173         return false;
1174     }
1175 }
1176 
1177 /**
1178  * Gets the Tid associated with name.
1179  *
1180  * Params:
1181  *  name = The name to locate within the registry.
1182  *
1183  * Returns:
1184  *  The associated Tid or Tid.init if name is not registered.
1185  */
locate(string name)1186 Tid locate(string name)
1187 {
1188     synchronized (registryLock)
1189     {
1190         if (auto tid = name in tidByName)
1191             return *tid;
1192         return Tid.init;
1193     }
1194 }
1195 
1196 /**
1197  * Encapsulates all implementation-level data needed for scheduling.
1198  *
1199  * When defining a Scheduler, an instance of this struct must be associated
1200  * with each logical thread.  It contains all implementation-level information
1201  * needed by the internal API.
1202  */
1203 struct ThreadInfo
1204 {
1205     Tid ident;
1206     bool[Tid] links;
1207     Tid owner;
1208 
1209     /**
1210      * Gets a thread-local instance of ThreadInfo.
1211      *
1212      * Gets a thread-local instance of ThreadInfo, which should be used as the
1213      * default instance when info is requested for a thread not created by the
1214      * Scheduler.
1215      */
thisInfoThreadInfo1216     static @property ref thisInfo() nothrow
1217     {
1218         static ThreadInfo val;
1219         return val;
1220     }
1221 
1222     /**
1223      * Cleans up this ThreadInfo.
1224      *
1225      * This must be called when a scheduled thread terminates.  It tears down
1226      * the messaging system for the thread and notifies interested parties of
1227      * the thread's termination.
1228      */
cleanupThreadInfo1229     void cleanup()
1230     {
1231         if (ident.mbox !is null)
1232             ident.mbox.close();
1233         foreach (tid; links.keys)
1234             _send(MsgType.linkDead, tid, ident);
1235         if (owner != Tid.init)
1236             _send(MsgType.linkDead, owner, ident);
1237         unregisterMe(this); // clean up registry entries
1238     }
1239 
1240     // https://issues.dlang.org/show_bug.cgi?id=20160
1241     @system unittest
1242     {
1243         register("main_thread", thisTid());
1244 
1245         ThreadInfo t;
1246         t.cleanup();
1247 
1248         assert(locate("main_thread") == thisTid());
1249     }
1250 }
1251 
1252 /**
1253  * A Scheduler controls how threading is performed by spawn.
1254  *
1255  * Implementing a Scheduler allows the concurrency mechanism used by this
1256  * module to be customized according to different needs.  By default, a call
1257  * to spawn will create a new kernel thread that executes the supplied routine
1258  * and terminates when finished.  But it is possible to create Schedulers that
1259  * reuse threads, that multiplex Fibers (coroutines) across a single thread,
1260  * or any number of other approaches.  By making the choice of Scheduler a
1261  * user-level option, std.concurrency may be used for far more types of
1262  * application than if this behavior were predefined.
1263  *
1264  * Example:
1265  * ---
1266  * import std.concurrency;
1267  * import std.stdio;
1268  *
1269  * void main()
1270  * {
1271  *     scheduler = new FiberScheduler;
1272  *     scheduler.start(
1273  *     {
1274  *         writeln("the rest of main goes here");
1275  *     });
1276  * }
1277  * ---
1278  *
1279  * Some schedulers have a dispatching loop that must run if they are to work
1280  * properly, so for the sake of consistency, when using a scheduler, start()
1281  * must be called within main().  This yields control to the scheduler and
1282  * will ensure that any spawned threads are executed in an expected manner.
1283  */
1284 interface Scheduler
1285 {
1286     /**
1287      * Spawns the supplied op and starts the Scheduler.
1288      *
1289      * This is intended to be called at the start of the program to yield all
1290      * scheduling to the active Scheduler instance.  This is necessary for
1291      * schedulers that explicitly dispatch threads rather than simply relying
1292      * on the operating system to do so, and so start should always be called
1293      * within main() to begin normal program execution.
1294      *
1295      * Params:
1296      *  op = A wrapper for whatever the main thread would have done in the
1297      *       absence of a custom scheduler.  It will be automatically executed
1298      *       via a call to spawn by the Scheduler.
1299      */
1300     void start(void delegate() op);
1301 
1302     /**
1303      * Assigns a logical thread to execute the supplied op.
1304      *
1305      * This routine is called by spawn.  It is expected to instantiate a new
1306      * logical thread and run the supplied operation.  This thread must call
1307      * thisInfo.cleanup() when the thread terminates if the scheduled thread
1308      * is not a kernel thread--all kernel threads will have their ThreadInfo
1309      * cleaned up automatically by a thread-local destructor.
1310      *
1311      * Params:
1312      *  op = The function to execute.  This may be the actual function passed
1313      *       by the user to spawn itself, or may be a wrapper function.
1314      */
1315     void spawn(void delegate() op);
1316 
1317     /**
1318      * Yields execution to another logical thread.
1319      *
1320      * This routine is called at various points within concurrency-aware APIs
1321      * to provide a scheduler a chance to yield execution when using some sort
1322      * of cooperative multithreading model.  If this is not appropriate, such
1323      * as when each logical thread is backed by a dedicated kernel thread,
1324      * this routine may be a no-op.
1325      */
1326     void yield() nothrow;
1327 
1328     /**
1329      * Returns an appropriate ThreadInfo instance.
1330      *
1331      * Returns an instance of ThreadInfo specific to the logical thread that
1332      * is calling this routine or, if the calling thread was not create by
1333      * this scheduler, returns ThreadInfo.thisInfo instead.
1334      */
1335     @property ref ThreadInfo thisInfo() nothrow;
1336 
1337     /**
1338      * Creates a Condition variable analog for signaling.
1339      *
1340      * Creates a new Condition variable analog which is used to check for and
1341      * to signal the addition of messages to a thread's message queue.  Like
1342      * yield, some schedulers may need to define custom behavior so that calls
1343      * to Condition.wait() yield to another thread when no new messages are
1344      * available instead of blocking.
1345      *
1346      * Params:
1347      *  m = The Mutex that will be associated with this condition.  It will be
1348      *      locked prior to any operation on the condition, and so in some
1349      *      cases a Scheduler may need to hold this reference and unlock the
1350      *      mutex before yielding execution to another logical thread.
1351      */
1352     Condition newCondition(Mutex m) nothrow;
1353 }
1354 
1355 /**
1356  * An example Scheduler using kernel threads.
1357  *
1358  * This is an example Scheduler that mirrors the default scheduling behavior
1359  * of creating one kernel thread per call to spawn.  It is fully functional
1360  * and may be instantiated and used, but is not a necessary part of the
1361  * default functioning of this module.
1362  */
1363 class ThreadScheduler : Scheduler
1364 {
1365     /**
1366      * This simply runs op directly, since no real scheduling is needed by
1367      * this approach.
1368      */
start(void delegate ()op)1369     void start(void delegate() op)
1370     {
1371         op();
1372     }
1373 
1374     /**
1375      * Creates a new kernel thread and assigns it to run the supplied op.
1376      */
spawn(void delegate ()op)1377     void spawn(void delegate() op)
1378     {
1379         auto t = new Thread(op);
1380         t.start();
1381     }
1382 
1383     /**
1384      * This scheduler does no explicit multiplexing, so this is a no-op.
1385      */
yield()1386     void yield() nothrow
1387     {
1388         // no explicit yield needed
1389     }
1390 
1391     /**
1392      * Returns ThreadInfo.thisInfo, since it is a thread-local instance of
1393      * ThreadInfo, which is the correct behavior for this scheduler.
1394      */
thisInfo()1395     @property ref ThreadInfo thisInfo() nothrow
1396     {
1397         return ThreadInfo.thisInfo;
1398     }
1399 
1400     /**
1401      * Creates a new Condition variable.  No custom behavior is needed here.
1402      */
newCondition(Mutex m)1403     Condition newCondition(Mutex m) nothrow
1404     {
1405         return new Condition(m);
1406     }
1407 }
1408 
1409 /**
1410  * An example Scheduler using Fibers.
1411  *
1412  * This is an example scheduler that creates a new Fiber per call to spawn
1413  * and multiplexes the execution of all fibers within the main thread.
1414  */
1415 class FiberScheduler : Scheduler
1416 {
1417     /**
1418      * This creates a new Fiber for the supplied op and then starts the
1419      * dispatcher.
1420      */
start(void delegate ()op)1421     void start(void delegate() op)
1422     {
1423         create(op);
1424         dispatch();
1425     }
1426 
1427     /**
1428      * This created a new Fiber for the supplied op and adds it to the
1429      * dispatch list.
1430      */
spawn(void delegate ()op)1431     void spawn(void delegate() op) nothrow
1432     {
1433         create(op);
1434         yield();
1435     }
1436 
1437     /**
1438      * If the caller is a scheduled Fiber, this yields execution to another
1439      * scheduled Fiber.
1440      */
yield()1441     void yield() nothrow
1442     {
1443         // NOTE: It's possible that we should test whether the calling Fiber
1444         //       is an InfoFiber before yielding, but I think it's reasonable
1445         //       that any (non-Generator) fiber should yield here.
1446         if (Fiber.getThis())
1447             Fiber.yield();
1448     }
1449 
1450     /**
1451      * Returns an appropriate ThreadInfo instance.
1452      *
1453      * Returns a ThreadInfo instance specific to the calling Fiber if the
1454      * Fiber was created by this dispatcher, otherwise it returns
1455      * ThreadInfo.thisInfo.
1456      */
thisInfo()1457     @property ref ThreadInfo thisInfo() nothrow
1458     {
1459         auto f = cast(InfoFiber) Fiber.getThis();
1460 
1461         if (f !is null)
1462             return f.info;
1463         return ThreadInfo.thisInfo;
1464     }
1465 
1466     /**
1467      * Returns a Condition analog that yields when wait or notify is called.
1468      *
1469      * Bug:
1470      * For the default implementation, `notifyAll`will behave like `notify`.
1471      *
1472      * Params:
1473      *   m = A `Mutex` to use for locking if the condition needs to be waited on
1474      *       or notified from multiple `Thread`s.
1475      *       If `null`, no `Mutex` will be used and it is assumed that the
1476      *       `Condition` is only waited on/notified from one `Thread`.
1477      */
newCondition(Mutex m)1478     Condition newCondition(Mutex m) nothrow
1479     {
1480         return new FiberCondition(m);
1481     }
1482 
1483 protected:
1484     /**
1485      * Creates a new Fiber which calls the given delegate.
1486      *
1487      * Params:
1488      *   op = The delegate the fiber should call
1489      */
create(void delegate ()op)1490     void create(void delegate() op) nothrow
1491     {
1492         void wrap()
1493         {
1494             scope (exit)
1495             {
1496                 thisInfo.cleanup();
1497             }
1498             op();
1499         }
1500 
1501         m_fibers ~= new InfoFiber(&wrap);
1502     }
1503 
1504     /**
1505      * Fiber which embeds a ThreadInfo
1506      */
1507     static class InfoFiber : Fiber
1508     {
1509         ThreadInfo info;
1510 
this(void delegate ()op)1511         this(void delegate() op) nothrow
1512         {
1513             super(op);
1514         }
1515 
this(void delegate ()op,size_t sz)1516         this(void delegate() op, size_t sz) nothrow
1517         {
1518             super(op, sz);
1519         }
1520     }
1521 
1522 private:
1523     class FiberCondition : Condition
1524     {
this(Mutex m)1525         this(Mutex m) nothrow
1526         {
1527             super(m);
1528             notified = false;
1529         }
1530 
wait()1531         override void wait() nothrow
1532         {
1533             scope (exit) notified = false;
1534 
1535             while (!notified)
1536                 switchContext();
1537         }
1538 
wait(Duration period)1539         override bool wait(Duration period) nothrow
1540         {
1541             import core.time : MonoTime;
1542 
1543             scope (exit) notified = false;
1544 
1545             for (auto limit = MonoTime.currTime + period;
1546                  !notified && !period.isNegative;
1547                  period = limit - MonoTime.currTime)
1548             {
1549                 this.outer.yield();
1550             }
1551             return notified;
1552         }
1553 
notify()1554         override void notify() nothrow
1555         {
1556             notified = true;
1557             switchContext();
1558         }
1559 
notifyAll()1560         override void notifyAll() nothrow
1561         {
1562             notified = true;
1563             switchContext();
1564         }
1565 
1566     private:
switchContext()1567         void switchContext() nothrow
1568         {
1569             if (mutex_nothrow) mutex_nothrow.unlock_nothrow();
1570             scope (exit)
1571                 if (mutex_nothrow)
1572                     mutex_nothrow.lock_nothrow();
1573             this.outer.yield();
1574         }
1575 
1576         bool notified;
1577     }
1578 
dispatch()1579     void dispatch()
1580     {
1581         import std.algorithm.mutation : remove;
1582 
1583         while (m_fibers.length > 0)
1584         {
1585             auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
1586             if (t !is null && !(cast(OwnerTerminated) t))
1587             {
1588                 throw t;
1589             }
1590             if (m_fibers[m_pos].state == Fiber.State.TERM)
1591             {
1592                 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1593                     m_pos = 0;
1594             }
1595             else if (m_pos++ >= m_fibers.length - 1)
1596             {
1597                 m_pos = 0;
1598             }
1599         }
1600     }
1601 
1602     Fiber[] m_fibers;
1603     size_t m_pos;
1604 }
1605 
1606 @system unittest
1607 {
receive(Condition cond,ref size_t received)1608     static void receive(Condition cond, ref size_t received)
1609     {
1610         while (true)
1611         {
1612             synchronized (cond.mutex)
1613             {
1614                 cond.wait();
1615                 ++received;
1616             }
1617         }
1618     }
1619 
send(Condition cond,ref size_t sent)1620     static void send(Condition cond, ref size_t sent)
1621     {
1622         while (true)
1623         {
1624             synchronized (cond.mutex)
1625             {
1626                 ++sent;
1627                 cond.notify();
1628             }
1629         }
1630     }
1631 
1632     auto fs = new FiberScheduler;
1633     auto mtx = new Mutex;
1634     auto cond = fs.newCondition(mtx);
1635 
1636     size_t received, sent;
1637     auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1638     waiter.call();
1639     assert(received == 0);
1640     notifier.call();
1641     assert(sent == 1);
1642     assert(received == 0);
1643     waiter.call();
1644     assert(received == 1);
1645     waiter.call();
1646     assert(received == 1);
1647 }
1648 
1649 /**
1650  * Sets the Scheduler behavior within the program.
1651  *
1652  * This variable sets the Scheduler behavior within this program.  Typically,
1653  * when setting a Scheduler, scheduler.start() should be called in main.  This
1654  * routine will not return until program execution is complete.
1655  */
1656 __gshared Scheduler scheduler;
1657 
1658 // Generator
1659 
1660 /**
1661  * If the caller is a Fiber and is not a Generator, this function will call
1662  * scheduler.yield() or Fiber.yield(), as appropriate.
1663  */
yield()1664 void yield() nothrow
1665 {
1666     auto fiber = Fiber.getThis();
1667     if (!(cast(IsGenerator) fiber))
1668     {
1669         if (scheduler is null)
1670         {
1671             if (fiber)
1672                 return Fiber.yield();
1673         }
1674         else
1675             scheduler.yield();
1676     }
1677 }
1678 
1679 /// Used to determine whether a Generator is running.
1680 private interface IsGenerator {}
1681 
1682 
1683 /**
1684  * A Generator is a Fiber that periodically returns values of type T to the
1685  * caller via yield.  This is represented as an InputRange.
1686  */
Generator(T)1687 class Generator(T) :
1688     Fiber, IsGenerator, InputRange!T
1689 {
1690     /**
1691      * Initializes a generator object which is associated with a static
1692      * D function.  The function will be called once to prepare the range
1693      * for iteration.
1694      *
1695      * Params:
1696      *  fn = The fiber function.
1697      *
1698      * In:
1699      *  fn must not be null.
1700      */
1701     this(void function() fn)
1702     {
1703         super(fn);
1704         call();
1705     }
1706 
1707     /**
1708      * Initializes a generator object which is associated with a static
1709      * D function.  The function will be called once to prepare the range
1710      * for iteration.
1711      *
1712      * Params:
1713      *  fn = The fiber function.
1714      *  sz = The stack size for this fiber.
1715      *
1716      * In:
1717      *  fn must not be null.
1718      */
1719     this(void function() fn, size_t sz)
1720     {
1721         super(fn, sz);
1722         call();
1723     }
1724 
1725     /**
1726      * Initializes a generator object which is associated with a static
1727      * D function.  The function will be called once to prepare the range
1728      * for iteration.
1729      *
1730      * Params:
1731      *  fn = The fiber function.
1732      *  sz = The stack size for this fiber.
1733      *  guardPageSize = size of the guard page to trap fiber's stack
1734      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1735      *                  documentation for more details.
1736      *
1737      * In:
1738      *  fn must not be null.
1739      */
1740     this(void function() fn, size_t sz, size_t guardPageSize)
1741     {
1742         super(fn, sz, guardPageSize);
1743         call();
1744     }
1745 
1746     /**
1747      * Initializes a generator object which is associated with a dynamic
1748      * D function.  The function will be called once to prepare the range
1749      * for iteration.
1750      *
1751      * Params:
1752      *  dg = The fiber function.
1753      *
1754      * In:
1755      *  dg must not be null.
1756      */
1757     this(void delegate() dg)
1758     {
1759         super(dg);
1760         call();
1761     }
1762 
1763     /**
1764      * Initializes a generator object which is associated with a dynamic
1765      * D function.  The function will be called once to prepare the range
1766      * for iteration.
1767      *
1768      * Params:
1769      *  dg = The fiber function.
1770      *  sz = The stack size for this fiber.
1771      *
1772      * In:
1773      *  dg must not be null.
1774      */
1775     this(void delegate() dg, size_t sz)
1776     {
1777         super(dg, sz);
1778         call();
1779     }
1780 
1781     /**
1782      * Initializes a generator object which is associated with a dynamic
1783      * D function.  The function will be called once to prepare the range
1784      * for iteration.
1785      *
1786      * Params:
1787      *  dg = The fiber function.
1788      *  sz = The stack size for this fiber.
1789      *  guardPageSize = size of the guard page to trap fiber's stack
1790      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1791      *                  documentation for more details.
1792      *
1793      * In:
1794      *  dg must not be null.
1795      */
1796     this(void delegate() dg, size_t sz, size_t guardPageSize)
1797     {
1798         super(dg, sz, guardPageSize);
1799         call();
1800     }
1801 
1802     /**
1803      * Returns true if the generator is empty.
1804      */
1805     final bool empty() @property
1806     {
1807         return m_value is null || state == State.TERM;
1808     }
1809 
1810     /**
1811      * Obtains the next value from the underlying function.
1812      */
1813     final void popFront()
1814     {
1815         call();
1816     }
1817 
1818     /**
1819      * Returns the most recently generated value by shallow copy.
1820      */
1821     final T front() @property
1822     {
1823         return *m_value;
1824     }
1825 
1826     /**
1827      * Returns the most recently generated value without executing a
1828      * copy contructor. Will not compile for element types defining a
1829      * postblit, because Generator does not return by reference.
1830      */
1831     final T moveFront()
1832     {
1833         static if (!hasElaborateCopyConstructor!T)
1834         {
1835             return front;
1836         }
1837         else
1838         {
1839             static assert(0,
1840                     "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1841         }
1842     }
1843 
1844     final int opApply(scope int delegate(T) loopBody)
1845     {
1846         int broken;
1847         for (; !empty; popFront())
1848         {
1849             broken = loopBody(front);
1850             if (broken) break;
1851         }
1852         return broken;
1853     }
1854 
1855     final int opApply(scope int delegate(size_t, T) loopBody)
1856     {
1857         int broken;
1858         for (size_t i; !empty; ++i, popFront())
1859         {
1860             broken = loopBody(i, front);
1861             if (broken) break;
1862         }
1863         return broken;
1864     }
1865 private:
1866     T* m_value;
1867 }
1868 
1869 ///
1870 @system unittest
1871 {
1872     auto tid = spawn({
1873         int i;
1874         while (i < 9)
1875             i = receiveOnly!int;
1876 
1877         ownerTid.send(i * 2);
1878     });
1879 
1880     auto r = new Generator!int({
1881         foreach (i; 1 .. 10)
1882             yield(i);
1883     });
1884 
1885     foreach (e; r)
1886         tid.send(e);
1887 
1888     assert(receiveOnly!int == 18);
1889 }
1890 
1891 /**
1892  * Yields a value of type T to the caller of the currently executing
1893  * generator.
1894  *
1895  * Params:
1896  *  value = The value to yield.
1897  */
yield(T)1898 void yield(T)(ref T value)
1899 {
1900     Generator!T cur = cast(Generator!T) Fiber.getThis();
1901     if (cur !is null && cur.state == Fiber.State.EXEC)
1902     {
1903         cur.m_value = &value;
1904         return Fiber.yield();
1905     }
1906     throw new Exception("yield(T) called with no active generator for the supplied type");
1907 }
1908 
1909 /// ditto
yield(T)1910 void yield(T)(T value)
1911 {
1912     yield(value);
1913 }
1914 
1915 @system unittest
1916 {
1917     import core.exception;
1918     import std.exception;
1919 
1920     auto mainTid = thisTid;
1921     alias testdg = () {
1922         auto tid = spawn(
1923         (Tid mainTid) {
1924             int i;
1925             scope (failure) mainTid.send(false);
1926             try
1927             {
1928                 for (i = 1; i < 10; i++)
1929                 {
1930                     if (receiveOnly!int() != i)
1931                     {
1932                         mainTid.send(false);
1933                         break;
1934                     }
1935                 }
1936             }
1937             catch (OwnerTerminated e)
1938             {
1939                 // i will advance 1 past the last value expected
1940                 mainTid.send(i == 4);
1941             }
1942         }, mainTid);
1943         auto r = new Generator!int(
1944         {
1945             assertThrown!Exception(yield(2.0));
1946             yield(); // ensure this is a no-op
1947             yield(1);
1948             yield(); // also once something has been yielded
1949             yield(2);
1950             yield(3);
1951         });
1952 
foreach(e;r)1953         foreach (e; r)
1954         {
1955             tid.send(e);
1956         }
1957     };
1958 
1959     scheduler = new ThreadScheduler;
1960     scheduler.spawn(testdg);
1961     assert(receiveOnly!bool());
1962 
1963     scheduler = new FiberScheduler;
1964     scheduler.start(testdg);
1965     assert(receiveOnly!bool());
1966     scheduler = null;
1967 }
1968 ///
1969 @system unittest
1970 {
1971     import std.range;
1972 
1973     InputRange!int myIota = iota(10).inputRangeObject;
1974 
1975     myIota.popFront();
1976     myIota.popFront();
1977     assert(myIota.moveFront == 2);
1978     assert(myIota.front == 2);
1979     myIota.popFront();
1980     assert(myIota.front == 3);
1981 
1982     //can be assigned to std.range.interfaces.InputRange directly
1983     myIota = new Generator!int(
1984     {
1985         foreach (i; 0 .. 10) yield(i);
1986     });
1987 
1988     myIota.popFront();
1989     myIota.popFront();
1990     assert(myIota.moveFront == 2);
1991     assert(myIota.front == 2);
1992     myIota.popFront();
1993     assert(myIota.front == 3);
1994 
1995     size_t[2] counter = [0, 0];
1996     foreach (i, unused; myIota) counter[] += [1, i];
1997 
1998     assert(myIota.empty);
1999     assert(counter == [7, 21]);
2000 }
2001 
2002 private
2003 {
2004     /*
2005      * A MessageBox is a message queue for one thread.  Other threads may send
2006      * messages to this owner by calling put(), and the owner receives them by
2007      * calling get().  The put() call is therefore effectively shared and the
2008      * get() call is effectively local.  setMaxMsgs may be used by any thread
2009      * to limit the size of the message queue.
2010      */
2011     class MessageBox
2012     {
this()2013         this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
2014         {
2015             m_lock = new Mutex;
2016             m_closed = false;
2017 
2018             if (scheduler is null)
2019             {
2020                 m_putMsg = new Condition(m_lock);
2021                 m_notFull = new Condition(m_lock);
2022             }
2023             else
2024             {
2025                 m_putMsg = scheduler.newCondition(m_lock);
2026                 m_notFull = scheduler.newCondition(m_lock);
2027             }
2028         }
2029 
2030         ///
isClosed()2031         final @property bool isClosed() @safe @nogc pure
2032         {
2033             synchronized (m_lock)
2034             {
2035                 return m_closed;
2036             }
2037         }
2038 
2039         /*
2040          * Sets a limit on the maximum number of user messages allowed in the
2041          * mailbox.  If this limit is reached, the caller attempting to add
2042          * a new message will execute call.  If num is zero, there is no limit
2043          * on the message queue.
2044          *
2045          * Params:
2046          *  num  = The maximum size of the queue or zero if the queue is
2047          *         unbounded.
2048          *  call = The routine to call when the queue is full.
2049          */
setMaxMsgs(size_t num,bool function (Tid)call)2050         final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
2051         {
2052             synchronized (m_lock)
2053             {
2054                 m_maxMsgs = num;
2055                 m_onMaxMsgs = call;
2056             }
2057         }
2058 
2059         /*
2060          * If maxMsgs is not set, the message is added to the queue and the
2061          * owner is notified.  If the queue is full, the message will still be
2062          * accepted if it is a control message, otherwise onCrowdingDoThis is
2063          * called.  If the routine returns true, this call will block until
2064          * the owner has made space available in the queue.  If it returns
2065          * false, this call will abort.
2066          *
2067          * Params:
2068          *  msg = The message to put in the queue.
2069          *
2070          * Throws:
2071          *  An exception if the queue is full and onCrowdingDoThis throws.
2072          */
put(ref Message msg)2073         final void put(ref Message msg)
2074         {
2075             synchronized (m_lock)
2076             {
2077                 // TODO: Generate an error here if m_closed is true, or maybe
2078                 //       put a message in the caller's queue?
2079                 if (!m_closed)
2080                 {
2081                     while (true)
2082                     {
2083                         if (isPriorityMsg(msg))
2084                         {
2085                             m_sharedPty.put(msg);
2086                             m_putMsg.notify();
2087                             return;
2088                         }
2089                         if (!mboxFull() || isControlMsg(msg))
2090                         {
2091                             m_sharedBox.put(msg);
2092                             m_putMsg.notify();
2093                             return;
2094                         }
2095                         if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
2096                         {
2097                             return;
2098                         }
2099                         m_putQueue++;
2100                         m_notFull.wait();
2101                         m_putQueue--;
2102                     }
2103                 }
2104             }
2105         }
2106 
2107         /*
2108          * Matches ops against each message in turn until a match is found.
2109          *
2110          * Params:
2111          *  ops = The operations to match.  Each may return a bool to indicate
2112          *        whether a message with a matching type is truly a match.
2113          *
2114          * Returns:
2115          *  true if a message was retrieved and false if not (such as if a
2116          *  timeout occurred).
2117          *
2118          * Throws:
2119          *  LinkTerminated if a linked thread terminated, or OwnerTerminated
2120          * if the owner thread terminates and no existing messages match the
2121          * supplied ops.
2122          */
get(T...)2123         bool get(T...)(scope T vals)
2124         {
2125             import std.meta : AliasSeq;
2126 
2127             static assert(T.length, "T must not be empty");
2128 
2129             static if (isImplicitlyConvertible!(T[0], Duration))
2130             {
2131                 alias Ops = AliasSeq!(T[1 .. $]);
2132                 alias ops = vals[1 .. $];
2133                 enum timedWait = true;
2134                 Duration period = vals[0];
2135             }
2136             else
2137             {
2138                 alias Ops = AliasSeq!(T);
2139                 alias ops = vals[0 .. $];
2140                 enum timedWait = false;
2141             }
2142 
2143             bool onStandardMsg(ref Message msg)
2144             {
2145                 foreach (i, t; Ops)
2146                 {
2147                     alias Args = Parameters!(t);
2148                     auto op = ops[i];
2149 
2150                     if (msg.convertsTo!(Args))
2151                     {
2152                         alias RT = ReturnType!(t);
2153                         static if (is(RT == bool))
2154                         {
2155                             return msg.map(op);
2156                         }
2157                         else
2158                         {
2159                             msg.map(op);
2160                             static if (!is(immutable RT == immutable noreturn))
2161                                 return true;
2162                         }
2163                     }
2164                 }
2165                 return false;
2166             }
2167 
2168             bool onLinkDeadMsg(ref Message msg)
2169             {
2170                 assert(msg.convertsTo!(Tid),
2171                         "Message could be converted to Tid");
2172                 auto tid = msg.get!(Tid);
2173 
2174                 if (bool* pDepends = tid in thisInfo.links)
2175                 {
2176                     auto depends = *pDepends;
2177                     thisInfo.links.remove(tid);
2178                     // Give the owner relationship precedence.
2179                     if (depends && tid != thisInfo.owner)
2180                     {
2181                         auto e = new LinkTerminated(tid);
2182                         auto m = Message(MsgType.standard, e);
2183                         if (onStandardMsg(m))
2184                             return true;
2185                         throw e;
2186                     }
2187                 }
2188                 if (tid == thisInfo.owner)
2189                 {
2190                     thisInfo.owner = Tid.init;
2191                     auto e = new OwnerTerminated(tid);
2192                     auto m = Message(MsgType.standard, e);
2193                     if (onStandardMsg(m))
2194                         return true;
2195                     throw e;
2196                 }
2197                 return false;
2198             }
2199 
2200             bool onControlMsg(ref Message msg)
2201             {
2202                 switch (msg.type)
2203                 {
2204                 case MsgType.linkDead:
2205                     return onLinkDeadMsg(msg);
2206                 default:
2207                     return false;
2208                 }
2209             }
2210 
2211             bool scan(ref ListT list)
2212             {
2213                 for (auto range = list[]; !range.empty;)
2214                 {
2215                     // Only the message handler will throw, so if this occurs
2216                     // we can be certain that the message was handled.
2217                     scope (failure)
2218                         list.removeAt(range);
2219 
2220                     if (isControlMsg(range.front))
2221                     {
2222                         if (onControlMsg(range.front))
2223                         {
2224                             // Although the linkDead message is a control message,
2225                             // it can be handled by the user.  Since the linkDead
2226                             // message throws if not handled, if we get here then
2227                             // it has been handled and we can return from receive.
2228                             // This is a weird special case that will have to be
2229                             // handled in a more general way if more are added.
2230                             if (!isLinkDeadMsg(range.front))
2231                             {
2232                                 list.removeAt(range);
2233                                 continue;
2234                             }
2235                             list.removeAt(range);
2236                             return true;
2237                         }
2238                         range.popFront();
2239                         continue;
2240                     }
2241                     else
2242                     {
2243                         if (onStandardMsg(range.front))
2244                         {
2245                             list.removeAt(range);
2246                             return true;
2247                         }
2248                         range.popFront();
2249                         continue;
2250                     }
2251                 }
2252                 return false;
2253             }
2254 
2255             bool pty(ref ListT list)
2256             {
2257                 if (!list.empty)
2258                 {
2259                     auto range = list[];
2260 
2261                     if (onStandardMsg(range.front))
2262                     {
2263                         list.removeAt(range);
2264                         return true;
2265                     }
2266                     if (range.front.convertsTo!(Throwable))
2267                         throw range.front.get!(Throwable);
2268                     else if (range.front.convertsTo!(shared(Throwable)))
2269                         throw range.front.get!(shared(Throwable));
2270                     else
2271                         throw new PriorityMessageException(range.front.data);
2272                 }
2273                 return false;
2274             }
2275 
2276             static if (timedWait)
2277             {
2278                 import core.time : MonoTime;
2279                 auto limit = MonoTime.currTime + period;
2280             }
2281 
2282             while (true)
2283             {
2284                 ListT arrived;
2285 
2286                 if (pty(m_localPty) || scan(m_localBox))
2287                 {
2288                     return true;
2289                 }
2290                 yield();
2291                 synchronized (m_lock)
2292                 {
2293                     updateMsgCount();
2294                     while (m_sharedPty.empty && m_sharedBox.empty)
2295                     {
2296                         // NOTE: We're notifying all waiters here instead of just
2297                         //       a few because the onCrowding behavior may have
2298                         //       changed and we don't want to block sender threads
2299                         //       unnecessarily if the new behavior is not to block.
2300                         //       This will admittedly result in spurious wakeups
2301                         //       in other situations, but what can you do?
2302                         if (m_putQueue && !mboxFull())
2303                             m_notFull.notifyAll();
2304                         static if (timedWait)
2305                         {
2306                             if (period <= Duration.zero || !m_putMsg.wait(period))
2307                                 return false;
2308                         }
2309                         else
2310                         {
2311                             m_putMsg.wait();
2312                         }
2313                     }
2314                     m_localPty.put(m_sharedPty);
2315                     arrived.put(m_sharedBox);
2316                 }
2317                 if (m_localPty.empty)
2318                 {
2319                     scope (exit) m_localBox.put(arrived);
2320                     if (scan(arrived))
2321                     {
2322                         return true;
2323                     }
2324                     else
2325                     {
2326                         static if (timedWait)
2327                         {
2328                             period = limit - MonoTime.currTime;
2329                         }
2330                         continue;
2331                     }
2332                 }
2333                 m_localBox.put(arrived);
2334                 pty(m_localPty);
2335                 return true;
2336             }
2337         }
2338 
2339         /*
2340          * Called on thread termination.  This routine processes any remaining
2341          * control messages, clears out message queues, and sets a flag to
2342          * reject any future messages.
2343          */
close()2344         final void close()
2345         {
2346             static void onLinkDeadMsg(ref Message msg)
2347             {
2348                 assert(msg.convertsTo!(Tid),
2349                         "Message could be converted to Tid");
2350                 auto tid = msg.get!(Tid);
2351 
2352                 thisInfo.links.remove(tid);
2353                 if (tid == thisInfo.owner)
2354                     thisInfo.owner = Tid.init;
2355             }
2356 
2357             static void sweep(ref ListT list)
2358             {
2359                 for (auto range = list[]; !range.empty; range.popFront())
2360                 {
2361                     if (range.front.type == MsgType.linkDead)
2362                         onLinkDeadMsg(range.front);
2363                 }
2364             }
2365 
2366             ListT arrived;
2367 
2368             sweep(m_localBox);
2369             synchronized (m_lock)
2370             {
2371                 arrived.put(m_sharedBox);
2372                 m_closed = true;
2373             }
2374             m_localBox.clear();
2375             sweep(arrived);
2376         }
2377 
2378     private:
2379         // Routines involving local data only, no lock needed.
2380 
mboxFull()2381         bool mboxFull() @safe @nogc pure nothrow
2382         {
2383             return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2384         }
2385 
updateMsgCount()2386         void updateMsgCount() @safe @nogc pure nothrow
2387         {
2388             m_localMsgs = m_localBox.length;
2389         }
2390 
isControlMsg(ref Message msg)2391         bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2392         {
2393             return msg.type != MsgType.standard && msg.type != MsgType.priority;
2394         }
2395 
isPriorityMsg(ref Message msg)2396         bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2397         {
2398             return msg.type == MsgType.priority;
2399         }
2400 
isLinkDeadMsg(ref Message msg)2401         bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2402         {
2403             return msg.type == MsgType.linkDead;
2404         }
2405 
2406         alias OnMaxFn = bool function(Tid);
2407         alias ListT = List!(Message);
2408 
2409         ListT m_localBox;
2410         ListT m_localPty;
2411 
2412         Mutex m_lock;
2413         Condition m_putMsg;
2414         Condition m_notFull;
2415         size_t m_putQueue;
2416         ListT m_sharedBox;
2417         ListT m_sharedPty;
2418         OnMaxFn m_onMaxMsgs;
2419         size_t m_localMsgs;
2420         size_t m_maxMsgs;
2421         bool m_closed;
2422     }
2423 
2424     /*
2425      *
2426      */
List(T)2427     struct List(T)
2428     {
2429         struct Range
2430         {
2431             import std.exception : enforce;
2432 
2433             @property bool empty() const
2434             {
2435                 return !m_prev.next;
2436             }
2437 
2438             @property ref T front()
2439             {
2440                 enforce(m_prev.next, "invalid list node");
2441                 return m_prev.next.val;
2442             }
2443 
2444             @property void front(T val)
2445             {
2446                 enforce(m_prev.next, "invalid list node");
2447                 m_prev.next.val = val;
2448             }
2449 
2450             void popFront()
2451             {
2452                 enforce(m_prev.next, "invalid list node");
2453                 m_prev = m_prev.next;
2454             }
2455 
2456             private this(Node* p)
2457             {
2458                 m_prev = p;
2459             }
2460 
2461             private Node* m_prev;
2462         }
2463 
2464         void put(T val)
2465         {
2466             put(newNode(val));
2467         }
2468 
2469         void put(ref List!(T) rhs)
2470         {
2471             if (!rhs.empty)
2472             {
2473                 put(rhs.m_first);
2474                 while (m_last.next !is null)
2475                 {
2476                     m_last = m_last.next;
2477                     m_count++;
2478                 }
2479                 rhs.m_first = null;
2480                 rhs.m_last = null;
2481                 rhs.m_count = 0;
2482             }
2483         }
2484 
2485         Range opSlice()
2486         {
2487             return Range(cast(Node*)&m_first);
2488         }
2489 
2490         void removeAt(Range r)
2491         {
2492             import std.exception : enforce;
2493 
2494             assert(m_count, "Can not remove from empty Range");
2495             Node* n = r.m_prev;
2496             enforce(n && n.next, "attempting to remove invalid list node");
2497 
2498             if (m_last is m_first)
2499                 m_last = null;
2500             else if (m_last is n.next)
2501                 m_last = n; // nocoverage
2502             Node* to_free = n.next;
2503             n.next = n.next.next;
2504             freeNode(to_free);
2505             m_count--;
2506         }
2507 
2508         @property size_t length()
2509         {
2510             return m_count;
2511         }
2512 
2513         void clear()
2514         {
2515             m_first = m_last = null;
2516             m_count = 0;
2517         }
2518 
2519         @property bool empty()
2520         {
2521             return m_first is null;
2522         }
2523 
2524     private:
2525         struct Node
2526         {
2527             Node* next;
2528             T val;
2529 
2530             this(T v)
2531             {
2532                 val = v;
2533             }
2534         }
2535 
2536         static shared struct SpinLock
2537         {
2538             void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2539             void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2540             bool locked;
2541         }
2542 
2543         static shared SpinLock sm_lock;
2544         static shared Node* sm_head;
2545 
2546         Node* newNode(T v)
2547         {
2548             Node* n;
2549             {
2550                 sm_lock.lock();
2551                 scope (exit) sm_lock.unlock();
2552 
2553                 if (sm_head)
2554                 {
2555                     n = cast(Node*) sm_head;
2556                     sm_head = sm_head.next;
2557                 }
2558             }
2559             if (n)
2560             {
2561                 import core.lifetime : emplace;
2562                 emplace!Node(n, v);
2563             }
2564             else
2565             {
2566                 n = new Node(v);
2567             }
2568             return n;
2569         }
2570 
2571         void freeNode(Node* n)
2572         {
2573             // destroy val to free any owned GC memory
2574             destroy(n.val);
2575 
2576             sm_lock.lock();
2577             scope (exit) sm_lock.unlock();
2578 
2579             auto sn = cast(shared(Node)*) n;
2580             sn.next = sm_head;
2581             sm_head = sn;
2582         }
2583 
2584         void put(Node* n)
2585         {
2586             m_count++;
2587             if (!empty)
2588             {
2589                 m_last.next = n;
2590                 m_last = n;
2591                 return;
2592             }
2593             m_first = n;
2594             m_last = n;
2595         }
2596 
2597         Node* m_first;
2598         Node* m_last;
2599         size_t m_count;
2600     }
2601 }
2602 
2603 @system unittest
2604 {
2605     import std.typecons : tuple, Tuple;
2606 
testfn(Tid tid)2607     static void testfn(Tid tid)
2608     {
2609         receive((float val) { assert(0); }, (int val, int val2) {
2610             assert(val == 42 && val2 == 86);
2611         });
2612         receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2613         receive((Variant val) {  });
2614         receive((string val) {
2615             if ("the quick brown fox" != val)
2616                 return false;
2617             return true;
2618         }, (string val) { assert(false); });
2619         prioritySend(tid, "done");
2620     }
2621 
runTest(Tid tid)2622     static void runTest(Tid tid)
2623     {
2624         send(tid, 42, 86);
2625         send(tid, tuple(42, 86));
2626         send(tid, "hello", "there");
2627         send(tid, "the quick brown fox");
2628         receive((string val) { assert(val == "done"); });
2629     }
2630 
simpleTest()2631     static void simpleTest()
2632     {
2633         auto tid = spawn(&testfn, thisTid);
2634         runTest(tid);
2635 
2636         // Run the test again with a limited mailbox size.
2637         tid = spawn(&testfn, thisTid);
2638         setMaxMailboxSize(tid, 2, OnCrowding.block);
2639         runTest(tid);
2640     }
2641 
2642     simpleTest();
2643 
2644     scheduler = new ThreadScheduler;
2645     simpleTest();
2646     scheduler = null;
2647 }
2648 
shared(Mutex)2649 private @property shared(Mutex) initOnceLock()
2650 {
2651     static shared Mutex lock;
2652     if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock))
2653         return mtx;
2654     auto mtx = new shared Mutex;
2655     if (cas(&lock, cast(shared) null, mtx))
2656         return mtx;
2657     return atomicLoad!(MemoryOrder.acq)(lock);
2658 }
2659 
2660 /**
2661  * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2662  * thread-safe manner.
2663  *
2664  * The implementation guarantees that all threads simultaneously calling
2665  * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2666  * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2667  * afterwards.
2668  *
2669  * Params:
2670  *   var = The variable to initialize
2671  *   init = The lazy initializer value
2672  *
2673  * Returns:
2674  *   A reference to the initialized variable
2675  */
initOnce(alias var)2676 auto ref initOnce(alias var)(lazy typeof(var) init)
2677 {
2678     return initOnce!var(init, initOnceLock);
2679 }
2680 
2681 /// A typical use-case is to perform lazy but thread-safe initialization.
2682 @system unittest
2683 {
2684     static class MySingleton
2685     {
instance()2686         static MySingleton instance()
2687         {
2688             __gshared MySingleton inst;
2689             return initOnce!inst(new MySingleton);
2690         }
2691     }
2692 
2693     assert(MySingleton.instance !is null);
2694 }
2695 
2696 @system unittest
2697 {
2698     static class MySingleton
2699     {
instance()2700         static MySingleton instance()
2701         {
2702             __gshared MySingleton inst;
2703             return initOnce!inst(new MySingleton);
2704         }
2705 
2706     private:
this()2707         this() { val = ++cnt; }
2708         size_t val;
2709         __gshared size_t cnt;
2710     }
2711 
2712     foreach (_; 0 .. 10)
2713         spawn({ ownerTid.send(MySingleton.instance.val); });
2714     foreach (_; 0 .. 10)
2715         assert(receiveOnly!size_t == MySingleton.instance.val);
2716     assert(MySingleton.cnt == 1);
2717 }
2718 
2719 /**
2720  * Same as above, but takes a separate mutex instead of sharing one among
2721  * all initOnce instances.
2722  *
2723  * This should be used to avoid dead-locks when the $(D_PARAM init)
2724  * expression waits for the result of another thread that might also
2725  * call initOnce. Use with care.
2726  *
2727  * Params:
2728  *   var = The variable to initialize
2729  *   init = The lazy initializer value
2730  *   mutex = A mutex to prevent race conditions
2731  *
2732  * Returns:
2733  *   A reference to the initialized variable
2734  */
initOnce(alias var)2735 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex)
2736 {
2737     // check that var is global, can't take address of a TLS variable
2738     static assert(is(typeof({ __gshared p = &var; })),
2739         "var must be 'static shared' or '__gshared'.");
2740     import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2741 
2742     static shared bool flag;
2743     if (!atomicLoad!(MemoryOrder.acq)(flag))
2744     {
2745         synchronized (mutex)
2746         {
2747             if (!atomicLoad!(MemoryOrder.raw)(flag))
2748             {
2749                 var = init;
2750                 static if (!is(immutable typeof(var) == immutable noreturn))
2751                     atomicStore!(MemoryOrder.rel)(flag, true);
2752             }
2753         }
2754     }
2755     return var;
2756 }
2757 
2758 /// ditto
initOnce(alias var)2759 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2760 {
2761     return initOnce!var(init, cast(shared) mutex);
2762 }
2763 
2764 /// Use a separate mutex when init blocks on another thread that might also call initOnce.
2765 @system unittest
2766 {
2767     import core.sync.mutex : Mutex;
2768 
2769     static shared bool varA, varB;
2770     static shared Mutex m;
2771     m = new shared Mutex;
2772 
2773     spawn({
2774         // use a different mutex for varB to avoid a dead-lock
2775         initOnce!varB(true, m);
2776         ownerTid.send(true);
2777     });
2778     // init depends on the result of the spawned thread
2779     initOnce!varA(receiveOnly!bool);
2780     assert(varA == true);
2781     assert(varB == true);
2782 }
2783 
2784 @system unittest
2785 {
2786     static shared bool a;
2787     __gshared bool b;
2788     static bool c;
2789     bool d;
2790     initOnce!a(true);
2791     initOnce!b(true);
2792     static assert(!__traits(compiles, initOnce!c(true))); // TLS
2793     static assert(!__traits(compiles, initOnce!d(true))); // local variable
2794 }
2795 
2796 // test ability to send shared arrays
2797 @system unittest
2798 {
2799     static shared int[] x = new shared(int)[1];
2800     auto tid = spawn({
2801         auto arr = receiveOnly!(shared(int)[]);
2802         arr[0] = 5;
2803         ownerTid.send(true);
2804     });
2805     tid.send(x);
2806     receiveOnly!(bool);
2807     assert(x[0] == 5);
2808 }
2809 
2810 // https://issues.dlang.org/show_bug.cgi?id=13930
2811 @system unittest
2812 {
2813     immutable aa = ["0":0];
2814     thisTid.send(aa);
2815     receiveOnly!(immutable int[string]); // compile error
2816 }
2817 
2818 // https://issues.dlang.org/show_bug.cgi?id=19345
2819 @system unittest
2820 {
2821     static struct Aggregate { const int a; const int[5] b; }
t1(Tid mainTid)2822     static void t1(Tid mainTid)
2823     {
2824         const sendMe = Aggregate(42, [1, 2, 3, 4, 5]);
2825         mainTid.send(sendMe);
2826     }
2827 
2828     spawn(&t1, thisTid);
2829     auto result1 = receiveOnly!(const Aggregate)();
2830     immutable expected = Aggregate(42, [1, 2, 3, 4, 5]);
2831     assert(result1 == expected);
2832 }
2833 
2834 // Noreturn support
2835 @system unittest
2836 {
foo(int)2837     static noreturn foo(int) { throw new Exception(""); }
2838 
2839     if (false) spawn(&foo, 1);
2840     if (false) spawnLinked(&foo, 1);
2841 
2842     if (false) receive(&foo);
2843     if (false) receiveTimeout(Duration.init, &foo);
2844 
2845     // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend
2846     static assert(__traits(compiles, receiveOnly!noreturn()                 ));
2847     static assert(__traits(compiles, send(Tid.init, noreturn.init)          ));
2848     static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init)  ));
2849     static assert(__traits(compiles, yield(noreturn.init)                   ));
2850 
2851     static assert(__traits(compiles, {
2852         __gshared noreturn n;
2853         initOnce!n(noreturn.init);
2854     }));
2855 }
2856