xref: /onnv-gate/usr/src/lib/libc/port/tpool/thread_pool.c (revision 6812:febeba71273d)
12248Sraf /*
22248Sraf  * CDDL HEADER START
32248Sraf  *
42248Sraf  * The contents of this file are subject to the terms of the
52248Sraf  * Common Development and Distribution License (the "License").
62248Sraf  * You may not use this file except in compliance with the License.
72248Sraf  *
82248Sraf  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
92248Sraf  * or http://www.opensolaris.org/os/licensing.
102248Sraf  * See the License for the specific language governing permissions
112248Sraf  * and limitations under the License.
122248Sraf  *
132248Sraf  * When distributing Covered Code, include this CDDL HEADER in each
142248Sraf  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
152248Sraf  * If applicable, add the following below this CDDL HEADER, with the
162248Sraf  * fields enclosed by brackets "[]" replaced with your own identifying
172248Sraf  * information: Portions Copyright [yyyy] [name of copyright owner]
182248Sraf  *
192248Sraf  * CDDL HEADER END
202248Sraf  */
212248Sraf 
222248Sraf /*
23*6812Sraf  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
242248Sraf  * Use is subject to license terms.
252248Sraf  */
262248Sraf 
272248Sraf #pragma ident	"%Z%%M%	%I%	%E% SMI"
282248Sraf 
29*6812Sraf #include "lint.h"
302248Sraf #include "thr_uberdata.h"
312248Sraf #include <stdlib.h>
322248Sraf #include <signal.h>
332248Sraf #include <errno.h>
342248Sraf #include "thread_pool_impl.h"
352248Sraf 
362248Sraf static mutex_t thread_pool_lock = DEFAULTMUTEX;
372248Sraf static tpool_t *thread_pools = NULL;
382248Sraf 
392248Sraf static void
delete_pool(tpool_t * tpool)402248Sraf delete_pool(tpool_t *tpool)
412248Sraf {
422248Sraf 	tpool_job_t *job;
432248Sraf 
442248Sraf 	ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
452248Sraf 
462248Sraf 	/*
472248Sraf 	 * Unlink the pool from the global list of all pools.
482248Sraf 	 */
492248Sraf 	lmutex_lock(&thread_pool_lock);
502248Sraf 	if (thread_pools == tpool)
512248Sraf 		thread_pools = tpool->tp_forw;
522248Sraf 	if (thread_pools == tpool)
532248Sraf 		thread_pools = NULL;
542248Sraf 	else {
552248Sraf 		tpool->tp_back->tp_forw = tpool->tp_forw;
562248Sraf 		tpool->tp_forw->tp_back = tpool->tp_back;
572248Sraf 	}
582248Sraf 	lmutex_unlock(&thread_pool_lock);
592248Sraf 
602248Sraf 	/*
612248Sraf 	 * There should be no pending jobs, but just in case...
622248Sraf 	 */
632248Sraf 	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
642248Sraf 		tpool->tp_head = job->tpj_next;
652248Sraf 		lfree(job, sizeof (*job));
662248Sraf 	}
672248Sraf 	(void) pthread_attr_destroy(&tpool->tp_attr);
682248Sraf 	lfree(tpool, sizeof (*tpool));
692248Sraf }
702248Sraf 
712248Sraf /*
722248Sraf  * Worker thread is terminating.
732248Sraf  */
742248Sraf static void
worker_cleanup(tpool_t * tpool)752248Sraf worker_cleanup(tpool_t *tpool)
762248Sraf {
772248Sraf 	ASSERT(MUTEX_HELD(&tpool->tp_mutex));
782248Sraf 
792248Sraf 	if (--tpool->tp_current == 0 &&
802248Sraf 	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
812248Sraf 		if (tpool->tp_flags & TP_ABANDON) {
822248Sraf 			sig_mutex_unlock(&tpool->tp_mutex);
832248Sraf 			delete_pool(tpool);
842248Sraf 			return;
852248Sraf 		}
862248Sraf 		if (tpool->tp_flags & TP_DESTROY)
872248Sraf 			(void) cond_broadcast(&tpool->tp_busycv);
882248Sraf 	}
892248Sraf 	sig_mutex_unlock(&tpool->tp_mutex);
902248Sraf }
912248Sraf 
922248Sraf static void
notify_waiters(tpool_t * tpool)932248Sraf notify_waiters(tpool_t *tpool)
942248Sraf {
952248Sraf 	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
962248Sraf 		tpool->tp_flags &= ~TP_WAIT;
972248Sraf 		(void) cond_broadcast(&tpool->tp_waitcv);
982248Sraf 	}
992248Sraf }
1002248Sraf 
1012248Sraf /*
1022248Sraf  * Called by a worker thread on return from a tpool_dispatch()d job.
1032248Sraf  */
1042248Sraf static void
job_cleanup(tpool_t * tpool)1052248Sraf job_cleanup(tpool_t *tpool)
1062248Sraf {
1072248Sraf 	pthread_t my_tid = pthread_self();
1082248Sraf 	tpool_active_t *activep;
1092248Sraf 	tpool_active_t **activepp;
1102248Sraf 
1112248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
1122248Sraf 	/* CSTYLED */
1132248Sraf 	for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
1142248Sraf 		activep = *activepp;
1152248Sraf 		if (activep->tpa_tid == my_tid) {
1162248Sraf 			*activepp = activep->tpa_next;
1172248Sraf 			break;
1182248Sraf 		}
1192248Sraf 	}
1202248Sraf 	if (tpool->tp_flags & TP_WAIT)
1212248Sraf 		notify_waiters(tpool);
1222248Sraf }
1232248Sraf 
1242248Sraf static void *
tpool_worker(void * arg)1252248Sraf tpool_worker(void *arg)
1262248Sraf {
1272248Sraf 	tpool_t *tpool = (tpool_t *)arg;
1282248Sraf 	int elapsed;
1292248Sraf 	tpool_job_t *job;
1302248Sraf 	void (*func)(void *);
1312248Sraf 	tpool_active_t active;
1322248Sraf 
1332248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
1342248Sraf 	pthread_cleanup_push(worker_cleanup, tpool);
1352248Sraf 
1362248Sraf 	/*
1372248Sraf 	 * This is the worker's main loop.
1382248Sraf 	 * It will only be left if a timeout or an error has occured.
1392248Sraf 	 */
1402248Sraf 	active.tpa_tid = pthread_self();
1412248Sraf 	for (;;) {
1422248Sraf 		elapsed = 0;
1432248Sraf 		tpool->tp_idle++;
1442248Sraf 		if (tpool->tp_flags & TP_WAIT)
1452248Sraf 			notify_waiters(tpool);
1462248Sraf 		while ((tpool->tp_head == NULL ||
1472248Sraf 		    (tpool->tp_flags & TP_SUSPEND)) &&
1482248Sraf 		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
1492248Sraf 			if (tpool->tp_current <= tpool->tp_minimum ||
1502248Sraf 			    tpool->tp_linger == 0) {
1512248Sraf 				(void) sig_cond_wait(&tpool->tp_workcv,
1522248Sraf 				    &tpool->tp_mutex);
1532248Sraf 			} else {
1542248Sraf 				timestruc_t timeout;
1552248Sraf 
1562248Sraf 				timeout.tv_sec = tpool->tp_linger;
1572248Sraf 				timeout.tv_nsec = 0;
1582248Sraf 				if (sig_cond_reltimedwait(&tpool->tp_workcv,
1592248Sraf 				    &tpool->tp_mutex, &timeout) != 0) {
1602248Sraf 					elapsed = 1;
1612248Sraf 					break;
1622248Sraf 				}
1632248Sraf 			}
1642248Sraf 		}
1652248Sraf 		tpool->tp_idle--;
1662248Sraf 		if (tpool->tp_flags & TP_DESTROY)
1672248Sraf 			break;
1682248Sraf 		if (tpool->tp_flags & TP_ABANDON) {
1692248Sraf 			/* can't abandon a suspended pool */
1702248Sraf 			if (tpool->tp_flags & TP_SUSPEND) {
1712248Sraf 				tpool->tp_flags &= ~TP_SUSPEND;
1722248Sraf 				(void) cond_broadcast(&tpool->tp_workcv);
1732248Sraf 			}
1742248Sraf 			if (tpool->tp_head == NULL)
1752248Sraf 				break;
1762248Sraf 		}
1772248Sraf 		if ((job = tpool->tp_head) != NULL &&
1782248Sraf 		    !(tpool->tp_flags & TP_SUSPEND)) {
1792248Sraf 			elapsed = 0;
1802248Sraf 			func = job->tpj_func;
1812248Sraf 			arg = job->tpj_arg;
1822248Sraf 			tpool->tp_head = job->tpj_next;
1832248Sraf 			if (job == tpool->tp_tail)
1842248Sraf 				tpool->tp_tail = NULL;
1852248Sraf 			tpool->tp_njobs--;
1862248Sraf 			active.tpa_next = tpool->tp_active;
1872248Sraf 			tpool->tp_active = &active;
1882248Sraf 			sig_mutex_unlock(&tpool->tp_mutex);
1892248Sraf 			pthread_cleanup_push(job_cleanup, tpool);
1902248Sraf 			lfree(job, sizeof (*job));
1912248Sraf 			/*
1922248Sraf 			 * Call the specified function.
1932248Sraf 			 */
1942248Sraf 			func(arg);
1952248Sraf 			/*
1962248Sraf 			 * We don't know what this thread has been doing,
1972248Sraf 			 * so we reset its signal mask and cancellation
1982248Sraf 			 * state back to the initial values.
1992248Sraf 			 */
2002248Sraf 			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
2012248Sraf 			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
2022248Sraf 			    NULL);
2032248Sraf 			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
2042248Sraf 			    NULL);
2052248Sraf 			pthread_cleanup_pop(1);
2062248Sraf 		}
2072248Sraf 		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
2082248Sraf 			/*
2092248Sraf 			 * We timed out and there is no work to be done
2102248Sraf 			 * and the number of workers exceeds the minimum.
2112248Sraf 			 * Exit now to reduce the size of the pool.
2122248Sraf 			 */
2132248Sraf 			break;
2142248Sraf 		}
2152248Sraf 	}
2162248Sraf 	pthread_cleanup_pop(1);
2172248Sraf 	return (arg);
2182248Sraf }
2192248Sraf 
2202248Sraf /*
2212248Sraf  * Create a worker thread, with all signals blocked.
2222248Sraf  */
2232248Sraf static int
create_worker(tpool_t * tpool)2242248Sraf create_worker(tpool_t *tpool)
2252248Sraf {
2262248Sraf 	sigset_t oset;
2272248Sraf 	int error;
2282248Sraf 
2292248Sraf 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
2302248Sraf 	error = pthread_create(NULL, &tpool->tp_attr, tpool_worker, tpool);
2312248Sraf 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
2322248Sraf 	return (error);
2332248Sraf }
2342248Sraf 
2352248Sraf tpool_t	*
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)2362248Sraf tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
2372248Sraf 	pthread_attr_t *attr)
2382248Sraf {
2392248Sraf 	tpool_t	*tpool;
2402248Sraf 	void *stackaddr;
2412248Sraf 	size_t stacksize;
2422248Sraf 	size_t minstack;
2432248Sraf 	int error;
2442248Sraf 
2452248Sraf 	if (min_threads > max_threads || max_threads < 1) {
2462248Sraf 		errno = EINVAL;
2472248Sraf 		return (NULL);
2482248Sraf 	}
2492248Sraf 	if (attr != NULL) {
2502248Sraf 		if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
2512248Sraf 			errno = EINVAL;
2522248Sraf 			return (NULL);
2532248Sraf 		}
2542248Sraf 		/*
2552248Sraf 		 * Allow only one thread in the pool with a specified stack.
2562248Sraf 		 * Require threads to have at least the minimum stack size.
2572248Sraf 		 */
2582248Sraf 		minstack = thr_min_stack();
2592248Sraf 		if (stackaddr != NULL) {
2602248Sraf 			if (stacksize < minstack || max_threads != 1) {
2612248Sraf 				errno = EINVAL;
2622248Sraf 				return (NULL);
2632248Sraf 			}
2642248Sraf 		} else if (stacksize != 0 && stacksize < minstack) {
2652248Sraf 			errno = EINVAL;
2662248Sraf 			return (NULL);
2672248Sraf 		}
2682248Sraf 	}
2692248Sraf 
2702248Sraf 	tpool = lmalloc(sizeof (*tpool));
2712248Sraf 	if (tpool == NULL) {
2722248Sraf 		errno = ENOMEM;
2732248Sraf 		return (NULL);
2742248Sraf 	}
2752248Sraf 	(void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
2762248Sraf 	(void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
2772248Sraf 	(void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
2782248Sraf 	(void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
2792248Sraf 	tpool->tp_minimum = min_threads;
2802248Sraf 	tpool->tp_maximum = max_threads;
2812248Sraf 	tpool->tp_linger = linger;
2822248Sraf 
2832248Sraf 	/*
2842248Sraf 	 * We cannot just copy the attribute pointer.
2852248Sraf 	 * We need to initialize a new pthread_attr_t structure
2862248Sraf 	 * with the values from the user-supplied pthread_attr_t.
2872248Sraf 	 * If the attribute pointer is NULL, we need to initialize
2882248Sraf 	 * the new pthread_attr_t structure with default values.
2892248Sraf 	 */
290*6812Sraf 	error = pthread_attr_clone(&tpool->tp_attr, attr);
2912248Sraf 	if (error) {
2922248Sraf 		lfree(tpool, sizeof (*tpool));
2932248Sraf 		errno = error;
2942248Sraf 		return (NULL);
2952248Sraf 	}
2962248Sraf 
2972248Sraf 	/* make all pool threads be detached daemon threads */
2982248Sraf 	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
2992248Sraf 	    PTHREAD_CREATE_DETACHED);
300*6812Sraf 	(void) pthread_attr_setdaemonstate_np(&tpool->tp_attr,
3012248Sraf 	    PTHREAD_CREATE_DAEMON_NP);
3022248Sraf 
3032248Sraf 	/* insert into the global list of all thread pools */
3042248Sraf 	lmutex_lock(&thread_pool_lock);
3052248Sraf 	if (thread_pools == NULL) {
3062248Sraf 		tpool->tp_forw = tpool;
3072248Sraf 		tpool->tp_back = tpool;
3082248Sraf 		thread_pools = tpool;
3092248Sraf 	} else {
3102248Sraf 		thread_pools->tp_back->tp_forw = tpool;
3112248Sraf 		tpool->tp_forw = thread_pools;
3122248Sraf 		tpool->tp_back = thread_pools->tp_back;
3132248Sraf 		thread_pools->tp_back = tpool;
3142248Sraf 	}
3152248Sraf 	lmutex_unlock(&thread_pool_lock);
3162248Sraf 
3172248Sraf 	return (tpool);
3182248Sraf }
3192248Sraf 
3202248Sraf /*
3212248Sraf  * Dispatch a work request to the thread pool.
3222248Sraf  * If there are idle workers, awaken one.
3232248Sraf  * Else, if the maximum number of workers has
3242248Sraf  * not been reached, spawn a new worker thread.
3252248Sraf  * Else just return with the job added to the queue.
3262248Sraf  */
3272248Sraf int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)3282248Sraf tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
3292248Sraf {
3302248Sraf 	tpool_job_t *job;
3312248Sraf 
3322248Sraf 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
3332248Sraf 
3342248Sraf 	if ((job = lmalloc(sizeof (*job))) == NULL)
3352248Sraf 		return (-1);
3362248Sraf 	job->tpj_next = NULL;
3372248Sraf 	job->tpj_func = func;
3382248Sraf 	job->tpj_arg = arg;
3392248Sraf 
3402248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
3412248Sraf 
3422248Sraf 	if (tpool->tp_head == NULL)
3432248Sraf 		tpool->tp_head = job;
3442248Sraf 	else
3452248Sraf 		tpool->tp_tail->tpj_next = job;
3462248Sraf 	tpool->tp_tail = job;
3472248Sraf 	tpool->tp_njobs++;
3482248Sraf 
3492248Sraf 	if (!(tpool->tp_flags & TP_SUSPEND)) {
3502248Sraf 		if (tpool->tp_idle > 0)
3512248Sraf 			(void) cond_signal(&tpool->tp_workcv);
3522248Sraf 		else if (tpool->tp_current < tpool->tp_maximum &&
3532248Sraf 		    create_worker(tpool) == 0)
3542248Sraf 			tpool->tp_current++;
3552248Sraf 	}
3562248Sraf 
3572248Sraf 	sig_mutex_unlock(&tpool->tp_mutex);
3582248Sraf 	return (0);
3592248Sraf }
3602248Sraf 
3612248Sraf /*
3622248Sraf  * Assumes: by the time tpool_destroy() is called no one will use this
3632248Sraf  * thread pool in any way and no one will try to dispatch entries to it.
3642248Sraf  * Calling tpool_destroy() from a job in the pool will cause deadlock.
3652248Sraf  */
3662248Sraf void
tpool_destroy(tpool_t * tpool)3672248Sraf tpool_destroy(tpool_t *tpool)
3682248Sraf {
3692248Sraf 	tpool_active_t *activep;
3702248Sraf 
3712248Sraf 	ASSERT(!tpool_member(tpool));
3722248Sraf 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
3732248Sraf 
3742248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
3752248Sraf 	pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
3762248Sraf 
3772248Sraf 	/* mark the pool as being destroyed; wakeup idle workers */
3782248Sraf 	tpool->tp_flags |= TP_DESTROY;
3792248Sraf 	tpool->tp_flags &= ~TP_SUSPEND;
3802248Sraf 	(void) cond_broadcast(&tpool->tp_workcv);
3812248Sraf 
3822248Sraf 	/* cancel all active workers */
3832248Sraf 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
3842248Sraf 		(void) pthread_cancel(activep->tpa_tid);
3852248Sraf 
3862248Sraf 	/* wait for all active workers to finish */
3872248Sraf 	while (tpool->tp_active != NULL) {
3882248Sraf 		tpool->tp_flags |= TP_WAIT;
3892248Sraf 		(void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
3902248Sraf 	}
3912248Sraf 
3922248Sraf 	/* the last worker to terminate will wake us up */
3932248Sraf 	while (tpool->tp_current != 0)
3942248Sraf 		(void) sig_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
3952248Sraf 
3962248Sraf 	pthread_cleanup_pop(1);	/* sig_mutex_unlock(&tpool->tp_mutex); */
3972248Sraf 	delete_pool(tpool);
3982248Sraf }
3992248Sraf 
4002248Sraf /*
4012248Sraf  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
4022248Sraf  * The last worker to terminate will delete the pool.
4032248Sraf  */
4042248Sraf void
tpool_abandon(tpool_t * tpool)4052248Sraf tpool_abandon(tpool_t *tpool)
4062248Sraf {
4072248Sraf 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
4082248Sraf 
4092248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
4102248Sraf 	if (tpool->tp_current == 0) {
4112248Sraf 		/* no workers, just delete the pool */
4122248Sraf 		sig_mutex_unlock(&tpool->tp_mutex);
4132248Sraf 		delete_pool(tpool);
4142248Sraf 	} else {
4152248Sraf 		/* wake up all workers, last one will delete the pool */
4162248Sraf 		tpool->tp_flags |= TP_ABANDON;
4172248Sraf 		tpool->tp_flags &= ~TP_SUSPEND;
4182248Sraf 		(void) cond_broadcast(&tpool->tp_workcv);
4192248Sraf 		sig_mutex_unlock(&tpool->tp_mutex);
4202248Sraf 	}
4212248Sraf }
4222248Sraf 
4232248Sraf /*
4242248Sraf  * Wait for all jobs to complete.
4252248Sraf  * Calling tpool_wait() from a job in the pool will cause deadlock.
4262248Sraf  */
4272248Sraf void
tpool_wait(tpool_t * tpool)4282248Sraf tpool_wait(tpool_t *tpool)
4292248Sraf {
4302248Sraf 	ASSERT(!tpool_member(tpool));
4312248Sraf 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
4322248Sraf 
4332248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
4342248Sraf 	pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
4352248Sraf 	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
4362248Sraf 		tpool->tp_flags |= TP_WAIT;
4372248Sraf 		(void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
4382248Sraf 		ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
4392248Sraf 	}
4402248Sraf 	pthread_cleanup_pop(1);	/* sig_mutex_unlock(&tpool->tp_mutex); */
4412248Sraf }
4422248Sraf 
4432248Sraf void
tpool_suspend(tpool_t * tpool)4442248Sraf tpool_suspend(tpool_t *tpool)
4452248Sraf {
4462248Sraf 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
4472248Sraf 
4482248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
4492248Sraf 	tpool->tp_flags |= TP_SUSPEND;
4502248Sraf 	sig_mutex_unlock(&tpool->tp_mutex);
4512248Sraf }
4522248Sraf 
4532248Sraf int
tpool_suspended(tpool_t * tpool)4542248Sraf tpool_suspended(tpool_t *tpool)
4552248Sraf {
4562248Sraf 	int suspended;
4572248Sraf 
4582248Sraf 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
4592248Sraf 
4602248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
4612248Sraf 	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
4622248Sraf 	sig_mutex_unlock(&tpool->tp_mutex);
4632248Sraf 
4642248Sraf 	return (suspended);
4652248Sraf }
4662248Sraf 
4672248Sraf void
tpool_resume(tpool_t * tpool)4682248Sraf tpool_resume(tpool_t *tpool)
4692248Sraf {
4702248Sraf 	int excess;
4712248Sraf 
4722248Sraf 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
4732248Sraf 
4742248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
4752248Sraf 	if (!(tpool->tp_flags & TP_SUSPEND)) {
4762248Sraf 		sig_mutex_unlock(&tpool->tp_mutex);
4772248Sraf 		return;
4782248Sraf 	}
4792248Sraf 	tpool->tp_flags &= ~TP_SUSPEND;
4802248Sraf 	(void) cond_broadcast(&tpool->tp_workcv);
4812248Sraf 	excess = tpool->tp_njobs - tpool->tp_idle;
4822248Sraf 	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
4832248Sraf 		if (create_worker(tpool) != 0)
4842248Sraf 			break;		/* pthread_create() failed */
4852248Sraf 		tpool->tp_current++;
4862248Sraf 	}
4872248Sraf 	sig_mutex_unlock(&tpool->tp_mutex);
4882248Sraf }
4892248Sraf 
4902248Sraf int
tpool_member(tpool_t * tpool)4912248Sraf tpool_member(tpool_t *tpool)
4922248Sraf {
4932248Sraf 	pthread_t my_tid = pthread_self();
4942248Sraf 	tpool_active_t *activep;
4952248Sraf 
4962248Sraf 	ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
4972248Sraf 
4982248Sraf 	sig_mutex_lock(&tpool->tp_mutex);
4992248Sraf 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
5002248Sraf 		if (activep->tpa_tid == my_tid) {
5012248Sraf 			sig_mutex_unlock(&tpool->tp_mutex);
5022248Sraf 			return (1);
5032248Sraf 		}
5042248Sraf 	}
5052248Sraf 	sig_mutex_unlock(&tpool->tp_mutex);
5062248Sraf 	return (0);
5072248Sraf }
5082248Sraf 
5092248Sraf void
postfork1_child_tpool(void)5102248Sraf postfork1_child_tpool(void)
5112248Sraf {
5122248Sraf 	pthread_t my_tid = pthread_self();
5132248Sraf 	tpool_t *tpool;
5142248Sraf 	tpool_job_t *job;
5152248Sraf 
5162248Sraf 	/*
5172248Sraf 	 * All of the thread pool workers are gone, except possibly
5182248Sraf 	 * for the current thread, if it is a thread pool worker thread.
5192248Sraf 	 * Retain the thread pools, but make them all empty.  Whatever
5202248Sraf 	 * jobs were queued or running belong to the parent process.
5212248Sraf 	 */
5222248Sraf top:
5232248Sraf 	if ((tpool = thread_pools) == NULL)
5242248Sraf 		return;
5252248Sraf 
5262248Sraf 	do {
5272248Sraf 		tpool_active_t *activep;
5282248Sraf 
5292248Sraf 		(void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
5302248Sraf 		(void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
5312248Sraf 		(void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
5322248Sraf 		(void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
5332248Sraf 		for (job = tpool->tp_head; job; job = tpool->tp_head) {
5342248Sraf 			tpool->tp_head = job->tpj_next;
5352248Sraf 			lfree(job, sizeof (*job));
5362248Sraf 		}
5372248Sraf 		tpool->tp_tail = NULL;
5382248Sraf 		tpool->tp_njobs = 0;
5392248Sraf 		for (activep = tpool->tp_active; activep;
5402248Sraf 		    activep = activep->tpa_next) {
5412248Sraf 			if (activep->tpa_tid == my_tid) {
5422248Sraf 				activep->tpa_next = NULL;
5432248Sraf 				break;
5442248Sraf 			}
5452248Sraf 		}
5462248Sraf 		tpool->tp_idle = 0;
5472248Sraf 		tpool->tp_current = 0;
5482248Sraf 		if ((tpool->tp_active = activep) != NULL)
5492248Sraf 			tpool->tp_current = 1;
5502248Sraf 		tpool->tp_flags &= ~TP_WAIT;
5512248Sraf 		if (tpool->tp_flags & (TP_DESTROY | TP_ABANDON)) {
5522248Sraf 			tpool->tp_flags &= ~TP_DESTROY;
5532248Sraf 			tpool->tp_flags |= TP_ABANDON;
5542248Sraf 			if (tpool->tp_current == 0) {
5552248Sraf 				delete_pool(tpool);
5562248Sraf 				goto top;	/* start over */
5572248Sraf 			}
5582248Sraf 		}
5592248Sraf 	} while ((tpool = tpool->tp_forw) != thread_pools);
5602248Sraf }
561