1 /**
2 * The semaphore module provides a general use semaphore for synchronization.
3 *
4 * Copyright: Copyright Sean Kelly 2005 - 2009.
5 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
6 * Authors: Sean Kelly
7 * Source: $(DRUNTIMESRC core/sync/_semaphore.d)
8 */
9
10 /* Copyright Sean Kelly 2005 - 2009.
11 * Distributed under the Boost Software License, Version 1.0.
12 * (See accompanying file LICENSE or copy at
13 * http://www.boost.org/LICENSE_1_0.txt)
14 */
15 module core.sync.semaphore;
16
17
18 public import core.sync.exception;
19 public import core.time;
20
21 version (OSX)
22 version = Darwin;
23 else version (iOS)
24 version = Darwin;
25 else version (TVOS)
26 version = Darwin;
27 else version (WatchOS)
28 version = Darwin;
29
version(Windows)30 version (Windows)
31 {
32 import core.sys.windows.basetsd /+: HANDLE+/;
33 import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, INFINITE,
34 ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
35 import core.sys.windows.windef /+: BOOL, DWORD+/;
36 import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
37 }
version(Darwin)38 else version (Darwin)
39 {
40 import core.sync.config;
41 import core.stdc.errno;
42 import core.sys.posix.time;
43 import core.sys.darwin.mach.semaphore;
44 }
version(Posix)45 else version (Posix)
46 {
47 import core.sync.config;
48 import core.stdc.errno;
49 import core.sys.posix.pthread;
50 import core.sys.posix.semaphore;
51 }
52 else
53 {
54 static assert(false, "Platform not supported");
55 }
56
57
58 ////////////////////////////////////////////////////////////////////////////////
59 // Semaphore
60 //
61 // void wait();
62 // void notify();
63 // bool tryWait();
64 ////////////////////////////////////////////////////////////////////////////////
65
66
67 /**
68 * This class represents a general counting semaphore as concieved by Edsger
69 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced
70 * with "notify" to indicate that control is not transferred to the waiter when
71 * a notification is sent.
72 */
73 class Semaphore
74 {
75 ////////////////////////////////////////////////////////////////////////////
76 // Initialization
77 ////////////////////////////////////////////////////////////////////////////
78
79
80 /**
81 * Initializes a semaphore object with the specified initial count.
82 *
83 * Params:
84 * count = The initial count for the semaphore.
85 *
86 * Throws:
87 * SyncError on error.
88 */
89 this( uint count = 0 )
90 {
version(Windows)91 version (Windows)
92 {
93 m_hndl = CreateSemaphoreA( null, count, int.max, null );
94 if ( m_hndl == m_hndl.init )
95 throw new SyncError( "Unable to create semaphore" );
96 }
version(Darwin)97 else version (Darwin)
98 {
99 auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count );
100 if ( rc )
101 throw new SyncError( "Unable to create semaphore" );
102 }
version(Posix)103 else version (Posix)
104 {
105 int rc = sem_init( &m_hndl, 0, count );
106 if ( rc )
107 throw new SyncError( "Unable to create semaphore" );
108 }
109 }
110
111
~this()112 ~this()
113 {
114 version (Windows)
115 {
116 BOOL rc = CloseHandle( m_hndl );
117 assert( rc, "Unable to destroy semaphore" );
118 }
119 else version (Darwin)
120 {
121 auto rc = semaphore_destroy( mach_task_self(), m_hndl );
122 assert( !rc, "Unable to destroy semaphore" );
123 }
124 else version (Posix)
125 {
126 int rc = sem_destroy( &m_hndl );
127 assert( !rc, "Unable to destroy semaphore" );
128 }
129 }
130
131
132 ////////////////////////////////////////////////////////////////////////////
133 // General Actions
134 ////////////////////////////////////////////////////////////////////////////
135
136
137 /**
138 * Wait until the current count is above zero, then atomically decrement
139 * the count by one and return.
140 *
141 * Throws:
142 * SyncError on error.
143 */
wait()144 void wait()
145 {
146 version (Windows)
147 {
148 DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
149 if ( rc != WAIT_OBJECT_0 )
150 throw new SyncError( "Unable to wait for semaphore" );
151 }
152 else version (Darwin)
153 {
154 while ( true )
155 {
156 auto rc = semaphore_wait( m_hndl );
157 if ( !rc )
158 return;
159 if ( rc == KERN_ABORTED && errno == EINTR )
160 continue;
161 throw new SyncError( "Unable to wait for semaphore" );
162 }
163 }
164 else version (Posix)
165 {
166 while ( true )
167 {
168 if ( !sem_wait( &m_hndl ) )
169 return;
170 if ( errno != EINTR )
171 throw new SyncError( "Unable to wait for semaphore" );
172 }
173 }
174 }
175
176
177 /**
178 * Suspends the calling thread until the current count moves above zero or
179 * until the supplied time period has elapsed. If the count moves above
180 * zero in this interval, then atomically decrement the count by one and
181 * return true. Otherwise, return false.
182 *
183 * Params:
184 * period = The time to wait.
185 *
186 * In:
187 * period must be non-negative.
188 *
189 * Throws:
190 * SyncError on error.
191 *
192 * Returns:
193 * true if notified before the timeout and false if not.
194 */
wait(Duration period)195 bool wait( Duration period )
196 in
197 {
198 assert( !period.isNegative );
199 }
200 do
201 {
version(Windows)202 version (Windows)
203 {
204 auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
205
206 while ( period > maxWaitMillis )
207 {
208 auto rc = WaitForSingleObject( m_hndl, cast(uint)
209 maxWaitMillis.total!"msecs" );
210 switch ( rc )
211 {
212 case WAIT_OBJECT_0:
213 return true;
214 case WAIT_TIMEOUT:
215 period -= maxWaitMillis;
216 continue;
217 default:
218 throw new SyncError( "Unable to wait for semaphore" );
219 }
220 }
221 switch ( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) )
222 {
223 case WAIT_OBJECT_0:
224 return true;
225 case WAIT_TIMEOUT:
226 return false;
227 default:
228 throw new SyncError( "Unable to wait for semaphore" );
229 }
230 }
version(Darwin)231 else version (Darwin)
232 {
233 mach_timespec_t t = void;
234 (cast(byte*) &t)[0 .. t.sizeof] = 0;
235
236 if ( period.total!"seconds" > t.tv_sec.max )
237 {
238 t.tv_sec = t.tv_sec.max;
239 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs;
240 }
241 else
242 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec);
243 while ( true )
244 {
245 auto rc = semaphore_timedwait( m_hndl, t );
246 if ( !rc )
247 return true;
248 if ( rc == KERN_OPERATION_TIMED_OUT )
249 return false;
250 if ( rc != KERN_ABORTED || errno != EINTR )
251 throw new SyncError( "Unable to wait for semaphore" );
252 }
253 }
version(Posix)254 else version (Posix)
255 {
256 import core.sys.posix.time : clock_gettime, CLOCK_REALTIME;
257
258 timespec t = void;
259 clock_gettime( CLOCK_REALTIME, &t );
260 mvtspec( t, period );
261
262 while ( true )
263 {
264 if ( !sem_timedwait( &m_hndl, &t ) )
265 return true;
266 if ( errno == ETIMEDOUT )
267 return false;
268 if ( errno != EINTR )
269 throw new SyncError( "Unable to wait for semaphore" );
270 }
271 }
272 }
273
274
275 /**
276 * Atomically increment the current count by one. This will notify one
277 * waiter, if there are any in the queue.
278 *
279 * Throws:
280 * SyncError on error.
281 */
notify()282 void notify()
283 {
284 version (Windows)
285 {
286 if ( !ReleaseSemaphore( m_hndl, 1, null ) )
287 throw new SyncError( "Unable to notify semaphore" );
288 }
289 else version (Darwin)
290 {
291 auto rc = semaphore_signal( m_hndl );
292 if ( rc )
293 throw new SyncError( "Unable to notify semaphore" );
294 }
295 else version (Posix)
296 {
297 int rc = sem_post( &m_hndl );
298 if ( rc )
299 throw new SyncError( "Unable to notify semaphore" );
300 }
301 }
302
303
304 /**
305 * If the current count is equal to zero, return. Otherwise, atomically
306 * decrement the count by one and return true.
307 *
308 * Throws:
309 * SyncError on error.
310 *
311 * Returns:
312 * true if the count was above zero and false if not.
313 */
tryWait()314 bool tryWait()
315 {
316 version (Windows)
317 {
318 switch ( WaitForSingleObject( m_hndl, 0 ) )
319 {
320 case WAIT_OBJECT_0:
321 return true;
322 case WAIT_TIMEOUT:
323 return false;
324 default:
325 throw new SyncError( "Unable to wait for semaphore" );
326 }
327 }
328 else version (Darwin)
329 {
330 return wait( dur!"hnsecs"(0) );
331 }
332 else version (Posix)
333 {
334 while ( true )
335 {
336 if ( !sem_trywait( &m_hndl ) )
337 return true;
338 if ( errno == EAGAIN )
339 return false;
340 if ( errno != EINTR )
341 throw new SyncError( "Unable to wait for semaphore" );
342 }
343 }
344 }
345
346
347 protected:
348
349 /// Aliases the operating-system-specific semaphore type.
350 version (Windows) alias Handle = HANDLE;
351 /// ditto
352 else version (Darwin) alias Handle = semaphore_t;
353 /// ditto
354 else version (Posix) alias Handle = sem_t;
355
356 /// Handle to the system-specific semaphore.
357 Handle m_hndl;
358 }
359
360
361 ////////////////////////////////////////////////////////////////////////////////
362 // Unit Tests
363 ////////////////////////////////////////////////////////////////////////////////
364
365 unittest
366 {
367 import core.thread, core.atomic;
368
testWait()369 void testWait()
370 {
371 auto semaphore = new Semaphore;
372 shared bool stopConsumption = false;
373 immutable numToProduce = 20;
374 immutable numConsumers = 10;
375 shared size_t numConsumed;
376 shared size_t numComplete;
377
378 void consumer()
379 {
380 while (true)
381 {
382 semaphore.wait();
383
384 if (atomicLoad(stopConsumption))
385 break;
386 atomicOp!"+="(numConsumed, 1);
387 }
388 atomicOp!"+="(numComplete, 1);
389 }
390
391 void producer()
392 {
393 assert(!semaphore.tryWait());
394
395 foreach (_; 0 .. numToProduce)
396 semaphore.notify();
397
398 // wait until all items are consumed
399 while (atomicLoad(numConsumed) != numToProduce)
400 Thread.yield();
401
402 // mark consumption as finished
403 atomicStore(stopConsumption, true);
404
405 // wake all consumers
406 foreach (_; 0 .. numConsumers)
407 semaphore.notify();
408
409 // wait until all consumers completed
410 while (atomicLoad(numComplete) != numConsumers)
411 Thread.yield();
412
413 assert(!semaphore.tryWait());
414 semaphore.notify();
415 assert(semaphore.tryWait());
416 assert(!semaphore.tryWait());
417 }
418
419 auto group = new ThreadGroup;
420
421 for ( int i = 0; i < numConsumers; ++i )
422 group.create(&consumer);
423 group.create(&producer);
424 group.joinAll();
425 }
426
427
testWaitTimeout()428 void testWaitTimeout()
429 {
430 auto sem = new Semaphore;
431 shared bool semReady;
432 bool alertedOne, alertedTwo;
433
434 void waiter()
435 {
436 while (!atomicLoad(semReady))
437 Thread.yield();
438 alertedOne = sem.wait(dur!"msecs"(1));
439 alertedTwo = sem.wait(dur!"msecs"(1));
440 assert(alertedOne && !alertedTwo);
441 }
442
443 auto thread = new Thread(&waiter);
444 thread.start();
445
446 sem.notify();
447 atomicStore(semReady, true);
448 thread.join();
449 assert(alertedOne && !alertedTwo);
450 }
451
452 testWait();
453 testWaitTimeout();
454 }
455