1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #include <sys/cdefs.h>
28 /* __FBSDID("$FreeBSD: head/cddl/compat/opensolaris/misc/thread_pool.c 275595 2014-12-08 06:10:47Z delphij $"); */
29
30 #include <stdlib.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include "thread_pool_impl.h"
34
35 typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */
36
37 static void
delete_pool(tpool_t * tpool)38 delete_pool(tpool_t *tpool)
39 {
40 tpool_job_t *job;
41
42 /*
43 * There should be no pending jobs, but just in case...
44 */
45 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
46 tpool->tp_head = job->tpj_next;
47 free(job);
48 }
49 (void) pthread_attr_destroy(&tpool->tp_attr);
50 free(tpool);
51 }
52
53 /*
54 * Worker thread is terminating.
55 */
56 static void
worker_cleanup(void * arg)57 worker_cleanup(void *arg)
58 {
59 tpool_t *tpool = arg;
60
61 if (--tpool->tp_current == 0 &&
62 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
63 if (tpool->tp_flags & TP_ABANDON) {
64 pthread_mutex_unlock(&tpool->tp_mutex);
65 delete_pool(tpool);
66 return;
67 }
68 if (tpool->tp_flags & TP_DESTROY)
69 (void) pthread_cond_broadcast(&tpool->tp_busycv);
70 }
71 pthread_mutex_unlock(&tpool->tp_mutex);
72 }
73
74 static void
notify_waiters(tpool_t * tpool)75 notify_waiters(tpool_t *tpool)
76 {
77 if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
78 tpool->tp_flags &= ~TP_WAIT;
79 (void) pthread_cond_broadcast(&tpool->tp_waitcv);
80 }
81 }
82
83 /*
84 * Called by a worker thread on return from a tpool_dispatch()d job.
85 */
86 static void
job_cleanup(void * arg)87 job_cleanup(void *arg)
88 {
89 tpool_t *tpool = arg;
90 pthread_t my_tid = pthread_self();
91 tpool_active_t *activep;
92 tpool_active_t **activepp;
93
94 pthread_mutex_lock(&tpool->tp_mutex);
95 /* CSTYLED */
96 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
97 activep = *activepp;
98 if (activep->tpa_tid == my_tid) {
99 *activepp = activep->tpa_next;
100 break;
101 }
102 }
103 if (tpool->tp_flags & TP_WAIT)
104 notify_waiters(tpool);
105 }
106
107 static void *
tpool_worker(void * arg)108 tpool_worker(void *arg)
109 {
110 tpool_t *tpool = (tpool_t *)arg;
111 int elapsed;
112 tpool_job_t *job;
113 void (*func)(void *);
114 tpool_active_t active;
115 sigset_t maskset;
116
117 pthread_mutex_lock(&tpool->tp_mutex);
118 pthread_cleanup_push(worker_cleanup, tpool);
119
120 /*
121 * This is the worker's main loop.
122 * It will only be left if a timeout or an error has occured.
123 */
124 active.tpa_tid = pthread_self();
125 for (;;) {
126 elapsed = 0;
127 tpool->tp_idle++;
128 if (tpool->tp_flags & TP_WAIT)
129 notify_waiters(tpool);
130 while ((tpool->tp_head == NULL ||
131 (tpool->tp_flags & TP_SUSPEND)) &&
132 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
133 if (tpool->tp_current <= tpool->tp_minimum ||
134 tpool->tp_linger == 0) {
135 (void) pthread_cond_wait(&tpool->tp_workcv,
136 &tpool->tp_mutex);
137 } else {
138 struct timespec timeout;
139
140 clock_gettime(CLOCK_MONOTONIC, &timeout);
141 timeout.tv_sec += tpool->tp_linger;
142 if (pthread_cond_timedwait(&tpool->tp_workcv,
143 &tpool->tp_mutex, &timeout) != 0) {
144 elapsed = 1;
145 break;
146 }
147 }
148 }
149 tpool->tp_idle--;
150 if (tpool->tp_flags & TP_DESTROY)
151 break;
152 if (tpool->tp_flags & TP_ABANDON) {
153 /* can't abandon a suspended pool */
154 if (tpool->tp_flags & TP_SUSPEND) {
155 tpool->tp_flags &= ~TP_SUSPEND;
156 (void) pthread_cond_broadcast(&tpool->tp_workcv);
157 }
158 if (tpool->tp_head == NULL)
159 break;
160 }
161 if ((job = tpool->tp_head) != NULL &&
162 !(tpool->tp_flags & TP_SUSPEND)) {
163 elapsed = 0;
164 func = job->tpj_func;
165 arg = job->tpj_arg;
166 tpool->tp_head = job->tpj_next;
167 if (job == tpool->tp_tail)
168 tpool->tp_tail = NULL;
169 tpool->tp_njobs--;
170 active.tpa_next = tpool->tp_active;
171 tpool->tp_active = &active;
172 pthread_mutex_unlock(&tpool->tp_mutex);
173 pthread_cleanup_push(job_cleanup, tpool);
174 free(job);
175 /*
176 * Call the specified function.
177 */
178 func(arg);
179 /*
180 * We don't know what this thread has been doing,
181 * so we reset its signal mask and cancellation
182 * state back to the initial values.
183 */
184 sigfillset(&maskset);
185 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
186 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
187 NULL);
188 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
189 NULL);
190 pthread_cleanup_pop(1);
191 }
192 if (elapsed && tpool->tp_current > tpool->tp_minimum) {
193 /*
194 * We timed out and there is no work to be done
195 * and the number of workers exceeds the minimum.
196 * Exit now to reduce the size of the pool.
197 */
198 break;
199 }
200 }
201 pthread_cleanup_pop(1);
202 return (arg);
203 }
204
205 /*
206 * Create a worker thread, with all signals blocked.
207 */
208 static int
create_worker(tpool_t * tpool)209 create_worker(tpool_t *tpool)
210 {
211 sigset_t maskset, oset;
212 pthread_t thread;
213 int error;
214
215 sigfillset(&maskset);
216 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
217 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
218 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
219 return (error);
220 }
221
222 tpool_t *
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)223 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
224 pthread_attr_t *attr)
225 {
226 tpool_t *tpool;
227 int error;
228
229 if (min_threads > max_threads || max_threads < 1) {
230 errno = EINVAL;
231 return (NULL);
232 }
233
234 tpool = calloc(1, sizeof (*tpool));
235 if (tpool == NULL) {
236 errno = ENOMEM;
237 return (NULL);
238 }
239 (void) pthread_mutex_init(&tpool->tp_mutex, NULL);
240 (void) pthread_cond_init(&tpool->tp_busycv, NULL);
241 (void) pthread_cond_init(&tpool->tp_workcv, NULL);
242 (void) pthread_cond_init(&tpool->tp_waitcv, NULL);
243 tpool->tp_minimum = min_threads;
244 tpool->tp_maximum = max_threads;
245 tpool->tp_linger = linger;
246
247 /* make all pool threads be detached daemon threads */
248 (void) pthread_attr_init(&tpool->tp_attr);
249 (void) pthread_attr_setdetachstate(&tpool->tp_attr,
250 PTHREAD_CREATE_DETACHED);
251
252 return (tpool);
253 }
254
255 /*
256 * Dispatch a work request to the thread pool.
257 * If there are idle workers, awaken one.
258 * Else, if the maximum number of workers has
259 * not been reached, spawn a new worker thread.
260 * Else just return with the job added to the queue.
261 */
262 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)263 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
264 {
265 tpool_job_t *job;
266
267 if ((job = calloc(1, sizeof (*job))) == NULL)
268 return (-1);
269 job->tpj_next = NULL;
270 job->tpj_func = func;
271 job->tpj_arg = arg;
272
273 pthread_mutex_lock(&tpool->tp_mutex);
274
275 if (tpool->tp_head == NULL)
276 tpool->tp_head = job;
277 else
278 tpool->tp_tail->tpj_next = job;
279 tpool->tp_tail = job;
280 tpool->tp_njobs++;
281
282 if (!(tpool->tp_flags & TP_SUSPEND)) {
283 if (tpool->tp_idle > 0)
284 (void) pthread_cond_signal(&tpool->tp_workcv);
285 else if (tpool->tp_current < tpool->tp_maximum &&
286 create_worker(tpool) == 0)
287 tpool->tp_current++;
288 }
289
290 pthread_mutex_unlock(&tpool->tp_mutex);
291 return (0);
292 }
293
294 /*
295 * Assumes: by the time tpool_destroy() is called no one will use this
296 * thread pool in any way and no one will try to dispatch entries to it.
297 * Calling tpool_destroy() from a job in the pool will cause deadlock.
298 */
299 void
tpool_destroy(tpool_t * tpool)300 tpool_destroy(tpool_t *tpool)
301 {
302 tpool_active_t *activep;
303
304 pthread_mutex_lock(&tpool->tp_mutex);
305 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
306
307 /* mark the pool as being destroyed; wakeup idle workers */
308 tpool->tp_flags |= TP_DESTROY;
309 tpool->tp_flags &= ~TP_SUSPEND;
310 (void) pthread_cond_broadcast(&tpool->tp_workcv);
311
312 /* cancel all active workers */
313 for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
314 (void) pthread_cancel(activep->tpa_tid);
315
316 /* wait for all active workers to finish */
317 while (tpool->tp_active != NULL) {
318 tpool->tp_flags |= TP_WAIT;
319 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
320 }
321
322 /* the last worker to terminate will wake us up */
323 while (tpool->tp_current != 0)
324 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
325
326 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
327 delete_pool(tpool);
328 }
329
330 /*
331 * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
332 * The last worker to terminate will delete the pool.
333 */
334 void
tpool_abandon(tpool_t * tpool)335 tpool_abandon(tpool_t *tpool)
336 {
337
338 pthread_mutex_lock(&tpool->tp_mutex);
339 if (tpool->tp_current == 0) {
340 /* no workers, just delete the pool */
341 pthread_mutex_unlock(&tpool->tp_mutex);
342 delete_pool(tpool);
343 } else {
344 /* wake up all workers, last one will delete the pool */
345 tpool->tp_flags |= TP_ABANDON;
346 tpool->tp_flags &= ~TP_SUSPEND;
347 (void) pthread_cond_broadcast(&tpool->tp_workcv);
348 pthread_mutex_unlock(&tpool->tp_mutex);
349 }
350 }
351
352 /*
353 * Wait for all jobs to complete.
354 * Calling tpool_wait() from a job in the pool will cause deadlock.
355 */
356 void
tpool_wait(tpool_t * tpool)357 tpool_wait(tpool_t *tpool)
358 {
359
360 pthread_mutex_lock(&tpool->tp_mutex);
361 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
362 while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
363 tpool->tp_flags |= TP_WAIT;
364 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
365 }
366 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
367 }
368
369 void
tpool_suspend(tpool_t * tpool)370 tpool_suspend(tpool_t *tpool)
371 {
372
373 pthread_mutex_lock(&tpool->tp_mutex);
374 tpool->tp_flags |= TP_SUSPEND;
375 pthread_mutex_unlock(&tpool->tp_mutex);
376 }
377
378 int
tpool_suspended(tpool_t * tpool)379 tpool_suspended(tpool_t *tpool)
380 {
381 int suspended;
382
383 pthread_mutex_lock(&tpool->tp_mutex);
384 suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
385 pthread_mutex_unlock(&tpool->tp_mutex);
386
387 return (suspended);
388 }
389
390 void
tpool_resume(tpool_t * tpool)391 tpool_resume(tpool_t *tpool)
392 {
393 int excess;
394
395 pthread_mutex_lock(&tpool->tp_mutex);
396 if (!(tpool->tp_flags & TP_SUSPEND)) {
397 pthread_mutex_unlock(&tpool->tp_mutex);
398 return;
399 }
400 tpool->tp_flags &= ~TP_SUSPEND;
401 (void) pthread_cond_broadcast(&tpool->tp_workcv);
402 excess = tpool->tp_njobs - tpool->tp_idle;
403 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
404 if (create_worker(tpool) != 0)
405 break; /* pthread_create() failed */
406 tpool->tp_current++;
407 }
408 pthread_mutex_unlock(&tpool->tp_mutex);
409 }
410
411 int
tpool_member(tpool_t * tpool)412 tpool_member(tpool_t *tpool)
413 {
414 pthread_t my_tid = pthread_self();
415 tpool_active_t *activep;
416
417 pthread_mutex_lock(&tpool->tp_mutex);
418 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
419 if (activep->tpa_tid == my_tid) {
420 pthread_mutex_unlock(&tpool->tp_mutex);
421 return (1);
422 }
423 }
424 pthread_mutex_unlock(&tpool->tp_mutex);
425 return (0);
426 }
427