1 /**
2 * The condition module provides a primitive for synchronized condition
3 * checking.
4 *
5 * Copyright: Copyright Sean Kelly 2005 - 2009.
6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7 * Authors: Sean Kelly
8 * Source: $(DRUNTIMESRC core/sync/_condition.d)
9 */
10
11 /* Copyright Sean Kelly 2005 - 2009.
12 * Distributed under the Boost Software License, Version 1.0.
13 * (See accompanying file LICENSE or copy at
14 * http://www.boost.org/LICENSE_1_0.txt)
15 */
16 module core.sync.condition;
17
18
19 public import core.sync.exception;
20 public import core.sync.mutex;
21 public import core.time;
22
version(Windows)23 version (Windows)
24 {
25 import core.sync.semaphore;
26 import core.sys.windows.basetsd /+: HANDLE+/;
27 import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
28 DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection,
29 LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
30 import core.sys.windows.windef /+: BOOL, DWORD+/;
31 import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
32 }
version(Posix)33 else version (Posix)
34 {
35 import core.sync.config;
36 import core.stdc.errno;
37 import core.sys.posix.pthread;
38 import core.sys.posix.time;
39 }
40 else
41 {
42 static assert(false, "Platform not supported");
43 }
44
45
46 ////////////////////////////////////////////////////////////////////////////////
47 // Condition
48 //
49 // void wait();
50 // void notify();
51 // void notifyAll();
52 ////////////////////////////////////////////////////////////////////////////////
53
54
55 /**
56 * This class represents a condition variable as conceived by C.A.R. Hoare. As
57 * per Mesa type monitors however, "signal" has been replaced with "notify" to
58 * indicate that control is not transferred to the waiter when a notification
59 * is sent.
60 */
61 class Condition
62 {
63 ////////////////////////////////////////////////////////////////////////////
64 // Initialization
65 ////////////////////////////////////////////////////////////////////////////
66
67 /**
68 * Initializes a condition object which is associated with the supplied
69 * mutex object.
70 *
71 * Params:
72 * m = The mutex with which this condition will be associated.
73 *
74 * Throws:
75 * SyncError on error.
76 */
this(Mutex m)77 this( Mutex m ) nothrow @safe
78 {
79 this(m, true);
80 }
81
82 /// ditto
this(shared Mutex m)83 this( shared Mutex m ) shared nothrow @safe
84 {
85 this(m, true);
86 }
87
88 //
89 private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted
90 if ((is(Q == Condition) && is(M == Mutex)) ||
91 (is(Q == shared Condition) && is(M == shared Mutex)))
92 {
version(Windows)93 version (Windows)
94 {
95 static if (is(Q == Condition))
96 {
97 alias HANDLE_TYPE = void*;
98 }
99 else
100 {
101 alias HANDLE_TYPE = shared(void*);
102 }
103 m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null );
104 if ( m_blockLock == m_blockLock.init )
105 throw new SyncError( "Unable to initialize condition" );
106 scope(failure) CloseHandle( cast(void*) m_blockLock );
107
108 m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null );
109 if ( m_blockQueue == m_blockQueue.init )
110 throw new SyncError( "Unable to initialize condition" );
111 scope(failure) CloseHandle( cast(void*) m_blockQueue );
112
113 InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock );
114 m_assocMutex = m;
115 }
version(Posix)116 else version (Posix)
117 {
118 m_assocMutex = m;
119 static if ( is( typeof( pthread_condattr_setclock ) ) )
120 {
121 () @trusted
122 {
123 pthread_condattr_t attr = void;
124 int rc = pthread_condattr_init( &attr );
125 if ( rc )
126 throw new SyncError( "Unable to initialize condition" );
127 rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC );
128 if ( rc )
129 throw new SyncError( "Unable to initialize condition" );
130 rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr );
131 if ( rc )
132 throw new SyncError( "Unable to initialize condition" );
133 rc = pthread_condattr_destroy( &attr );
134 if ( rc )
135 throw new SyncError( "Unable to initialize condition" );
136 } ();
137 }
138 else
139 {
140 int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null );
141 if ( rc )
142 throw new SyncError( "Unable to initialize condition" );
143 }
144 }
145 }
146
147
~this()148 ~this()
149 {
150 version (Windows)
151 {
152 BOOL rc = CloseHandle( m_blockLock );
153 assert( rc, "Unable to destroy condition" );
154 rc = CloseHandle( m_blockQueue );
155 assert( rc, "Unable to destroy condition" );
156 DeleteCriticalSection( &m_unblockLock );
157 }
158 else version (Posix)
159 {
160 int rc = pthread_cond_destroy( &m_hndl );
161 assert( !rc, "Unable to destroy condition" );
162 }
163 }
164
165
166 ////////////////////////////////////////////////////////////////////////////
167 // General Properties
168 ////////////////////////////////////////////////////////////////////////////
169
170
171 /**
172 * Gets the mutex associated with this condition.
173 *
174 * Returns:
175 * The mutex associated with this condition.
176 */
mutex()177 @property Mutex mutex()
178 {
179 return m_assocMutex;
180 }
181
182 /// ditto
shared(Mutex)183 @property shared(Mutex) mutex() shared
184 {
185 return m_assocMutex;
186 }
187
188 // undocumented function for internal use
mutex_nothrow()189 final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
190 {
191 return m_assocMutex;
192 }
193
194 // ditto
shared(Mutex)195 final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc
196 {
197 return m_assocMutex;
198 }
199
200 ////////////////////////////////////////////////////////////////////////////
201 // General Actions
202 ////////////////////////////////////////////////////////////////////////////
203
204
205 /**
206 * Wait until notified.
207 *
208 * Throws:
209 * SyncError on error.
210 */
wait()211 void wait()
212 {
213 wait!(typeof(this))(true);
214 }
215
216 /// ditto
wait()217 void wait() shared
218 {
219 wait!(typeof(this))(true);
220 }
221
222 /// ditto
223 void wait(this Q)( bool _unused_ )
224 if (is(Q == Condition) || is(Q == shared Condition))
225 {
version(Windows)226 version (Windows)
227 {
228 timedWait( INFINITE );
229 }
version(Posix)230 else version (Posix)
231 {
232 int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() );
233 if ( rc )
234 throw new SyncError( "Unable to wait for condition" );
235 }
236 }
237
238 /**
239 * Suspends the calling thread until a notification occurs or until the
240 * supplied time period has elapsed.
241 *
242 * Params:
243 * val = The time to wait.
244 *
245 * In:
246 * val must be non-negative.
247 *
248 * Throws:
249 * SyncError on error.
250 *
251 * Returns:
252 * true if notified before the timeout and false if not.
253 */
wait(Duration val)254 bool wait( Duration val )
255 {
256 return wait!(typeof(this))(val, true);
257 }
258
259 /// ditto
wait(Duration val)260 bool wait( Duration val ) shared
261 {
262 return wait!(typeof(this))(val, true);
263 }
264
265 /// ditto
266 bool wait(this Q)( Duration val, bool _unused_ )
267 if (is(Q == Condition) || is(Q == shared Condition))
268 in
269 {
270 assert( !val.isNegative );
271 }
272 do
273 {
version(Windows)274 version (Windows)
275 {
276 auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
277
278 while ( val > maxWaitMillis )
279 {
280 if ( timedWait( cast(uint)
281 maxWaitMillis.total!"msecs" ) )
282 return true;
283 val -= maxWaitMillis;
284 }
285 return timedWait( cast(uint) val.total!"msecs" );
286 }
version(Posix)287 else version (Posix)
288 {
289 timespec t = void;
290 mktspec( t, val );
291
292 int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl,
293 (cast(Mutex) m_assocMutex).handleAddr(),
294 &t );
295 if ( !rc )
296 return true;
297 if ( rc == ETIMEDOUT )
298 return false;
299 throw new SyncError( "Unable to wait for condition" );
300 }
301 }
302
303 /**
304 * Notifies one waiter.
305 *
306 * Throws:
307 * SyncError on error.
308 */
notify()309 void notify()
310 {
311 notify!(typeof(this))(true);
312 }
313
314 /// ditto
notify()315 void notify() shared
316 {
317 notify!(typeof(this))(true);
318 }
319
320 /// ditto
321 void notify(this Q)( bool _unused_ )
322 if (is(Q == Condition) || is(Q == shared Condition))
323 {
version(Windows)324 version (Windows)
325 {
326 notify_( false );
327 }
version(Posix)328 else version (Posix)
329 {
330 // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times,
331 // so need to retrying while it returns EAGAIN.
332 //
333 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
334 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
335 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
336 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
337 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
338 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
339 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
340 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
341
342 int rc;
343 do {
344 rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl );
345 } while ( rc == EAGAIN );
346 if ( rc )
347 throw new SyncError( "Unable to notify condition" );
348 }
349 }
350
351 /**
352 * Notifies all waiters.
353 *
354 * Throws:
355 * SyncError on error.
356 */
notifyAll()357 void notifyAll()
358 {
359 notifyAll!(typeof(this))(true);
360 }
361
362 /// ditto
notifyAll()363 void notifyAll() shared
364 {
365 notifyAll!(typeof(this))(true);
366 }
367
368 /// ditto
369 void notifyAll(this Q)( bool _unused_ )
370 if (is(Q == Condition) || is(Q == shared Condition))
371 {
version(Windows)372 version (Windows)
373 {
374 notify_( true );
375 }
version(Posix)376 else version (Posix)
377 {
378 // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times,
379 // so need to retrying while it returns EAGAIN.
380 //
381 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
382 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
383 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
384 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
385 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
386 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
387 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
388 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
389
390 int rc;
391 do {
392 rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl );
393 } while ( rc == EAGAIN );
394 if ( rc )
395 throw new SyncError( "Unable to notify condition" );
396 }
397 }
398
399 private:
version(Windows)400 version (Windows)
401 {
402 bool timedWait(this Q)( DWORD timeout )
403 if (is(Q == Condition) || is(Q == shared Condition))
404 {
405 static if (is(Q == Condition))
406 {
407 auto op(string o, T, V1)(ref T val, V1 mod)
408 {
409 return mixin("val " ~ o ~ "mod");
410 }
411 }
412 else
413 {
414 auto op(string o, T, V1)(ref shared T val, V1 mod)
415 {
416 import core.atomic: atomicOp;
417 return atomicOp!o(val, mod);
418 }
419 }
420
421 int numSignalsLeft;
422 int numWaitersGone;
423 DWORD rc;
424
425 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
426 assert( rc == WAIT_OBJECT_0 );
427
428 op!"+="(m_numWaitersBlocked, 1);
429
430 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
431 assert( rc );
432
433 m_assocMutex.unlock();
434 scope(failure) m_assocMutex.lock();
435
436 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout );
437 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
438 bool timedOut = (rc == WAIT_TIMEOUT);
439
440 EnterCriticalSection( &m_unblockLock );
441 scope(failure) LeaveCriticalSection( &m_unblockLock );
442
443 if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
444 {
445 if ( timedOut )
446 {
447 // timeout (or canceled)
448 if ( m_numWaitersBlocked != 0 )
449 {
450 op!"-="(m_numWaitersBlocked, 1);
451 // do not unblock next waiter below (already unblocked)
452 numSignalsLeft = 0;
453 }
454 else
455 {
456 // spurious wakeup pending!!
457 m_numWaitersGone = 1;
458 }
459 }
460 if ( op!"-="(m_numWaitersToUnblock, 1) == 0 )
461 {
462 if ( m_numWaitersBlocked != 0 )
463 {
464 // open the gate
465 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
466 assert( rc );
467 // do not open the gate below again
468 numSignalsLeft = 0;
469 }
470 else if ( (numWaitersGone = m_numWaitersGone) != 0 )
471 {
472 m_numWaitersGone = 0;
473 }
474 }
475 }
476 else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 )
477 {
478 // timeout/canceled or spurious event :-)
479 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
480 assert( rc == WAIT_OBJECT_0 );
481 // something is going on here - test of timeouts?
482 op!"-="(m_numWaitersBlocked, m_numWaitersGone);
483 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
484 assert( rc == WAIT_OBJECT_0 );
485 m_numWaitersGone = 0;
486 }
487
488 LeaveCriticalSection( &m_unblockLock );
489
490 if ( numSignalsLeft == 1 )
491 {
492 // better now than spurious later (same as ResetEvent)
493 for ( ; numWaitersGone > 0; --numWaitersGone )
494 {
495 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE );
496 assert( rc == WAIT_OBJECT_0 );
497 }
498 // open the gate
499 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
500 assert( rc );
501 }
502 else if ( numSignalsLeft != 0 )
503 {
504 // unblock next waiter
505 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
506 assert( rc );
507 }
508 m_assocMutex.lock();
509 return !timedOut;
510 }
511
512
513 void notify_(this Q)( bool all )
514 if (is(Q == Condition) || is(Q == shared Condition))
515 {
516 static if (is(Q == Condition))
517 {
518 auto op(string o, T, V1)(ref T val, V1 mod)
519 {
520 return mixin("val " ~ o ~ "mod");
521 }
522 }
523 else
524 {
525 auto op(string o, T, V1)(ref shared T val, V1 mod)
526 {
527 import core.atomic: atomicOp;
528 return atomicOp!o(val, mod);
529 }
530 }
531
532 DWORD rc;
533
534 EnterCriticalSection( &m_unblockLock );
535 scope(failure) LeaveCriticalSection( &m_unblockLock );
536
537 if ( m_numWaitersToUnblock != 0 )
538 {
539 if ( m_numWaitersBlocked == 0 )
540 {
541 LeaveCriticalSection( &m_unblockLock );
542 return;
543 }
544 if ( all )
545 {
546 op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked);
547 m_numWaitersBlocked = 0;
548 }
549 else
550 {
551 op!"+="(m_numWaitersToUnblock, 1);
552 op!"-="(m_numWaitersBlocked, 1);
553 }
554 LeaveCriticalSection( &m_unblockLock );
555 }
556 else if ( m_numWaitersBlocked > m_numWaitersGone )
557 {
558 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
559 assert( rc == WAIT_OBJECT_0 );
560 if ( 0 != m_numWaitersGone )
561 {
562 op!"-="(m_numWaitersBlocked, m_numWaitersGone);
563 m_numWaitersGone = 0;
564 }
565 if ( all )
566 {
567 m_numWaitersToUnblock = m_numWaitersBlocked;
568 m_numWaitersBlocked = 0;
569 }
570 else
571 {
572 m_numWaitersToUnblock = 1;
573 op!"-="(m_numWaitersBlocked, 1);
574 }
575 LeaveCriticalSection( &m_unblockLock );
576 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
577 assert( rc );
578 }
579 else
580 {
581 LeaveCriticalSection( &m_unblockLock );
582 }
583 }
584
585
586 // NOTE: This implementation uses Algorithm 8c as described here:
587 // http://groups.google.com/group/comp.programming.threads/
588 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
589 HANDLE m_blockLock; // auto-reset event (now semaphore)
590 HANDLE m_blockQueue; // auto-reset event (now semaphore)
591 Mutex m_assocMutex; // external mutex/CS
592 CRITICAL_SECTION m_unblockLock; // internal mutex/CS
593 int m_numWaitersGone = 0;
594 int m_numWaitersBlocked = 0;
595 int m_numWaitersToUnblock = 0;
596 }
version(Posix)597 else version (Posix)
598 {
599 Mutex m_assocMutex;
600 pthread_cond_t m_hndl;
601 }
602 }
603
604
605 ////////////////////////////////////////////////////////////////////////////////
606 // Unit Tests
607 ////////////////////////////////////////////////////////////////////////////////
608
609 unittest
610 {
611 import core.thread;
612 import core.sync.mutex;
613 import core.sync.semaphore;
614
615
testNotify()616 void testNotify()
617 {
618 auto mutex = new Mutex;
619 auto condReady = new Condition( mutex );
620 auto semDone = new Semaphore;
621 auto synLoop = new Object;
622 int numWaiters = 10;
623 int numTries = 10;
624 int numReady = 0;
625 int numTotal = 0;
626 int numDone = 0;
627 int numPost = 0;
628
629 void waiter()
630 {
631 for ( int i = 0; i < numTries; ++i )
632 {
633 synchronized( mutex )
634 {
635 while ( numReady < 1 )
636 {
637 condReady.wait();
638 }
639 --numReady;
640 ++numTotal;
641 }
642
643 synchronized( synLoop )
644 {
645 ++numDone;
646 }
647 semDone.wait();
648 }
649 }
650
651 auto group = new ThreadGroup;
652
653 for ( int i = 0; i < numWaiters; ++i )
654 group.create( &waiter );
655
656 for ( int i = 0; i < numTries; ++i )
657 {
658 for ( int j = 0; j < numWaiters; ++j )
659 {
660 synchronized( mutex )
661 {
662 ++numReady;
663 condReady.notify();
664 }
665 }
666 while ( true )
667 {
668 synchronized( synLoop )
669 {
670 if ( numDone >= numWaiters )
671 break;
672 }
673 Thread.yield();
674 }
675 for ( int j = 0; j < numWaiters; ++j )
676 {
677 semDone.notify();
678 }
679 }
680
681 group.joinAll();
682 assert( numTotal == numWaiters * numTries );
683 }
684
685
testNotifyAll()686 void testNotifyAll()
687 {
688 auto mutex = new Mutex;
689 auto condReady = new Condition( mutex );
690 int numWaiters = 10;
691 int numReady = 0;
692 int numDone = 0;
693 bool alert = false;
694
695 void waiter()
696 {
697 synchronized( mutex )
698 {
699 ++numReady;
700 while ( !alert )
701 condReady.wait();
702 ++numDone;
703 }
704 }
705
706 auto group = new ThreadGroup;
707
708 for ( int i = 0; i < numWaiters; ++i )
709 group.create( &waiter );
710
711 while ( true )
712 {
713 synchronized( mutex )
714 {
715 if ( numReady >= numWaiters )
716 {
717 alert = true;
718 condReady.notifyAll();
719 break;
720 }
721 }
722 Thread.yield();
723 }
724 group.joinAll();
725 assert( numReady == numWaiters && numDone == numWaiters );
726 }
727
728
testWaitTimeout()729 void testWaitTimeout()
730 {
731 auto mutex = new Mutex;
732 auto condReady = new Condition( mutex );
733 bool waiting = false;
734 bool alertedOne = true;
735 bool alertedTwo = true;
736
737 void waiter()
738 {
739 synchronized( mutex )
740 {
741 waiting = true;
742 // we never want to miss the notification (30s)
743 alertedOne = condReady.wait( dur!"seconds"(30) );
744 // but we don't want to wait long for the timeout (10ms)
745 alertedTwo = condReady.wait( dur!"msecs"(10) );
746 }
747 }
748
749 auto thread = new Thread( &waiter );
750 thread.start();
751
752 while ( true )
753 {
754 synchronized( mutex )
755 {
756 if ( waiting )
757 {
758 condReady.notify();
759 break;
760 }
761 }
762 Thread.yield();
763 }
764 thread.join();
765 assert( waiting );
766 assert( alertedOne );
767 assert( !alertedTwo );
768 }
769
770 testNotify();
771 testNotifyAll();
772 testWaitTimeout();
773 }
774
775 unittest
776 {
777 import core.thread;
778 import core.sync.mutex;
779 import core.sync.semaphore;
780
781
testNotify()782 void testNotify()
783 {
784 auto mutex = new shared Mutex;
785 auto condReady = new shared Condition( mutex );
786 auto semDone = new Semaphore;
787 auto synLoop = new Object;
788 int numWaiters = 10;
789 int numTries = 10;
790 int numReady = 0;
791 int numTotal = 0;
792 int numDone = 0;
793 int numPost = 0;
794
795 void waiter()
796 {
797 for ( int i = 0; i < numTries; ++i )
798 {
799 synchronized( mutex )
800 {
801 while ( numReady < 1 )
802 {
803 condReady.wait();
804 }
805 --numReady;
806 ++numTotal;
807 }
808
809 synchronized( synLoop )
810 {
811 ++numDone;
812 }
813 semDone.wait();
814 }
815 }
816
817 auto group = new ThreadGroup;
818
819 for ( int i = 0; i < numWaiters; ++i )
820 group.create( &waiter );
821
822 for ( int i = 0; i < numTries; ++i )
823 {
824 for ( int j = 0; j < numWaiters; ++j )
825 {
826 synchronized( mutex )
827 {
828 ++numReady;
829 condReady.notify();
830 }
831 }
832 while ( true )
833 {
834 synchronized( synLoop )
835 {
836 if ( numDone >= numWaiters )
837 break;
838 }
839 Thread.yield();
840 }
841 for ( int j = 0; j < numWaiters; ++j )
842 {
843 semDone.notify();
844 }
845 }
846
847 group.joinAll();
848 assert( numTotal == numWaiters * numTries );
849 }
850
851
testNotifyAll()852 void testNotifyAll()
853 {
854 auto mutex = new shared Mutex;
855 auto condReady = new shared Condition( mutex );
856 int numWaiters = 10;
857 int numReady = 0;
858 int numDone = 0;
859 bool alert = false;
860
861 void waiter()
862 {
863 synchronized( mutex )
864 {
865 ++numReady;
866 while ( !alert )
867 condReady.wait();
868 ++numDone;
869 }
870 }
871
872 auto group = new ThreadGroup;
873
874 for ( int i = 0; i < numWaiters; ++i )
875 group.create( &waiter );
876
877 while ( true )
878 {
879 synchronized( mutex )
880 {
881 if ( numReady >= numWaiters )
882 {
883 alert = true;
884 condReady.notifyAll();
885 break;
886 }
887 }
888 Thread.yield();
889 }
890 group.joinAll();
891 assert( numReady == numWaiters && numDone == numWaiters );
892 }
893
894
testWaitTimeout()895 void testWaitTimeout()
896 {
897 auto mutex = new shared Mutex;
898 auto condReady = new shared Condition( mutex );
899 bool waiting = false;
900 bool alertedOne = true;
901 bool alertedTwo = true;
902
903 void waiter()
904 {
905 synchronized( mutex )
906 {
907 waiting = true;
908 // we never want to miss the notification (30s)
909 alertedOne = condReady.wait( dur!"seconds"(30) );
910 // but we don't want to wait long for the timeout (10ms)
911 alertedTwo = condReady.wait( dur!"msecs"(10) );
912 }
913 }
914
915 auto thread = new Thread( &waiter );
916 thread.start();
917
918 while ( true )
919 {
920 synchronized( mutex )
921 {
922 if ( waiting )
923 {
924 condReady.notify();
925 break;
926 }
927 }
928 Thread.yield();
929 }
930 thread.join();
931 assert( waiting );
932 assert( alertedOne );
933 assert( !alertedTwo );
934 }
935
936 testNotify();
937 testNotifyAll();
938 testWaitTimeout();
939 }
940