1 /**
2 * The event module provides a primitive for lightweight signaling of other threads
3 * (emulating Windows events on Posix)
4 *
5 * Copyright: Copyright (c) 2019 D Language Foundation
6 * License: Distributed under the
7 * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
8 * (See accompanying file LICENSE)
9 * Authors: Rainer Schuetze
10 * Source: $(DRUNTIMESRC core/sync/event.d)
11 */
12 module core.sync.event;
13
version(Windows)14 version (Windows)
15 {
16 import core.sys.windows.basetsd /+: HANDLE +/;
17 import core.sys.windows.winerror /+: WAIT_TIMEOUT +/;
18 import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent,
19 WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/;
20 }
version(Posix)21 else version (Posix)
22 {
23 import core.sys.posix.pthread;
24 import core.sys.posix.sys.types;
25 import core.sys.posix.time;
26 }
27 else
28 {
29 static assert(false, "Platform not supported");
30 }
31
32 import core.time;
33 import core.internal.abort : abort;
34
35 /**
36 * represents an event. Clients of an event are suspended while waiting
37 * for the event to be "signaled".
38 *
39 * Implemented using `pthread_mutex` and `pthread_condition` on Posix and
40 * `CreateEvent` and `SetEvent` on Windows.
41 ---
42 import core.sync.event, core.thread, std.file;
43
44 struct ProcessFile
45 {
46 ThreadGroup group;
47 Event event;
48 void[] buffer;
49
50 void doProcess()
51 {
52 event.wait();
53 // process buffer
54 }
55
56 void process(string filename)
57 {
58 event.initialize(true, false);
59 group = new ThreadGroup;
60 for (int i = 0; i < 10; ++i)
61 group.create(&doProcess);
62
63 buffer = std.file.read(filename);
64 event.set();
65 group.joinAll();
66 event.terminate();
67 }
68 }
69 ---
70 */
71 struct Event
72 {
73 nothrow @nogc:
74 /**
75 * Creates an event object.
76 *
77 * Params:
78 * manualReset = the state of the event is not reset automatically after resuming waiting clients
79 * initialState = initial state of the signal
80 */
thisEvent81 this(bool manualReset, bool initialState)
82 {
83 initialize(manualReset, initialState);
84 }
85
86 /**
87 * Initializes an event object. Does nothing if the event is already initialized.
88 *
89 * Params:
90 * manualReset = the state of the event is not reset automatically after resuming waiting clients
91 * initialState = initial state of the signal
92 */
initializeEvent93 void initialize(bool manualReset, bool initialState)
94 {
95 version (Windows)
96 {
97 if (m_event)
98 return;
99 m_event = CreateEvent(null, manualReset, initialState, null);
100 m_event || abort("Error: CreateEvent failed.");
101 }
102 else version (Posix)
103 {
104 if (m_initalized)
105 return;
106 pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
107 abort("Error: pthread_mutex_init failed.");
108 static if ( is( typeof( pthread_condattr_setclock ) ) )
109 {
110 pthread_condattr_t attr = void;
111 pthread_condattr_init(&attr) == 0 ||
112 abort("Error: pthread_condattr_init failed.");
113 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 ||
114 abort("Error: pthread_condattr_setclock failed.");
115 pthread_cond_init(&m_cond, &attr) == 0 ||
116 abort("Error: pthread_cond_init failed.");
117 pthread_condattr_destroy(&attr) == 0 ||
118 abort("Error: pthread_condattr_destroy failed.");
119 }
120 else
121 {
122 pthread_cond_init(&m_cond, null) == 0 ||
123 abort("Error: pthread_cond_init failed.");
124 }
125 m_state = initialState;
126 m_manualReset = manualReset;
127 m_initalized = true;
128 }
129 }
130
131 // copying not allowed, can produce resource leaks
132 @disable this(this);
133 @disable void opAssign(Event);
134
~thisEvent135 ~this()
136 {
137 terminate();
138 }
139
140 /**
141 * deinitialize event. Does nothing if the event is not initialized. There must not be
142 * threads currently waiting for the event to be signaled.
143 */
terminateEvent144 void terminate()
145 {
146 version (Windows)
147 {
148 if (m_event)
149 CloseHandle(m_event);
150 m_event = null;
151 }
152 else version (Posix)
153 {
154 if (m_initalized)
155 {
156 pthread_mutex_destroy(&m_mutex) == 0 ||
157 abort("Error: pthread_mutex_destroy failed.");
158 pthread_cond_destroy(&m_cond) == 0 ||
159 abort("Error: pthread_cond_destroy failed.");
160 m_initalized = false;
161 }
162 }
163 }
164
165
166 /// Set the event to "signaled", so that waiting clients are resumed
setEvent167 void set()
168 {
169 version (Windows)
170 {
171 if (m_event)
172 SetEvent(m_event);
173 }
174 else version (Posix)
175 {
176 if (m_initalized)
177 {
178 pthread_mutex_lock(&m_mutex);
179 m_state = true;
180 pthread_cond_broadcast(&m_cond);
181 pthread_mutex_unlock(&m_mutex);
182 }
183 }
184 }
185
186 /// Reset the event manually
resetEvent187 void reset()
188 {
189 version (Windows)
190 {
191 if (m_event)
192 ResetEvent(m_event);
193 }
194 else version (Posix)
195 {
196 if (m_initalized)
197 {
198 pthread_mutex_lock(&m_mutex);
199 m_state = false;
200 pthread_mutex_unlock(&m_mutex);
201 }
202 }
203 }
204
205 /**
206 * Wait for the event to be signaled without timeout.
207 *
208 * Returns:
209 * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
210 */
waitEvent211 bool wait()
212 {
213 version (Windows)
214 {
215 return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0;
216 }
217 else version (Posix)
218 {
219 return wait(Duration.max);
220 }
221 }
222
223 /**
224 * Wait for the event to be signaled with timeout.
225 *
226 * Params:
227 * tmout = the maximum time to wait
228 * Returns:
229 * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
230 * the event is uninitialized or another error occured
231 */
waitEvent232 bool wait(Duration tmout)
233 {
234 version (Windows)
235 {
236 if (!m_event)
237 return false;
238
239 auto maxWaitMillis = dur!("msecs")(uint.max - 1);
240
241 while (tmout > maxWaitMillis)
242 {
243 auto res = WaitForSingleObject(m_event, uint.max - 1);
244 if (res != WAIT_TIMEOUT)
245 return res == WAIT_OBJECT_0;
246 tmout -= maxWaitMillis;
247 }
248 auto ms = cast(uint)(tmout.total!"msecs");
249 return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0;
250 }
251 else version (Posix)
252 {
253 if (!m_initalized)
254 return false;
255
256 pthread_mutex_lock(&m_mutex);
257
258 int result = 0;
259 if (!m_state)
260 {
261 if (tmout == Duration.max)
262 {
263 result = pthread_cond_wait(&m_cond, &m_mutex);
264 }
265 else
266 {
267 import core.sync.config;
268
269 timespec t = void;
270 mktspec(t, tmout);
271
272 result = pthread_cond_timedwait(&m_cond, &m_mutex, &t);
273 }
274 }
275 if (result == 0 && !m_manualReset)
276 m_state = false;
277
278 pthread_mutex_unlock(&m_mutex);
279
280 return result == 0;
281 }
282 }
283
284 private:
versionEvent285 version (Windows)
286 {
287 HANDLE m_event;
288 }
versionEvent289 else version (Posix)
290 {
291 pthread_mutex_t m_mutex;
292 pthread_cond_t m_cond;
293 bool m_initalized;
294 bool m_state;
295 bool m_manualReset;
296 }
297 }
298
299 // Test single-thread (non-shared) use.
300 @nogc nothrow unittest
301 {
302 // auto-reset, initial state false
303 Event ev1 = Event(false, false);
304 assert(!ev1.wait(1.dur!"msecs"));
305 ev1.set();
306 assert(ev1.wait());
307 assert(!ev1.wait(1.dur!"msecs"));
308
309 // manual-reset, initial state true
310 Event ev2 = Event(true, true);
311 assert(ev2.wait());
312 assert(ev2.wait());
313 ev2.reset();
314 assert(!ev2.wait(1.dur!"msecs"));
315 }
316
317 unittest
318 {
319 import core.thread, core.atomic;
320
321 scope event = new Event(true, false);
322 int numThreads = 10;
323 shared int numRunning = 0;
324
testFn()325 void testFn()
326 {
327 event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
328 numRunning.atomicOp!"+="(1);
329 }
330
331 auto group = new ThreadGroup;
332
333 for (int i = 0; i < numThreads; ++i)
334 group.create(&testFn);
335
336 auto start = MonoTime.currTime;
337 assert(numRunning == 0);
338
339 event.set();
340 group.joinAll();
341
342 assert(numRunning == numThreads);
343
344 assert(MonoTime.currTime - start < 5.dur!"seconds");
345 }
346