1 /**
2 * $(SCRIPT inhibitQuickIndex = 1;)
3 * $(DIVC quickindex,
4 * $(BOOKTABLE,
5 * $(TR $(TH Category) $(TH Symbols))
6 * $(TR $(TD Tid) $(TD
7 * $(MYREF locate)
8 * $(MYREF ownerTid)
9 * $(MYREF register)
10 * $(MYREF spawn)
11 * $(MYREF spawnLinked)
12 * $(MYREF thisTid)
13 * $(MYREF Tid)
14 * $(MYREF TidMissingException)
15 * $(MYREF unregister)
16 * ))
17 * $(TR $(TD Message passing) $(TD
18 * $(MYREF prioritySend)
19 * $(MYREF receive)
20 * $(MYREF receiveOnly)
21 * $(MYREF receiveTimeout)
22 * $(MYREF send)
23 * $(MYREF setMaxMailboxSize)
24 * ))
25 * $(TR $(TD Message-related types) $(TD
26 * $(MYREF LinkTerminated)
27 * $(MYREF MailboxFull)
28 * $(MYREF MessageMismatch)
29 * $(MYREF OnCrowding)
30 * $(MYREF OwnerTerminated)
31 * $(MYREF PriorityMessageException)
32 * ))
33 * $(TR $(TD Scheduler) $(TD
34 * $(MYREF FiberScheduler)
35 * $(MYREF Generator)
36 * $(MYREF Scheduler)
37 * $(MYREF scheduler)
38 * $(MYREF ThreadInfo)
39 * $(MYREF ThreadScheduler)
40 * $(MYREF yield)
41 * ))
42 * $(TR $(TD Misc) $(TD
43 * $(MYREF initOnce)
44 * ))
45 * ))
46 *
47 * This is a low-level messaging API upon which more structured or restrictive
48 * APIs may be built. The general idea is that every messageable entity is
49 * represented by a common handle type called a Tid, which allows messages to
50 * be sent to logical threads that are executing in both the current process
51 * and in external processes using the same interface. This is an important
52 * aspect of scalability because it allows the components of a program to be
53 * spread across available resources with few to no changes to the actual
54 * implementation.
55 *
56 * A logical thread is an execution context that has its own stack and which
57 * runs asynchronously to other logical threads. These may be preemptively
58 * scheduled kernel threads, fibers (cooperative user-space threads), or some
59 * other concept with similar behavior.
60 *
61 * The type of concurrency used when logical threads are created is determined
62 * by the Scheduler selected at initialization time. The default behavior is
63 * currently to create a new kernel thread per call to spawn, but other
64 * schedulers are available that multiplex fibers across the main thread or
65 * use some combination of the two approaches.
66 *
67 * Copyright: Copyright Sean Kelly 2009 - 2014.
68 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
69 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak
70 * Source: $(PHOBOSSRC std/concurrency.d)
71 */
72 /* Copyright Sean Kelly 2009 - 2014.
73 * Distributed under the Boost Software License, Version 1.0.
74 * (See accompanying file LICENSE_1_0.txt or copy at
75 * http://www.boost.org/LICENSE_1_0.txt)
76 */
77 module std.concurrency;
78
79 public import std.variant;
80
81 import core.atomic;
82 import core.sync.condition;
83 import core.sync.mutex;
84 import core.thread;
85 import std.range.primitives;
86 import std.range.interfaces : InputRange;
87 import std.traits;
88
89 ///
90 @system unittest
91 {
92 __gshared string received;
spawnedFunc(Tid ownerTid)93 static void spawnedFunc(Tid ownerTid)
94 {
95 import std.conv : text;
96 // Receive a message from the owner thread.
97 receive((int i){
98 received = text("Received the number ", i);
99
100 // Send a message back to the owner thread
101 // indicating success.
102 send(ownerTid, true);
103 });
104 }
105
106 // Start spawnedFunc in a new thread.
107 auto childTid = spawn(&spawnedFunc, thisTid);
108
109 // Send the number 42 to this new thread.
110 send(childTid, 42);
111
112 // Receive the result code.
113 auto wasSuccessful = receiveOnly!(bool);
114 assert(wasSuccessful);
115 assert(received == "Received the number 42");
116 }
117
118 private
119 {
hasLocalAliasing(Types...)120 bool hasLocalAliasing(Types...)()
121 {
122 import std.typecons : Rebindable;
123
124 // Works around "statement is not reachable"
125 bool doesIt = false;
126 static foreach (T; Types)
127 {
128 static if (is(T == Tid))
129 { /* Allowed */ }
130 else static if (is(T : Rebindable!R, R))
131 doesIt |= hasLocalAliasing!R;
132 else static if (is(T == struct))
133 doesIt |= hasLocalAliasing!(typeof(T.tupleof));
134 else
135 doesIt |= std.traits.hasUnsharedAliasing!(T);
136 }
137 return doesIt;
138 }
139
140 @safe unittest
141 {
142 static struct Container { Tid t; }
143 static assert(!hasLocalAliasing!(Tid, Container, int));
144 }
145
146 // https://issues.dlang.org/show_bug.cgi?id=20097
147 @safe unittest
148 {
149 import std.datetime.systime : SysTime;
150 static struct Container { SysTime time; }
151 static assert(!hasLocalAliasing!(SysTime, Container));
152 }
153
154 enum MsgType
155 {
156 standard,
157 priority,
158 linkDead,
159 }
160
161 struct Message
162 {
163 MsgType type;
164 Variant data;
165
166 this(T...)(MsgType t, T vals) if (T.length > 0)
167 {
168 static if (T.length == 1)
169 {
170 type = t;
171 data = vals[0];
172 }
173 else
174 {
175 import std.typecons : Tuple;
176
177 type = t;
178 data = Tuple!(T)(vals);
179 }
180 }
181
convertsTo(T...)182 @property auto convertsTo(T...)()
183 {
184 static if (T.length == 1)
185 {
186 return is(T[0] == Variant) || data.convertsTo!(T);
187 }
188 else
189 {
190 import std.typecons : Tuple;
191 return data.convertsTo!(Tuple!(T));
192 }
193 }
194
get(T...)195 @property auto get(T...)()
196 {
197 static if (T.length == 1)
198 {
199 static if (is(T[0] == Variant))
200 return data;
201 else
202 return data.get!(T);
203 }
204 else
205 {
206 import std.typecons : Tuple;
207 return data.get!(Tuple!(T));
208 }
209 }
210
map(Op)211 auto map(Op)(Op op)
212 {
213 alias Args = Parameters!(Op);
214
215 static if (Args.length == 1)
216 {
217 static if (is(Args[0] == Variant))
218 return op(data);
219 else
220 return op(data.get!(Args));
221 }
222 else
223 {
224 import std.typecons : Tuple;
225 return op(data.get!(Tuple!(Args)).expand);
226 }
227 }
228 }
229
checkops(T...)230 void checkops(T...)(T ops)
231 {
232 import std.format : format;
233
234 foreach (i, t1; T)
235 {
236 static assert(isFunctionPointer!t1 || isDelegate!t1,
237 format!"T %d is not a function pointer or delegates"(i));
238 alias a1 = Parameters!(t1);
239 alias r1 = ReturnType!(t1);
240
241 static if (i < T.length - 1 && is(r1 == void))
242 {
243 static assert(a1.length != 1 || !is(a1[0] == Variant),
244 "function with arguments " ~ a1.stringof ~
245 " occludes successive function");
246
247 foreach (t2; T[i + 1 .. $])
248 {
249 alias a2 = Parameters!(t2);
250
251 static assert(!is(a1 == a2),
252 "function with arguments " ~ a1.stringof ~ " occludes successive function");
253 }
254 }
255 }
256 }
257
thisInfo()258 @property ref ThreadInfo thisInfo() nothrow
259 {
260 if (scheduler is null)
261 return ThreadInfo.thisInfo;
262 return scheduler.thisInfo;
263 }
264 }
265
~this()266 static ~this()
267 {
268 thisInfo.cleanup();
269 }
270
271 // Exceptions
272
273 /**
274 * Thrown on calls to `receiveOnly` if a message other than the type
275 * the receiving thread expected is sent.
276 */
277 class MessageMismatch : Exception
278 {
279 ///
280 this(string msg = "Unexpected message type") @safe pure nothrow @nogc
281 {
282 super(msg);
283 }
284 }
285
286 /**
287 * Thrown on calls to `receive` if the thread that spawned the receiving
288 * thread has terminated and no more messages exist.
289 */
290 class OwnerTerminated : Exception
291 {
292 ///
293 this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
294 {
295 super(msg);
296 tid = t;
297 }
298
299 Tid tid;
300 }
301
302 /**
303 * Thrown if a linked thread has terminated.
304 */
305 class LinkTerminated : Exception
306 {
307 ///
308 this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
309 {
310 super(msg);
311 tid = t;
312 }
313
314 Tid tid;
315 }
316
317 /**
318 * Thrown if a message was sent to a thread via
319 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
320 * for a message of this type.
321 */
322 class PriorityMessageException : Exception
323 {
324 ///
this(Variant vals)325 this(Variant vals)
326 {
327 super("Priority message");
328 message = vals;
329 }
330
331 /**
332 * The message that was sent.
333 */
334 Variant message;
335 }
336
337 /**
338 * Thrown on mailbox crowding if the mailbox is configured with
339 * `OnCrowding.throwException`.
340 */
341 class MailboxFull : Exception
342 {
343 ///
344 this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
345 {
346 super(msg);
347 tid = t;
348 }
349
350 Tid tid;
351 }
352
353 /**
354 * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't
355 * find an owner thread.
356 */
357 class TidMissingException : Exception
358 {
359 import std.exception : basicExceptionCtors;
360 ///
361 mixin basicExceptionCtors;
362 }
363
364
365 // Thread ID
366
367
368 /**
369 * An opaque type used to represent a logical thread.
370 */
371 struct Tid
372 {
373 private:
thisTid374 this(MessageBox m) @safe pure nothrow @nogc
375 {
376 mbox = m;
377 }
378
379 MessageBox mbox;
380
381 public:
382
383 /**
384 * Generate a convenient string for identifying this Tid. This is only
385 * useful to see if Tid's that are currently executing are the same or
386 * different, e.g. for logging and debugging. It is potentially possible
387 * that a Tid executed in the future will have the same toString() output
388 * as another Tid that has already terminated.
389 */
toStringTid390 void toString(W)(ref W w) const
391 {
392 import std.format.write : formattedWrite;
393 auto p = () @trusted { return cast(void*) mbox; }();
394 formattedWrite(w, "Tid(%x)", p);
395 }
396
397 }
398
399 @safe unittest
400 {
401 import std.conv : text;
402 Tid tid;
403 assert(text(tid) == "Tid(0)");
404 auto tid2 = thisTid;
405 assert(text(tid2) != "Tid(0)");
406 auto tid3 = tid2;
407 assert(text(tid2) == text(tid3));
408 }
409
410 // https://issues.dlang.org/show_bug.cgi?id=21512
411 @system unittest
412 {
413 import std.format : format;
414
415 const(Tid) b = spawn(() {});
416 assert(format!"%s"(b)[0 .. 4] == "Tid(");
417 }
418
419 /**
420 * Returns: The $(LREF Tid) of the caller's thread.
421 */
thisTid()422 @property Tid thisTid() @safe
423 {
424 // TODO: remove when concurrency is safe
425 static auto trus() @trusted
426 {
427 if (thisInfo.ident != Tid.init)
428 return thisInfo.ident;
429 thisInfo.ident = Tid(new MessageBox);
430 return thisInfo.ident;
431 }
432
433 return trus();
434 }
435
436 /**
437 * Return the Tid of the thread which spawned the caller's thread.
438 *
439 * Throws: A `TidMissingException` exception if
440 * there is no owner thread.
441 */
ownerTid()442 @property Tid ownerTid()
443 {
444 import std.exception : enforce;
445
446 enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
447 return thisInfo.owner;
448 }
449
450 @system unittest
451 {
452 import std.exception : assertThrown;
453
fun()454 static void fun()
455 {
456 string res = receiveOnly!string();
457 assert(res == "Main calling");
458 ownerTid.send("Child responding");
459 }
460
461 assertThrown!TidMissingException(ownerTid);
462 auto child = spawn(&fun);
463 child.send("Main calling");
464 string res = receiveOnly!string();
465 assert(res == "Child responding");
466 }
467
468 // Thread Creation
469
isSpawnable(F,T...)470 private template isSpawnable(F, T...)
471 {
472 template isParamsImplicitlyConvertible(F1, F2, int i = 0)
473 {
474 alias param1 = Parameters!F1;
475 alias param2 = Parameters!F2;
476 static if (param1.length != param2.length)
477 enum isParamsImplicitlyConvertible = false;
478 else static if (param1.length == i)
479 enum isParamsImplicitlyConvertible = true;
480 else static if (isImplicitlyConvertible!(param2[i], param1[i]))
481 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
482 F2, i + 1);
483 else
484 enum isParamsImplicitlyConvertible = false;
485 }
486
487 enum isSpawnable = isCallable!F && is(ReturnType!F : void)
488 && isParamsImplicitlyConvertible!(F, void function(T))
489 && (isFunctionPointer!F || !hasUnsharedAliasing!F);
490 }
491
492 /**
493 * Starts fn(args) in a new logical thread.
494 *
495 * Executes the supplied function in a new logical thread represented by
496 * `Tid`. The calling thread is designated as the owner of the new thread.
497 * When the owner thread terminates an `OwnerTerminated` message will be
498 * sent to the new thread, causing an `OwnerTerminated` exception to be
499 * thrown on `receive()`.
500 *
501 * Params:
502 * fn = The function to execute.
503 * args = Arguments to the function.
504 *
505 * Returns:
506 * A Tid representing the new logical thread.
507 *
508 * Notes:
509 * `args` must not have unshared aliasing. In other words, all arguments
510 * to `fn` must either be `shared` or `immutable` or have no
511 * pointer indirection. This is necessary for enforcing isolation among
512 * threads.
513 *
514 * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning
515 * `fn` must be either `shared` or `immutable`. */
516 Tid spawn(F, T...)(F fn, T args)
517 if (isSpawnable!(F, T))
518 {
519 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
520 return _spawn(false, fn, args);
521 }
522
523 ///
524 @system unittest
525 {
f(string msg)526 static void f(string msg)
527 {
528 assert(msg == "Hello World");
529 }
530
531 auto tid = spawn(&f, "Hello World");
532 }
533
534 /// Fails: char[] has mutable aliasing.
535 @system unittest
536 {
537 string msg = "Hello, World!";
538
f1(string msg)539 static void f1(string msg) {}
540 static assert(!__traits(compiles, spawn(&f1, msg.dup)));
541 static assert( __traits(compiles, spawn(&f1, msg.idup)));
542
f2(char[]msg)543 static void f2(char[] msg) {}
544 static assert(!__traits(compiles, spawn(&f2, msg.dup)));
545 static assert(!__traits(compiles, spawn(&f2, msg.idup)));
546 }
547
548 /// New thread with anonymous function
549 @system unittest
550 {
551 spawn({
552 ownerTid.send("This is so great!");
553 });
554 assert(receiveOnly!string == "This is so great!");
555 }
556
557 @system unittest
558 {
559 import core.thread : thread_joinAll;
560
561 __gshared string receivedMessage;
f1(string msg)562 static void f1(string msg)
563 {
564 receivedMessage = msg;
565 }
566
567 auto tid1 = spawn(&f1, "Hello World");
568 thread_joinAll;
569 assert(receivedMessage == "Hello World");
570 }
571
572 /**
573 * Starts fn(args) in a logical thread and will receive a LinkTerminated
574 * message when the operation terminates.
575 *
576 * Executes the supplied function in a new logical thread represented by
577 * Tid. This new thread is linked to the calling thread so that if either
578 * it or the calling thread terminates a LinkTerminated message will be sent
579 * to the other, causing a LinkTerminated exception to be thrown on receive().
580 * The owner relationship from spawn() is preserved as well, so if the link
581 * between threads is broken, owner termination will still result in an
582 * OwnerTerminated exception to be thrown on receive().
583 *
584 * Params:
585 * fn = The function to execute.
586 * args = Arguments to the function.
587 *
588 * Returns:
589 * A Tid representing the new thread.
590 */
591 Tid spawnLinked(F, T...)(F fn, T args)
592 if (isSpawnable!(F, T))
593 {
594 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
595 return _spawn(true, fn, args);
596 }
597
598 /*
599 *
600 */
601 private Tid _spawn(F, T...)(bool linked, F fn, T args)
602 if (isSpawnable!(F, T))
603 {
604 // TODO: MessageList and &exec should be shared.
605 auto spawnTid = Tid(new MessageBox);
606 auto ownerTid = thisTid;
607
exec()608 void exec()
609 {
610 thisInfo.ident = spawnTid;
611 thisInfo.owner = ownerTid;
612 fn(args);
613 }
614
615 // TODO: MessageList and &exec should be shared.
616 if (scheduler !is null)
617 scheduler.spawn(&exec);
618 else
619 {
620 auto t = new Thread(&exec);
621 t.start();
622 }
623 thisInfo.links[spawnTid] = linked;
624 return spawnTid;
625 }
626
627 @system unittest
628 {
629 void function() fn1;
630 void function(int) fn2;
631 static assert(__traits(compiles, spawn(fn1)));
632 static assert(__traits(compiles, spawn(fn2, 2)));
633 static assert(!__traits(compiles, spawn(fn1, 1)));
634 static assert(!__traits(compiles, spawn(fn2)));
635
636 void delegate(int) shared dg1;
637 shared(void delegate(int)) dg2;
638 shared(void delegate(long) shared) dg3;
639 shared(void delegate(real, int, long) shared) dg4;
640 void delegate(int) immutable dg5;
641 void delegate(int) dg6;
642 static assert(__traits(compiles, spawn(dg1, 1)));
643 static assert(__traits(compiles, spawn(dg2, 2)));
644 static assert(__traits(compiles, spawn(dg3, 3)));
645 static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
646 static assert(__traits(compiles, spawn(dg5, 5)));
647 static assert(!__traits(compiles, spawn(dg6, 6)));
648
opCall(int)649 auto callable1 = new class{ void opCall(int) shared {} };
cast(shared)650 auto callable2 = cast(shared) new class{ void opCall(int) shared {} };
opCall(int)651 auto callable3 = new class{ void opCall(int) immutable {} };
cast(immutable)652 auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} };
opCall(int)653 auto callable5 = new class{ void opCall(int) {} };
cast(shared)654 auto callable6 = cast(shared) new class{ void opCall(int) immutable {} };
cast(immutable)655 auto callable7 = cast(immutable) new class{ void opCall(int) shared {} };
cast(shared)656 auto callable8 = cast(shared) new class{ void opCall(int) const shared {} };
cast(const shared)657 auto callable9 = cast(const shared) new class{ void opCall(int) shared {} };
cast(const shared)658 auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
cast(immutable)659 auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
660 static assert(!__traits(compiles, spawn(callable1, 1)));
661 static assert( __traits(compiles, spawn(callable2, 2)));
662 static assert(!__traits(compiles, spawn(callable3, 3)));
663 static assert( __traits(compiles, spawn(callable4, 4)));
664 static assert(!__traits(compiles, spawn(callable5, 5)));
665 static assert(!__traits(compiles, spawn(callable6, 6)));
666 static assert(!__traits(compiles, spawn(callable7, 7)));
667 static assert( __traits(compiles, spawn(callable8, 8)));
668 static assert(!__traits(compiles, spawn(callable9, 9)));
669 static assert( __traits(compiles, spawn(callable10, 10)));
670 static assert( __traits(compiles, spawn(callable11, 11)));
671 }
672
673 /**
674 * Places the values as a message at the back of tid's message queue.
675 *
676 * Sends the supplied value to the thread represented by tid. As with
677 * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
678 */
send(T...)679 void send(T...)(Tid tid, T vals)
680 in (tid.mbox !is null)
681 {
682 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
683 _send(tid, vals);
684 }
685
686 /**
687 * Places the values as a message on the front of tid's message queue.
688 *
689 * Send a message to `tid` but place it at the front of `tid`'s message
690 * queue instead of at the back. This function is typically used for
691 * out-of-band communication, to signal exceptional conditions, etc.
692 */
prioritySend(T...)693 void prioritySend(T...)(Tid tid, T vals)
694 in (tid.mbox !is null)
695 {
696 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
697 _send(MsgType.priority, tid, vals);
698 }
699
700 /*
701 * ditto
702 */
_send(T...)703 private void _send(T...)(Tid tid, T vals)
704 in (tid.mbox !is null)
705 {
706 _send(MsgType.standard, tid, vals);
707 }
708
709 /*
710 * Implementation of send. This allows parameter checking to be different for
711 * both Tid.send() and .send().
712 */
_send(T...)713 private void _send(T...)(MsgType type, Tid tid, T vals)
714 in (tid.mbox !is null)
715 {
716 auto msg = Message(type, vals);
717 tid.mbox.put(msg);
718 }
719
720 /**
721 * Receives a message from another thread.
722 *
723 * Receive a message from another thread, or block if no messages of the
724 * specified types are available. This function works by pattern matching
725 * a message against a set of delegates and executing the first match found.
726 *
727 * If a delegate that accepts a $(REF Variant, std,variant) is included as
728 * the last argument to `receive`, it will match any message that was not
729 * matched by an earlier delegate. If more than one argument is sent,
730 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
731 * sent.
732 *
733 * Params:
734 * ops = Variadic list of function pointers and delegates. Entries
735 * in this list must not occlude later entries.
736 *
737 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
738 */
receive(T...)739 void receive(T...)( T ops )
740 in
741 {
742 assert(thisInfo.ident.mbox !is null,
743 "Cannot receive a message until a thread was spawned "
744 ~ "or thisTid was passed to a running thread.");
745 }
746 do
747 {
748 checkops( ops );
749
750 thisInfo.ident.mbox.get( ops );
751 }
752
753 ///
754 @system unittest
755 {
756 import std.variant : Variant;
757
758 auto process = ()
759 {
760 receive(
761 (int i) { ownerTid.send(1); },
762 (double f) { ownerTid.send(2); },
763 (Variant v) { ownerTid.send(3); }
764 );
765 };
766
767 {
768 auto tid = spawn(process);
769 send(tid, 42);
770 assert(receiveOnly!int == 1);
771 }
772
773 {
774 auto tid = spawn(process);
775 send(tid, 3.14);
776 assert(receiveOnly!int == 2);
777 }
778
779 {
780 auto tid = spawn(process);
781 send(tid, "something else");
782 assert(receiveOnly!int == 3);
783 }
784 }
785
786 @safe unittest
787 {
788 static assert( __traits( compiles,
789 {
790 receive( (Variant x) {} );
791 receive( (int x) {}, (Variant x) {} );
792 } ) );
793
794 static assert( !__traits( compiles,
795 {
796 receive( (Variant x) {}, (int x) {} );
797 } ) );
798
799 static assert( !__traits( compiles,
800 {
801 receive( (int x) {}, (int x) {} );
802 } ) );
803 }
804
805 // Make sure receive() works with free functions as well.
version(StdUnittest)806 version (StdUnittest)
807 {
808 private void receiveFunction(int x) {}
809 }
810 @safe unittest
811 {
812 static assert( __traits( compiles,
813 {
814 receive( &receiveFunction );
815 receive( &receiveFunction, (Variant x) {} );
816 } ) );
817 }
818
819
receiveOnlyRet(T...)820 private template receiveOnlyRet(T...)
821 {
822 static if ( T.length == 1 )
823 {
824 alias receiveOnlyRet = T[0];
825 }
826 else
827 {
828 import std.typecons : Tuple;
829 alias receiveOnlyRet = Tuple!(T);
830 }
831 }
832
833 /**
834 * Receives only messages with arguments of the specified types.
835 *
836 * Params:
837 * T = Variadic list of types to be received.
838 *
839 * Returns: The received message. If `T` has more than one entry,
840 * the message will be packed into a $(REF Tuple, std,typecons).
841 *
842 * Throws: $(LREF MessageMismatch) if a message of types other than `T`
843 * is received,
844 * $(LREF OwnerTerminated) when the sending thread was terminated.
845 */
846 receiveOnlyRet!(T) receiveOnly(T...)()
847 in
848 {
849 assert(thisInfo.ident.mbox !is null,
850 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
851 }
852 do
853 {
854 import std.format : format;
855 import std.meta : allSatisfy;
856 import std.typecons : Tuple;
857
858 Tuple!(T) ret;
859
860 thisInfo.ident.mbox.get((T val) {
861 static if (T.length)
862 {
863 static if (allSatisfy!(isAssignable, T))
864 {
865 ret.field = val;
866 }
867 else
868 {
869 import core.lifetime : emplace;
870 emplace(&ret, val);
871 }
872 }
873 },
874 (LinkTerminated e) { throw e; },
875 (OwnerTerminated e) { throw e; },
876 (Variant val) {
877 static if (T.length > 1)
878 string exp = T.stringof;
879 else
880 string exp = T[0].stringof;
881
882 throw new MessageMismatch(
883 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
884 });
885 static if (T.length == 1)
886 return ret[0];
887 else
888 return ret;
889 }
890
891 ///
892 @system unittest
893 {
894 auto tid = spawn(
895 {
896 assert(receiveOnly!int == 42);
897 });
898 send(tid, 42);
899 }
900
901 ///
902 @system unittest
903 {
904 auto tid = spawn(
905 {
906 assert(receiveOnly!string == "text");
907 });
908 send(tid, "text");
909 }
910
911 ///
912 @system unittest
913 {
914 struct Record { string name; int age; }
915
916 auto tid = spawn(
917 {
918 auto msg = receiveOnly!(double, Record);
919 assert(msg[0] == 0.5);
920 assert(msg[1].name == "Alice");
921 assert(msg[1].age == 31);
922 });
923
924 send(tid, 0.5, Record("Alice", 31));
925 }
926
927 @system unittest
928 {
t1(Tid mainTid)929 static void t1(Tid mainTid)
930 {
931 try
932 {
933 receiveOnly!string();
934 mainTid.send("");
935 }
936 catch (Throwable th)
937 {
938 mainTid.send(th.msg);
939 }
940 }
941
942 auto tid = spawn(&t1, thisTid);
943 tid.send(1);
944 string result = receiveOnly!string();
945 assert(result == "Unexpected message type: expected 'string', got 'int'");
946 }
947
948 // https://issues.dlang.org/show_bug.cgi?id=21663
949 @safe unittest
950 {
951 alias test = receiveOnly!(string, bool, bool);
952 }
953
954 /**
955 * Receives a message from another thread and gives up if no match
956 * arrives within a specified duration.
957 *
958 * Receive a message from another thread, or block until `duration` exceeds,
959 * if no messages of the specified types are available. This function works
960 * by pattern matching a message against a set of delegates and executing
961 * the first match found.
962 *
963 * If a delegate that accepts a $(REF Variant, std,variant) is included as
964 * the last argument, it will match any message that was not
965 * matched by an earlier delegate. If more than one argument is sent,
966 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
967 * sent.
968 *
969 * Params:
970 * duration = Duration, how long to wait. If `duration` is negative,
971 * won't wait at all.
972 * ops = Variadic list of function pointers and delegates. Entries
973 * in this list must not occlude later entries.
974 *
975 * Returns: `true` if it received a message and `false` if it timed out waiting
976 * for one.
977 *
978 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
979 */
receiveTimeout(T...)980 bool receiveTimeout(T...)(Duration duration, T ops)
981 in
982 {
983 assert(thisInfo.ident.mbox !is null,
984 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
985 }
986 do
987 {
988 checkops(ops);
989
990 return thisInfo.ident.mbox.get(duration, ops);
991 }
992
993 @safe unittest
994 {
995 static assert(__traits(compiles, {
996 receiveTimeout(msecs(0), (Variant x) {});
997 receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
998 }));
999
1000 static assert(!__traits(compiles, {
1001 receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
1002 }));
1003
1004 static assert(!__traits(compiles, {
1005 receiveTimeout(msecs(0), (int x) {}, (int x) {});
1006 }));
1007
1008 static assert(__traits(compiles, {
1009 receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
1010 }));
1011 }
1012
1013 // MessageBox Limits
1014
1015 /**
1016 * These behaviors may be specified when a mailbox is full.
1017 */
1018 enum OnCrowding
1019 {
1020 block, /// Wait until room is available.
1021 throwException, /// Throw a MailboxFull exception.
1022 ignore /// Abort the send and return.
1023 }
1024
1025 private
1026 {
onCrowdingBlock(Tid tid)1027 bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
1028 {
1029 return true;
1030 }
1031
onCrowdingThrow(Tid tid)1032 bool onCrowdingThrow(Tid tid) @safe pure
1033 {
1034 throw new MailboxFull(tid);
1035 }
1036
onCrowdingIgnore(Tid tid)1037 bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
1038 {
1039 return false;
1040 }
1041 }
1042
1043 /**
1044 * Sets a maximum mailbox size.
1045 *
1046 * Sets a limit on the maximum number of user messages allowed in the mailbox.
1047 * If this limit is reached, the caller attempting to add a new message will
1048 * execute the behavior specified by doThis. If messages is zero, the mailbox
1049 * is unbounded.
1050 *
1051 * Params:
1052 * tid = The Tid of the thread for which this limit should be set.
1053 * messages = The maximum number of messages or zero if no limit.
1054 * doThis = The behavior executed when a message is sent to a full
1055 * mailbox.
1056 */
setMaxMailboxSize(Tid tid,size_t messages,OnCrowding doThis)1057 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
1058 in (tid.mbox !is null)
1059 {
1060 final switch (doThis)
1061 {
1062 case OnCrowding.block:
1063 return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
1064 case OnCrowding.throwException:
1065 return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
1066 case OnCrowding.ignore:
1067 return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
1068 }
1069 }
1070
1071 /**
1072 * Sets a maximum mailbox size.
1073 *
1074 * Sets a limit on the maximum number of user messages allowed in the mailbox.
1075 * If this limit is reached, the caller attempting to add a new message will
1076 * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded.
1077 *
1078 * Params:
1079 * tid = The Tid of the thread for which this limit should be set.
1080 * messages = The maximum number of messages or zero if no limit.
1081 * onCrowdingDoThis = The routine called when a message is sent to a full
1082 * mailbox.
1083 */
setMaxMailboxSize(Tid tid,size_t messages,bool function (Tid)onCrowdingDoThis)1084 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
1085 in (tid.mbox !is null)
1086 {
1087 tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
1088 }
1089
1090 private
1091 {
1092 __gshared Tid[string] tidByName;
1093 __gshared string[][Tid] namesByTid;
1094 }
1095
registryLock()1096 private @property Mutex registryLock()
1097 {
1098 __gshared Mutex impl;
1099 initOnce!impl(new Mutex);
1100 return impl;
1101 }
1102
unregisterMe(ref ThreadInfo me)1103 private void unregisterMe(ref ThreadInfo me)
1104 {
1105 if (me.ident != Tid.init)
1106 {
1107 synchronized (registryLock)
1108 {
1109 if (auto allNames = me.ident in namesByTid)
1110 {
1111 foreach (name; *allNames)
1112 tidByName.remove(name);
1113 namesByTid.remove(me.ident);
1114 }
1115 }
1116 }
1117 }
1118
1119 /**
1120 * Associates name with tid.
1121 *
1122 * Associates name with tid in a process-local map. When the thread
1123 * represented by tid terminates, any names associated with it will be
1124 * automatically unregistered.
1125 *
1126 * Params:
1127 * name = The name to associate with tid.
1128 * tid = The tid register by name.
1129 *
1130 * Returns:
1131 * true if the name is available and tid is not known to represent a
1132 * defunct thread.
1133 */
1134 bool register(string name, Tid tid)
1135 in (tid.mbox !is null)
1136 {
synchronized(registryLock)1137 synchronized (registryLock)
1138 {
1139 if (name in tidByName)
1140 return false;
1141 if (tid.mbox.isClosed)
1142 return false;
1143 namesByTid[tid] ~= name;
1144 tidByName[name] = tid;
1145 return true;
1146 }
1147 }
1148
1149 /**
1150 * Removes the registered name associated with a tid.
1151 *
1152 * Params:
1153 * name = The name to unregister.
1154 *
1155 * Returns:
1156 * true if the name is registered, false if not.
1157 */
unregister(string name)1158 bool unregister(string name)
1159 {
1160 import std.algorithm.mutation : remove, SwapStrategy;
1161 import std.algorithm.searching : countUntil;
1162
1163 synchronized (registryLock)
1164 {
1165 if (auto tid = name in tidByName)
1166 {
1167 auto allNames = *tid in namesByTid;
1168 auto pos = countUntil(*allNames, name);
1169 remove!(SwapStrategy.unstable)(*allNames, pos);
1170 tidByName.remove(name);
1171 return true;
1172 }
1173 return false;
1174 }
1175 }
1176
1177 /**
1178 * Gets the Tid associated with name.
1179 *
1180 * Params:
1181 * name = The name to locate within the registry.
1182 *
1183 * Returns:
1184 * The associated Tid or Tid.init if name is not registered.
1185 */
locate(string name)1186 Tid locate(string name)
1187 {
1188 synchronized (registryLock)
1189 {
1190 if (auto tid = name in tidByName)
1191 return *tid;
1192 return Tid.init;
1193 }
1194 }
1195
1196 /**
1197 * Encapsulates all implementation-level data needed for scheduling.
1198 *
1199 * When defining a Scheduler, an instance of this struct must be associated
1200 * with each logical thread. It contains all implementation-level information
1201 * needed by the internal API.
1202 */
1203 struct ThreadInfo
1204 {
1205 Tid ident;
1206 bool[Tid] links;
1207 Tid owner;
1208
1209 /**
1210 * Gets a thread-local instance of ThreadInfo.
1211 *
1212 * Gets a thread-local instance of ThreadInfo, which should be used as the
1213 * default instance when info is requested for a thread not created by the
1214 * Scheduler.
1215 */
thisInfoThreadInfo1216 static @property ref thisInfo() nothrow
1217 {
1218 static ThreadInfo val;
1219 return val;
1220 }
1221
1222 /**
1223 * Cleans up this ThreadInfo.
1224 *
1225 * This must be called when a scheduled thread terminates. It tears down
1226 * the messaging system for the thread and notifies interested parties of
1227 * the thread's termination.
1228 */
cleanupThreadInfo1229 void cleanup()
1230 {
1231 if (ident.mbox !is null)
1232 ident.mbox.close();
1233 foreach (tid; links.keys)
1234 _send(MsgType.linkDead, tid, ident);
1235 if (owner != Tid.init)
1236 _send(MsgType.linkDead, owner, ident);
1237 unregisterMe(this); // clean up registry entries
1238 }
1239
1240 // https://issues.dlang.org/show_bug.cgi?id=20160
1241 @system unittest
1242 {
1243 register("main_thread", thisTid());
1244
1245 ThreadInfo t;
1246 t.cleanup();
1247
1248 assert(locate("main_thread") == thisTid());
1249 }
1250 }
1251
1252 /**
1253 * A Scheduler controls how threading is performed by spawn.
1254 *
1255 * Implementing a Scheduler allows the concurrency mechanism used by this
1256 * module to be customized according to different needs. By default, a call
1257 * to spawn will create a new kernel thread that executes the supplied routine
1258 * and terminates when finished. But it is possible to create Schedulers that
1259 * reuse threads, that multiplex Fibers (coroutines) across a single thread,
1260 * or any number of other approaches. By making the choice of Scheduler a
1261 * user-level option, std.concurrency may be used for far more types of
1262 * application than if this behavior were predefined.
1263 *
1264 * Example:
1265 * ---
1266 * import std.concurrency;
1267 * import std.stdio;
1268 *
1269 * void main()
1270 * {
1271 * scheduler = new FiberScheduler;
1272 * scheduler.start(
1273 * {
1274 * writeln("the rest of main goes here");
1275 * });
1276 * }
1277 * ---
1278 *
1279 * Some schedulers have a dispatching loop that must run if they are to work
1280 * properly, so for the sake of consistency, when using a scheduler, start()
1281 * must be called within main(). This yields control to the scheduler and
1282 * will ensure that any spawned threads are executed in an expected manner.
1283 */
1284 interface Scheduler
1285 {
1286 /**
1287 * Spawns the supplied op and starts the Scheduler.
1288 *
1289 * This is intended to be called at the start of the program to yield all
1290 * scheduling to the active Scheduler instance. This is necessary for
1291 * schedulers that explicitly dispatch threads rather than simply relying
1292 * on the operating system to do so, and so start should always be called
1293 * within main() to begin normal program execution.
1294 *
1295 * Params:
1296 * op = A wrapper for whatever the main thread would have done in the
1297 * absence of a custom scheduler. It will be automatically executed
1298 * via a call to spawn by the Scheduler.
1299 */
1300 void start(void delegate() op);
1301
1302 /**
1303 * Assigns a logical thread to execute the supplied op.
1304 *
1305 * This routine is called by spawn. It is expected to instantiate a new
1306 * logical thread and run the supplied operation. This thread must call
1307 * thisInfo.cleanup() when the thread terminates if the scheduled thread
1308 * is not a kernel thread--all kernel threads will have their ThreadInfo
1309 * cleaned up automatically by a thread-local destructor.
1310 *
1311 * Params:
1312 * op = The function to execute. This may be the actual function passed
1313 * by the user to spawn itself, or may be a wrapper function.
1314 */
1315 void spawn(void delegate() op);
1316
1317 /**
1318 * Yields execution to another logical thread.
1319 *
1320 * This routine is called at various points within concurrency-aware APIs
1321 * to provide a scheduler a chance to yield execution when using some sort
1322 * of cooperative multithreading model. If this is not appropriate, such
1323 * as when each logical thread is backed by a dedicated kernel thread,
1324 * this routine may be a no-op.
1325 */
1326 void yield() nothrow;
1327
1328 /**
1329 * Returns an appropriate ThreadInfo instance.
1330 *
1331 * Returns an instance of ThreadInfo specific to the logical thread that
1332 * is calling this routine or, if the calling thread was not create by
1333 * this scheduler, returns ThreadInfo.thisInfo instead.
1334 */
1335 @property ref ThreadInfo thisInfo() nothrow;
1336
1337 /**
1338 * Creates a Condition variable analog for signaling.
1339 *
1340 * Creates a new Condition variable analog which is used to check for and
1341 * to signal the addition of messages to a thread's message queue. Like
1342 * yield, some schedulers may need to define custom behavior so that calls
1343 * to Condition.wait() yield to another thread when no new messages are
1344 * available instead of blocking.
1345 *
1346 * Params:
1347 * m = The Mutex that will be associated with this condition. It will be
1348 * locked prior to any operation on the condition, and so in some
1349 * cases a Scheduler may need to hold this reference and unlock the
1350 * mutex before yielding execution to another logical thread.
1351 */
1352 Condition newCondition(Mutex m) nothrow;
1353 }
1354
1355 /**
1356 * An example Scheduler using kernel threads.
1357 *
1358 * This is an example Scheduler that mirrors the default scheduling behavior
1359 * of creating one kernel thread per call to spawn. It is fully functional
1360 * and may be instantiated and used, but is not a necessary part of the
1361 * default functioning of this module.
1362 */
1363 class ThreadScheduler : Scheduler
1364 {
1365 /**
1366 * This simply runs op directly, since no real scheduling is needed by
1367 * this approach.
1368 */
start(void delegate ()op)1369 void start(void delegate() op)
1370 {
1371 op();
1372 }
1373
1374 /**
1375 * Creates a new kernel thread and assigns it to run the supplied op.
1376 */
spawn(void delegate ()op)1377 void spawn(void delegate() op)
1378 {
1379 auto t = new Thread(op);
1380 t.start();
1381 }
1382
1383 /**
1384 * This scheduler does no explicit multiplexing, so this is a no-op.
1385 */
yield()1386 void yield() nothrow
1387 {
1388 // no explicit yield needed
1389 }
1390
1391 /**
1392 * Returns ThreadInfo.thisInfo, since it is a thread-local instance of
1393 * ThreadInfo, which is the correct behavior for this scheduler.
1394 */
thisInfo()1395 @property ref ThreadInfo thisInfo() nothrow
1396 {
1397 return ThreadInfo.thisInfo;
1398 }
1399
1400 /**
1401 * Creates a new Condition variable. No custom behavior is needed here.
1402 */
newCondition(Mutex m)1403 Condition newCondition(Mutex m) nothrow
1404 {
1405 return new Condition(m);
1406 }
1407 }
1408
1409 /**
1410 * An example Scheduler using Fibers.
1411 *
1412 * This is an example scheduler that creates a new Fiber per call to spawn
1413 * and multiplexes the execution of all fibers within the main thread.
1414 */
1415 class FiberScheduler : Scheduler
1416 {
1417 /**
1418 * This creates a new Fiber for the supplied op and then starts the
1419 * dispatcher.
1420 */
start(void delegate ()op)1421 void start(void delegate() op)
1422 {
1423 create(op);
1424 dispatch();
1425 }
1426
1427 /**
1428 * This created a new Fiber for the supplied op and adds it to the
1429 * dispatch list.
1430 */
spawn(void delegate ()op)1431 void spawn(void delegate() op) nothrow
1432 {
1433 create(op);
1434 yield();
1435 }
1436
1437 /**
1438 * If the caller is a scheduled Fiber, this yields execution to another
1439 * scheduled Fiber.
1440 */
yield()1441 void yield() nothrow
1442 {
1443 // NOTE: It's possible that we should test whether the calling Fiber
1444 // is an InfoFiber before yielding, but I think it's reasonable
1445 // that any (non-Generator) fiber should yield here.
1446 if (Fiber.getThis())
1447 Fiber.yield();
1448 }
1449
1450 /**
1451 * Returns an appropriate ThreadInfo instance.
1452 *
1453 * Returns a ThreadInfo instance specific to the calling Fiber if the
1454 * Fiber was created by this dispatcher, otherwise it returns
1455 * ThreadInfo.thisInfo.
1456 */
thisInfo()1457 @property ref ThreadInfo thisInfo() nothrow
1458 {
1459 auto f = cast(InfoFiber) Fiber.getThis();
1460
1461 if (f !is null)
1462 return f.info;
1463 return ThreadInfo.thisInfo;
1464 }
1465
1466 /**
1467 * Returns a Condition analog that yields when wait or notify is called.
1468 *
1469 * Bug:
1470 * For the default implementation, `notifyAll`will behave like `notify`.
1471 *
1472 * Params:
1473 * m = A `Mutex` to use for locking if the condition needs to be waited on
1474 * or notified from multiple `Thread`s.
1475 * If `null`, no `Mutex` will be used and it is assumed that the
1476 * `Condition` is only waited on/notified from one `Thread`.
1477 */
newCondition(Mutex m)1478 Condition newCondition(Mutex m) nothrow
1479 {
1480 return new FiberCondition(m);
1481 }
1482
1483 protected:
1484 /**
1485 * Creates a new Fiber which calls the given delegate.
1486 *
1487 * Params:
1488 * op = The delegate the fiber should call
1489 */
create(void delegate ()op)1490 void create(void delegate() op) nothrow
1491 {
1492 void wrap()
1493 {
1494 scope (exit)
1495 {
1496 thisInfo.cleanup();
1497 }
1498 op();
1499 }
1500
1501 m_fibers ~= new InfoFiber(&wrap);
1502 }
1503
1504 /**
1505 * Fiber which embeds a ThreadInfo
1506 */
1507 static class InfoFiber : Fiber
1508 {
1509 ThreadInfo info;
1510
this(void delegate ()op)1511 this(void delegate() op) nothrow
1512 {
1513 super(op);
1514 }
1515
this(void delegate ()op,size_t sz)1516 this(void delegate() op, size_t sz) nothrow
1517 {
1518 super(op, sz);
1519 }
1520 }
1521
1522 private:
1523 class FiberCondition : Condition
1524 {
this(Mutex m)1525 this(Mutex m) nothrow
1526 {
1527 super(m);
1528 notified = false;
1529 }
1530
wait()1531 override void wait() nothrow
1532 {
1533 scope (exit) notified = false;
1534
1535 while (!notified)
1536 switchContext();
1537 }
1538
wait(Duration period)1539 override bool wait(Duration period) nothrow
1540 {
1541 import core.time : MonoTime;
1542
1543 scope (exit) notified = false;
1544
1545 for (auto limit = MonoTime.currTime + period;
1546 !notified && !period.isNegative;
1547 period = limit - MonoTime.currTime)
1548 {
1549 this.outer.yield();
1550 }
1551 return notified;
1552 }
1553
notify()1554 override void notify() nothrow
1555 {
1556 notified = true;
1557 switchContext();
1558 }
1559
notifyAll()1560 override void notifyAll() nothrow
1561 {
1562 notified = true;
1563 switchContext();
1564 }
1565
1566 private:
switchContext()1567 void switchContext() nothrow
1568 {
1569 if (mutex_nothrow) mutex_nothrow.unlock_nothrow();
1570 scope (exit)
1571 if (mutex_nothrow)
1572 mutex_nothrow.lock_nothrow();
1573 this.outer.yield();
1574 }
1575
1576 bool notified;
1577 }
1578
dispatch()1579 void dispatch()
1580 {
1581 import std.algorithm.mutation : remove;
1582
1583 while (m_fibers.length > 0)
1584 {
1585 auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
1586 if (t !is null && !(cast(OwnerTerminated) t))
1587 {
1588 throw t;
1589 }
1590 if (m_fibers[m_pos].state == Fiber.State.TERM)
1591 {
1592 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1593 m_pos = 0;
1594 }
1595 else if (m_pos++ >= m_fibers.length - 1)
1596 {
1597 m_pos = 0;
1598 }
1599 }
1600 }
1601
1602 Fiber[] m_fibers;
1603 size_t m_pos;
1604 }
1605
1606 @system unittest
1607 {
receive(Condition cond,ref size_t received)1608 static void receive(Condition cond, ref size_t received)
1609 {
1610 while (true)
1611 {
1612 synchronized (cond.mutex)
1613 {
1614 cond.wait();
1615 ++received;
1616 }
1617 }
1618 }
1619
send(Condition cond,ref size_t sent)1620 static void send(Condition cond, ref size_t sent)
1621 {
1622 while (true)
1623 {
1624 synchronized (cond.mutex)
1625 {
1626 ++sent;
1627 cond.notify();
1628 }
1629 }
1630 }
1631
1632 auto fs = new FiberScheduler;
1633 auto mtx = new Mutex;
1634 auto cond = fs.newCondition(mtx);
1635
1636 size_t received, sent;
1637 auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1638 waiter.call();
1639 assert(received == 0);
1640 notifier.call();
1641 assert(sent == 1);
1642 assert(received == 0);
1643 waiter.call();
1644 assert(received == 1);
1645 waiter.call();
1646 assert(received == 1);
1647 }
1648
1649 /**
1650 * Sets the Scheduler behavior within the program.
1651 *
1652 * This variable sets the Scheduler behavior within this program. Typically,
1653 * when setting a Scheduler, scheduler.start() should be called in main. This
1654 * routine will not return until program execution is complete.
1655 */
1656 __gshared Scheduler scheduler;
1657
1658 // Generator
1659
1660 /**
1661 * If the caller is a Fiber and is not a Generator, this function will call
1662 * scheduler.yield() or Fiber.yield(), as appropriate.
1663 */
yield()1664 void yield() nothrow
1665 {
1666 auto fiber = Fiber.getThis();
1667 if (!(cast(IsGenerator) fiber))
1668 {
1669 if (scheduler is null)
1670 {
1671 if (fiber)
1672 return Fiber.yield();
1673 }
1674 else
1675 scheduler.yield();
1676 }
1677 }
1678
1679 /// Used to determine whether a Generator is running.
1680 private interface IsGenerator {}
1681
1682
1683 /**
1684 * A Generator is a Fiber that periodically returns values of type T to the
1685 * caller via yield. This is represented as an InputRange.
1686 */
Generator(T)1687 class Generator(T) :
1688 Fiber, IsGenerator, InputRange!T
1689 {
1690 /**
1691 * Initializes a generator object which is associated with a static
1692 * D function. The function will be called once to prepare the range
1693 * for iteration.
1694 *
1695 * Params:
1696 * fn = The fiber function.
1697 *
1698 * In:
1699 * fn must not be null.
1700 */
1701 this(void function() fn)
1702 {
1703 super(fn);
1704 call();
1705 }
1706
1707 /**
1708 * Initializes a generator object which is associated with a static
1709 * D function. The function will be called once to prepare the range
1710 * for iteration.
1711 *
1712 * Params:
1713 * fn = The fiber function.
1714 * sz = The stack size for this fiber.
1715 *
1716 * In:
1717 * fn must not be null.
1718 */
1719 this(void function() fn, size_t sz)
1720 {
1721 super(fn, sz);
1722 call();
1723 }
1724
1725 /**
1726 * Initializes a generator object which is associated with a static
1727 * D function. The function will be called once to prepare the range
1728 * for iteration.
1729 *
1730 * Params:
1731 * fn = The fiber function.
1732 * sz = The stack size for this fiber.
1733 * guardPageSize = size of the guard page to trap fiber's stack
1734 * overflows. Refer to $(REF Fiber, core,thread)'s
1735 * documentation for more details.
1736 *
1737 * In:
1738 * fn must not be null.
1739 */
1740 this(void function() fn, size_t sz, size_t guardPageSize)
1741 {
1742 super(fn, sz, guardPageSize);
1743 call();
1744 }
1745
1746 /**
1747 * Initializes a generator object which is associated with a dynamic
1748 * D function. The function will be called once to prepare the range
1749 * for iteration.
1750 *
1751 * Params:
1752 * dg = The fiber function.
1753 *
1754 * In:
1755 * dg must not be null.
1756 */
1757 this(void delegate() dg)
1758 {
1759 super(dg);
1760 call();
1761 }
1762
1763 /**
1764 * Initializes a generator object which is associated with a dynamic
1765 * D function. The function will be called once to prepare the range
1766 * for iteration.
1767 *
1768 * Params:
1769 * dg = The fiber function.
1770 * sz = The stack size for this fiber.
1771 *
1772 * In:
1773 * dg must not be null.
1774 */
1775 this(void delegate() dg, size_t sz)
1776 {
1777 super(dg, sz);
1778 call();
1779 }
1780
1781 /**
1782 * Initializes a generator object which is associated with a dynamic
1783 * D function. The function will be called once to prepare the range
1784 * for iteration.
1785 *
1786 * Params:
1787 * dg = The fiber function.
1788 * sz = The stack size for this fiber.
1789 * guardPageSize = size of the guard page to trap fiber's stack
1790 * overflows. Refer to $(REF Fiber, core,thread)'s
1791 * documentation for more details.
1792 *
1793 * In:
1794 * dg must not be null.
1795 */
1796 this(void delegate() dg, size_t sz, size_t guardPageSize)
1797 {
1798 super(dg, sz, guardPageSize);
1799 call();
1800 }
1801
1802 /**
1803 * Returns true if the generator is empty.
1804 */
1805 final bool empty() @property
1806 {
1807 return m_value is null || state == State.TERM;
1808 }
1809
1810 /**
1811 * Obtains the next value from the underlying function.
1812 */
1813 final void popFront()
1814 {
1815 call();
1816 }
1817
1818 /**
1819 * Returns the most recently generated value by shallow copy.
1820 */
1821 final T front() @property
1822 {
1823 return *m_value;
1824 }
1825
1826 /**
1827 * Returns the most recently generated value without executing a
1828 * copy contructor. Will not compile for element types defining a
1829 * postblit, because Generator does not return by reference.
1830 */
1831 final T moveFront()
1832 {
1833 static if (!hasElaborateCopyConstructor!T)
1834 {
1835 return front;
1836 }
1837 else
1838 {
1839 static assert(0,
1840 "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1841 }
1842 }
1843
1844 final int opApply(scope int delegate(T) loopBody)
1845 {
1846 int broken;
1847 for (; !empty; popFront())
1848 {
1849 broken = loopBody(front);
1850 if (broken) break;
1851 }
1852 return broken;
1853 }
1854
1855 final int opApply(scope int delegate(size_t, T) loopBody)
1856 {
1857 int broken;
1858 for (size_t i; !empty; ++i, popFront())
1859 {
1860 broken = loopBody(i, front);
1861 if (broken) break;
1862 }
1863 return broken;
1864 }
1865 private:
1866 T* m_value;
1867 }
1868
1869 ///
1870 @system unittest
1871 {
1872 auto tid = spawn({
1873 int i;
1874 while (i < 9)
1875 i = receiveOnly!int;
1876
1877 ownerTid.send(i * 2);
1878 });
1879
1880 auto r = new Generator!int({
1881 foreach (i; 1 .. 10)
1882 yield(i);
1883 });
1884
1885 foreach (e; r)
1886 tid.send(e);
1887
1888 assert(receiveOnly!int == 18);
1889 }
1890
1891 /**
1892 * Yields a value of type T to the caller of the currently executing
1893 * generator.
1894 *
1895 * Params:
1896 * value = The value to yield.
1897 */
yield(T)1898 void yield(T)(ref T value)
1899 {
1900 Generator!T cur = cast(Generator!T) Fiber.getThis();
1901 if (cur !is null && cur.state == Fiber.State.EXEC)
1902 {
1903 cur.m_value = &value;
1904 return Fiber.yield();
1905 }
1906 throw new Exception("yield(T) called with no active generator for the supplied type");
1907 }
1908
1909 /// ditto
yield(T)1910 void yield(T)(T value)
1911 {
1912 yield(value);
1913 }
1914
1915 @system unittest
1916 {
1917 import core.exception;
1918 import std.exception;
1919
1920 auto mainTid = thisTid;
1921 alias testdg = () {
1922 auto tid = spawn(
1923 (Tid mainTid) {
1924 int i;
1925 scope (failure) mainTid.send(false);
1926 try
1927 {
1928 for (i = 1; i < 10; i++)
1929 {
1930 if (receiveOnly!int() != i)
1931 {
1932 mainTid.send(false);
1933 break;
1934 }
1935 }
1936 }
1937 catch (OwnerTerminated e)
1938 {
1939 // i will advance 1 past the last value expected
1940 mainTid.send(i == 4);
1941 }
1942 }, mainTid);
1943 auto r = new Generator!int(
1944 {
1945 assertThrown!Exception(yield(2.0));
1946 yield(); // ensure this is a no-op
1947 yield(1);
1948 yield(); // also once something has been yielded
1949 yield(2);
1950 yield(3);
1951 });
1952
foreach(e;r)1953 foreach (e; r)
1954 {
1955 tid.send(e);
1956 }
1957 };
1958
1959 scheduler = new ThreadScheduler;
1960 scheduler.spawn(testdg);
1961 assert(receiveOnly!bool());
1962
1963 scheduler = new FiberScheduler;
1964 scheduler.start(testdg);
1965 assert(receiveOnly!bool());
1966 scheduler = null;
1967 }
1968 ///
1969 @system unittest
1970 {
1971 import std.range;
1972
1973 InputRange!int myIota = iota(10).inputRangeObject;
1974
1975 myIota.popFront();
1976 myIota.popFront();
1977 assert(myIota.moveFront == 2);
1978 assert(myIota.front == 2);
1979 myIota.popFront();
1980 assert(myIota.front == 3);
1981
1982 //can be assigned to std.range.interfaces.InputRange directly
1983 myIota = new Generator!int(
1984 {
1985 foreach (i; 0 .. 10) yield(i);
1986 });
1987
1988 myIota.popFront();
1989 myIota.popFront();
1990 assert(myIota.moveFront == 2);
1991 assert(myIota.front == 2);
1992 myIota.popFront();
1993 assert(myIota.front == 3);
1994
1995 size_t[2] counter = [0, 0];
1996 foreach (i, unused; myIota) counter[] += [1, i];
1997
1998 assert(myIota.empty);
1999 assert(counter == [7, 21]);
2000 }
2001
2002 private
2003 {
2004 /*
2005 * A MessageBox is a message queue for one thread. Other threads may send
2006 * messages to this owner by calling put(), and the owner receives them by
2007 * calling get(). The put() call is therefore effectively shared and the
2008 * get() call is effectively local. setMaxMsgs may be used by any thread
2009 * to limit the size of the message queue.
2010 */
2011 class MessageBox
2012 {
this()2013 this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
2014 {
2015 m_lock = new Mutex;
2016 m_closed = false;
2017
2018 if (scheduler is null)
2019 {
2020 m_putMsg = new Condition(m_lock);
2021 m_notFull = new Condition(m_lock);
2022 }
2023 else
2024 {
2025 m_putMsg = scheduler.newCondition(m_lock);
2026 m_notFull = scheduler.newCondition(m_lock);
2027 }
2028 }
2029
2030 ///
isClosed()2031 final @property bool isClosed() @safe @nogc pure
2032 {
2033 synchronized (m_lock)
2034 {
2035 return m_closed;
2036 }
2037 }
2038
2039 /*
2040 * Sets a limit on the maximum number of user messages allowed in the
2041 * mailbox. If this limit is reached, the caller attempting to add
2042 * a new message will execute call. If num is zero, there is no limit
2043 * on the message queue.
2044 *
2045 * Params:
2046 * num = The maximum size of the queue or zero if the queue is
2047 * unbounded.
2048 * call = The routine to call when the queue is full.
2049 */
setMaxMsgs(size_t num,bool function (Tid)call)2050 final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
2051 {
2052 synchronized (m_lock)
2053 {
2054 m_maxMsgs = num;
2055 m_onMaxMsgs = call;
2056 }
2057 }
2058
2059 /*
2060 * If maxMsgs is not set, the message is added to the queue and the
2061 * owner is notified. If the queue is full, the message will still be
2062 * accepted if it is a control message, otherwise onCrowdingDoThis is
2063 * called. If the routine returns true, this call will block until
2064 * the owner has made space available in the queue. If it returns
2065 * false, this call will abort.
2066 *
2067 * Params:
2068 * msg = The message to put in the queue.
2069 *
2070 * Throws:
2071 * An exception if the queue is full and onCrowdingDoThis throws.
2072 */
put(ref Message msg)2073 final void put(ref Message msg)
2074 {
2075 synchronized (m_lock)
2076 {
2077 // TODO: Generate an error here if m_closed is true, or maybe
2078 // put a message in the caller's queue?
2079 if (!m_closed)
2080 {
2081 while (true)
2082 {
2083 if (isPriorityMsg(msg))
2084 {
2085 m_sharedPty.put(msg);
2086 m_putMsg.notify();
2087 return;
2088 }
2089 if (!mboxFull() || isControlMsg(msg))
2090 {
2091 m_sharedBox.put(msg);
2092 m_putMsg.notify();
2093 return;
2094 }
2095 if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
2096 {
2097 return;
2098 }
2099 m_putQueue++;
2100 m_notFull.wait();
2101 m_putQueue--;
2102 }
2103 }
2104 }
2105 }
2106
2107 /*
2108 * Matches ops against each message in turn until a match is found.
2109 *
2110 * Params:
2111 * ops = The operations to match. Each may return a bool to indicate
2112 * whether a message with a matching type is truly a match.
2113 *
2114 * Returns:
2115 * true if a message was retrieved and false if not (such as if a
2116 * timeout occurred).
2117 *
2118 * Throws:
2119 * LinkTerminated if a linked thread terminated, or OwnerTerminated
2120 * if the owner thread terminates and no existing messages match the
2121 * supplied ops.
2122 */
get(T...)2123 bool get(T...)(scope T vals)
2124 {
2125 import std.meta : AliasSeq;
2126
2127 static assert(T.length, "T must not be empty");
2128
2129 static if (isImplicitlyConvertible!(T[0], Duration))
2130 {
2131 alias Ops = AliasSeq!(T[1 .. $]);
2132 alias ops = vals[1 .. $];
2133 enum timedWait = true;
2134 Duration period = vals[0];
2135 }
2136 else
2137 {
2138 alias Ops = AliasSeq!(T);
2139 alias ops = vals[0 .. $];
2140 enum timedWait = false;
2141 }
2142
2143 bool onStandardMsg(ref Message msg)
2144 {
2145 foreach (i, t; Ops)
2146 {
2147 alias Args = Parameters!(t);
2148 auto op = ops[i];
2149
2150 if (msg.convertsTo!(Args))
2151 {
2152 alias RT = ReturnType!(t);
2153 static if (is(RT == bool))
2154 {
2155 return msg.map(op);
2156 }
2157 else
2158 {
2159 msg.map(op);
2160 static if (!is(immutable RT == immutable noreturn))
2161 return true;
2162 }
2163 }
2164 }
2165 return false;
2166 }
2167
2168 bool onLinkDeadMsg(ref Message msg)
2169 {
2170 assert(msg.convertsTo!(Tid),
2171 "Message could be converted to Tid");
2172 auto tid = msg.get!(Tid);
2173
2174 if (bool* pDepends = tid in thisInfo.links)
2175 {
2176 auto depends = *pDepends;
2177 thisInfo.links.remove(tid);
2178 // Give the owner relationship precedence.
2179 if (depends && tid != thisInfo.owner)
2180 {
2181 auto e = new LinkTerminated(tid);
2182 auto m = Message(MsgType.standard, e);
2183 if (onStandardMsg(m))
2184 return true;
2185 throw e;
2186 }
2187 }
2188 if (tid == thisInfo.owner)
2189 {
2190 thisInfo.owner = Tid.init;
2191 auto e = new OwnerTerminated(tid);
2192 auto m = Message(MsgType.standard, e);
2193 if (onStandardMsg(m))
2194 return true;
2195 throw e;
2196 }
2197 return false;
2198 }
2199
2200 bool onControlMsg(ref Message msg)
2201 {
2202 switch (msg.type)
2203 {
2204 case MsgType.linkDead:
2205 return onLinkDeadMsg(msg);
2206 default:
2207 return false;
2208 }
2209 }
2210
2211 bool scan(ref ListT list)
2212 {
2213 for (auto range = list[]; !range.empty;)
2214 {
2215 // Only the message handler will throw, so if this occurs
2216 // we can be certain that the message was handled.
2217 scope (failure)
2218 list.removeAt(range);
2219
2220 if (isControlMsg(range.front))
2221 {
2222 if (onControlMsg(range.front))
2223 {
2224 // Although the linkDead message is a control message,
2225 // it can be handled by the user. Since the linkDead
2226 // message throws if not handled, if we get here then
2227 // it has been handled and we can return from receive.
2228 // This is a weird special case that will have to be
2229 // handled in a more general way if more are added.
2230 if (!isLinkDeadMsg(range.front))
2231 {
2232 list.removeAt(range);
2233 continue;
2234 }
2235 list.removeAt(range);
2236 return true;
2237 }
2238 range.popFront();
2239 continue;
2240 }
2241 else
2242 {
2243 if (onStandardMsg(range.front))
2244 {
2245 list.removeAt(range);
2246 return true;
2247 }
2248 range.popFront();
2249 continue;
2250 }
2251 }
2252 return false;
2253 }
2254
2255 bool pty(ref ListT list)
2256 {
2257 if (!list.empty)
2258 {
2259 auto range = list[];
2260
2261 if (onStandardMsg(range.front))
2262 {
2263 list.removeAt(range);
2264 return true;
2265 }
2266 if (range.front.convertsTo!(Throwable))
2267 throw range.front.get!(Throwable);
2268 else if (range.front.convertsTo!(shared(Throwable)))
2269 throw range.front.get!(shared(Throwable));
2270 else
2271 throw new PriorityMessageException(range.front.data);
2272 }
2273 return false;
2274 }
2275
2276 static if (timedWait)
2277 {
2278 import core.time : MonoTime;
2279 auto limit = MonoTime.currTime + period;
2280 }
2281
2282 while (true)
2283 {
2284 ListT arrived;
2285
2286 if (pty(m_localPty) || scan(m_localBox))
2287 {
2288 return true;
2289 }
2290 yield();
2291 synchronized (m_lock)
2292 {
2293 updateMsgCount();
2294 while (m_sharedPty.empty && m_sharedBox.empty)
2295 {
2296 // NOTE: We're notifying all waiters here instead of just
2297 // a few because the onCrowding behavior may have
2298 // changed and we don't want to block sender threads
2299 // unnecessarily if the new behavior is not to block.
2300 // This will admittedly result in spurious wakeups
2301 // in other situations, but what can you do?
2302 if (m_putQueue && !mboxFull())
2303 m_notFull.notifyAll();
2304 static if (timedWait)
2305 {
2306 if (period <= Duration.zero || !m_putMsg.wait(period))
2307 return false;
2308 }
2309 else
2310 {
2311 m_putMsg.wait();
2312 }
2313 }
2314 m_localPty.put(m_sharedPty);
2315 arrived.put(m_sharedBox);
2316 }
2317 if (m_localPty.empty)
2318 {
2319 scope (exit) m_localBox.put(arrived);
2320 if (scan(arrived))
2321 {
2322 return true;
2323 }
2324 else
2325 {
2326 static if (timedWait)
2327 {
2328 period = limit - MonoTime.currTime;
2329 }
2330 continue;
2331 }
2332 }
2333 m_localBox.put(arrived);
2334 pty(m_localPty);
2335 return true;
2336 }
2337 }
2338
2339 /*
2340 * Called on thread termination. This routine processes any remaining
2341 * control messages, clears out message queues, and sets a flag to
2342 * reject any future messages.
2343 */
close()2344 final void close()
2345 {
2346 static void onLinkDeadMsg(ref Message msg)
2347 {
2348 assert(msg.convertsTo!(Tid),
2349 "Message could be converted to Tid");
2350 auto tid = msg.get!(Tid);
2351
2352 thisInfo.links.remove(tid);
2353 if (tid == thisInfo.owner)
2354 thisInfo.owner = Tid.init;
2355 }
2356
2357 static void sweep(ref ListT list)
2358 {
2359 for (auto range = list[]; !range.empty; range.popFront())
2360 {
2361 if (range.front.type == MsgType.linkDead)
2362 onLinkDeadMsg(range.front);
2363 }
2364 }
2365
2366 ListT arrived;
2367
2368 sweep(m_localBox);
2369 synchronized (m_lock)
2370 {
2371 arrived.put(m_sharedBox);
2372 m_closed = true;
2373 }
2374 m_localBox.clear();
2375 sweep(arrived);
2376 }
2377
2378 private:
2379 // Routines involving local data only, no lock needed.
2380
mboxFull()2381 bool mboxFull() @safe @nogc pure nothrow
2382 {
2383 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2384 }
2385
updateMsgCount()2386 void updateMsgCount() @safe @nogc pure nothrow
2387 {
2388 m_localMsgs = m_localBox.length;
2389 }
2390
isControlMsg(ref Message msg)2391 bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2392 {
2393 return msg.type != MsgType.standard && msg.type != MsgType.priority;
2394 }
2395
isPriorityMsg(ref Message msg)2396 bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2397 {
2398 return msg.type == MsgType.priority;
2399 }
2400
isLinkDeadMsg(ref Message msg)2401 bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2402 {
2403 return msg.type == MsgType.linkDead;
2404 }
2405
2406 alias OnMaxFn = bool function(Tid);
2407 alias ListT = List!(Message);
2408
2409 ListT m_localBox;
2410 ListT m_localPty;
2411
2412 Mutex m_lock;
2413 Condition m_putMsg;
2414 Condition m_notFull;
2415 size_t m_putQueue;
2416 ListT m_sharedBox;
2417 ListT m_sharedPty;
2418 OnMaxFn m_onMaxMsgs;
2419 size_t m_localMsgs;
2420 size_t m_maxMsgs;
2421 bool m_closed;
2422 }
2423
2424 /*
2425 *
2426 */
List(T)2427 struct List(T)
2428 {
2429 struct Range
2430 {
2431 import std.exception : enforce;
2432
2433 @property bool empty() const
2434 {
2435 return !m_prev.next;
2436 }
2437
2438 @property ref T front()
2439 {
2440 enforce(m_prev.next, "invalid list node");
2441 return m_prev.next.val;
2442 }
2443
2444 @property void front(T val)
2445 {
2446 enforce(m_prev.next, "invalid list node");
2447 m_prev.next.val = val;
2448 }
2449
2450 void popFront()
2451 {
2452 enforce(m_prev.next, "invalid list node");
2453 m_prev = m_prev.next;
2454 }
2455
2456 private this(Node* p)
2457 {
2458 m_prev = p;
2459 }
2460
2461 private Node* m_prev;
2462 }
2463
2464 void put(T val)
2465 {
2466 put(newNode(val));
2467 }
2468
2469 void put(ref List!(T) rhs)
2470 {
2471 if (!rhs.empty)
2472 {
2473 put(rhs.m_first);
2474 while (m_last.next !is null)
2475 {
2476 m_last = m_last.next;
2477 m_count++;
2478 }
2479 rhs.m_first = null;
2480 rhs.m_last = null;
2481 rhs.m_count = 0;
2482 }
2483 }
2484
2485 Range opSlice()
2486 {
2487 return Range(cast(Node*)&m_first);
2488 }
2489
2490 void removeAt(Range r)
2491 {
2492 import std.exception : enforce;
2493
2494 assert(m_count, "Can not remove from empty Range");
2495 Node* n = r.m_prev;
2496 enforce(n && n.next, "attempting to remove invalid list node");
2497
2498 if (m_last is m_first)
2499 m_last = null;
2500 else if (m_last is n.next)
2501 m_last = n; // nocoverage
2502 Node* to_free = n.next;
2503 n.next = n.next.next;
2504 freeNode(to_free);
2505 m_count--;
2506 }
2507
2508 @property size_t length()
2509 {
2510 return m_count;
2511 }
2512
2513 void clear()
2514 {
2515 m_first = m_last = null;
2516 m_count = 0;
2517 }
2518
2519 @property bool empty()
2520 {
2521 return m_first is null;
2522 }
2523
2524 private:
2525 struct Node
2526 {
2527 Node* next;
2528 T val;
2529
2530 this(T v)
2531 {
2532 val = v;
2533 }
2534 }
2535
2536 static shared struct SpinLock
2537 {
2538 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2539 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2540 bool locked;
2541 }
2542
2543 static shared SpinLock sm_lock;
2544 static shared Node* sm_head;
2545
2546 Node* newNode(T v)
2547 {
2548 Node* n;
2549 {
2550 sm_lock.lock();
2551 scope (exit) sm_lock.unlock();
2552
2553 if (sm_head)
2554 {
2555 n = cast(Node*) sm_head;
2556 sm_head = sm_head.next;
2557 }
2558 }
2559 if (n)
2560 {
2561 import core.lifetime : emplace;
2562 emplace!Node(n, v);
2563 }
2564 else
2565 {
2566 n = new Node(v);
2567 }
2568 return n;
2569 }
2570
2571 void freeNode(Node* n)
2572 {
2573 // destroy val to free any owned GC memory
2574 destroy(n.val);
2575
2576 sm_lock.lock();
2577 scope (exit) sm_lock.unlock();
2578
2579 auto sn = cast(shared(Node)*) n;
2580 sn.next = sm_head;
2581 sm_head = sn;
2582 }
2583
2584 void put(Node* n)
2585 {
2586 m_count++;
2587 if (!empty)
2588 {
2589 m_last.next = n;
2590 m_last = n;
2591 return;
2592 }
2593 m_first = n;
2594 m_last = n;
2595 }
2596
2597 Node* m_first;
2598 Node* m_last;
2599 size_t m_count;
2600 }
2601 }
2602
2603 @system unittest
2604 {
2605 import std.typecons : tuple, Tuple;
2606
testfn(Tid tid)2607 static void testfn(Tid tid)
2608 {
2609 receive((float val) { assert(0); }, (int val, int val2) {
2610 assert(val == 42 && val2 == 86);
2611 });
2612 receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2613 receive((Variant val) { });
2614 receive((string val) {
2615 if ("the quick brown fox" != val)
2616 return false;
2617 return true;
2618 }, (string val) { assert(false); });
2619 prioritySend(tid, "done");
2620 }
2621
runTest(Tid tid)2622 static void runTest(Tid tid)
2623 {
2624 send(tid, 42, 86);
2625 send(tid, tuple(42, 86));
2626 send(tid, "hello", "there");
2627 send(tid, "the quick brown fox");
2628 receive((string val) { assert(val == "done"); });
2629 }
2630
simpleTest()2631 static void simpleTest()
2632 {
2633 auto tid = spawn(&testfn, thisTid);
2634 runTest(tid);
2635
2636 // Run the test again with a limited mailbox size.
2637 tid = spawn(&testfn, thisTid);
2638 setMaxMailboxSize(tid, 2, OnCrowding.block);
2639 runTest(tid);
2640 }
2641
2642 simpleTest();
2643
2644 scheduler = new ThreadScheduler;
2645 simpleTest();
2646 scheduler = null;
2647 }
2648
shared(Mutex)2649 private @property shared(Mutex) initOnceLock()
2650 {
2651 static shared Mutex lock;
2652 if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock))
2653 return mtx;
2654 auto mtx = new shared Mutex;
2655 if (cas(&lock, cast(shared) null, mtx))
2656 return mtx;
2657 return atomicLoad!(MemoryOrder.acq)(lock);
2658 }
2659
2660 /**
2661 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2662 * thread-safe manner.
2663 *
2664 * The implementation guarantees that all threads simultaneously calling
2665 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2666 * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2667 * afterwards.
2668 *
2669 * Params:
2670 * var = The variable to initialize
2671 * init = The lazy initializer value
2672 *
2673 * Returns:
2674 * A reference to the initialized variable
2675 */
initOnce(alias var)2676 auto ref initOnce(alias var)(lazy typeof(var) init)
2677 {
2678 return initOnce!var(init, initOnceLock);
2679 }
2680
2681 /// A typical use-case is to perform lazy but thread-safe initialization.
2682 @system unittest
2683 {
2684 static class MySingleton
2685 {
instance()2686 static MySingleton instance()
2687 {
2688 __gshared MySingleton inst;
2689 return initOnce!inst(new MySingleton);
2690 }
2691 }
2692
2693 assert(MySingleton.instance !is null);
2694 }
2695
2696 @system unittest
2697 {
2698 static class MySingleton
2699 {
instance()2700 static MySingleton instance()
2701 {
2702 __gshared MySingleton inst;
2703 return initOnce!inst(new MySingleton);
2704 }
2705
2706 private:
this()2707 this() { val = ++cnt; }
2708 size_t val;
2709 __gshared size_t cnt;
2710 }
2711
2712 foreach (_; 0 .. 10)
2713 spawn({ ownerTid.send(MySingleton.instance.val); });
2714 foreach (_; 0 .. 10)
2715 assert(receiveOnly!size_t == MySingleton.instance.val);
2716 assert(MySingleton.cnt == 1);
2717 }
2718
2719 /**
2720 * Same as above, but takes a separate mutex instead of sharing one among
2721 * all initOnce instances.
2722 *
2723 * This should be used to avoid dead-locks when the $(D_PARAM init)
2724 * expression waits for the result of another thread that might also
2725 * call initOnce. Use with care.
2726 *
2727 * Params:
2728 * var = The variable to initialize
2729 * init = The lazy initializer value
2730 * mutex = A mutex to prevent race conditions
2731 *
2732 * Returns:
2733 * A reference to the initialized variable
2734 */
initOnce(alias var)2735 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex)
2736 {
2737 // check that var is global, can't take address of a TLS variable
2738 static assert(is(typeof({ __gshared p = &var; })),
2739 "var must be 'static shared' or '__gshared'.");
2740 import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2741
2742 static shared bool flag;
2743 if (!atomicLoad!(MemoryOrder.acq)(flag))
2744 {
2745 synchronized (mutex)
2746 {
2747 if (!atomicLoad!(MemoryOrder.raw)(flag))
2748 {
2749 var = init;
2750 static if (!is(immutable typeof(var) == immutable noreturn))
2751 atomicStore!(MemoryOrder.rel)(flag, true);
2752 }
2753 }
2754 }
2755 return var;
2756 }
2757
2758 /// ditto
initOnce(alias var)2759 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2760 {
2761 return initOnce!var(init, cast(shared) mutex);
2762 }
2763
2764 /// Use a separate mutex when init blocks on another thread that might also call initOnce.
2765 @system unittest
2766 {
2767 import core.sync.mutex : Mutex;
2768
2769 static shared bool varA, varB;
2770 static shared Mutex m;
2771 m = new shared Mutex;
2772
2773 spawn({
2774 // use a different mutex for varB to avoid a dead-lock
2775 initOnce!varB(true, m);
2776 ownerTid.send(true);
2777 });
2778 // init depends on the result of the spawned thread
2779 initOnce!varA(receiveOnly!bool);
2780 assert(varA == true);
2781 assert(varB == true);
2782 }
2783
2784 @system unittest
2785 {
2786 static shared bool a;
2787 __gshared bool b;
2788 static bool c;
2789 bool d;
2790 initOnce!a(true);
2791 initOnce!b(true);
2792 static assert(!__traits(compiles, initOnce!c(true))); // TLS
2793 static assert(!__traits(compiles, initOnce!d(true))); // local variable
2794 }
2795
2796 // test ability to send shared arrays
2797 @system unittest
2798 {
2799 static shared int[] x = new shared(int)[1];
2800 auto tid = spawn({
2801 auto arr = receiveOnly!(shared(int)[]);
2802 arr[0] = 5;
2803 ownerTid.send(true);
2804 });
2805 tid.send(x);
2806 receiveOnly!(bool);
2807 assert(x[0] == 5);
2808 }
2809
2810 // https://issues.dlang.org/show_bug.cgi?id=13930
2811 @system unittest
2812 {
2813 immutable aa = ["0":0];
2814 thisTid.send(aa);
2815 receiveOnly!(immutable int[string]); // compile error
2816 }
2817
2818 // https://issues.dlang.org/show_bug.cgi?id=19345
2819 @system unittest
2820 {
2821 static struct Aggregate { const int a; const int[5] b; }
t1(Tid mainTid)2822 static void t1(Tid mainTid)
2823 {
2824 const sendMe = Aggregate(42, [1, 2, 3, 4, 5]);
2825 mainTid.send(sendMe);
2826 }
2827
2828 spawn(&t1, thisTid);
2829 auto result1 = receiveOnly!(const Aggregate)();
2830 immutable expected = Aggregate(42, [1, 2, 3, 4, 5]);
2831 assert(result1 == expected);
2832 }
2833
2834 // Noreturn support
2835 @system unittest
2836 {
foo(int)2837 static noreturn foo(int) { throw new Exception(""); }
2838
2839 if (false) spawn(&foo, 1);
2840 if (false) spawnLinked(&foo, 1);
2841
2842 if (false) receive(&foo);
2843 if (false) receiveTimeout(Duration.init, &foo);
2844
2845 // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend
2846 static assert(__traits(compiles, receiveOnly!noreturn() ));
2847 static assert(__traits(compiles, send(Tid.init, noreturn.init) ));
2848 static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init) ));
2849 static assert(__traits(compiles, yield(noreturn.init) ));
2850
2851 static assert(__traits(compiles, {
2852 __gshared noreturn n;
2853 initOnce!n(noreturn.init);
2854 }));
2855 }
2856