xref: /netbsd-src/external/gpl3/gcc/dist/libphobos/libdruntime/core/sync/condition.d (revision 0a3071956a3a9fdebdbf7f338cf2d439b45fc728)
1 /**
2  * The condition module provides a primitive for synchronized condition
3  * checking.
4  *
5  * Copyright: Copyright Sean Kelly 2005 - 2009.
6  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7  * Authors:   Sean Kelly
8  * Source:    $(DRUNTIMESRC core/sync/_condition.d)
9  */
10 
11 /*          Copyright Sean Kelly 2005 - 2009.
12  * Distributed under the Boost Software License, Version 1.0.
13  *    (See accompanying file LICENSE or copy at
14  *          http://www.boost.org/LICENSE_1_0.txt)
15  */
16 module core.sync.condition;
17 
18 
19 public import core.sync.exception;
20 public import core.sync.mutex;
21 public import core.time;
22 
version(Windows)23 version (Windows)
24 {
25     import core.sync.semaphore;
26     import core.sys.windows.basetsd /+: HANDLE+/;
27     import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
28         DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection,
29         LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
30     import core.sys.windows.windef /+: BOOL, DWORD+/;
31     import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
32 }
version(Posix)33 else version (Posix)
34 {
35     import core.sync.config;
36     import core.stdc.errno;
37     import core.sys.posix.pthread;
38     import core.sys.posix.time;
39 }
40 else
41 {
42     static assert(false, "Platform not supported");
43 }
44 
45 
46 ////////////////////////////////////////////////////////////////////////////////
47 // Condition
48 //
49 // void wait();
50 // void notify();
51 // void notifyAll();
52 ////////////////////////////////////////////////////////////////////////////////
53 
54 
55 /**
56  * This class represents a condition variable as conceived by C.A.R. Hoare.  As
57  * per Mesa type monitors however, "signal" has been replaced with "notify" to
58  * indicate that control is not transferred to the waiter when a notification
59  * is sent.
60  */
61 class Condition
62 {
63     ////////////////////////////////////////////////////////////////////////////
64     // Initialization
65     ////////////////////////////////////////////////////////////////////////////
66 
67     /**
68      * Initializes a condition object which is associated with the supplied
69      * mutex object.
70      *
71      * Params:
72      *  m = The mutex with which this condition will be associated.
73      *
74      * Throws:
75      *  SyncError on error.
76      */
this(Mutex m)77     this( Mutex m ) nothrow @safe
78     {
79         this(m, true);
80     }
81 
82     /// ditto
this(shared Mutex m)83     this( shared Mutex m ) shared nothrow @safe
84     {
85         this(m, true);
86     }
87 
88     //
89     private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted
90         if ((is(Q == Condition) && is(M == Mutex)) ||
91             (is(Q == shared Condition) && is(M == shared Mutex)))
92     {
version(Windows)93         version (Windows)
94         {
95             static if (is(Q == Condition))
96             {
97                 alias HANDLE_TYPE = void*;
98             }
99             else
100             {
101                 alias HANDLE_TYPE = shared(void*);
102             }
103             m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null );
104             if ( m_blockLock == m_blockLock.init )
105                 throw new SyncError( "Unable to initialize condition" );
106             scope(failure) CloseHandle( cast(void*) m_blockLock );
107 
108             m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null );
109             if ( m_blockQueue == m_blockQueue.init )
110                 throw new SyncError( "Unable to initialize condition" );
111             scope(failure) CloseHandle( cast(void*) m_blockQueue );
112 
113             InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock );
114             m_assocMutex = m;
115         }
version(Posix)116         else version (Posix)
117         {
118             m_assocMutex = m;
119             static if ( is( typeof( pthread_condattr_setclock ) ) )
120             {
121                 () @trusted
122                 {
123                     pthread_condattr_t attr = void;
124                     int rc  = pthread_condattr_init( &attr );
125                     if ( rc )
126                         throw new SyncError( "Unable to initialize condition" );
127                     rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC );
128                     if ( rc )
129                         throw new SyncError( "Unable to initialize condition" );
130                     rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr );
131                     if ( rc )
132                         throw new SyncError( "Unable to initialize condition" );
133                     rc = pthread_condattr_destroy( &attr );
134                     if ( rc )
135                         throw new SyncError( "Unable to initialize condition" );
136                 } ();
137             }
138             else
139             {
140                 int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null );
141                 if ( rc )
142                     throw new SyncError( "Unable to initialize condition" );
143             }
144         }
145     }
146 
147 
~this()148     ~this()
149     {
150         version (Windows)
151         {
152             BOOL rc = CloseHandle( m_blockLock );
153             assert( rc, "Unable to destroy condition" );
154             rc = CloseHandle( m_blockQueue );
155             assert( rc, "Unable to destroy condition" );
156             DeleteCriticalSection( &m_unblockLock );
157         }
158         else version (Posix)
159         {
160             int rc = pthread_cond_destroy( &m_hndl );
161             assert( !rc, "Unable to destroy condition" );
162         }
163     }
164 
165 
166     ////////////////////////////////////////////////////////////////////////////
167     // General Properties
168     ////////////////////////////////////////////////////////////////////////////
169 
170 
171     /**
172      * Gets the mutex associated with this condition.
173      *
174      * Returns:
175      *  The mutex associated with this condition.
176      */
mutex()177     @property Mutex mutex()
178     {
179         return m_assocMutex;
180     }
181 
182     /// ditto
shared(Mutex)183     @property shared(Mutex) mutex() shared
184     {
185         return m_assocMutex;
186     }
187 
188     // undocumented function for internal use
mutex_nothrow()189     final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
190     {
191         return m_assocMutex;
192     }
193 
194     // ditto
shared(Mutex)195     final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc
196     {
197         return m_assocMutex;
198     }
199 
200     ////////////////////////////////////////////////////////////////////////////
201     // General Actions
202     ////////////////////////////////////////////////////////////////////////////
203 
204 
205     /**
206      * Wait until notified.
207      *
208      * Throws:
209      *  SyncError on error.
210      */
wait()211     void wait()
212     {
213         wait!(typeof(this))(true);
214     }
215 
216     /// ditto
wait()217     void wait() shared
218     {
219         wait!(typeof(this))(true);
220     }
221 
222     /// ditto
223     void wait(this Q)( bool _unused_ )
224         if (is(Q == Condition) || is(Q == shared Condition))
225     {
version(Windows)226         version (Windows)
227         {
228             timedWait( INFINITE );
229         }
version(Posix)230         else version (Posix)
231         {
232             int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() );
233             if ( rc )
234                 throw new SyncError( "Unable to wait for condition" );
235         }
236     }
237 
238     /**
239      * Suspends the calling thread until a notification occurs or until the
240      * supplied time period has elapsed.
241      *
242      * Params:
243      *  val = The time to wait.
244      *
245      * In:
246      *  val must be non-negative.
247      *
248      * Throws:
249      *  SyncError on error.
250      *
251      * Returns:
252      *  true if notified before the timeout and false if not.
253      */
wait(Duration val)254     bool wait( Duration val )
255     {
256         return wait!(typeof(this))(val, true);
257     }
258 
259     /// ditto
wait(Duration val)260     bool wait( Duration val ) shared
261     {
262         return wait!(typeof(this))(val, true);
263     }
264 
265     /// ditto
266     bool wait(this Q)( Duration val, bool _unused_ )
267         if (is(Q == Condition) || is(Q == shared Condition))
268     in
269     {
270         assert( !val.isNegative );
271     }
272     do
273     {
version(Windows)274         version (Windows)
275         {
276             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
277 
278             while ( val > maxWaitMillis )
279             {
280                 if ( timedWait( cast(uint)
281                                maxWaitMillis.total!"msecs" ) )
282                     return true;
283                 val -= maxWaitMillis;
284             }
285             return timedWait( cast(uint) val.total!"msecs" );
286         }
version(Posix)287         else version (Posix)
288         {
289             timespec t = void;
290             mktspec( t, val );
291 
292             int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl,
293                                              (cast(Mutex) m_assocMutex).handleAddr(),
294                                              &t );
295             if ( !rc )
296                 return true;
297             if ( rc == ETIMEDOUT )
298                 return false;
299             throw new SyncError( "Unable to wait for condition" );
300         }
301     }
302 
303     /**
304      * Notifies one waiter.
305      *
306      * Throws:
307      *  SyncError on error.
308      */
notify()309     void notify()
310     {
311         notify!(typeof(this))(true);
312     }
313 
314     /// ditto
notify()315     void notify() shared
316     {
317         notify!(typeof(this))(true);
318     }
319 
320     /// ditto
321     void notify(this Q)( bool _unused_ )
322         if (is(Q == Condition) || is(Q == shared Condition))
323     {
version(Windows)324         version (Windows)
325         {
326             notify_( false );
327         }
version(Posix)328         else version (Posix)
329         {
330             // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times,
331             // so need to retrying while it returns EAGAIN.
332             //
333             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
334             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
335             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
336             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
337             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
338             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
339             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
340             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
341 
342             int rc;
343             do {
344                 rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl );
345             } while ( rc == EAGAIN );
346             if ( rc )
347                 throw new SyncError( "Unable to notify condition" );
348         }
349     }
350 
351     /**
352      * Notifies all waiters.
353      *
354      * Throws:
355      *  SyncError on error.
356      */
notifyAll()357     void notifyAll()
358     {
359         notifyAll!(typeof(this))(true);
360     }
361 
362     /// ditto
notifyAll()363     void notifyAll() shared
364     {
365         notifyAll!(typeof(this))(true);
366     }
367 
368     /// ditto
369     void notifyAll(this Q)( bool _unused_ )
370         if (is(Q == Condition) || is(Q == shared Condition))
371     {
version(Windows)372         version (Windows)
373         {
374             notify_( true );
375         }
version(Posix)376         else version (Posix)
377         {
378             // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times,
379             // so need to retrying while it returns EAGAIN.
380             //
381             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
382             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
383             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
384             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
385             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
386             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
387             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
388             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
389 
390             int rc;
391             do {
392                 rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl );
393             } while ( rc == EAGAIN );
394             if ( rc )
395                 throw new SyncError( "Unable to notify condition" );
396         }
397     }
398 
399 private:
version(Windows)400     version (Windows)
401     {
402         bool timedWait(this Q)( DWORD timeout )
403             if (is(Q == Condition) || is(Q == shared Condition))
404         {
405             static if (is(Q == Condition))
406             {
407                 auto op(string o, T, V1)(ref T val, V1 mod)
408                 {
409                     return mixin("val " ~ o ~ "mod");
410                 }
411             }
412             else
413             {
414                 auto op(string o, T, V1)(ref shared T val, V1 mod)
415                 {
416                     import core.atomic: atomicOp;
417                     return atomicOp!o(val, mod);
418                 }
419             }
420 
421             int   numSignalsLeft;
422             int   numWaitersGone;
423             DWORD rc;
424 
425             rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
426             assert( rc == WAIT_OBJECT_0 );
427 
428             op!"+="(m_numWaitersBlocked, 1);
429 
430             rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
431             assert( rc );
432 
433             m_assocMutex.unlock();
434             scope(failure) m_assocMutex.lock();
435 
436             rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout );
437             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
438             bool timedOut = (rc == WAIT_TIMEOUT);
439 
440             EnterCriticalSection( &m_unblockLock );
441             scope(failure) LeaveCriticalSection( &m_unblockLock );
442 
443             if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
444             {
445                 if ( timedOut )
446                 {
447                     // timeout (or canceled)
448                     if ( m_numWaitersBlocked != 0 )
449                     {
450                         op!"-="(m_numWaitersBlocked, 1);
451                         // do not unblock next waiter below (already unblocked)
452                         numSignalsLeft = 0;
453                     }
454                     else
455                     {
456                         // spurious wakeup pending!!
457                         m_numWaitersGone = 1;
458                     }
459                 }
460                 if ( op!"-="(m_numWaitersToUnblock, 1) == 0 )
461                 {
462                     if ( m_numWaitersBlocked != 0 )
463                     {
464                         // open the gate
465                         rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
466                         assert( rc );
467                         // do not open the gate below again
468                         numSignalsLeft = 0;
469                     }
470                     else if ( (numWaitersGone = m_numWaitersGone) != 0 )
471                     {
472                         m_numWaitersGone = 0;
473                     }
474                 }
475             }
476             else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 )
477             {
478                 // timeout/canceled or spurious event :-)
479                 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
480                 assert( rc == WAIT_OBJECT_0 );
481                 // something is going on here - test of timeouts?
482                 op!"-="(m_numWaitersBlocked, m_numWaitersGone);
483                 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
484                 assert( rc == WAIT_OBJECT_0 );
485                 m_numWaitersGone = 0;
486             }
487 
488             LeaveCriticalSection( &m_unblockLock );
489 
490             if ( numSignalsLeft == 1 )
491             {
492                 // better now than spurious later (same as ResetEvent)
493                 for ( ; numWaitersGone > 0; --numWaitersGone )
494                 {
495                     rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE );
496                     assert( rc == WAIT_OBJECT_0 );
497                 }
498                 // open the gate
499                 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
500                 assert( rc );
501             }
502             else if ( numSignalsLeft != 0 )
503             {
504                 // unblock next waiter
505                 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
506                 assert( rc );
507             }
508             m_assocMutex.lock();
509             return !timedOut;
510         }
511 
512 
513         void notify_(this Q)( bool all )
514             if (is(Q == Condition) || is(Q == shared Condition))
515         {
516             static if (is(Q == Condition))
517             {
518                 auto op(string o, T, V1)(ref T val, V1 mod)
519                 {
520                     return mixin("val " ~ o ~ "mod");
521                 }
522             }
523             else
524             {
525                 auto op(string o, T, V1)(ref shared T val, V1 mod)
526                 {
527                     import core.atomic: atomicOp;
528                     return atomicOp!o(val, mod);
529                 }
530             }
531 
532             DWORD rc;
533 
534             EnterCriticalSection( &m_unblockLock );
535             scope(failure) LeaveCriticalSection( &m_unblockLock );
536 
537             if ( m_numWaitersToUnblock != 0 )
538             {
539                 if ( m_numWaitersBlocked == 0 )
540                 {
541                     LeaveCriticalSection( &m_unblockLock );
542                     return;
543                 }
544                 if ( all )
545                 {
546                     op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked);
547                     m_numWaitersBlocked = 0;
548                 }
549                 else
550                 {
551                     op!"+="(m_numWaitersToUnblock, 1);
552                     op!"-="(m_numWaitersBlocked, 1);
553                 }
554                 LeaveCriticalSection( &m_unblockLock );
555             }
556             else if ( m_numWaitersBlocked > m_numWaitersGone )
557             {
558                 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
559                 assert( rc == WAIT_OBJECT_0 );
560                 if ( 0 != m_numWaitersGone )
561                 {
562                     op!"-="(m_numWaitersBlocked, m_numWaitersGone);
563                     m_numWaitersGone = 0;
564                 }
565                 if ( all )
566                 {
567                     m_numWaitersToUnblock = m_numWaitersBlocked;
568                     m_numWaitersBlocked = 0;
569                 }
570                 else
571                 {
572                     m_numWaitersToUnblock = 1;
573                     op!"-="(m_numWaitersBlocked, 1);
574                 }
575                 LeaveCriticalSection( &m_unblockLock );
576                 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
577                 assert( rc );
578             }
579             else
580             {
581                 LeaveCriticalSection( &m_unblockLock );
582             }
583         }
584 
585 
586         // NOTE: This implementation uses Algorithm 8c as described here:
587         //       http://groups.google.com/group/comp.programming.threads/
588         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
589         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
590         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
591         Mutex               m_assocMutex;   // external mutex/CS
592         CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
593         int                 m_numWaitersGone        = 0;
594         int                 m_numWaitersBlocked     = 0;
595         int                 m_numWaitersToUnblock   = 0;
596     }
version(Posix)597     else version (Posix)
598     {
599         Mutex               m_assocMutex;
600         pthread_cond_t      m_hndl;
601     }
602 }
603 
604 
605 ////////////////////////////////////////////////////////////////////////////////
606 // Unit Tests
607 ////////////////////////////////////////////////////////////////////////////////
608 
609 unittest
610 {
611     import core.thread;
612     import core.sync.mutex;
613     import core.sync.semaphore;
614 
615 
testNotify()616     void testNotify()
617     {
618         auto mutex      = new Mutex;
619         auto condReady  = new Condition( mutex );
620         auto semDone    = new Semaphore;
621         auto synLoop    = new Object;
622         int  numWaiters = 10;
623         int  numTries   = 10;
624         int  numReady   = 0;
625         int  numTotal   = 0;
626         int  numDone    = 0;
627         int  numPost    = 0;
628 
629         void waiter()
630         {
631             for ( int i = 0; i < numTries; ++i )
632             {
633                 synchronized( mutex )
634                 {
635                     while ( numReady < 1 )
636                     {
637                         condReady.wait();
638                     }
639                     --numReady;
640                     ++numTotal;
641                 }
642 
643                 synchronized( synLoop )
644                 {
645                     ++numDone;
646                 }
647                 semDone.wait();
648             }
649         }
650 
651         auto group = new ThreadGroup;
652 
653         for ( int i = 0; i < numWaiters; ++i )
654             group.create( &waiter );
655 
656         for ( int i = 0; i < numTries; ++i )
657         {
658             for ( int j = 0; j < numWaiters; ++j )
659             {
660                 synchronized( mutex )
661                 {
662                     ++numReady;
663                     condReady.notify();
664                 }
665             }
666             while ( true )
667             {
668                 synchronized( synLoop )
669                 {
670                     if ( numDone >= numWaiters )
671                         break;
672                 }
673                 Thread.yield();
674             }
675             for ( int j = 0; j < numWaiters; ++j )
676             {
677                 semDone.notify();
678             }
679         }
680 
681         group.joinAll();
682         assert( numTotal == numWaiters * numTries );
683     }
684 
685 
testNotifyAll()686     void testNotifyAll()
687     {
688         auto mutex      = new Mutex;
689         auto condReady  = new Condition( mutex );
690         int  numWaiters = 10;
691         int  numReady   = 0;
692         int  numDone    = 0;
693         bool alert      = false;
694 
695         void waiter()
696         {
697             synchronized( mutex )
698             {
699                 ++numReady;
700                 while ( !alert )
701                     condReady.wait();
702                 ++numDone;
703             }
704         }
705 
706         auto group = new ThreadGroup;
707 
708         for ( int i = 0; i < numWaiters; ++i )
709             group.create( &waiter );
710 
711         while ( true )
712         {
713             synchronized( mutex )
714             {
715                 if ( numReady >= numWaiters )
716                 {
717                     alert = true;
718                     condReady.notifyAll();
719                     break;
720                 }
721             }
722             Thread.yield();
723         }
724         group.joinAll();
725         assert( numReady == numWaiters && numDone == numWaiters );
726     }
727 
728 
testWaitTimeout()729     void testWaitTimeout()
730     {
731         auto mutex      = new Mutex;
732         auto condReady  = new Condition( mutex );
733         bool waiting    = false;
734         bool alertedOne = true;
735         bool alertedTwo = true;
736 
737         void waiter()
738         {
739             synchronized( mutex )
740             {
741                 waiting    = true;
742                 // we never want to miss the notification (30s)
743                 alertedOne = condReady.wait( dur!"seconds"(30) );
744                 // but we don't want to wait long for the timeout (10ms)
745                 alertedTwo = condReady.wait( dur!"msecs"(10) );
746             }
747         }
748 
749         auto thread = new Thread( &waiter );
750         thread.start();
751 
752         while ( true )
753         {
754             synchronized( mutex )
755             {
756                 if ( waiting )
757                 {
758                     condReady.notify();
759                     break;
760                 }
761             }
762             Thread.yield();
763         }
764         thread.join();
765         assert( waiting );
766         assert( alertedOne );
767         assert( !alertedTwo );
768     }
769 
770     testNotify();
771     testNotifyAll();
772     testWaitTimeout();
773 }
774 
775 unittest
776 {
777     import core.thread;
778     import core.sync.mutex;
779     import core.sync.semaphore;
780 
781 
testNotify()782     void testNotify()
783     {
784         auto mutex      = new shared Mutex;
785         auto condReady  = new shared Condition( mutex );
786         auto semDone    = new Semaphore;
787         auto synLoop    = new Object;
788         int  numWaiters = 10;
789         int  numTries   = 10;
790         int  numReady   = 0;
791         int  numTotal   = 0;
792         int  numDone    = 0;
793         int  numPost    = 0;
794 
795         void waiter()
796         {
797             for ( int i = 0; i < numTries; ++i )
798             {
799                 synchronized( mutex )
800                 {
801                     while ( numReady < 1 )
802                     {
803                         condReady.wait();
804                     }
805                     --numReady;
806                     ++numTotal;
807                 }
808 
809                 synchronized( synLoop )
810                 {
811                     ++numDone;
812                 }
813                 semDone.wait();
814             }
815         }
816 
817         auto group = new ThreadGroup;
818 
819         for ( int i = 0; i < numWaiters; ++i )
820             group.create( &waiter );
821 
822         for ( int i = 0; i < numTries; ++i )
823         {
824             for ( int j = 0; j < numWaiters; ++j )
825             {
826                 synchronized( mutex )
827                 {
828                     ++numReady;
829                     condReady.notify();
830                 }
831             }
832             while ( true )
833             {
834                 synchronized( synLoop )
835                 {
836                     if ( numDone >= numWaiters )
837                         break;
838                 }
839                 Thread.yield();
840             }
841             for ( int j = 0; j < numWaiters; ++j )
842             {
843                 semDone.notify();
844             }
845         }
846 
847         group.joinAll();
848         assert( numTotal == numWaiters * numTries );
849     }
850 
851 
testNotifyAll()852     void testNotifyAll()
853     {
854         auto mutex      = new shared Mutex;
855         auto condReady  = new shared Condition( mutex );
856         int  numWaiters = 10;
857         int  numReady   = 0;
858         int  numDone    = 0;
859         bool alert      = false;
860 
861         void waiter()
862         {
863             synchronized( mutex )
864             {
865                 ++numReady;
866                 while ( !alert )
867                     condReady.wait();
868                 ++numDone;
869             }
870         }
871 
872         auto group = new ThreadGroup;
873 
874         for ( int i = 0; i < numWaiters; ++i )
875             group.create( &waiter );
876 
877         while ( true )
878         {
879             synchronized( mutex )
880             {
881                 if ( numReady >= numWaiters )
882                 {
883                     alert = true;
884                     condReady.notifyAll();
885                     break;
886                 }
887             }
888             Thread.yield();
889         }
890         group.joinAll();
891         assert( numReady == numWaiters && numDone == numWaiters );
892     }
893 
894 
testWaitTimeout()895     void testWaitTimeout()
896     {
897         auto mutex      = new shared Mutex;
898         auto condReady  = new shared Condition( mutex );
899         bool waiting    = false;
900         bool alertedOne = true;
901         bool alertedTwo = true;
902 
903         void waiter()
904         {
905             synchronized( mutex )
906             {
907                 waiting    = true;
908                 // we never want to miss the notification (30s)
909                 alertedOne = condReady.wait( dur!"seconds"(30) );
910                 // but we don't want to wait long for the timeout (10ms)
911                 alertedTwo = condReady.wait( dur!"msecs"(10) );
912             }
913         }
914 
915         auto thread = new Thread( &waiter );
916         thread.start();
917 
918         while ( true )
919         {
920             synchronized( mutex )
921             {
922                 if ( waiting )
923                 {
924                     condReady.notify();
925                     break;
926                 }
927             }
928             Thread.yield();
929         }
930         thread.join();
931         assert( waiting );
932         assert( alertedOne );
933         assert( !alertedTwo );
934     }
935 
936     testNotify();
937     testNotifyAll();
938     testWaitTimeout();
939 }
940