1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include <sys/cdefs.h> 28 /* __FBSDID("$FreeBSD: head/cddl/compat/opensolaris/misc/thread_pool.c 275595 2014-12-08 06:10:47Z delphij $"); */ 29 30 #include <stdlib.h> 31 #include <signal.h> 32 #include <errno.h> 33 #include "thread_pool_impl.h" 34 35 typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */ 36 37 static void 38 delete_pool(tpool_t *tpool) 39 { 40 tpool_job_t *job; 41 42 /* 43 * There should be no pending jobs, but just in case... 44 */ 45 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 46 tpool->tp_head = job->tpj_next; 47 free(job); 48 } 49 (void) pthread_attr_destroy(&tpool->tp_attr); 50 free(tpool); 51 } 52 53 /* 54 * Worker thread is terminating. 55 */ 56 static void 57 worker_cleanup(void *arg) 58 { 59 tpool_t *tpool = arg; 60 61 if (--tpool->tp_current == 0 && 62 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 63 if (tpool->tp_flags & TP_ABANDON) { 64 pthread_mutex_unlock(&tpool->tp_mutex); 65 delete_pool(tpool); 66 return; 67 } 68 if (tpool->tp_flags & TP_DESTROY) 69 (void) pthread_cond_broadcast(&tpool->tp_busycv); 70 } 71 pthread_mutex_unlock(&tpool->tp_mutex); 72 } 73 74 static void 75 notify_waiters(tpool_t *tpool) 76 { 77 if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 78 tpool->tp_flags &= ~TP_WAIT; 79 (void) pthread_cond_broadcast(&tpool->tp_waitcv); 80 } 81 } 82 83 /* 84 * Called by a worker thread on return from a tpool_dispatch()d job. 85 */ 86 static void 87 job_cleanup(void *arg) 88 { 89 tpool_t *tpool = arg; 90 pthread_t my_tid = pthread_self(); 91 tpool_active_t *activep; 92 tpool_active_t **activepp; 93 94 pthread_mutex_lock(&tpool->tp_mutex); 95 /* CSTYLED */ 96 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) { 97 activep = *activepp; 98 if (activep->tpa_tid == my_tid) { 99 *activepp = activep->tpa_next; 100 break; 101 } 102 } 103 if (tpool->tp_flags & TP_WAIT) 104 notify_waiters(tpool); 105 } 106 107 static void * 108 tpool_worker(void *arg) 109 { 110 tpool_t *tpool = (tpool_t *)arg; 111 int elapsed; 112 tpool_job_t *job; 113 void (*func)(void *); 114 tpool_active_t active; 115 sigset_t maskset; 116 117 pthread_mutex_lock(&tpool->tp_mutex); 118 pthread_cleanup_push(worker_cleanup, tpool); 119 120 /* 121 * This is the worker's main loop. 122 * It will only be left if a timeout or an error has occured. 123 */ 124 active.tpa_tid = pthread_self(); 125 for (;;) { 126 elapsed = 0; 127 tpool->tp_idle++; 128 if (tpool->tp_flags & TP_WAIT) 129 notify_waiters(tpool); 130 while ((tpool->tp_head == NULL || 131 (tpool->tp_flags & TP_SUSPEND)) && 132 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 133 if (tpool->tp_current <= tpool->tp_minimum || 134 tpool->tp_linger == 0) { 135 (void) pthread_cond_wait(&tpool->tp_workcv, 136 &tpool->tp_mutex); 137 } else { 138 struct timespec timeout; 139 140 clock_gettime(CLOCK_MONOTONIC, &timeout); 141 timeout.tv_sec += tpool->tp_linger; 142 if (pthread_cond_timedwait(&tpool->tp_workcv, 143 &tpool->tp_mutex, &timeout) != 0) { 144 elapsed = 1; 145 break; 146 } 147 } 148 } 149 tpool->tp_idle--; 150 if (tpool->tp_flags & TP_DESTROY) 151 break; 152 if (tpool->tp_flags & TP_ABANDON) { 153 /* can't abandon a suspended pool */ 154 if (tpool->tp_flags & TP_SUSPEND) { 155 tpool->tp_flags &= ~TP_SUSPEND; 156 (void) pthread_cond_broadcast(&tpool->tp_workcv); 157 } 158 if (tpool->tp_head == NULL) 159 break; 160 } 161 if ((job = tpool->tp_head) != NULL && 162 !(tpool->tp_flags & TP_SUSPEND)) { 163 elapsed = 0; 164 func = job->tpj_func; 165 arg = job->tpj_arg; 166 tpool->tp_head = job->tpj_next; 167 if (job == tpool->tp_tail) 168 tpool->tp_tail = NULL; 169 tpool->tp_njobs--; 170 active.tpa_next = tpool->tp_active; 171 tpool->tp_active = &active; 172 pthread_mutex_unlock(&tpool->tp_mutex); 173 pthread_cleanup_push(job_cleanup, tpool); 174 free(job); 175 /* 176 * Call the specified function. 177 */ 178 func(arg); 179 /* 180 * We don't know what this thread has been doing, 181 * so we reset its signal mask and cancellation 182 * state back to the initial values. 183 */ 184 sigfillset(&maskset); 185 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 186 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 187 NULL); 188 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 189 NULL); 190 pthread_cleanup_pop(1); 191 } 192 if (elapsed && tpool->tp_current > tpool->tp_minimum) { 193 /* 194 * We timed out and there is no work to be done 195 * and the number of workers exceeds the minimum. 196 * Exit now to reduce the size of the pool. 197 */ 198 break; 199 } 200 } 201 pthread_cleanup_pop(1); 202 return (arg); 203 } 204 205 /* 206 * Create a worker thread, with all signals blocked. 207 */ 208 static int 209 create_worker(tpool_t *tpool) 210 { 211 sigset_t maskset, oset; 212 pthread_t thread; 213 int error; 214 215 sigfillset(&maskset); 216 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 217 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); 218 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 219 return (error); 220 } 221 222 tpool_t * 223 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 224 pthread_attr_t *attr) 225 { 226 tpool_t *tpool; 227 int error; 228 229 if (min_threads > max_threads || max_threads < 1) { 230 errno = EINVAL; 231 return (NULL); 232 } 233 234 tpool = calloc(1, sizeof (*tpool)); 235 if (tpool == NULL) { 236 errno = ENOMEM; 237 return (NULL); 238 } 239 (void) pthread_mutex_init(&tpool->tp_mutex, NULL); 240 (void) pthread_cond_init(&tpool->tp_busycv, NULL); 241 (void) pthread_cond_init(&tpool->tp_workcv, NULL); 242 (void) pthread_cond_init(&tpool->tp_waitcv, NULL); 243 tpool->tp_minimum = min_threads; 244 tpool->tp_maximum = max_threads; 245 tpool->tp_linger = linger; 246 247 /* make all pool threads be detached daemon threads */ 248 (void) pthread_attr_init(&tpool->tp_attr); 249 (void) pthread_attr_setdetachstate(&tpool->tp_attr, 250 PTHREAD_CREATE_DETACHED); 251 252 return (tpool); 253 } 254 255 /* 256 * Dispatch a work request to the thread pool. 257 * If there are idle workers, awaken one. 258 * Else, if the maximum number of workers has 259 * not been reached, spawn a new worker thread. 260 * Else just return with the job added to the queue. 261 */ 262 int 263 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 264 { 265 tpool_job_t *job; 266 267 if ((job = calloc(1, sizeof (*job))) == NULL) 268 return (-1); 269 job->tpj_next = NULL; 270 job->tpj_func = func; 271 job->tpj_arg = arg; 272 273 pthread_mutex_lock(&tpool->tp_mutex); 274 275 if (tpool->tp_head == NULL) 276 tpool->tp_head = job; 277 else 278 tpool->tp_tail->tpj_next = job; 279 tpool->tp_tail = job; 280 tpool->tp_njobs++; 281 282 if (!(tpool->tp_flags & TP_SUSPEND)) { 283 if (tpool->tp_idle > 0) 284 (void) pthread_cond_signal(&tpool->tp_workcv); 285 else if (tpool->tp_current < tpool->tp_maximum && 286 create_worker(tpool) == 0) 287 tpool->tp_current++; 288 } 289 290 pthread_mutex_unlock(&tpool->tp_mutex); 291 return (0); 292 } 293 294 /* 295 * Assumes: by the time tpool_destroy() is called no one will use this 296 * thread pool in any way and no one will try to dispatch entries to it. 297 * Calling tpool_destroy() from a job in the pool will cause deadlock. 298 */ 299 void 300 tpool_destroy(tpool_t *tpool) 301 { 302 tpool_active_t *activep; 303 304 pthread_mutex_lock(&tpool->tp_mutex); 305 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex); 306 307 /* mark the pool as being destroyed; wakeup idle workers */ 308 tpool->tp_flags |= TP_DESTROY; 309 tpool->tp_flags &= ~TP_SUSPEND; 310 (void) pthread_cond_broadcast(&tpool->tp_workcv); 311 312 /* cancel all active workers */ 313 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 314 (void) pthread_cancel(activep->tpa_tid); 315 316 /* wait for all active workers to finish */ 317 while (tpool->tp_active != NULL) { 318 tpool->tp_flags |= TP_WAIT; 319 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 320 } 321 322 /* the last worker to terminate will wake us up */ 323 while (tpool->tp_current != 0) 324 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 325 326 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 327 delete_pool(tpool); 328 } 329 330 /* 331 * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 332 * The last worker to terminate will delete the pool. 333 */ 334 void 335 tpool_abandon(tpool_t *tpool) 336 { 337 338 pthread_mutex_lock(&tpool->tp_mutex); 339 if (tpool->tp_current == 0) { 340 /* no workers, just delete the pool */ 341 pthread_mutex_unlock(&tpool->tp_mutex); 342 delete_pool(tpool); 343 } else { 344 /* wake up all workers, last one will delete the pool */ 345 tpool->tp_flags |= TP_ABANDON; 346 tpool->tp_flags &= ~TP_SUSPEND; 347 (void) pthread_cond_broadcast(&tpool->tp_workcv); 348 pthread_mutex_unlock(&tpool->tp_mutex); 349 } 350 } 351 352 /* 353 * Wait for all jobs to complete. 354 * Calling tpool_wait() from a job in the pool will cause deadlock. 355 */ 356 void 357 tpool_wait(tpool_t *tpool) 358 { 359 360 pthread_mutex_lock(&tpool->tp_mutex); 361 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex); 362 while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 363 tpool->tp_flags |= TP_WAIT; 364 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 365 } 366 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 367 } 368 369 void 370 tpool_suspend(tpool_t *tpool) 371 { 372 373 pthread_mutex_lock(&tpool->tp_mutex); 374 tpool->tp_flags |= TP_SUSPEND; 375 pthread_mutex_unlock(&tpool->tp_mutex); 376 } 377 378 int 379 tpool_suspended(tpool_t *tpool) 380 { 381 int suspended; 382 383 pthread_mutex_lock(&tpool->tp_mutex); 384 suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 385 pthread_mutex_unlock(&tpool->tp_mutex); 386 387 return (suspended); 388 } 389 390 void 391 tpool_resume(tpool_t *tpool) 392 { 393 int excess; 394 395 pthread_mutex_lock(&tpool->tp_mutex); 396 if (!(tpool->tp_flags & TP_SUSPEND)) { 397 pthread_mutex_unlock(&tpool->tp_mutex); 398 return; 399 } 400 tpool->tp_flags &= ~TP_SUSPEND; 401 (void) pthread_cond_broadcast(&tpool->tp_workcv); 402 excess = tpool->tp_njobs - tpool->tp_idle; 403 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 404 if (create_worker(tpool) != 0) 405 break; /* pthread_create() failed */ 406 tpool->tp_current++; 407 } 408 pthread_mutex_unlock(&tpool->tp_mutex); 409 } 410 411 int 412 tpool_member(tpool_t *tpool) 413 { 414 pthread_t my_tid = pthread_self(); 415 tpool_active_t *activep; 416 417 pthread_mutex_lock(&tpool->tp_mutex); 418 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 419 if (activep->tpa_tid == my_tid) { 420 pthread_mutex_unlock(&tpool->tp_mutex); 421 return (1); 422 } 423 } 424 pthread_mutex_unlock(&tpool->tp_mutex); 425 return (0); 426 } 427