1 /**
2 * The read/write mutex module provides a primitive for maintaining shared read
3 * access and mutually exclusive write access.
4 *
5 * Copyright: Copyright Sean Kelly 2005 - 2009.
6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7 * Authors: Sean Kelly
8 * Source: $(DRUNTIMESRC core/sync/_rwmutex.d)
9 */
10
11 /* Copyright Sean Kelly 2005 - 2009.
12 * Distributed under the Boost Software License, Version 1.0.
13 * (See accompanying file LICENSE or copy at
14 * http://www.boost.org/LICENSE_1_0.txt)
15 */
16 module core.sync.rwmutex;
17
18
19 public import core.sync.exception;
20 import core.sync.condition;
21 import core.sync.mutex;
22 import core.memory;
23
version(Posix)24 version (Posix)
25 {
26 import core.sys.posix.pthread;
27 }
28
29
30 ////////////////////////////////////////////////////////////////////////////////
31 // ReadWriteMutex
32 //
33 // Reader reader();
34 // Writer writer();
35 ////////////////////////////////////////////////////////////////////////////////
36
37
38 /**
39 * This class represents a mutex that allows any number of readers to enter,
40 * but when a writer enters, all other readers and writers are blocked.
41 *
42 * Please note that this mutex is not recursive and is intended to guard access
43 * to data only. Also, no deadlock checking is in place because doing so would
44 * require dynamic memory allocation, which would reduce performance by an
45 * unacceptable amount. As a result, any attempt to recursively acquire this
46 * mutex may well deadlock the caller, particularly if a write lock is acquired
47 * while holding a read lock, or vice-versa. In practice, this should not be
48 * an issue however, because it is uncommon to call deeply into unknown code
49 * while holding a lock that simply protects data.
50 */
51 class ReadWriteMutex
52 {
53 /**
54 * Defines the policy used by this mutex. Currently, two policies are
55 * defined.
56 *
57 * The first will queue writers until no readers hold the mutex, then
58 * pass the writers through one at a time. If a reader acquires the mutex
59 * while there are still writers queued, the reader will take precedence.
60 *
61 * The second will queue readers if there are any writers queued. Writers
62 * are passed through one at a time, and once there are no writers present,
63 * all queued readers will be alerted.
64 *
65 * Future policies may offer a more even balance between reader and writer
66 * precedence.
67 */
68 enum Policy
69 {
70 PREFER_READERS, /// Readers get preference. This may starve writers.
71 PREFER_WRITERS /// Writers get preference. This may starve readers.
72 }
73
74
75 ////////////////////////////////////////////////////////////////////////////
76 // Initialization
77 ////////////////////////////////////////////////////////////////////////////
78
79
80 /**
81 * Initializes a read/write mutex object with the supplied policy.
82 *
83 * Params:
84 * policy = The policy to use.
85 *
86 * Throws:
87 * SyncError on error.
88 */
89 this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
90 {
91 m_commonMutex = new Mutex;
92 if ( !m_commonMutex )
93 throw new SyncError( "Unable to initialize mutex" );
94
95 m_readerQueue = new Condition( m_commonMutex );
96 if ( !m_readerQueue )
97 throw new SyncError( "Unable to initialize mutex" );
98
99 m_writerQueue = new Condition( m_commonMutex );
100 if ( !m_writerQueue )
101 throw new SyncError( "Unable to initialize mutex" );
102
103 m_policy = policy;
104 m_reader = new Reader;
105 m_writer = new Writer;
106 }
107
108 /// ditto
109 shared this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
110 {
111 m_commonMutex = new shared Mutex;
112 if ( !m_commonMutex )
113 throw new SyncError( "Unable to initialize mutex" );
114
115 m_readerQueue = new shared Condition( m_commonMutex );
116 if ( !m_readerQueue )
117 throw new SyncError( "Unable to initialize mutex" );
118
119 m_writerQueue = new shared Condition( m_commonMutex );
120 if ( !m_writerQueue )
121 throw new SyncError( "Unable to initialize mutex" );
122
123 m_policy = policy;
124 m_reader = new shared Reader;
125 m_writer = new shared Writer;
126 }
127
128 ////////////////////////////////////////////////////////////////////////////
129 // General Properties
130 ////////////////////////////////////////////////////////////////////////////
131
132
133 /**
134 * Gets the policy used by this mutex.
135 *
136 * Returns:
137 * The policy used by this mutex.
138 */
policy()139 @property Policy policy() @safe nothrow
140 {
141 return m_policy;
142 }
143
144 ///ditto
policy()145 @property Policy policy() shared @safe nothrow
146 {
147 return m_policy;
148 }
149
150 ////////////////////////////////////////////////////////////////////////////
151 // Reader/Writer Handles
152 ////////////////////////////////////////////////////////////////////////////
153
154
155 /**
156 * Gets an object representing the reader lock for the associated mutex.
157 *
158 * Returns:
159 * A reader sub-mutex.
160 */
reader()161 @property Reader reader() @safe nothrow
162 {
163 return m_reader;
164 }
165
166 ///ditto
shared(Reader)167 @property shared(Reader) reader() shared @safe nothrow
168 {
169 return m_reader;
170 }
171
172 /**
173 * Gets an object representing the writer lock for the associated mutex.
174 *
175 * Returns:
176 * A writer sub-mutex.
177 */
writer()178 @property Writer writer() @safe nothrow
179 {
180 return m_writer;
181 }
182
183 ///ditto
shared(Writer)184 @property shared(Writer) writer() shared @safe nothrow
185 {
186 return m_writer;
187 }
188
189
190 ////////////////////////////////////////////////////////////////////////////
191 // Reader
192 ////////////////////////////////////////////////////////////////////////////
193
194
195 /**
196 * This class can be considered a mutex in its own right, and is used to
197 * negotiate a read lock for the enclosing mutex.
198 */
199 class Reader :
200 Object.Monitor
201 {
202 /**
203 * Initializes a read/write mutex reader proxy object.
204 */
205 this(this Q)() @trusted nothrow
206 if (is(Q == Reader) || is(Q == shared Reader))
207 {
208 m_proxy.link = this;
209 this.__monitor = cast(void*) &m_proxy;
210 }
211
212 /**
213 * Acquires a read lock on the enclosing mutex.
214 */
lock()215 @trusted void lock()
216 {
217 synchronized( m_commonMutex )
218 {
219 ++m_numQueuedReaders;
220 scope(exit) --m_numQueuedReaders;
221
222 while ( shouldQueueReader )
223 m_readerQueue.wait();
224 ++m_numActiveReaders;
225 }
226 }
227
228 /// ditto
lock()229 @trusted void lock() shared
230 {
231 synchronized( m_commonMutex )
232 {
233 ++(cast()m_numQueuedReaders);
234 scope(exit) --(cast()m_numQueuedReaders);
235
236 while ( shouldQueueReader )
237 m_readerQueue.wait();
238 ++(cast()m_numActiveReaders);
239 }
240 }
241
242 /**
243 * Releases a read lock on the enclosing mutex.
244 */
unlock()245 @trusted void unlock()
246 {
247 synchronized( m_commonMutex )
248 {
249 if ( --m_numActiveReaders < 1 )
250 {
251 if ( m_numQueuedWriters > 0 )
252 m_writerQueue.notify();
253 }
254 }
255 }
256
257 /// ditto
unlock()258 @trusted void unlock() shared
259 {
260 synchronized( m_commonMutex )
261 {
262 if ( --(cast()m_numActiveReaders) < 1 )
263 {
264 if ( m_numQueuedWriters > 0 )
265 m_writerQueue.notify();
266 }
267 }
268 }
269
270 /**
271 * Attempts to acquire a read lock on the enclosing mutex. If one can
272 * be obtained without blocking, the lock is acquired and true is
273 * returned. If not, the lock is not acquired and false is returned.
274 *
275 * Returns:
276 * true if the lock was acquired and false if not.
277 */
tryLock()278 @trusted bool tryLock()
279 {
280 synchronized( m_commonMutex )
281 {
282 if ( shouldQueueReader )
283 return false;
284 ++m_numActiveReaders;
285 return true;
286 }
287 }
288
289 /// ditto
tryLock()290 @trusted bool tryLock() shared
291 {
292 synchronized( m_commonMutex )
293 {
294 if ( shouldQueueReader )
295 return false;
296 ++(cast()m_numActiveReaders);
297 return true;
298 }
299 }
300
301 /**
302 * Attempts to acquire a read lock on the enclosing mutex. If one can
303 * be obtained without blocking, the lock is acquired and true is
304 * returned. If not, the function blocks until either the lock can be
305 * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
306 * true if the lock was acquired and false if the function timed out.
307 *
308 * Params:
309 * timeout = maximum amount of time to wait for the lock
310 * Returns:
311 * true if the lock was acquired and false if not.
312 */
tryLock(Duration timeout)313 @trusted bool tryLock(Duration timeout)
314 {
315 synchronized( m_commonMutex )
316 {
317 if (!shouldQueueReader)
318 {
319 ++m_numActiveReaders;
320 return true;
321 }
322
323 enum zero = Duration.zero();
324 if (timeout <= zero)
325 return false;
326
327 ++m_numQueuedReaders;
328 scope(exit) --m_numQueuedReaders;
329
330 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
331 const initialTime = MonoTime.currTime;
332 m_readerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
333 while (shouldQueueReader)
334 {
335 const timeElapsed = MonoTime.currTime - initialTime;
336 if (timeElapsed >= timeout)
337 return false;
338 auto nextWait = timeout - timeElapsed;
339 m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
340 }
341 ++m_numActiveReaders;
342 return true;
343 }
344 }
345
346 /// ditto
tryLock(Duration timeout)347 @trusted bool tryLock(Duration timeout) shared
348 {
349 const initialTime = MonoTime.currTime;
350 synchronized( m_commonMutex )
351 {
352 ++(cast()m_numQueuedReaders);
353 scope(exit) --(cast()m_numQueuedReaders);
354
355 while (shouldQueueReader)
356 {
357 const timeElapsed = MonoTime.currTime - initialTime;
358 if (timeElapsed >= timeout)
359 return false;
360 auto nextWait = timeout - timeElapsed;
361 // Avoid problems calling wait(Duration) with huge arguments.
362 enum maxWaitPerCall = dur!"hours"(24 * 365);
363 m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
364 }
365 ++(cast()m_numActiveReaders);
366 return true;
367 }
368 }
369
370
371 private:
372 @property bool shouldQueueReader(this Q)() nothrow @safe @nogc
373 if (is(Q == Reader) || is(Q == shared Reader))
374 {
375 if ( m_numActiveWriters > 0 )
376 return true;
377
378 switch ( m_policy )
379 {
380 case Policy.PREFER_WRITERS:
381 return m_numQueuedWriters > 0;
382
383 case Policy.PREFER_READERS:
384 default:
385 break;
386 }
387
388 return false;
389 }
390
391 struct MonitorProxy
392 {
393 Object.Monitor link;
394 }
395
396 MonitorProxy m_proxy;
397 }
398
399
400 ////////////////////////////////////////////////////////////////////////////
401 // Writer
402 ////////////////////////////////////////////////////////////////////////////
403
404
405 /**
406 * This class can be considered a mutex in its own right, and is used to
407 * negotiate a write lock for the enclosing mutex.
408 */
409 class Writer :
410 Object.Monitor
411 {
412 /**
413 * Initializes a read/write mutex writer proxy object.
414 */
415 this(this Q)() @trusted nothrow
416 if (is(Q == Writer) || is(Q == shared Writer))
417 {
418 m_proxy.link = this;
419 this.__monitor = cast(void*) &m_proxy;
420 }
421
422
423 /**
424 * Acquires a write lock on the enclosing mutex.
425 */
lock()426 @trusted void lock()
427 {
428 synchronized( m_commonMutex )
429 {
430 ++m_numQueuedWriters;
431 scope(exit) --m_numQueuedWriters;
432
433 while ( shouldQueueWriter )
434 m_writerQueue.wait();
435 ++m_numActiveWriters;
436 }
437 }
438
439 /// ditto
lock()440 @trusted void lock() shared
441 {
442 synchronized( m_commonMutex )
443 {
444 ++(cast()m_numQueuedWriters);
445 scope(exit) --(cast()m_numQueuedWriters);
446
447 while ( shouldQueueWriter )
448 m_writerQueue.wait();
449 ++(cast()m_numActiveWriters);
450 }
451 }
452
453
454 /**
455 * Releases a write lock on the enclosing mutex.
456 */
unlock()457 @trusted void unlock()
458 {
459 synchronized( m_commonMutex )
460 {
461 if ( --m_numActiveWriters < 1 )
462 {
463 switch ( m_policy )
464 {
465 default:
466 case Policy.PREFER_READERS:
467 if ( m_numQueuedReaders > 0 )
468 m_readerQueue.notifyAll();
469 else if ( m_numQueuedWriters > 0 )
470 m_writerQueue.notify();
471 break;
472 case Policy.PREFER_WRITERS:
473 if ( m_numQueuedWriters > 0 )
474 m_writerQueue.notify();
475 else if ( m_numQueuedReaders > 0 )
476 m_readerQueue.notifyAll();
477 }
478 }
479 }
480 }
481
482 /// ditto
unlock()483 @trusted void unlock() shared
484 {
485 synchronized( m_commonMutex )
486 {
487 if ( --(cast()m_numActiveWriters) < 1 )
488 {
489 switch ( m_policy )
490 {
491 default:
492 case Policy.PREFER_READERS:
493 if ( m_numQueuedReaders > 0 )
494 m_readerQueue.notifyAll();
495 else if ( m_numQueuedWriters > 0 )
496 m_writerQueue.notify();
497 break;
498 case Policy.PREFER_WRITERS:
499 if ( m_numQueuedWriters > 0 )
500 m_writerQueue.notify();
501 else if ( m_numQueuedReaders > 0 )
502 m_readerQueue.notifyAll();
503 }
504 }
505 }
506 }
507
508
509 /**
510 * Attempts to acquire a write lock on the enclosing mutex. If one can
511 * be obtained without blocking, the lock is acquired and true is
512 * returned. If not, the lock is not acquired and false is returned.
513 *
514 * Returns:
515 * true if the lock was acquired and false if not.
516 */
tryLock()517 @trusted bool tryLock()
518 {
519 synchronized( m_commonMutex )
520 {
521 if ( shouldQueueWriter )
522 return false;
523 ++m_numActiveWriters;
524 return true;
525 }
526 }
527
528 /// ditto
tryLock()529 @trusted bool tryLock() shared
530 {
531 synchronized( m_commonMutex )
532 {
533 if ( shouldQueueWriter )
534 return false;
535 ++(cast()m_numActiveWriters);
536 return true;
537 }
538 }
539
540 /**
541 * Attempts to acquire a write lock on the enclosing mutex. If one can
542 * be obtained without blocking, the lock is acquired and true is
543 * returned. If not, the function blocks until either the lock can be
544 * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
545 * true if the lock was acquired and false if the function timed out.
546 *
547 * Params:
548 * timeout = maximum amount of time to wait for the lock
549 * Returns:
550 * true if the lock was acquired and false if not.
551 */
tryLock(Duration timeout)552 @trusted bool tryLock(Duration timeout)
553 {
554 synchronized( m_commonMutex )
555 {
556 if (!shouldQueueWriter)
557 {
558 ++m_numActiveWriters;
559 return true;
560 }
561
562 enum zero = Duration.zero();
563 if (timeout <= zero)
564 return false;
565
566 ++m_numQueuedWriters;
567 scope(exit) --m_numQueuedWriters;
568
569 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
570 const initialTime = MonoTime.currTime;
571 m_writerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
572 while (shouldQueueWriter)
573 {
574 const timeElapsed = MonoTime.currTime - initialTime;
575 if (timeElapsed >= timeout)
576 return false;
577 auto nextWait = timeout - timeElapsed;
578 m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
579 }
580 ++m_numActiveWriters;
581 return true;
582 }
583 }
584
585 /// ditto
tryLock(Duration timeout)586 @trusted bool tryLock(Duration timeout) shared
587 {
588 const initialTime = MonoTime.currTime;
589 synchronized( m_commonMutex )
590 {
591 ++(cast()m_numQueuedWriters);
592 scope(exit) --(cast()m_numQueuedWriters);
593
594 while (shouldQueueWriter)
595 {
596 const timeElapsed = MonoTime.currTime - initialTime;
597 if (timeElapsed >= timeout)
598 return false;
599 auto nextWait = timeout - timeElapsed;
600 // Avoid problems calling wait(Duration) with huge arguments.
601 enum maxWaitPerCall = dur!"hours"(24 * 365);
602 m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
603 }
604 ++(cast()m_numActiveWriters);
605 return true;
606 }
607 }
608
609 private:
610 @property bool shouldQueueWriter(this Q)()
611 if (is(Q == Writer) || is(Q == shared Writer))
612 {
613 if ( m_numActiveWriters > 0 ||
614 m_numActiveReaders > 0 )
615 return true;
616 switch ( m_policy )
617 {
618 case Policy.PREFER_READERS:
619 return m_numQueuedReaders > 0;
620
621 case Policy.PREFER_WRITERS:
622 default:
623 break;
624 }
625
626 return false;
627 }
628
629 struct MonitorProxy
630 {
631 Object.Monitor link;
632 }
633
634 MonitorProxy m_proxy;
635 }
636
637
638 private:
639 Policy m_policy;
640 Reader m_reader;
641 Writer m_writer;
642
643 Mutex m_commonMutex;
644 Condition m_readerQueue;
645 Condition m_writerQueue;
646
647 int m_numQueuedReaders;
648 int m_numActiveReaders;
649 int m_numQueuedWriters;
650 int m_numActiveWriters;
651 }
652
653
654 ////////////////////////////////////////////////////////////////////////////////
655 // Unit Tests
656 ////////////////////////////////////////////////////////////////////////////////
657
658
659 unittest
660 {
661 import core.atomic, core.thread, core.sync.semaphore;
662
663 static void runTest(ReadWriteMutex.Policy policy)
664 {
665 scope mutex = new ReadWriteMutex(policy);
666 scope rdSemA = new Semaphore, rdSemB = new Semaphore,
667 wrSemA = new Semaphore, wrSemB = new Semaphore;
668 shared size_t numReaders, numWriters;
669
readerFn()670 void readerFn()
671 {
672 synchronized (mutex.reader)
673 {
674 atomicOp!"+="(numReaders, 1);
675 rdSemA.notify();
676 rdSemB.wait();
677 atomicOp!"-="(numReaders, 1);
678 }
679 }
680
writerFn()681 void writerFn()
682 {
683 synchronized (mutex.writer)
684 {
685 atomicOp!"+="(numWriters, 1);
686 wrSemA.notify();
687 wrSemB.wait();
688 atomicOp!"-="(numWriters, 1);
689 }
690 }
691
waitQueued(size_t queuedReaders,size_t queuedWriters)692 void waitQueued(size_t queuedReaders, size_t queuedWriters)
693 {
694 for (;;)
695 {
696 synchronized (mutex.m_commonMutex)
697 {
698 if (mutex.m_numQueuedReaders == queuedReaders &&
699 mutex.m_numQueuedWriters == queuedWriters)
700 break;
701 }
702 Thread.yield();
703 }
704 }
705
706 scope group = new ThreadGroup;
707
708 // 2 simultaneous readers
709 group.create(&readerFn); group.create(&readerFn);
710 rdSemA.wait(); rdSemA.wait();
711 assert(numReaders == 2);
712 rdSemB.notify(); rdSemB.notify();
713 group.joinAll();
714 assert(numReaders == 0);
715 foreach (t; group) group.remove(t);
716
717 // 1 writer at a time
718 group.create(&writerFn); group.create(&writerFn);
719 wrSemA.wait();
720 assert(!wrSemA.tryWait());
721 assert(numWriters == 1);
722 wrSemB.notify();
723 wrSemA.wait();
724 assert(numWriters == 1);
725 wrSemB.notify();
726 group.joinAll();
727 assert(numWriters == 0);
728 foreach (t; group) group.remove(t);
729
730 // reader and writer are mutually exclusive
731 group.create(&readerFn);
732 rdSemA.wait();
733 group.create(&writerFn);
734 waitQueued(0, 1);
735 assert(!wrSemA.tryWait());
736 assert(numReaders == 1 && numWriters == 0);
737 rdSemB.notify();
738 wrSemA.wait();
739 assert(numReaders == 0 && numWriters == 1);
740 wrSemB.notify();
741 group.joinAll();
742 assert(numReaders == 0 && numWriters == 0);
743 foreach (t; group) group.remove(t);
744
745 // writer and reader are mutually exclusive
746 group.create(&writerFn);
747 wrSemA.wait();
748 group.create(&readerFn);
749 waitQueued(1, 0);
750 assert(!rdSemA.tryWait());
751 assert(numReaders == 0 && numWriters == 1);
752 wrSemB.notify();
753 rdSemA.wait();
754 assert(numReaders == 1 && numWriters == 0);
755 rdSemB.notify();
756 group.joinAll();
757 assert(numReaders == 0 && numWriters == 0);
758 foreach (t; group) group.remove(t);
759
760 // policy determines whether queued reader or writers progress first
761 group.create(&writerFn);
762 wrSemA.wait();
763 group.create(&readerFn);
764 group.create(&writerFn);
765 waitQueued(1, 1);
766 assert(numReaders == 0 && numWriters == 1);
767 wrSemB.notify();
768
769 if (policy == ReadWriteMutex.Policy.PREFER_READERS)
770 {
771 rdSemA.wait();
772 assert(numReaders == 1 && numWriters == 0);
773 rdSemB.notify();
774 wrSemA.wait();
775 assert(numReaders == 0 && numWriters == 1);
776 wrSemB.notify();
777 }
778 else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
779 {
780 wrSemA.wait();
781 assert(numReaders == 0 && numWriters == 1);
782 wrSemB.notify();
783 rdSemA.wait();
784 assert(numReaders == 1 && numWriters == 0);
785 rdSemB.notify();
786 }
787 group.joinAll();
788 assert(numReaders == 0 && numWriters == 0);
789 foreach (t; group) group.remove(t);
790 }
791 runTest(ReadWriteMutex.Policy.PREFER_READERS);
792 runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
793 }
794
795 unittest
796 {
797 import core.atomic, core.thread;
798 __gshared ReadWriteMutex rwmutex;
799 shared static bool threadTriedOnceToGetLock;
800 shared static bool threadFinallyGotLock;
801
802 rwmutex = new ReadWriteMutex();
803 atomicFence;
804 const maxTimeAllowedForTest = dur!"seconds"(20);
805 // Test ReadWriteMutex.Reader.tryLock(Duration).
806 {
testReaderTryLock()807 static void testReaderTryLock()
808 {
809 assert(!rwmutex.reader.tryLock(Duration.min));
810 threadTriedOnceToGetLock.atomicStore(true);
811 assert(rwmutex.reader.tryLock(Duration.max));
812 threadFinallyGotLock.atomicStore(true);
813 rwmutex.reader.unlock;
814 }
815 assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
816 auto otherThread = new Thread(&testReaderTryLock).start;
817 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
818 Thread.yield;
819 // We started otherThread with the writer lock held so otherThread's
820 // first rwlock.reader.tryLock with timeout Duration.min should fail.
821 while (!threadTriedOnceToGetLock.atomicLoad)
822 {
823 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
824 Thread.yield;
825 }
826 rwmutex.writer.unlock;
827 // Soon after we release the writer lock otherThread's second
828 // rwlock.reader.tryLock with timeout Duration.max should succeed.
829 while (!threadFinallyGotLock.atomicLoad)
830 {
831 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
832 Thread.yield;
833 }
834 otherThread.join;
835 }
836 threadTriedOnceToGetLock.atomicStore(false); // Reset.
837 threadFinallyGotLock.atomicStore(false); // Reset.
838 // Test ReadWriteMutex.Writer.tryLock(Duration).
839 {
testWriterTryLock()840 static void testWriterTryLock()
841 {
842 assert(!rwmutex.writer.tryLock(Duration.min));
843 threadTriedOnceToGetLock.atomicStore(true);
844 assert(rwmutex.writer.tryLock(Duration.max));
845 threadFinallyGotLock.atomicStore(true);
846 rwmutex.writer.unlock;
847 }
848 assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
849 auto otherThread = new Thread(&testWriterTryLock).start;
850 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
851 Thread.yield;
852 // We started otherThread with the reader lock held so otherThread's
853 // first rwlock.writer.tryLock with timeout Duration.min should fail.
854 while (!threadTriedOnceToGetLock.atomicLoad)
855 {
856 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
857 Thread.yield;
858 }
859 rwmutex.reader.unlock;
860 // Soon after we release the reader lock otherThread's second
861 // rwlock.writer.tryLock with timeout Duration.max should succeed.
862 while (!threadFinallyGotLock.atomicLoad)
863 {
864 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
865 Thread.yield;
866 }
867 otherThread.join;
868 }
869 }
870
871 unittest
872 {
873 import core.atomic, core.thread, core.sync.semaphore;
874
875 static void runTest(ReadWriteMutex.Policy policy)
876 {
877 shared scope mutex = new shared ReadWriteMutex(policy);
878 scope rdSemA = new Semaphore, rdSemB = new Semaphore,
879 wrSemA = new Semaphore, wrSemB = new Semaphore;
880 shared size_t numReaders, numWriters;
881
readerFn()882 void readerFn()
883 {
884 synchronized (mutex.reader)
885 {
886 atomicOp!"+="(numReaders, 1);
887 rdSemA.notify();
888 rdSemB.wait();
889 atomicOp!"-="(numReaders, 1);
890 }
891 }
892
writerFn()893 void writerFn()
894 {
895 synchronized (mutex.writer)
896 {
897 atomicOp!"+="(numWriters, 1);
898 wrSemA.notify();
899 wrSemB.wait();
900 atomicOp!"-="(numWriters, 1);
901 }
902 }
903
waitQueued(size_t queuedReaders,size_t queuedWriters)904 void waitQueued(size_t queuedReaders, size_t queuedWriters)
905 {
906 for (;;)
907 {
908 synchronized (mutex.m_commonMutex)
909 {
910 if (mutex.m_numQueuedReaders == queuedReaders &&
911 mutex.m_numQueuedWriters == queuedWriters)
912 break;
913 }
914 Thread.yield();
915 }
916 }
917
918 scope group = new ThreadGroup;
919
920 // 2 simultaneous readers
921 group.create(&readerFn); group.create(&readerFn);
922 rdSemA.wait(); rdSemA.wait();
923 assert(numReaders == 2);
924 rdSemB.notify(); rdSemB.notify();
925 group.joinAll();
926 assert(numReaders == 0);
927 foreach (t; group) group.remove(t);
928
929 // 1 writer at a time
930 group.create(&writerFn); group.create(&writerFn);
931 wrSemA.wait();
932 assert(!wrSemA.tryWait());
933 assert(numWriters == 1);
934 wrSemB.notify();
935 wrSemA.wait();
936 assert(numWriters == 1);
937 wrSemB.notify();
938 group.joinAll();
939 assert(numWriters == 0);
940 foreach (t; group) group.remove(t);
941
942 // reader and writer are mutually exclusive
943 group.create(&readerFn);
944 rdSemA.wait();
945 group.create(&writerFn);
946 waitQueued(0, 1);
947 assert(!wrSemA.tryWait());
948 assert(numReaders == 1 && numWriters == 0);
949 rdSemB.notify();
950 wrSemA.wait();
951 assert(numReaders == 0 && numWriters == 1);
952 wrSemB.notify();
953 group.joinAll();
954 assert(numReaders == 0 && numWriters == 0);
955 foreach (t; group) group.remove(t);
956
957 // writer and reader are mutually exclusive
958 group.create(&writerFn);
959 wrSemA.wait();
960 group.create(&readerFn);
961 waitQueued(1, 0);
962 assert(!rdSemA.tryWait());
963 assert(numReaders == 0 && numWriters == 1);
964 wrSemB.notify();
965 rdSemA.wait();
966 assert(numReaders == 1 && numWriters == 0);
967 rdSemB.notify();
968 group.joinAll();
969 assert(numReaders == 0 && numWriters == 0);
970 foreach (t; group) group.remove(t);
971
972 // policy determines whether queued reader or writers progress first
973 group.create(&writerFn);
974 wrSemA.wait();
975 group.create(&readerFn);
976 group.create(&writerFn);
977 waitQueued(1, 1);
978 assert(numReaders == 0 && numWriters == 1);
979 wrSemB.notify();
980
981 if (policy == ReadWriteMutex.Policy.PREFER_READERS)
982 {
983 rdSemA.wait();
984 assert(numReaders == 1 && numWriters == 0);
985 rdSemB.notify();
986 wrSemA.wait();
987 assert(numReaders == 0 && numWriters == 1);
988 wrSemB.notify();
989 }
990 else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
991 {
992 wrSemA.wait();
993 assert(numReaders == 0 && numWriters == 1);
994 wrSemB.notify();
995 rdSemA.wait();
996 assert(numReaders == 1 && numWriters == 0);
997 rdSemB.notify();
998 }
999 group.joinAll();
1000 assert(numReaders == 0 && numWriters == 0);
1001 foreach (t; group) group.remove(t);
1002 }
1003 runTest(ReadWriteMutex.Policy.PREFER_READERS);
1004 runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
1005 }
1006
1007 unittest
1008 {
1009 import core.atomic, core.thread;
1010 shared static ReadWriteMutex rwmutex;
1011 shared static bool threadTriedOnceToGetLock;
1012 shared static bool threadFinallyGotLock;
1013
1014 rwmutex = new shared ReadWriteMutex();
1015 atomicFence;
1016 const maxTimeAllowedForTest = dur!"seconds"(20);
1017 // Test ReadWriteMutex.Reader.tryLock(Duration).
1018 {
testReaderTryLock()1019 static void testReaderTryLock()
1020 {
1021 assert(!rwmutex.reader.tryLock(Duration.min));
1022 threadTriedOnceToGetLock.atomicStore(true);
1023 assert(rwmutex.reader.tryLock(Duration.max));
1024 threadFinallyGotLock.atomicStore(true);
1025 rwmutex.reader.unlock;
1026 }
1027 assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
1028 auto otherThread = new Thread(&testReaderTryLock).start;
1029 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
1030 Thread.yield;
1031 // We started otherThread with the writer lock held so otherThread's
1032 // first rwlock.reader.tryLock with timeout Duration.min should fail.
1033 while (!threadTriedOnceToGetLock.atomicLoad)
1034 {
1035 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1036 Thread.yield;
1037 }
1038 rwmutex.writer.unlock;
1039 // Soon after we release the writer lock otherThread's second
1040 // rwlock.reader.tryLock with timeout Duration.max should succeed.
1041 while (!threadFinallyGotLock.atomicLoad)
1042 {
1043 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1044 Thread.yield;
1045 }
1046 otherThread.join;
1047 }
1048 threadTriedOnceToGetLock.atomicStore(false); // Reset.
1049 threadFinallyGotLock.atomicStore(false); // Reset.
1050 // Test ReadWriteMutex.Writer.tryLock(Duration).
1051 {
testWriterTryLock()1052 static void testWriterTryLock()
1053 {
1054 assert(!rwmutex.writer.tryLock(Duration.min));
1055 threadTriedOnceToGetLock.atomicStore(true);
1056 assert(rwmutex.writer.tryLock(Duration.max));
1057 threadFinallyGotLock.atomicStore(true);
1058 rwmutex.writer.unlock;
1059 }
1060 assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
1061 auto otherThread = new Thread(&testWriterTryLock).start;
1062 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
1063 Thread.yield;
1064 // We started otherThread with the reader lock held so otherThread's
1065 // first rwlock.writer.tryLock with timeout Duration.min should fail.
1066 while (!threadTriedOnceToGetLock.atomicLoad)
1067 {
1068 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1069 Thread.yield;
1070 }
1071 rwmutex.reader.unlock;
1072 // Soon after we release the reader lock otherThread's second
1073 // rwlock.writer.tryLock with timeout Duration.max should succeed.
1074 while (!threadFinallyGotLock.atomicLoad)
1075 {
1076 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1077 Thread.yield;
1078 }
1079 otherThread.join;
1080 }
1081 }
1082