1eda14cbcSMatt Macy /* 2eda14cbcSMatt Macy * CDDL HEADER START 3eda14cbcSMatt Macy * 4eda14cbcSMatt Macy * The contents of this file are subject to the terms of the 5eda14cbcSMatt Macy * Common Development and Distribution License (the "License"). 6eda14cbcSMatt Macy * You may not use this file except in compliance with the License. 7eda14cbcSMatt Macy * 8eda14cbcSMatt Macy * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9271171e0SMartin Matuska * or https://opensource.org/licenses/CDDL-1.0. 10eda14cbcSMatt Macy * See the License for the specific language governing permissions 11eda14cbcSMatt Macy * and limitations under the License. 12eda14cbcSMatt Macy * 13eda14cbcSMatt Macy * When distributing Covered Code, include this CDDL HEADER in each 14eda14cbcSMatt Macy * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15eda14cbcSMatt Macy * If applicable, add the following below this CDDL HEADER, with the 16eda14cbcSMatt Macy * fields enclosed by brackets "[]" replaced with your own identifying 17eda14cbcSMatt Macy * information: Portions Copyright [yyyy] [name of copyright owner] 18eda14cbcSMatt Macy * 19eda14cbcSMatt Macy * CDDL HEADER END 20eda14cbcSMatt Macy */ 21eda14cbcSMatt Macy 22eda14cbcSMatt Macy /* 23eda14cbcSMatt Macy * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24eda14cbcSMatt Macy * Use is subject to license terms. 25eda14cbcSMatt Macy */ 26eda14cbcSMatt Macy 27eda14cbcSMatt Macy #include <stdlib.h> 28eda14cbcSMatt Macy #include <signal.h> 29eda14cbcSMatt Macy #include <errno.h> 30eda14cbcSMatt Macy #include <assert.h> 31*bb2d13b6SMartin Matuska #include <limits.h> 32eda14cbcSMatt Macy #include "thread_pool_impl.h" 33eda14cbcSMatt Macy 34eda14cbcSMatt Macy static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER; 35eda14cbcSMatt Macy static tpool_t *thread_pools = NULL; 36eda14cbcSMatt Macy 37eda14cbcSMatt Macy static void 38eda14cbcSMatt Macy delete_pool(tpool_t *tpool) 39eda14cbcSMatt Macy { 40eda14cbcSMatt Macy tpool_job_t *job; 41eda14cbcSMatt Macy 42eda14cbcSMatt Macy ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL); 43eda14cbcSMatt Macy 44eda14cbcSMatt Macy /* 45eda14cbcSMatt Macy * Unlink the pool from the global list of all pools. 46eda14cbcSMatt Macy */ 47eda14cbcSMatt Macy (void) pthread_mutex_lock(&thread_pool_lock); 48eda14cbcSMatt Macy if (thread_pools == tpool) 49eda14cbcSMatt Macy thread_pools = tpool->tp_forw; 50eda14cbcSMatt Macy if (thread_pools == tpool) 51eda14cbcSMatt Macy thread_pools = NULL; 52eda14cbcSMatt Macy else { 53eda14cbcSMatt Macy tpool->tp_back->tp_forw = tpool->tp_forw; 54eda14cbcSMatt Macy tpool->tp_forw->tp_back = tpool->tp_back; 55eda14cbcSMatt Macy } 56eda14cbcSMatt Macy pthread_mutex_unlock(&thread_pool_lock); 57eda14cbcSMatt Macy 58eda14cbcSMatt Macy /* 59eda14cbcSMatt Macy * There should be no pending jobs, but just in case... 60eda14cbcSMatt Macy */ 61eda14cbcSMatt Macy for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 62eda14cbcSMatt Macy tpool->tp_head = job->tpj_next; 63eda14cbcSMatt Macy free(job); 64eda14cbcSMatt Macy } 65eda14cbcSMatt Macy (void) pthread_attr_destroy(&tpool->tp_attr); 66eda14cbcSMatt Macy free(tpool); 67eda14cbcSMatt Macy } 68eda14cbcSMatt Macy 69eda14cbcSMatt Macy /* 70eda14cbcSMatt Macy * Worker thread is terminating. 71eda14cbcSMatt Macy */ 72eda14cbcSMatt Macy static void 73eda14cbcSMatt Macy worker_cleanup(void *arg) 74eda14cbcSMatt Macy { 75eda14cbcSMatt Macy tpool_t *tpool = (tpool_t *)arg; 76eda14cbcSMatt Macy 77eda14cbcSMatt Macy if (--tpool->tp_current == 0 && 78eda14cbcSMatt Macy (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 79eda14cbcSMatt Macy if (tpool->tp_flags & TP_ABANDON) { 80eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 81eda14cbcSMatt Macy delete_pool(tpool); 82eda14cbcSMatt Macy return; 83eda14cbcSMatt Macy } 84eda14cbcSMatt Macy if (tpool->tp_flags & TP_DESTROY) 85eda14cbcSMatt Macy (void) pthread_cond_broadcast(&tpool->tp_busycv); 86eda14cbcSMatt Macy } 87eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 88eda14cbcSMatt Macy } 89eda14cbcSMatt Macy 90eda14cbcSMatt Macy static void 91eda14cbcSMatt Macy notify_waiters(tpool_t *tpool) 92eda14cbcSMatt Macy { 93eda14cbcSMatt Macy if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 94eda14cbcSMatt Macy tpool->tp_flags &= ~TP_WAIT; 95eda14cbcSMatt Macy (void) pthread_cond_broadcast(&tpool->tp_waitcv); 96eda14cbcSMatt Macy } 97eda14cbcSMatt Macy } 98eda14cbcSMatt Macy 99eda14cbcSMatt Macy /* 100eda14cbcSMatt Macy * Called by a worker thread on return from a tpool_dispatch()d job. 101eda14cbcSMatt Macy */ 102eda14cbcSMatt Macy static void 103eda14cbcSMatt Macy job_cleanup(void *arg) 104eda14cbcSMatt Macy { 105eda14cbcSMatt Macy tpool_t *tpool = (tpool_t *)arg; 106eda14cbcSMatt Macy 107eda14cbcSMatt Macy pthread_t my_tid = pthread_self(); 108eda14cbcSMatt Macy tpool_active_t *activep; 109eda14cbcSMatt Macy tpool_active_t **activepp; 110eda14cbcSMatt Macy 111eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 112eda14cbcSMatt Macy for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) { 113eda14cbcSMatt Macy activep = *activepp; 114eda14cbcSMatt Macy if (activep->tpa_tid == my_tid) { 115eda14cbcSMatt Macy *activepp = activep->tpa_next; 116eda14cbcSMatt Macy break; 117eda14cbcSMatt Macy } 118eda14cbcSMatt Macy } 119eda14cbcSMatt Macy if (tpool->tp_flags & TP_WAIT) 120eda14cbcSMatt Macy notify_waiters(tpool); 121eda14cbcSMatt Macy } 122eda14cbcSMatt Macy 123eda14cbcSMatt Macy static void * 124eda14cbcSMatt Macy tpool_worker(void *arg) 125eda14cbcSMatt Macy { 126eda14cbcSMatt Macy tpool_t *tpool = (tpool_t *)arg; 127eda14cbcSMatt Macy int elapsed; 128eda14cbcSMatt Macy tpool_job_t *job; 129eda14cbcSMatt Macy void (*func)(void *); 130eda14cbcSMatt Macy tpool_active_t active; 131eda14cbcSMatt Macy 132eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 133eda14cbcSMatt Macy pthread_cleanup_push(worker_cleanup, tpool); 134eda14cbcSMatt Macy 135eda14cbcSMatt Macy /* 136eda14cbcSMatt Macy * This is the worker's main loop. 137eda14cbcSMatt Macy * It will only be left if a timeout or an error has occurred. 138eda14cbcSMatt Macy */ 139eda14cbcSMatt Macy active.tpa_tid = pthread_self(); 140eda14cbcSMatt Macy for (;;) { 141eda14cbcSMatt Macy elapsed = 0; 142eda14cbcSMatt Macy tpool->tp_idle++; 143eda14cbcSMatt Macy if (tpool->tp_flags & TP_WAIT) 144eda14cbcSMatt Macy notify_waiters(tpool); 145eda14cbcSMatt Macy while ((tpool->tp_head == NULL || 146eda14cbcSMatt Macy (tpool->tp_flags & TP_SUSPEND)) && 147eda14cbcSMatt Macy !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 148eda14cbcSMatt Macy if (tpool->tp_current <= tpool->tp_minimum || 149eda14cbcSMatt Macy tpool->tp_linger == 0) { 150eda14cbcSMatt Macy (void) pthread_cond_wait(&tpool->tp_workcv, 151eda14cbcSMatt Macy &tpool->tp_mutex); 152eda14cbcSMatt Macy } else { 153eda14cbcSMatt Macy struct timespec ts; 154eda14cbcSMatt Macy 155eda14cbcSMatt Macy clock_gettime(CLOCK_REALTIME, &ts); 156eda14cbcSMatt Macy ts.tv_sec += tpool->tp_linger; 157eda14cbcSMatt Macy 158eda14cbcSMatt Macy if (pthread_cond_timedwait(&tpool->tp_workcv, 159eda14cbcSMatt Macy &tpool->tp_mutex, &ts) != 0) { 160eda14cbcSMatt Macy elapsed = 1; 161eda14cbcSMatt Macy break; 162eda14cbcSMatt Macy } 163eda14cbcSMatt Macy } 164eda14cbcSMatt Macy } 165eda14cbcSMatt Macy tpool->tp_idle--; 166eda14cbcSMatt Macy if (tpool->tp_flags & TP_DESTROY) 167eda14cbcSMatt Macy break; 168eda14cbcSMatt Macy if (tpool->tp_flags & TP_ABANDON) { 169eda14cbcSMatt Macy /* can't abandon a suspended pool */ 170eda14cbcSMatt Macy if (tpool->tp_flags & TP_SUSPEND) { 171eda14cbcSMatt Macy tpool->tp_flags &= ~TP_SUSPEND; 172eda14cbcSMatt Macy (void) pthread_cond_broadcast( 173eda14cbcSMatt Macy &tpool->tp_workcv); 174eda14cbcSMatt Macy } 175eda14cbcSMatt Macy if (tpool->tp_head == NULL) 176eda14cbcSMatt Macy break; 177eda14cbcSMatt Macy } 178eda14cbcSMatt Macy if ((job = tpool->tp_head) != NULL && 179eda14cbcSMatt Macy !(tpool->tp_flags & TP_SUSPEND)) { 180eda14cbcSMatt Macy elapsed = 0; 181eda14cbcSMatt Macy func = job->tpj_func; 182eda14cbcSMatt Macy arg = job->tpj_arg; 183eda14cbcSMatt Macy tpool->tp_head = job->tpj_next; 184eda14cbcSMatt Macy if (job == tpool->tp_tail) 185eda14cbcSMatt Macy tpool->tp_tail = NULL; 186eda14cbcSMatt Macy tpool->tp_njobs--; 187eda14cbcSMatt Macy active.tpa_next = tpool->tp_active; 188eda14cbcSMatt Macy tpool->tp_active = &active; 189eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 190eda14cbcSMatt Macy pthread_cleanup_push(job_cleanup, tpool); 191eda14cbcSMatt Macy free(job); 192eda14cbcSMatt Macy 193eda14cbcSMatt Macy sigset_t maskset; 194eda14cbcSMatt Macy (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset); 195eda14cbcSMatt Macy 196eda14cbcSMatt Macy /* 197eda14cbcSMatt Macy * Call the specified function. 198eda14cbcSMatt Macy */ 199eda14cbcSMatt Macy func(arg); 200eda14cbcSMatt Macy /* 201eda14cbcSMatt Macy * We don't know what this thread has been doing, 202eda14cbcSMatt Macy * so we reset its signal mask and cancellation 203eda14cbcSMatt Macy * state back to the values prior to calling func(). 204eda14cbcSMatt Macy */ 205eda14cbcSMatt Macy (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 206eda14cbcSMatt Macy (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 207eda14cbcSMatt Macy NULL); 208eda14cbcSMatt Macy (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 209eda14cbcSMatt Macy NULL); 210eda14cbcSMatt Macy pthread_cleanup_pop(1); 211eda14cbcSMatt Macy } 212eda14cbcSMatt Macy if (elapsed && tpool->tp_current > tpool->tp_minimum) { 213eda14cbcSMatt Macy /* 214eda14cbcSMatt Macy * We timed out and there is no work to be done 215eda14cbcSMatt Macy * and the number of workers exceeds the minimum. 216eda14cbcSMatt Macy * Exit now to reduce the size of the pool. 217eda14cbcSMatt Macy */ 218eda14cbcSMatt Macy break; 219eda14cbcSMatt Macy } 220eda14cbcSMatt Macy } 221eda14cbcSMatt Macy pthread_cleanup_pop(1); 222eda14cbcSMatt Macy return (arg); 223eda14cbcSMatt Macy } 224eda14cbcSMatt Macy 225eda14cbcSMatt Macy /* 226eda14cbcSMatt Macy * Create a worker thread, with default signals blocked. 227eda14cbcSMatt Macy */ 228eda14cbcSMatt Macy static int 229eda14cbcSMatt Macy create_worker(tpool_t *tpool) 230eda14cbcSMatt Macy { 231eda14cbcSMatt Macy pthread_t thread; 232eda14cbcSMatt Macy sigset_t oset; 233eda14cbcSMatt Macy int error; 234eda14cbcSMatt Macy 235eda14cbcSMatt Macy (void) pthread_sigmask(SIG_SETMASK, NULL, &oset); 236eda14cbcSMatt Macy error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); 237eda14cbcSMatt Macy (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 238eda14cbcSMatt Macy return (error); 239eda14cbcSMatt Macy } 240eda14cbcSMatt Macy 241eda14cbcSMatt Macy 242eda14cbcSMatt Macy /* 243eda14cbcSMatt Macy * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr 244eda14cbcSMatt Macy * is NULL initialize the cloned attr using default values. 245eda14cbcSMatt Macy */ 246eda14cbcSMatt Macy static int 247eda14cbcSMatt Macy pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr) 248eda14cbcSMatt Macy { 249eda14cbcSMatt Macy int error; 250eda14cbcSMatt Macy 251eda14cbcSMatt Macy error = pthread_attr_init(attr); 252eda14cbcSMatt Macy if (error || (old_attr == NULL)) 253eda14cbcSMatt Macy return (error); 254eda14cbcSMatt Macy 255eda14cbcSMatt Macy #ifdef __GLIBC__ 256eda14cbcSMatt Macy cpu_set_t cpuset; 257eda14cbcSMatt Macy size_t cpusetsize = sizeof (cpuset); 258eda14cbcSMatt Macy error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset); 259eda14cbcSMatt Macy if (error == 0) 260eda14cbcSMatt Macy error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset); 261eda14cbcSMatt Macy if (error) 262eda14cbcSMatt Macy goto error; 263eda14cbcSMatt Macy #endif /* __GLIBC__ */ 264eda14cbcSMatt Macy 265eda14cbcSMatt Macy int detachstate; 266eda14cbcSMatt Macy error = pthread_attr_getdetachstate(old_attr, &detachstate); 267eda14cbcSMatt Macy if (error == 0) 268eda14cbcSMatt Macy error = pthread_attr_setdetachstate(attr, detachstate); 269eda14cbcSMatt Macy if (error) 270eda14cbcSMatt Macy goto error; 271eda14cbcSMatt Macy 272eda14cbcSMatt Macy size_t guardsize; 273eda14cbcSMatt Macy error = pthread_attr_getguardsize(old_attr, &guardsize); 274eda14cbcSMatt Macy if (error == 0) 275eda14cbcSMatt Macy error = pthread_attr_setguardsize(attr, guardsize); 276eda14cbcSMatt Macy if (error) 277eda14cbcSMatt Macy goto error; 278eda14cbcSMatt Macy 279eda14cbcSMatt Macy int inheritsched; 280eda14cbcSMatt Macy error = pthread_attr_getinheritsched(old_attr, &inheritsched); 281eda14cbcSMatt Macy if (error == 0) 282eda14cbcSMatt Macy error = pthread_attr_setinheritsched(attr, inheritsched); 283eda14cbcSMatt Macy if (error) 284eda14cbcSMatt Macy goto error; 285eda14cbcSMatt Macy 286eda14cbcSMatt Macy struct sched_param param; 287eda14cbcSMatt Macy error = pthread_attr_getschedparam(old_attr, ¶m); 288eda14cbcSMatt Macy if (error == 0) 289eda14cbcSMatt Macy error = pthread_attr_setschedparam(attr, ¶m); 290eda14cbcSMatt Macy if (error) 291eda14cbcSMatt Macy goto error; 292eda14cbcSMatt Macy 293eda14cbcSMatt Macy int policy; 294eda14cbcSMatt Macy error = pthread_attr_getschedpolicy(old_attr, &policy); 295eda14cbcSMatt Macy if (error == 0) 296eda14cbcSMatt Macy error = pthread_attr_setschedpolicy(attr, policy); 297eda14cbcSMatt Macy if (error) 298eda14cbcSMatt Macy goto error; 299eda14cbcSMatt Macy 300eda14cbcSMatt Macy int scope; 301eda14cbcSMatt Macy error = pthread_attr_getscope(old_attr, &scope); 302eda14cbcSMatt Macy if (error == 0) 303eda14cbcSMatt Macy error = pthread_attr_setscope(attr, scope); 304eda14cbcSMatt Macy if (error) 305eda14cbcSMatt Macy goto error; 306eda14cbcSMatt Macy 307eda14cbcSMatt Macy void *stackaddr; 308eda14cbcSMatt Macy size_t stacksize; 309eda14cbcSMatt Macy error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize); 310eda14cbcSMatt Macy if (error == 0) 311eda14cbcSMatt Macy error = pthread_attr_setstack(attr, stackaddr, stacksize); 312eda14cbcSMatt Macy if (error) 313eda14cbcSMatt Macy goto error; 314eda14cbcSMatt Macy 315eda14cbcSMatt Macy return (0); 316eda14cbcSMatt Macy error: 317eda14cbcSMatt Macy pthread_attr_destroy(attr); 318eda14cbcSMatt Macy return (error); 319eda14cbcSMatt Macy } 320eda14cbcSMatt Macy 321eda14cbcSMatt Macy tpool_t * 322eda14cbcSMatt Macy tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 323eda14cbcSMatt Macy pthread_attr_t *attr) 324eda14cbcSMatt Macy { 325eda14cbcSMatt Macy tpool_t *tpool; 326eda14cbcSMatt Macy void *stackaddr; 327eda14cbcSMatt Macy size_t stacksize; 328eda14cbcSMatt Macy size_t minstack; 329eda14cbcSMatt Macy int error; 330eda14cbcSMatt Macy 331eda14cbcSMatt Macy if (min_threads > max_threads || max_threads < 1) { 332eda14cbcSMatt Macy errno = EINVAL; 333eda14cbcSMatt Macy return (NULL); 334eda14cbcSMatt Macy } 335eda14cbcSMatt Macy if (attr != NULL) { 336eda14cbcSMatt Macy if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) { 337eda14cbcSMatt Macy errno = EINVAL; 338eda14cbcSMatt Macy return (NULL); 339eda14cbcSMatt Macy } 340eda14cbcSMatt Macy /* 341eda14cbcSMatt Macy * Allow only one thread in the pool with a specified stack. 342eda14cbcSMatt Macy * Require threads to have at least the minimum stack size. 343eda14cbcSMatt Macy */ 344eda14cbcSMatt Macy minstack = PTHREAD_STACK_MIN; 345eda14cbcSMatt Macy if (stackaddr != NULL) { 346eda14cbcSMatt Macy if (stacksize < minstack || max_threads != 1) { 347eda14cbcSMatt Macy errno = EINVAL; 348eda14cbcSMatt Macy return (NULL); 349eda14cbcSMatt Macy } 350eda14cbcSMatt Macy } else if (stacksize != 0 && stacksize < minstack) { 351eda14cbcSMatt Macy errno = EINVAL; 352eda14cbcSMatt Macy return (NULL); 353eda14cbcSMatt Macy } 354eda14cbcSMatt Macy } 355eda14cbcSMatt Macy 356eda14cbcSMatt Macy tpool = calloc(1, sizeof (*tpool)); 357eda14cbcSMatt Macy if (tpool == NULL) { 358eda14cbcSMatt Macy errno = ENOMEM; 359eda14cbcSMatt Macy return (NULL); 360eda14cbcSMatt Macy } 361eda14cbcSMatt Macy (void) pthread_mutex_init(&tpool->tp_mutex, NULL); 362eda14cbcSMatt Macy (void) pthread_cond_init(&tpool->tp_busycv, NULL); 363eda14cbcSMatt Macy (void) pthread_cond_init(&tpool->tp_workcv, NULL); 364eda14cbcSMatt Macy (void) pthread_cond_init(&tpool->tp_waitcv, NULL); 365eda14cbcSMatt Macy tpool->tp_minimum = min_threads; 366eda14cbcSMatt Macy tpool->tp_maximum = max_threads; 367eda14cbcSMatt Macy tpool->tp_linger = linger; 368eda14cbcSMatt Macy 369eda14cbcSMatt Macy /* 370eda14cbcSMatt Macy * We cannot just copy the attribute pointer. 371eda14cbcSMatt Macy * We need to initialize a new pthread_attr_t structure 372eda14cbcSMatt Macy * with the values from the user-supplied pthread_attr_t. 373eda14cbcSMatt Macy * If the attribute pointer is NULL, we need to initialize 374eda14cbcSMatt Macy * the new pthread_attr_t structure with default values. 375eda14cbcSMatt Macy */ 376eda14cbcSMatt Macy error = pthread_attr_clone(&tpool->tp_attr, attr); 377eda14cbcSMatt Macy if (error) { 378eda14cbcSMatt Macy free(tpool); 379eda14cbcSMatt Macy errno = error; 380eda14cbcSMatt Macy return (NULL); 381eda14cbcSMatt Macy } 382eda14cbcSMatt Macy 383eda14cbcSMatt Macy /* make all pool threads be detached daemon threads */ 384eda14cbcSMatt Macy (void) pthread_attr_setdetachstate(&tpool->tp_attr, 385eda14cbcSMatt Macy PTHREAD_CREATE_DETACHED); 386eda14cbcSMatt Macy 387eda14cbcSMatt Macy /* insert into the global list of all thread pools */ 388eda14cbcSMatt Macy pthread_mutex_lock(&thread_pool_lock); 389eda14cbcSMatt Macy if (thread_pools == NULL) { 390eda14cbcSMatt Macy tpool->tp_forw = tpool; 391eda14cbcSMatt Macy tpool->tp_back = tpool; 392eda14cbcSMatt Macy thread_pools = tpool; 393eda14cbcSMatt Macy } else { 394eda14cbcSMatt Macy thread_pools->tp_back->tp_forw = tpool; 395eda14cbcSMatt Macy tpool->tp_forw = thread_pools; 396eda14cbcSMatt Macy tpool->tp_back = thread_pools->tp_back; 397eda14cbcSMatt Macy thread_pools->tp_back = tpool; 398eda14cbcSMatt Macy } 399eda14cbcSMatt Macy pthread_mutex_unlock(&thread_pool_lock); 400eda14cbcSMatt Macy 401eda14cbcSMatt Macy return (tpool); 402eda14cbcSMatt Macy } 403eda14cbcSMatt Macy 404eda14cbcSMatt Macy /* 405eda14cbcSMatt Macy * Dispatch a work request to the thread pool. 406eda14cbcSMatt Macy * If there are idle workers, awaken one. 407eda14cbcSMatt Macy * Else, if the maximum number of workers has 408eda14cbcSMatt Macy * not been reached, spawn a new worker thread. 409eda14cbcSMatt Macy * Else just return with the job added to the queue. 410eda14cbcSMatt Macy */ 411eda14cbcSMatt Macy int 412eda14cbcSMatt Macy tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 413eda14cbcSMatt Macy { 414eda14cbcSMatt Macy tpool_job_t *job; 415eda14cbcSMatt Macy 416eda14cbcSMatt Macy ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 417eda14cbcSMatt Macy 418eda14cbcSMatt Macy if ((job = calloc(1, sizeof (*job))) == NULL) 419eda14cbcSMatt Macy return (-1); 420eda14cbcSMatt Macy job->tpj_next = NULL; 421eda14cbcSMatt Macy job->tpj_func = func; 422eda14cbcSMatt Macy job->tpj_arg = arg; 423eda14cbcSMatt Macy 424eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 425eda14cbcSMatt Macy 426eda14cbcSMatt Macy if (tpool->tp_head == NULL) 427eda14cbcSMatt Macy tpool->tp_head = job; 428eda14cbcSMatt Macy else 429eda14cbcSMatt Macy tpool->tp_tail->tpj_next = job; 430eda14cbcSMatt Macy tpool->tp_tail = job; 431eda14cbcSMatt Macy tpool->tp_njobs++; 432eda14cbcSMatt Macy 433eda14cbcSMatt Macy if (!(tpool->tp_flags & TP_SUSPEND)) { 434eda14cbcSMatt Macy if (tpool->tp_idle > 0) 435eda14cbcSMatt Macy (void) pthread_cond_signal(&tpool->tp_workcv); 436eda14cbcSMatt Macy else if (tpool->tp_current < tpool->tp_maximum && 437eda14cbcSMatt Macy create_worker(tpool) == 0) 438eda14cbcSMatt Macy tpool->tp_current++; 439eda14cbcSMatt Macy } 440eda14cbcSMatt Macy 441eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 442eda14cbcSMatt Macy return (0); 443eda14cbcSMatt Macy } 444eda14cbcSMatt Macy 445eda14cbcSMatt Macy static void 446eda14cbcSMatt Macy tpool_cleanup(void *arg) 447eda14cbcSMatt Macy { 448eda14cbcSMatt Macy tpool_t *tpool = (tpool_t *)arg; 449eda14cbcSMatt Macy 450eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 451eda14cbcSMatt Macy } 452eda14cbcSMatt Macy 453eda14cbcSMatt Macy /* 454eda14cbcSMatt Macy * Assumes: by the time tpool_destroy() is called no one will use this 455eda14cbcSMatt Macy * thread pool in any way and no one will try to dispatch entries to it. 456eda14cbcSMatt Macy * Calling tpool_destroy() from a job in the pool will cause deadlock. 457eda14cbcSMatt Macy */ 458eda14cbcSMatt Macy void 459eda14cbcSMatt Macy tpool_destroy(tpool_t *tpool) 460eda14cbcSMatt Macy { 461eda14cbcSMatt Macy tpool_active_t *activep; 462eda14cbcSMatt Macy 463eda14cbcSMatt Macy ASSERT(!tpool_member(tpool)); 464eda14cbcSMatt Macy ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 465eda14cbcSMatt Macy 466eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 467eda14cbcSMatt Macy pthread_cleanup_push(tpool_cleanup, tpool); 468eda14cbcSMatt Macy 469eda14cbcSMatt Macy /* mark the pool as being destroyed; wakeup idle workers */ 470eda14cbcSMatt Macy tpool->tp_flags |= TP_DESTROY; 471eda14cbcSMatt Macy tpool->tp_flags &= ~TP_SUSPEND; 472eda14cbcSMatt Macy (void) pthread_cond_broadcast(&tpool->tp_workcv); 473eda14cbcSMatt Macy 474eda14cbcSMatt Macy /* cancel all active workers */ 475eda14cbcSMatt Macy for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 476eda14cbcSMatt Macy (void) pthread_cancel(activep->tpa_tid); 477eda14cbcSMatt Macy 478eda14cbcSMatt Macy /* wait for all active workers to finish */ 479eda14cbcSMatt Macy while (tpool->tp_active != NULL) { 480eda14cbcSMatt Macy tpool->tp_flags |= TP_WAIT; 481eda14cbcSMatt Macy (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 482eda14cbcSMatt Macy } 483eda14cbcSMatt Macy 484eda14cbcSMatt Macy /* the last worker to terminate will wake us up */ 485eda14cbcSMatt Macy while (tpool->tp_current != 0) 486eda14cbcSMatt Macy (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 487eda14cbcSMatt Macy 488eda14cbcSMatt Macy pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 489eda14cbcSMatt Macy delete_pool(tpool); 490eda14cbcSMatt Macy } 491eda14cbcSMatt Macy 492eda14cbcSMatt Macy /* 493eda14cbcSMatt Macy * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 494eda14cbcSMatt Macy * The last worker to terminate will delete the pool. 495eda14cbcSMatt Macy */ 496eda14cbcSMatt Macy void 497eda14cbcSMatt Macy tpool_abandon(tpool_t *tpool) 498eda14cbcSMatt Macy { 499eda14cbcSMatt Macy ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 500eda14cbcSMatt Macy 501eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 502eda14cbcSMatt Macy if (tpool->tp_current == 0) { 503eda14cbcSMatt Macy /* no workers, just delete the pool */ 504eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 505eda14cbcSMatt Macy delete_pool(tpool); 506eda14cbcSMatt Macy } else { 507eda14cbcSMatt Macy /* wake up all workers, last one will delete the pool */ 508eda14cbcSMatt Macy tpool->tp_flags |= TP_ABANDON; 509eda14cbcSMatt Macy tpool->tp_flags &= ~TP_SUSPEND; 510eda14cbcSMatt Macy (void) pthread_cond_broadcast(&tpool->tp_workcv); 511eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 512eda14cbcSMatt Macy } 513eda14cbcSMatt Macy } 514eda14cbcSMatt Macy 515eda14cbcSMatt Macy /* 516eda14cbcSMatt Macy * Wait for all jobs to complete. 517eda14cbcSMatt Macy * Calling tpool_wait() from a job in the pool will cause deadlock. 518eda14cbcSMatt Macy */ 519eda14cbcSMatt Macy void 520eda14cbcSMatt Macy tpool_wait(tpool_t *tpool) 521eda14cbcSMatt Macy { 522eda14cbcSMatt Macy ASSERT(!tpool_member(tpool)); 523eda14cbcSMatt Macy ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 524eda14cbcSMatt Macy 525eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 526eda14cbcSMatt Macy pthread_cleanup_push(tpool_cleanup, tpool); 527eda14cbcSMatt Macy while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 528eda14cbcSMatt Macy tpool->tp_flags |= TP_WAIT; 529eda14cbcSMatt Macy (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 530eda14cbcSMatt Macy ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 531eda14cbcSMatt Macy } 532eda14cbcSMatt Macy pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 533eda14cbcSMatt Macy } 534eda14cbcSMatt Macy 535eda14cbcSMatt Macy void 536eda14cbcSMatt Macy tpool_suspend(tpool_t *tpool) 537eda14cbcSMatt Macy { 538eda14cbcSMatt Macy ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 539eda14cbcSMatt Macy 540eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 541eda14cbcSMatt Macy tpool->tp_flags |= TP_SUSPEND; 542eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 543eda14cbcSMatt Macy } 544eda14cbcSMatt Macy 545eda14cbcSMatt Macy int 546eda14cbcSMatt Macy tpool_suspended(tpool_t *tpool) 547eda14cbcSMatt Macy { 548eda14cbcSMatt Macy int suspended; 549eda14cbcSMatt Macy 550eda14cbcSMatt Macy ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 551eda14cbcSMatt Macy 552eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 553eda14cbcSMatt Macy suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 554eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 555eda14cbcSMatt Macy 556eda14cbcSMatt Macy return (suspended); 557eda14cbcSMatt Macy } 558eda14cbcSMatt Macy 559eda14cbcSMatt Macy void 560eda14cbcSMatt Macy tpool_resume(tpool_t *tpool) 561eda14cbcSMatt Macy { 562eda14cbcSMatt Macy int excess; 563eda14cbcSMatt Macy 564eda14cbcSMatt Macy ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 565eda14cbcSMatt Macy 566eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 567eda14cbcSMatt Macy if (!(tpool->tp_flags & TP_SUSPEND)) { 568eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 569eda14cbcSMatt Macy return; 570eda14cbcSMatt Macy } 571eda14cbcSMatt Macy tpool->tp_flags &= ~TP_SUSPEND; 572eda14cbcSMatt Macy (void) pthread_cond_broadcast(&tpool->tp_workcv); 573eda14cbcSMatt Macy excess = tpool->tp_njobs - tpool->tp_idle; 574eda14cbcSMatt Macy while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 575eda14cbcSMatt Macy if (create_worker(tpool) != 0) 576eda14cbcSMatt Macy break; /* pthread_create() failed */ 577eda14cbcSMatt Macy tpool->tp_current++; 578eda14cbcSMatt Macy } 579eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 580eda14cbcSMatt Macy } 581eda14cbcSMatt Macy 582eda14cbcSMatt Macy int 583eda14cbcSMatt Macy tpool_member(tpool_t *tpool) 584eda14cbcSMatt Macy { 585eda14cbcSMatt Macy pthread_t my_tid = pthread_self(); 586eda14cbcSMatt Macy tpool_active_t *activep; 587eda14cbcSMatt Macy 588eda14cbcSMatt Macy ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))); 589eda14cbcSMatt Macy 590eda14cbcSMatt Macy pthread_mutex_lock(&tpool->tp_mutex); 591eda14cbcSMatt Macy for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 592eda14cbcSMatt Macy if (activep->tpa_tid == my_tid) { 593eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 594eda14cbcSMatt Macy return (1); 595eda14cbcSMatt Macy } 596eda14cbcSMatt Macy } 597eda14cbcSMatt Macy pthread_mutex_unlock(&tpool->tp_mutex); 598eda14cbcSMatt Macy return (0); 599eda14cbcSMatt Macy } 600