xref: /netbsd-src/external/cddl/osnet/lib/libzfs/thread_pool.c (revision ba2539a9805a0544ff82c0003cc02fe1eee5603d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include <sys/cdefs.h>
28 /* __FBSDID("$FreeBSD: head/cddl/compat/opensolaris/misc/thread_pool.c 275595 2014-12-08 06:10:47Z delphij $"); */
29 
30 #include <stdlib.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include "thread_pool_impl.h"
34 
35 typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */
36 
37 static void
delete_pool(tpool_t * tpool)38 delete_pool(tpool_t *tpool)
39 {
40 	tpool_job_t *job;
41 
42 	/*
43 	 * There should be no pending jobs, but just in case...
44 	 */
45 	for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
46 		tpool->tp_head = job->tpj_next;
47 		free(job);
48 	}
49 	(void) pthread_attr_destroy(&tpool->tp_attr);
50 	free(tpool);
51 }
52 
53 /*
54  * Worker thread is terminating.
55  */
56 static void
worker_cleanup(void * arg)57 worker_cleanup(void *arg)
58 {
59 	tpool_t *tpool = arg;
60 
61 	if (--tpool->tp_current == 0 &&
62 	    (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
63 		if (tpool->tp_flags & TP_ABANDON) {
64 			pthread_mutex_unlock(&tpool->tp_mutex);
65 			delete_pool(tpool);
66 			return;
67 		}
68 		if (tpool->tp_flags & TP_DESTROY)
69 			(void) pthread_cond_broadcast(&tpool->tp_busycv);
70 	}
71 	pthread_mutex_unlock(&tpool->tp_mutex);
72 }
73 
74 static void
notify_waiters(tpool_t * tpool)75 notify_waiters(tpool_t *tpool)
76 {
77 	if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
78 		tpool->tp_flags &= ~TP_WAIT;
79 		(void) pthread_cond_broadcast(&tpool->tp_waitcv);
80 	}
81 }
82 
83 /*
84  * Called by a worker thread on return from a tpool_dispatch()d job.
85  */
86 static void
job_cleanup(void * arg)87 job_cleanup(void *arg)
88 {
89 	tpool_t *tpool = arg;
90 	pthread_t my_tid = pthread_self();
91 	tpool_active_t *activep;
92 	tpool_active_t **activepp;
93 
94 	pthread_mutex_lock(&tpool->tp_mutex);
95 	/* CSTYLED */
96 	for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
97 		activep = *activepp;
98 		if (activep->tpa_tid == my_tid) {
99 			*activepp = activep->tpa_next;
100 			break;
101 		}
102 	}
103 	if (tpool->tp_flags & TP_WAIT)
104 		notify_waiters(tpool);
105 }
106 
107 static void *
tpool_worker(void * arg)108 tpool_worker(void *arg)
109 {
110 	tpool_t *tpool = (tpool_t *)arg;
111 	int elapsed;
112 	tpool_job_t *job;
113 	void (*func)(void *);
114 	tpool_active_t active;
115 	sigset_t maskset;
116 
117 	pthread_mutex_lock(&tpool->tp_mutex);
118 	pthread_cleanup_push(worker_cleanup, tpool);
119 
120 	/*
121 	 * This is the worker's main loop.
122 	 * It will only be left if a timeout or an error has occured.
123 	 */
124 	active.tpa_tid = pthread_self();
125 	for (;;) {
126 		elapsed = 0;
127 		tpool->tp_idle++;
128 		if (tpool->tp_flags & TP_WAIT)
129 			notify_waiters(tpool);
130 		while ((tpool->tp_head == NULL ||
131 		    (tpool->tp_flags & TP_SUSPEND)) &&
132 		    !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
133 			if (tpool->tp_current <= tpool->tp_minimum ||
134 			    tpool->tp_linger == 0) {
135 				(void) pthread_cond_wait(&tpool->tp_workcv,
136 				    &tpool->tp_mutex);
137 			} else {
138 				struct timespec timeout;
139 
140 				clock_gettime(CLOCK_MONOTONIC, &timeout);
141 				timeout.tv_sec += tpool->tp_linger;
142 				if (pthread_cond_timedwait(&tpool->tp_workcv,
143 				    &tpool->tp_mutex, &timeout) != 0) {
144 					elapsed = 1;
145 					break;
146 				}
147 			}
148 		}
149 		tpool->tp_idle--;
150 		if (tpool->tp_flags & TP_DESTROY)
151 			break;
152 		if (tpool->tp_flags & TP_ABANDON) {
153 			/* can't abandon a suspended pool */
154 			if (tpool->tp_flags & TP_SUSPEND) {
155 				tpool->tp_flags &= ~TP_SUSPEND;
156 				(void) pthread_cond_broadcast(&tpool->tp_workcv);
157 			}
158 			if (tpool->tp_head == NULL)
159 				break;
160 		}
161 		if ((job = tpool->tp_head) != NULL &&
162 		    !(tpool->tp_flags & TP_SUSPEND)) {
163 			elapsed = 0;
164 			func = job->tpj_func;
165 			arg = job->tpj_arg;
166 			tpool->tp_head = job->tpj_next;
167 			if (job == tpool->tp_tail)
168 				tpool->tp_tail = NULL;
169 			tpool->tp_njobs--;
170 			active.tpa_next = tpool->tp_active;
171 			tpool->tp_active = &active;
172 			pthread_mutex_unlock(&tpool->tp_mutex);
173 			pthread_cleanup_push(job_cleanup, tpool);
174 			free(job);
175 			/*
176 			 * Call the specified function.
177 			 */
178 			func(arg);
179 			/*
180 			 * We don't know what this thread has been doing,
181 			 * so we reset its signal mask and cancellation
182 			 * state back to the initial values.
183 			 */
184 			sigfillset(&maskset);
185 			(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
186 			(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
187 			    NULL);
188 			(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
189 			    NULL);
190 			pthread_cleanup_pop(1);
191 		}
192 		if (elapsed && tpool->tp_current > tpool->tp_minimum) {
193 			/*
194 			 * We timed out and there is no work to be done
195 			 * and the number of workers exceeds the minimum.
196 			 * Exit now to reduce the size of the pool.
197 			 */
198 			break;
199 		}
200 	}
201 	pthread_cleanup_pop(1);
202 	return (arg);
203 }
204 
205 /*
206  * Create a worker thread, with all signals blocked.
207  */
208 static int
create_worker(tpool_t * tpool)209 create_worker(tpool_t *tpool)
210 {
211 	sigset_t maskset, oset;
212 	pthread_t thread;
213 	int error;
214 
215 	sigfillset(&maskset);
216 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
217 	error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
218 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
219 	return (error);
220 }
221 
222 tpool_t	*
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)223 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
224 	pthread_attr_t *attr)
225 {
226 	tpool_t	*tpool;
227 	int error;
228 
229 	if (min_threads > max_threads || max_threads < 1) {
230 		errno = EINVAL;
231 		return (NULL);
232 	}
233 
234 	tpool = calloc(1, sizeof (*tpool));
235 	if (tpool == NULL) {
236 		errno = ENOMEM;
237 		return (NULL);
238 	}
239 	(void) pthread_mutex_init(&tpool->tp_mutex, NULL);
240 	(void) pthread_cond_init(&tpool->tp_busycv, NULL);
241 	(void) pthread_cond_init(&tpool->tp_workcv, NULL);
242 	(void) pthread_cond_init(&tpool->tp_waitcv, NULL);
243 	tpool->tp_minimum = min_threads;
244 	tpool->tp_maximum = max_threads;
245 	tpool->tp_linger = linger;
246 
247 	/* make all pool threads be detached daemon threads */
248 	(void) pthread_attr_init(&tpool->tp_attr);
249 	(void) pthread_attr_setdetachstate(&tpool->tp_attr,
250 	    PTHREAD_CREATE_DETACHED);
251 
252 	return (tpool);
253 }
254 
255 /*
256  * Dispatch a work request to the thread pool.
257  * If there are idle workers, awaken one.
258  * Else, if the maximum number of workers has
259  * not been reached, spawn a new worker thread.
260  * Else just return with the job added to the queue.
261  */
262 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)263 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
264 {
265 	tpool_job_t *job;
266 
267 	if ((job = calloc(1, sizeof (*job))) == NULL)
268 		return (-1);
269 	job->tpj_next = NULL;
270 	job->tpj_func = func;
271 	job->tpj_arg = arg;
272 
273 	pthread_mutex_lock(&tpool->tp_mutex);
274 
275 	if (tpool->tp_head == NULL)
276 		tpool->tp_head = job;
277 	else
278 		tpool->tp_tail->tpj_next = job;
279 	tpool->tp_tail = job;
280 	tpool->tp_njobs++;
281 
282 	if (!(tpool->tp_flags & TP_SUSPEND)) {
283 		if (tpool->tp_idle > 0)
284 			(void) pthread_cond_signal(&tpool->tp_workcv);
285 		else if (tpool->tp_current < tpool->tp_maximum &&
286 		    create_worker(tpool) == 0)
287 			tpool->tp_current++;
288 	}
289 
290 	pthread_mutex_unlock(&tpool->tp_mutex);
291 	return (0);
292 }
293 
294 /*
295  * Assumes: by the time tpool_destroy() is called no one will use this
296  * thread pool in any way and no one will try to dispatch entries to it.
297  * Calling tpool_destroy() from a job in the pool will cause deadlock.
298  */
299 void
tpool_destroy(tpool_t * tpool)300 tpool_destroy(tpool_t *tpool)
301 {
302 	tpool_active_t *activep;
303 
304 	pthread_mutex_lock(&tpool->tp_mutex);
305 	pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
306 
307 	/* mark the pool as being destroyed; wakeup idle workers */
308 	tpool->tp_flags |= TP_DESTROY;
309 	tpool->tp_flags &= ~TP_SUSPEND;
310 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
311 
312 	/* cancel all active workers */
313 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
314 		(void) pthread_cancel(activep->tpa_tid);
315 
316 	/* wait for all active workers to finish */
317 	while (tpool->tp_active != NULL) {
318 		tpool->tp_flags |= TP_WAIT;
319 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
320 	}
321 
322 	/* the last worker to terminate will wake us up */
323 	while (tpool->tp_current != 0)
324 		(void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
325 
326 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
327 	delete_pool(tpool);
328 }
329 
330 /*
331  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
332  * The last worker to terminate will delete the pool.
333  */
334 void
tpool_abandon(tpool_t * tpool)335 tpool_abandon(tpool_t *tpool)
336 {
337 
338 	pthread_mutex_lock(&tpool->tp_mutex);
339 	if (tpool->tp_current == 0) {
340 		/* no workers, just delete the pool */
341 		pthread_mutex_unlock(&tpool->tp_mutex);
342 		delete_pool(tpool);
343 	} else {
344 		/* wake up all workers, last one will delete the pool */
345 		tpool->tp_flags |= TP_ABANDON;
346 		tpool->tp_flags &= ~TP_SUSPEND;
347 		(void) pthread_cond_broadcast(&tpool->tp_workcv);
348 		pthread_mutex_unlock(&tpool->tp_mutex);
349 	}
350 }
351 
352 /*
353  * Wait for all jobs to complete.
354  * Calling tpool_wait() from a job in the pool will cause deadlock.
355  */
356 void
tpool_wait(tpool_t * tpool)357 tpool_wait(tpool_t *tpool)
358 {
359 
360 	pthread_mutex_lock(&tpool->tp_mutex);
361 	pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
362 	while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
363 		tpool->tp_flags |= TP_WAIT;
364 		(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
365 	}
366 	pthread_cleanup_pop(1);	/* pthread_mutex_unlock(&tpool->tp_mutex); */
367 }
368 
369 void
tpool_suspend(tpool_t * tpool)370 tpool_suspend(tpool_t *tpool)
371 {
372 
373 	pthread_mutex_lock(&tpool->tp_mutex);
374 	tpool->tp_flags |= TP_SUSPEND;
375 	pthread_mutex_unlock(&tpool->tp_mutex);
376 }
377 
378 int
tpool_suspended(tpool_t * tpool)379 tpool_suspended(tpool_t *tpool)
380 {
381 	int suspended;
382 
383 	pthread_mutex_lock(&tpool->tp_mutex);
384 	suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
385 	pthread_mutex_unlock(&tpool->tp_mutex);
386 
387 	return (suspended);
388 }
389 
390 void
tpool_resume(tpool_t * tpool)391 tpool_resume(tpool_t *tpool)
392 {
393 	int excess;
394 
395 	pthread_mutex_lock(&tpool->tp_mutex);
396 	if (!(tpool->tp_flags & TP_SUSPEND)) {
397 		pthread_mutex_unlock(&tpool->tp_mutex);
398 		return;
399 	}
400 	tpool->tp_flags &= ~TP_SUSPEND;
401 	(void) pthread_cond_broadcast(&tpool->tp_workcv);
402 	excess = tpool->tp_njobs - tpool->tp_idle;
403 	while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
404 		if (create_worker(tpool) != 0)
405 			break;		/* pthread_create() failed */
406 		tpool->tp_current++;
407 	}
408 	pthread_mutex_unlock(&tpool->tp_mutex);
409 }
410 
411 int
tpool_member(tpool_t * tpool)412 tpool_member(tpool_t *tpool)
413 {
414 	pthread_t my_tid = pthread_self();
415 	tpool_active_t *activep;
416 
417 	pthread_mutex_lock(&tpool->tp_mutex);
418 	for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
419 		if (activep->tpa_tid == my_tid) {
420 			pthread_mutex_unlock(&tpool->tp_mutex);
421 			return (1);
422 		}
423 	}
424 	pthread_mutex_unlock(&tpool->tp_mutex);
425 	return (0);
426 }
427