xref: /netbsd-src/external/cddl/osnet/lib/libzfs/thread_pool.c (revision ba2539a9805a0544ff82c0003cc02fe1eee5603d)
1*ba2539a9Schs /*
2*ba2539a9Schs  * CDDL HEADER START
3*ba2539a9Schs  *
4*ba2539a9Schs  * The contents of this file are subject to the terms of the
5*ba2539a9Schs  * Common Development and Distribution License (the "License").
6*ba2539a9Schs  * You may not use this file except in compliance with the License.
7*ba2539a9Schs  *
8*ba2539a9Schs  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9*ba2539a9Schs  * or http://www.opensolaris.org/os/licensing.
10*ba2539a9Schs  * See the License for the specific language governing permissions
11*ba2539a9Schs  * and limitations under the License.
12*ba2539a9Schs  *
13*ba2539a9Schs  * When distributing Covered Code, include this CDDL HEADER in each
14*ba2539a9Schs  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15*ba2539a9Schs  * If applicable, add the following below this CDDL HEADER, with the
16*ba2539a9Schs  * fields enclosed by brackets "[]" replaced with your own identifying
17*ba2539a9Schs  * information: Portions Copyright [yyyy] [name of copyright owner]
18*ba2539a9Schs  *
19*ba2539a9Schs  * CDDL HEADER END
20*ba2539a9Schs  */
21*ba2539a9Schs 
22*ba2539a9Schs /*
23*ba2539a9Schs  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24*ba2539a9Schs  * Use is subject to license terms.
25*ba2539a9Schs  */
26*ba2539a9Schs 
27*ba2539a9Schs #include <sys/cdefs.h>
28*ba2539a9Schs /* __FBSDID("$FreeBSD: head/cddl/compat/opensolaris/misc/thread_pool.c 275595 2014-12-08 06:10:47Z delphij $"); */
29*ba2539a9Schs 
30*ba2539a9Schs #include <stdlib.h>
31*ba2539a9Schs #include <signal.h>
32*ba2539a9Schs #include <errno.h>
33*ba2539a9Schs #include "thread_pool_impl.h"
34*ba2539a9Schs 
35*ba2539a9Schs typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */
36*ba2539a9Schs 
37*ba2539a9Schs static void
delete_pool(tpool_t * tpool)38*ba2539a9Schs delete_pool(tpool_t *tpool)
39*ba2539a9Schs {
40*ba2539a9Schs 	tpool_job_t *job;
41*ba2539a9Schs 
42*ba2539a9Schs 	/*
43*ba2539a9Schs 	 * There should be no pending jobs, but just in case...
44*ba2539a9Schs 	 */
45*ba2539a9Schs 	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
46*ba2539a9Schs 		tpool->tp_head = job->tpj_next;
47*ba2539a9Schs 		free(job);
48*ba2539a9Schs 	}
49*ba2539a9Schs 	(void) pthread_attr_destroy(&tpool->tp_attr);
50*ba2539a9Schs 	free(tpool);
51*ba2539a9Schs }
52*ba2539a9Schs 
53*ba2539a9Schs /*
54*ba2539a9Schs  * Worker thread is terminating.
55*ba2539a9Schs  */
56*ba2539a9Schs static void
worker_cleanup(void * arg)57*ba2539a9Schs worker_cleanup(void *arg)
58*ba2539a9Schs {
59*ba2539a9Schs 	tpool_t *tpool = arg;
60*ba2539a9Schs 
61*ba2539a9Schs 	if (--tpool->tp_current == 0 &&
62*ba2539a9Schs 	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
63*ba2539a9Schs 		if (tpool->tp_flags & TP_ABANDON) {
64*ba2539a9Schs 			pthread_mutex_unlock(&tpool->tp_mutex);
65*ba2539a9Schs 			delete_pool(tpool);
66*ba2539a9Schs 			return;
67*ba2539a9Schs 		}
68*ba2539a9Schs 		if (tpool->tp_flags & TP_DESTROY)
69*ba2539a9Schs 			(void) pthread_cond_broadcast(&tpool->tp_busycv);
70*ba2539a9Schs 	}
71*ba2539a9Schs 	pthread_mutex_unlock(&tpool->tp_mutex);
72*ba2539a9Schs }
73*ba2539a9Schs 
74*ba2539a9Schs static void
notify_waiters(tpool_t * tpool)75*ba2539a9Schs notify_waiters(tpool_t *tpool)
76*ba2539a9Schs {
77*ba2539a9Schs 	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
78*ba2539a9Schs 		tpool->tp_flags &= ~TP_WAIT;
79*ba2539a9Schs 		(void) pthread_cond_broadcast(&tpool->tp_waitcv);
80*ba2539a9Schs 	}
81*ba2539a9Schs }
82*ba2539a9Schs 
83*ba2539a9Schs /*
84*ba2539a9Schs  * Called by a worker thread on return from a tpool_dispatch()d job.
85*ba2539a9Schs  */
86*ba2539a9Schs static void
job_cleanup(void * arg)87*ba2539a9Schs job_cleanup(void *arg)
88*ba2539a9Schs {
89*ba2539a9Schs 	tpool_t *tpool = arg;
90*ba2539a9Schs 	pthread_t my_tid = pthread_self();
91*ba2539a9Schs 	tpool_active_t *activep;
92*ba2539a9Schs 	tpool_active_t **activepp;
93*ba2539a9Schs 
94*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
95*ba2539a9Schs 	/* CSTYLED */
96*ba2539a9Schs 	for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
97*ba2539a9Schs 		activep = *activepp;
98*ba2539a9Schs 		if (activep->tpa_tid == my_tid) {
99*ba2539a9Schs 			*activepp = activep->tpa_next;
100*ba2539a9Schs 			break;
101*ba2539a9Schs 		}
102*ba2539a9Schs 	}
103*ba2539a9Schs 	if (tpool->tp_flags & TP_WAIT)
104*ba2539a9Schs 		notify_waiters(tpool);
105*ba2539a9Schs }
106*ba2539a9Schs 
107*ba2539a9Schs static void *
tpool_worker(void * arg)108*ba2539a9Schs tpool_worker(void *arg)
109*ba2539a9Schs {
110*ba2539a9Schs 	tpool_t *tpool = (tpool_t *)arg;
111*ba2539a9Schs 	int elapsed;
112*ba2539a9Schs 	tpool_job_t *job;
113*ba2539a9Schs 	void (*func)(void *);
114*ba2539a9Schs 	tpool_active_t active;
115*ba2539a9Schs 	sigset_t maskset;
116*ba2539a9Schs 
117*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
118*ba2539a9Schs 	pthread_cleanup_push(worker_cleanup, tpool);
119*ba2539a9Schs 
120*ba2539a9Schs 	/*
121*ba2539a9Schs 	 * This is the worker's main loop.
122*ba2539a9Schs 	 * It will only be left if a timeout or an error has occured.
123*ba2539a9Schs 	 */
124*ba2539a9Schs 	active.tpa_tid = pthread_self();
125*ba2539a9Schs 	for (;;) {
126*ba2539a9Schs 		elapsed = 0;
127*ba2539a9Schs 		tpool->tp_idle++;
128*ba2539a9Schs 		if (tpool->tp_flags & TP_WAIT)
129*ba2539a9Schs 			notify_waiters(tpool);
130*ba2539a9Schs 		while ((tpool->tp_head == NULL ||
131*ba2539a9Schs 		    (tpool->tp_flags & TP_SUSPEND)) &&
132*ba2539a9Schs 		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
133*ba2539a9Schs 			if (tpool->tp_current <= tpool->tp_minimum ||
134*ba2539a9Schs 			    tpool->tp_linger == 0) {
135*ba2539a9Schs 				(void) pthread_cond_wait(&tpool->tp_workcv,
136*ba2539a9Schs 				    &tpool->tp_mutex);
137*ba2539a9Schs 			} else {
138*ba2539a9Schs 				struct timespec timeout;
139*ba2539a9Schs 
140*ba2539a9Schs 				clock_gettime(CLOCK_MONOTONIC, &timeout);
141*ba2539a9Schs 				timeout.tv_sec += tpool->tp_linger;
142*ba2539a9Schs 				if (pthread_cond_timedwait(&tpool->tp_workcv,
143*ba2539a9Schs 				    &tpool->tp_mutex, &timeout) != 0) {
144*ba2539a9Schs 					elapsed = 1;
145*ba2539a9Schs 					break;
146*ba2539a9Schs 				}
147*ba2539a9Schs 			}
148*ba2539a9Schs 		}
149*ba2539a9Schs 		tpool->tp_idle--;
150*ba2539a9Schs 		if (tpool->tp_flags & TP_DESTROY)
151*ba2539a9Schs 			break;
152*ba2539a9Schs 		if (tpool->tp_flags & TP_ABANDON) {
153*ba2539a9Schs 			/* can't abandon a suspended pool */
154*ba2539a9Schs 			if (tpool->tp_flags & TP_SUSPEND) {
155*ba2539a9Schs 				tpool->tp_flags &= ~TP_SUSPEND;
156*ba2539a9Schs 				(void) pthread_cond_broadcast(&tpool->tp_workcv);
157*ba2539a9Schs 			}
158*ba2539a9Schs 			if (tpool->tp_head == NULL)
159*ba2539a9Schs 				break;
160*ba2539a9Schs 		}
161*ba2539a9Schs 		if ((job = tpool->tp_head) != NULL &&
162*ba2539a9Schs 		    !(tpool->tp_flags & TP_SUSPEND)) {
163*ba2539a9Schs 			elapsed = 0;
164*ba2539a9Schs 			func = job->tpj_func;
165*ba2539a9Schs 			arg = job->tpj_arg;
166*ba2539a9Schs 			tpool->tp_head = job->tpj_next;
167*ba2539a9Schs 			if (job == tpool->tp_tail)
168*ba2539a9Schs 				tpool->tp_tail = NULL;
169*ba2539a9Schs 			tpool->tp_njobs--;
170*ba2539a9Schs 			active.tpa_next = tpool->tp_active;
171*ba2539a9Schs 			tpool->tp_active = &active;
172*ba2539a9Schs 			pthread_mutex_unlock(&tpool->tp_mutex);
173*ba2539a9Schs 			pthread_cleanup_push(job_cleanup, tpool);
174*ba2539a9Schs 			free(job);
175*ba2539a9Schs 			/*
176*ba2539a9Schs 			 * Call the specified function.
177*ba2539a9Schs 			 */
178*ba2539a9Schs 			func(arg);
179*ba2539a9Schs 			/*
180*ba2539a9Schs 			 * We don't know what this thread has been doing,
181*ba2539a9Schs 			 * so we reset its signal mask and cancellation
182*ba2539a9Schs 			 * state back to the initial values.
183*ba2539a9Schs 			 */
184*ba2539a9Schs 			sigfillset(&maskset);
185*ba2539a9Schs 			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
186*ba2539a9Schs 			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
187*ba2539a9Schs 			    NULL);
188*ba2539a9Schs 			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
189*ba2539a9Schs 			    NULL);
190*ba2539a9Schs 			pthread_cleanup_pop(1);
191*ba2539a9Schs 		}
192*ba2539a9Schs 		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
193*ba2539a9Schs 			/*
194*ba2539a9Schs 			 * We timed out and there is no work to be done
195*ba2539a9Schs 			 * and the number of workers exceeds the minimum.
196*ba2539a9Schs 			 * Exit now to reduce the size of the pool.
197*ba2539a9Schs 			 */
198*ba2539a9Schs 			break;
199*ba2539a9Schs 		}
200*ba2539a9Schs 	}
201*ba2539a9Schs 	pthread_cleanup_pop(1);
202*ba2539a9Schs 	return (arg);
203*ba2539a9Schs }
204*ba2539a9Schs 
205*ba2539a9Schs /*
206*ba2539a9Schs  * Create a worker thread, with all signals blocked.
207*ba2539a9Schs  */
208*ba2539a9Schs static int
create_worker(tpool_t * tpool)209*ba2539a9Schs create_worker(tpool_t *tpool)
210*ba2539a9Schs {
211*ba2539a9Schs 	sigset_t maskset, oset;
212*ba2539a9Schs 	pthread_t thread;
213*ba2539a9Schs 	int error;
214*ba2539a9Schs 
215*ba2539a9Schs 	sigfillset(&maskset);
216*ba2539a9Schs 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
217*ba2539a9Schs 	error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
218*ba2539a9Schs 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
219*ba2539a9Schs 	return (error);
220*ba2539a9Schs }
221*ba2539a9Schs 
222*ba2539a9Schs tpool_t	*
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)223*ba2539a9Schs tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
224*ba2539a9Schs 	pthread_attr_t *attr)
225*ba2539a9Schs {
226*ba2539a9Schs 	tpool_t	*tpool;
227*ba2539a9Schs 	int error;
228*ba2539a9Schs 
229*ba2539a9Schs 	if (min_threads > max_threads || max_threads < 1) {
230*ba2539a9Schs 		errno = EINVAL;
231*ba2539a9Schs 		return (NULL);
232*ba2539a9Schs 	}
233*ba2539a9Schs 
234*ba2539a9Schs 	tpool = calloc(1, sizeof (*tpool));
235*ba2539a9Schs 	if (tpool == NULL) {
236*ba2539a9Schs 		errno = ENOMEM;
237*ba2539a9Schs 		return (NULL);
238*ba2539a9Schs 	}
239*ba2539a9Schs 	(void) pthread_mutex_init(&tpool->tp_mutex, NULL);
240*ba2539a9Schs 	(void) pthread_cond_init(&tpool->tp_busycv, NULL);
241*ba2539a9Schs 	(void) pthread_cond_init(&tpool->tp_workcv, NULL);
242*ba2539a9Schs 	(void) pthread_cond_init(&tpool->tp_waitcv, NULL);
243*ba2539a9Schs 	tpool->tp_minimum = min_threads;
244*ba2539a9Schs 	tpool->tp_maximum = max_threads;
245*ba2539a9Schs 	tpool->tp_linger = linger;
246*ba2539a9Schs 
247*ba2539a9Schs 	/* make all pool threads be detached daemon threads */
248*ba2539a9Schs 	(void) pthread_attr_init(&tpool->tp_attr);
249*ba2539a9Schs 	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
250*ba2539a9Schs 	    PTHREAD_CREATE_DETACHED);
251*ba2539a9Schs 
252*ba2539a9Schs 	return (tpool);
253*ba2539a9Schs }
254*ba2539a9Schs 
255*ba2539a9Schs /*
256*ba2539a9Schs  * Dispatch a work request to the thread pool.
257*ba2539a9Schs  * If there are idle workers, awaken one.
258*ba2539a9Schs  * Else, if the maximum number of workers has
259*ba2539a9Schs  * not been reached, spawn a new worker thread.
260*ba2539a9Schs  * Else just return with the job added to the queue.
261*ba2539a9Schs  */
262*ba2539a9Schs int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)263*ba2539a9Schs tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
264*ba2539a9Schs {
265*ba2539a9Schs 	tpool_job_t *job;
266*ba2539a9Schs 
267*ba2539a9Schs 	if ((job = calloc(1, sizeof (*job))) == NULL)
268*ba2539a9Schs 		return (-1);
269*ba2539a9Schs 	job->tpj_next = NULL;
270*ba2539a9Schs 	job->tpj_func = func;
271*ba2539a9Schs 	job->tpj_arg = arg;
272*ba2539a9Schs 
273*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
274*ba2539a9Schs 
275*ba2539a9Schs 	if (tpool->tp_head == NULL)
276*ba2539a9Schs 		tpool->tp_head = job;
277*ba2539a9Schs 	else
278*ba2539a9Schs 		tpool->tp_tail->tpj_next = job;
279*ba2539a9Schs 	tpool->tp_tail = job;
280*ba2539a9Schs 	tpool->tp_njobs++;
281*ba2539a9Schs 
282*ba2539a9Schs 	if (!(tpool->tp_flags & TP_SUSPEND)) {
283*ba2539a9Schs 		if (tpool->tp_idle > 0)
284*ba2539a9Schs 			(void) pthread_cond_signal(&tpool->tp_workcv);
285*ba2539a9Schs 		else if (tpool->tp_current < tpool->tp_maximum &&
286*ba2539a9Schs 		    create_worker(tpool) == 0)
287*ba2539a9Schs 			tpool->tp_current++;
288*ba2539a9Schs 	}
289*ba2539a9Schs 
290*ba2539a9Schs 	pthread_mutex_unlock(&tpool->tp_mutex);
291*ba2539a9Schs 	return (0);
292*ba2539a9Schs }
293*ba2539a9Schs 
294*ba2539a9Schs /*
295*ba2539a9Schs  * Assumes: by the time tpool_destroy() is called no one will use this
296*ba2539a9Schs  * thread pool in any way and no one will try to dispatch entries to it.
297*ba2539a9Schs  * Calling tpool_destroy() from a job in the pool will cause deadlock.
298*ba2539a9Schs  */
299*ba2539a9Schs void
tpool_destroy(tpool_t * tpool)300*ba2539a9Schs tpool_destroy(tpool_t *tpool)
301*ba2539a9Schs {
302*ba2539a9Schs 	tpool_active_t *activep;
303*ba2539a9Schs 
304*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
305*ba2539a9Schs 	pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
306*ba2539a9Schs 
307*ba2539a9Schs 	/* mark the pool as being destroyed; wakeup idle workers */
308*ba2539a9Schs 	tpool->tp_flags |= TP_DESTROY;
309*ba2539a9Schs 	tpool->tp_flags &= ~TP_SUSPEND;
310*ba2539a9Schs 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
311*ba2539a9Schs 
312*ba2539a9Schs 	/* cancel all active workers */
313*ba2539a9Schs 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
314*ba2539a9Schs 		(void) pthread_cancel(activep->tpa_tid);
315*ba2539a9Schs 
316*ba2539a9Schs 	/* wait for all active workers to finish */
317*ba2539a9Schs 	while (tpool->tp_active != NULL) {
318*ba2539a9Schs 		tpool->tp_flags |= TP_WAIT;
319*ba2539a9Schs 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
320*ba2539a9Schs 	}
321*ba2539a9Schs 
322*ba2539a9Schs 	/* the last worker to terminate will wake us up */
323*ba2539a9Schs 	while (tpool->tp_current != 0)
324*ba2539a9Schs 		(void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
325*ba2539a9Schs 
326*ba2539a9Schs 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
327*ba2539a9Schs 	delete_pool(tpool);
328*ba2539a9Schs }
329*ba2539a9Schs 
330*ba2539a9Schs /*
331*ba2539a9Schs  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
332*ba2539a9Schs  * The last worker to terminate will delete the pool.
333*ba2539a9Schs  */
334*ba2539a9Schs void
tpool_abandon(tpool_t * tpool)335*ba2539a9Schs tpool_abandon(tpool_t *tpool)
336*ba2539a9Schs {
337*ba2539a9Schs 
338*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
339*ba2539a9Schs 	if (tpool->tp_current == 0) {
340*ba2539a9Schs 		/* no workers, just delete the pool */
341*ba2539a9Schs 		pthread_mutex_unlock(&tpool->tp_mutex);
342*ba2539a9Schs 		delete_pool(tpool);
343*ba2539a9Schs 	} else {
344*ba2539a9Schs 		/* wake up all workers, last one will delete the pool */
345*ba2539a9Schs 		tpool->tp_flags |= TP_ABANDON;
346*ba2539a9Schs 		tpool->tp_flags &= ~TP_SUSPEND;
347*ba2539a9Schs 		(void) pthread_cond_broadcast(&tpool->tp_workcv);
348*ba2539a9Schs 		pthread_mutex_unlock(&tpool->tp_mutex);
349*ba2539a9Schs 	}
350*ba2539a9Schs }
351*ba2539a9Schs 
352*ba2539a9Schs /*
353*ba2539a9Schs  * Wait for all jobs to complete.
354*ba2539a9Schs  * Calling tpool_wait() from a job in the pool will cause deadlock.
355*ba2539a9Schs  */
356*ba2539a9Schs void
tpool_wait(tpool_t * tpool)357*ba2539a9Schs tpool_wait(tpool_t *tpool)
358*ba2539a9Schs {
359*ba2539a9Schs 
360*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
361*ba2539a9Schs 	pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
362*ba2539a9Schs 	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
363*ba2539a9Schs 		tpool->tp_flags |= TP_WAIT;
364*ba2539a9Schs 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
365*ba2539a9Schs 	}
366*ba2539a9Schs 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
367*ba2539a9Schs }
368*ba2539a9Schs 
369*ba2539a9Schs void
tpool_suspend(tpool_t * tpool)370*ba2539a9Schs tpool_suspend(tpool_t *tpool)
371*ba2539a9Schs {
372*ba2539a9Schs 
373*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
374*ba2539a9Schs 	tpool->tp_flags |= TP_SUSPEND;
375*ba2539a9Schs 	pthread_mutex_unlock(&tpool->tp_mutex);
376*ba2539a9Schs }
377*ba2539a9Schs 
378*ba2539a9Schs int
tpool_suspended(tpool_t * tpool)379*ba2539a9Schs tpool_suspended(tpool_t *tpool)
380*ba2539a9Schs {
381*ba2539a9Schs 	int suspended;
382*ba2539a9Schs 
383*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
384*ba2539a9Schs 	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
385*ba2539a9Schs 	pthread_mutex_unlock(&tpool->tp_mutex);
386*ba2539a9Schs 
387*ba2539a9Schs 	return (suspended);
388*ba2539a9Schs }
389*ba2539a9Schs 
390*ba2539a9Schs void
tpool_resume(tpool_t * tpool)391*ba2539a9Schs tpool_resume(tpool_t *tpool)
392*ba2539a9Schs {
393*ba2539a9Schs 	int excess;
394*ba2539a9Schs 
395*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
396*ba2539a9Schs 	if (!(tpool->tp_flags & TP_SUSPEND)) {
397*ba2539a9Schs 		pthread_mutex_unlock(&tpool->tp_mutex);
398*ba2539a9Schs 		return;
399*ba2539a9Schs 	}
400*ba2539a9Schs 	tpool->tp_flags &= ~TP_SUSPEND;
401*ba2539a9Schs 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
402*ba2539a9Schs 	excess = tpool->tp_njobs - tpool->tp_idle;
403*ba2539a9Schs 	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
404*ba2539a9Schs 		if (create_worker(tpool) != 0)
405*ba2539a9Schs 			break;		/* pthread_create() failed */
406*ba2539a9Schs 		tpool->tp_current++;
407*ba2539a9Schs 	}
408*ba2539a9Schs 	pthread_mutex_unlock(&tpool->tp_mutex);
409*ba2539a9Schs }
410*ba2539a9Schs 
411*ba2539a9Schs int
tpool_member(tpool_t * tpool)412*ba2539a9Schs tpool_member(tpool_t *tpool)
413*ba2539a9Schs {
414*ba2539a9Schs 	pthread_t my_tid = pthread_self();
415*ba2539a9Schs 	tpool_active_t *activep;
416*ba2539a9Schs 
417*ba2539a9Schs 	pthread_mutex_lock(&tpool->tp_mutex);
418*ba2539a9Schs 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
419*ba2539a9Schs 		if (activep->tpa_tid == my_tid) {
420*ba2539a9Schs 			pthread_mutex_unlock(&tpool->tp_mutex);
421*ba2539a9Schs 			return (1);
422*ba2539a9Schs 		}
423*ba2539a9Schs 	}
424*ba2539a9Schs 	pthread_mutex_unlock(&tpool->tp_mutex);
425*ba2539a9Schs 	return (0);
426*ba2539a9Schs }
427