xref: /netbsd-src/sys/kern/kern_threadpool.c (revision caa99e8b408b0483e1a9e4c77ceb0b30422e647f)
1 /*	$NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $	*/
2 
3 /*-
4  * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
5  * All rights reserved.
6  *
7  * This code is derived from software contributed to The NetBSD Foundation
8  * by Taylor R. Campbell and Jason R. Thorpe.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 /*
33  * Thread pools.
34  *
35  * A thread pool is a collection of worker threads idle or running
36  * jobs, together with a dispatcher thread that does not run jobs but
37  * can be given jobs to assign to a worker thread.  Scheduling a job in
38  * a thread pool does not allocate or even sleep at all, except perhaps
39  * on an adaptive lock, unlike kthread_create.  Jobs reuse threads, so
40  * they do not incur the expense of creating and destroying kthreads
41  * unless there is not much work to be done.
42  *
43  * A per-CPU thread pool (threadpool_percpu) is a collection of thread
44  * pools, one per CPU bound to that CPU.  For each priority level in
45  * use, there is one shared unbound thread pool (i.e., pool of threads
46  * not bound to any CPU) and one shared per-CPU thread pool.
47  *
48  * To use the unbound thread pool at priority pri, call
49  * threadpool_get(&pool, pri).  When you're done, call
50  * threadpool_put(pool, pri).
51  *
52  * To use the per-CPU thread pools at priority pri, call
53  * threadpool_percpu_get(&pool_percpu, pri), and then use the thread
54  * pool returned by threadpool_percpu_ref(pool_percpu) for the current
55  * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another
56  * CPU.  When you're done, call threadpool_percpu_put(pool_percpu,
57  * pri).
58  *
59  * +--MACHINE-----------------------------------------------------+
60  * | +--CPU 0---------+ +--CPU 1---------+     +--CPU n---------+ |
61  * | | <dispatcher 0> | | <dispatcher 1> | ... | <dispatcher n> | |
62  * | | <idle 0a>      | | <running 1a>   | ... | <idle na>      | |
63  * | | <running 0b>   | | <running 1b>   | ... | <idle nb>      | |
64  * | | .              | | .              | ... | .              | |
65  * | | .              | | .              | ... | .              | |
66  * | | .              | | .              | ... | .              | |
67  * | +----------------+ +----------------+     +----------------+ |
68  * |            +--unbound-----------+                            |
69  * |            | <dispatcher n+1>   |                            |
70  * |            | <idle (n+1)a>      |                            |
71  * |            | <running (n+1)b>   |                            |
72  * |            +--------------------+                            |
73  * +--------------------------------------------------------------+
74  *
75  * XXX Why one dispatcher per CPU?  I did that originally to avoid
76  * touching remote CPUs' memory when scheduling a job, but that still
77  * requires interprocessor synchronization.  Perhaps we could get by
78  * with a single dispatcher thread, at the expense of another pointer
79  * in struct threadpool_job to identify the CPU on which it must run in
80  * order for the dispatcher to schedule it correctly.
81  */
82 
83 #include <sys/cdefs.h>
84 __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $");
85 
86 #include <sys/types.h>
87 #include <sys/param.h>
88 #include <sys/atomic.h>
89 #include <sys/condvar.h>
90 #include <sys/cpu.h>
91 #include <sys/kernel.h>
92 #include <sys/kmem.h>
93 #include <sys/kthread.h>
94 #include <sys/mutex.h>
95 #include <sys/once.h>
96 #include <sys/percpu.h>
97 #include <sys/pool.h>
98 #include <sys/proc.h>
99 #include <sys/queue.h>
100 #include <sys/sdt.h>
101 #include <sys/sysctl.h>
102 #include <sys/systm.h>
103 #include <sys/threadpool.h>
104 
105 /* Probes */
106 
107 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get,
108     "pri_t"/*pri*/);
109 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__create,
110     "pri_t"/*pri*/);
111 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__race,
112     "pri_t"/*pri*/);
113 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put,
114     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
115 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put__destroy,
116     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
117 
118 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get,
119     "pri_t"/*pri*/);
120 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__create,
121     "pri_t"/*pri*/);
122 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__race,
123     "pri_t"/*pri*/);
124 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put,
125     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
126 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put__destroy,
127     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
128 
129 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, create,
130     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/);
131 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__success,
132     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "struct threadpool *"/*pool*/);
133 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__failure,
134     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "int"/*error*/);
135 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, destroy,
136     "struct threadpool *"/*pool*/);
137 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, destroy__wait,
138     "struct threadpool *"/*pool*/, "uint64_t"/*refcnt*/);
139 
140 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job,
141     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
142 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__running,
143     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
144 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__dispatcher,
145     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
146 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, schedule__job__thread,
147     "struct threadpool *"/*pool*/,
148     "struct threadpool_job *"/*job*/,
149     "struct lwp *"/*thread*/);
150 
151 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__start,
152     "struct threadpool *"/*pool*/);
153 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__dying,
154     "struct threadpool *"/*pool*/);
155 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__spawn,
156     "struct threadpool *"/*pool*/);
157 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, dispatcher__race,
158     "struct threadpool *"/*pool*/,
159     "struct threadpool_job *"/*job*/);
160 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, dispatcher__assign,
161     "struct threadpool *"/*pool*/,
162     "struct threadpool_job *"/*job*/,
163     "struct lwp *"/*thread*/);
164 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__exit,
165     "struct threadpool *"/*pool*/);
166 
167 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__start,
168     "struct threadpool *"/*pool*/);
169 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__dying,
170     "struct threadpool *"/*pool*/);
171 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, thread__job,
172     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
173 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__exit,
174     "struct threadpool *"/*pool*/);
175 
176 /* Data structures */
177 
178 TAILQ_HEAD(job_head, threadpool_job);
179 TAILQ_HEAD(thread_head, threadpool_thread);
180 
181 struct threadpool_thread {
182 	struct lwp			*tpt_lwp;
183 	char				*tpt_lwp_savedname;
184 	struct threadpool		*tpt_pool;
185 	struct threadpool_job		*tpt_job;
186 	kcondvar_t			tpt_cv;
187 	TAILQ_ENTRY(threadpool_thread)	tpt_entry;
188 };
189 
190 struct threadpool {
191 	kmutex_t			tp_lock;
192 	struct threadpool_thread	tp_dispatcher;
193 	struct job_head			tp_jobs;
194 	struct thread_head		tp_idle_threads;
195 	uint64_t			tp_refcnt;
196 	int				tp_flags;
197 #define	THREADPOOL_DYING	0x01
198 	struct cpu_info			*tp_cpu;
199 	pri_t				tp_pri;
200 };
201 
202 static void	threadpool_hold(struct threadpool *);
203 static void	threadpool_rele(struct threadpool *);
204 
205 static int	threadpool_percpu_create(struct threadpool_percpu **, pri_t);
206 static void	threadpool_percpu_destroy(struct threadpool_percpu *);
207 static void	threadpool_percpu_init(void *, void *, struct cpu_info *);
208 static void	threadpool_percpu_ok(void *, void *, struct cpu_info *);
209 static void	threadpool_percpu_fini(void *, void *, struct cpu_info *);
210 
211 static threadpool_job_fn_t threadpool_job_dead;
212 
213 static void	threadpool_job_hold(struct threadpool_job *);
214 static void	threadpool_job_rele(struct threadpool_job *);
215 
216 static void	threadpool_dispatcher_thread(void *) __dead;
217 static void	threadpool_thread(void *) __dead;
218 
219 static pool_cache_t	threadpool_thread_pc __read_mostly;
220 
221 static kmutex_t		threadpools_lock __cacheline_aligned;
222 
223 	/* Default to 30 second idle timeout for pool threads. */
224 static int	threadpool_idle_time_ms = 30 * 1000;
225 
226 struct threadpool_unbound {
227 	struct threadpool		tpu_pool;
228 
229 	/* protected by threadpools_lock */
230 	LIST_ENTRY(threadpool_unbound)	tpu_link;
231 	uint64_t			tpu_refcnt;
232 };
233 
234 static LIST_HEAD(, threadpool_unbound) unbound_threadpools;
235 
236 static struct threadpool_unbound *
threadpool_lookup_unbound(pri_t pri)237 threadpool_lookup_unbound(pri_t pri)
238 {
239 	struct threadpool_unbound *tpu;
240 
241 	LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) {
242 		if (tpu->tpu_pool.tp_pri == pri)
243 			return tpu;
244 	}
245 	return NULL;
246 }
247 
248 static void
threadpool_insert_unbound(struct threadpool_unbound * tpu)249 threadpool_insert_unbound(struct threadpool_unbound *tpu)
250 {
251 	KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL);
252 	LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link);
253 }
254 
255 static void
threadpool_remove_unbound(struct threadpool_unbound * tpu)256 threadpool_remove_unbound(struct threadpool_unbound *tpu)
257 {
258 	KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu);
259 	LIST_REMOVE(tpu, tpu_link);
260 }
261 
262 struct threadpool_percpu {
263 	percpu_t *			tpp_percpu;
264 	pri_t				tpp_pri;
265 
266 	/* protected by threadpools_lock */
267 	LIST_ENTRY(threadpool_percpu)	tpp_link;
268 	uint64_t			tpp_refcnt;
269 };
270 
271 static LIST_HEAD(, threadpool_percpu) percpu_threadpools;
272 
273 static struct threadpool_percpu *
threadpool_lookup_percpu(pri_t pri)274 threadpool_lookup_percpu(pri_t pri)
275 {
276 	struct threadpool_percpu *tpp;
277 
278 	LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) {
279 		if (tpp->tpp_pri == pri)
280 			return tpp;
281 	}
282 	return NULL;
283 }
284 
285 static void
threadpool_insert_percpu(struct threadpool_percpu * tpp)286 threadpool_insert_percpu(struct threadpool_percpu *tpp)
287 {
288 	KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL);
289 	LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link);
290 }
291 
292 static void
threadpool_remove_percpu(struct threadpool_percpu * tpp)293 threadpool_remove_percpu(struct threadpool_percpu *tpp)
294 {
295 	KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp);
296 	LIST_REMOVE(tpp, tpp_link);
297 }
298 
299 static int
sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS)300 sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS)
301 {
302 	struct sysctlnode node;
303 	int val, error;
304 
305 	node = *rnode;
306 
307 	val = threadpool_idle_time_ms;
308 	node.sysctl_data = &val;
309 	error = sysctl_lookup(SYSCTLFN_CALL(&node));
310 	if (error == 0 && newp != NULL) {
311 		/* Disallow negative values and 0 (forever). */
312 		if (val < 1)
313 			error = EINVAL;
314 		else
315 			threadpool_idle_time_ms = val;
316 	}
317 
318 	return error;
319 }
320 
321 SYSCTL_SETUP_PROTO(sysctl_threadpool_setup);
322 
323 SYSCTL_SETUP(sysctl_threadpool_setup,
324     "sysctl kern.threadpool subtree setup")
325 {
326 	const struct sysctlnode *rnode, *cnode;
327 	int error __diagused;
328 
329 	error = sysctl_createv(clog, 0, NULL, &rnode,
330 	    CTLFLAG_PERMANENT,
331 	    CTLTYPE_NODE, "threadpool",
332 	    SYSCTL_DESCR("threadpool subsystem options"),
333 	    NULL, 0, NULL, 0,
334 	    CTL_KERN, CTL_CREATE, CTL_EOL);
335 	KASSERT(error == 0);
336 
337 	error = sysctl_createv(clog, 0, &rnode, &cnode,
338 	    CTLFLAG_PERMANENT | CTLFLAG_READWRITE,
339 	    CTLTYPE_INT, "idle_ms",
340 	    SYSCTL_DESCR("idle thread timeout in ms"),
341 	    sysctl_kern_threadpool_idle_ms, 0, NULL, 0,
342 	    CTL_CREATE, CTL_EOL);
343 	KASSERT(error == 0);
344 }
345 
346 void
threadpools_init(void)347 threadpools_init(void)
348 {
349 
350 	threadpool_thread_pc =
351 	    pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0,
352 		"thplthrd", NULL, IPL_NONE, NULL, NULL, NULL);
353 
354 	LIST_INIT(&unbound_threadpools);
355 	LIST_INIT(&percpu_threadpools);
356 	mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE);
357 }
358 
359 static void
threadnamesuffix(char * buf,size_t buflen,struct cpu_info * ci,int pri)360 threadnamesuffix(char *buf, size_t buflen, struct cpu_info *ci, int pri)
361 {
362 
363 	buf[0] = '\0';
364 	if (ci)
365 		snprintf(buf + strlen(buf), buflen - strlen(buf), "/%d",
366 		    cpu_index(ci));
367 	if (pri != PRI_NONE)
368 		snprintf(buf + strlen(buf), buflen - strlen(buf), "@%d", pri);
369 }
370 
371 /* Thread pool creation */
372 
373 static bool
threadpool_pri_is_valid(pri_t pri)374 threadpool_pri_is_valid(pri_t pri)
375 {
376 	return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT));
377 }
378 
379 static int
threadpool_create(struct threadpool * const pool,struct cpu_info * ci,pri_t pri)380 threadpool_create(struct threadpool *const pool, struct cpu_info *ci,
381     pri_t pri)
382 {
383 	struct lwp *lwp;
384 	char suffix[16];
385 	int ktflags;
386 	int error;
387 
388 	KASSERT(threadpool_pri_is_valid(pri));
389 
390 	SDT_PROBE2(sdt, kernel, threadpool, create,  ci, pri);
391 
392 	mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM);
393 	/* XXX dispatcher */
394 	TAILQ_INIT(&pool->tp_jobs);
395 	TAILQ_INIT(&pool->tp_idle_threads);
396 	pool->tp_refcnt = 1;		/* dispatcher's reference */
397 	pool->tp_flags = 0;
398 	pool->tp_cpu = ci;
399 	pool->tp_pri = pri;
400 
401 	pool->tp_dispatcher.tpt_lwp = NULL;
402 	pool->tp_dispatcher.tpt_pool = pool;
403 	pool->tp_dispatcher.tpt_job = NULL;
404 	cv_init(&pool->tp_dispatcher.tpt_cv, "pooldisp");
405 
406 	ktflags = 0;
407 	ktflags |= KTHREAD_MPSAFE;
408 	if (pri < PRI_KERNEL)
409 		ktflags |= KTHREAD_TS;
410 	threadnamesuffix(suffix, sizeof(suffix), ci, pri);
411 	error = kthread_create(pri, ktflags, ci, &threadpool_dispatcher_thread,
412 	    &pool->tp_dispatcher, &lwp, "pooldisp%s", suffix);
413 	if (error)
414 		goto fail0;
415 
416 	mutex_spin_enter(&pool->tp_lock);
417 	pool->tp_dispatcher.tpt_lwp = lwp;
418 	cv_broadcast(&pool->tp_dispatcher.tpt_cv);
419 	mutex_spin_exit(&pool->tp_lock);
420 
421 	SDT_PROBE3(sdt, kernel, threadpool, create__success,  ci, pri, pool);
422 	return 0;
423 
424 fail0:	KASSERT(error);
425 	KASSERT(pool->tp_dispatcher.tpt_job == NULL);
426 	KASSERT(pool->tp_dispatcher.tpt_pool == pool);
427 	KASSERT(pool->tp_flags == 0);
428 	KASSERT(pool->tp_refcnt == 0);
429 	KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
430 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
431 	KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
432 	cv_destroy(&pool->tp_dispatcher.tpt_cv);
433 	mutex_destroy(&pool->tp_lock);
434 	SDT_PROBE3(sdt, kernel, threadpool, create__failure,  ci, pri, error);
435 	return error;
436 }
437 
438 /* Thread pool destruction */
439 
440 static void
threadpool_destroy(struct threadpool * pool)441 threadpool_destroy(struct threadpool *pool)
442 {
443 	struct threadpool_thread *thread;
444 
445 	SDT_PROBE1(sdt, kernel, threadpool, destroy,  pool);
446 
447 	/* Mark the pool dying and wait for threads to commit suicide.  */
448 	mutex_spin_enter(&pool->tp_lock);
449 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
450 	pool->tp_flags |= THREADPOOL_DYING;
451 	cv_broadcast(&pool->tp_dispatcher.tpt_cv);
452 	TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry)
453 		cv_broadcast(&thread->tpt_cv);
454 	while (0 < pool->tp_refcnt) {
455 		SDT_PROBE2(sdt, kernel, threadpool, destroy__wait,
456 		    pool, pool->tp_refcnt);
457 		cv_wait(&pool->tp_dispatcher.tpt_cv, &pool->tp_lock);
458 	}
459 	mutex_spin_exit(&pool->tp_lock);
460 
461 	KASSERT(pool->tp_dispatcher.tpt_job == NULL);
462 	KASSERT(pool->tp_dispatcher.tpt_pool == pool);
463 	KASSERT(pool->tp_flags == THREADPOOL_DYING);
464 	KASSERT(pool->tp_refcnt == 0);
465 	KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
466 	KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
467 	KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
468 	cv_destroy(&pool->tp_dispatcher.tpt_cv);
469 	mutex_destroy(&pool->tp_lock);
470 }
471 
472 static void
threadpool_hold(struct threadpool * pool)473 threadpool_hold(struct threadpool *pool)
474 {
475 
476 	KASSERT(mutex_owned(&pool->tp_lock));
477 	pool->tp_refcnt++;
478 	KASSERT(pool->tp_refcnt != 0);
479 }
480 
481 static void
threadpool_rele(struct threadpool * pool)482 threadpool_rele(struct threadpool *pool)
483 {
484 
485 	KASSERT(mutex_owned(&pool->tp_lock));
486 	KASSERT(0 < pool->tp_refcnt);
487 	if (--pool->tp_refcnt == 0)
488 		cv_broadcast(&pool->tp_dispatcher.tpt_cv);
489 }
490 
491 /* Unbound thread pools */
492 
493 int
threadpool_get(struct threadpool ** poolp,pri_t pri)494 threadpool_get(struct threadpool **poolp, pri_t pri)
495 {
496 	struct threadpool_unbound *tpu, *tmp = NULL;
497 	int error;
498 
499 	ASSERT_SLEEPABLE();
500 
501 	SDT_PROBE1(sdt, kernel, threadpool, get,  pri);
502 
503 	if (! threadpool_pri_is_valid(pri))
504 		return EINVAL;
505 
506 	mutex_enter(&threadpools_lock);
507 	tpu = threadpool_lookup_unbound(pri);
508 	if (tpu == NULL) {
509 		mutex_exit(&threadpools_lock);
510 		SDT_PROBE1(sdt, kernel, threadpool, get__create,  pri);
511 		tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP);
512 		error = threadpool_create(&tmp->tpu_pool, NULL, pri);
513 		if (error) {
514 			kmem_free(tmp, sizeof(*tmp));
515 			return error;
516 		}
517 		mutex_enter(&threadpools_lock);
518 		tpu = threadpool_lookup_unbound(pri);
519 		if (tpu == NULL) {
520 			tpu = tmp;
521 			tmp = NULL;
522 			threadpool_insert_unbound(tpu);
523 		} else {
524 			SDT_PROBE1(sdt, kernel, threadpool, get__race,  pri);
525 		}
526 	}
527 	KASSERT(tpu != NULL);
528 	tpu->tpu_refcnt++;
529 	KASSERT(tpu->tpu_refcnt != 0);
530 	mutex_exit(&threadpools_lock);
531 
532 	if (tmp != NULL) {
533 		threadpool_destroy(&tmp->tpu_pool);
534 		kmem_free(tmp, sizeof(*tmp));
535 	}
536 	KASSERT(tpu != NULL);
537 	*poolp = &tpu->tpu_pool;
538 	return 0;
539 }
540 
541 void
threadpool_put(struct threadpool * pool,pri_t pri)542 threadpool_put(struct threadpool *pool, pri_t pri)
543 {
544 	struct threadpool_unbound *tpu =
545 	    container_of(pool, struct threadpool_unbound, tpu_pool);
546 
547 	ASSERT_SLEEPABLE();
548 	KASSERT(threadpool_pri_is_valid(pri));
549 
550 	SDT_PROBE2(sdt, kernel, threadpool, put,  pool, pri);
551 
552 	mutex_enter(&threadpools_lock);
553 	KASSERT(tpu == threadpool_lookup_unbound(pri));
554 	KASSERT(0 < tpu->tpu_refcnt);
555 	if (--tpu->tpu_refcnt == 0) {
556 		SDT_PROBE2(sdt, kernel, threadpool, put__destroy,  pool, pri);
557 		threadpool_remove_unbound(tpu);
558 	} else {
559 		tpu = NULL;
560 	}
561 	mutex_exit(&threadpools_lock);
562 
563 	if (tpu) {
564 		threadpool_destroy(&tpu->tpu_pool);
565 		kmem_free(tpu, sizeof(*tpu));
566 	}
567 }
568 
569 /* Per-CPU thread pools */
570 
571 int
threadpool_percpu_get(struct threadpool_percpu ** pool_percpup,pri_t pri)572 threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri)
573 {
574 	struct threadpool_percpu *pool_percpu, *tmp = NULL;
575 	int error;
576 
577 	ASSERT_SLEEPABLE();
578 
579 	SDT_PROBE1(sdt, kernel, threadpool, percpu__get,  pri);
580 
581 	if (! threadpool_pri_is_valid(pri))
582 		return EINVAL;
583 
584 	mutex_enter(&threadpools_lock);
585 	pool_percpu = threadpool_lookup_percpu(pri);
586 	if (pool_percpu == NULL) {
587 		mutex_exit(&threadpools_lock);
588 		SDT_PROBE1(sdt, kernel, threadpool, percpu__get__create,  pri);
589 		error = threadpool_percpu_create(&tmp, pri);
590 		if (error)
591 			return error;
592 		KASSERT(tmp != NULL);
593 		mutex_enter(&threadpools_lock);
594 		pool_percpu = threadpool_lookup_percpu(pri);
595 		if (pool_percpu == NULL) {
596 			pool_percpu = tmp;
597 			tmp = NULL;
598 			threadpool_insert_percpu(pool_percpu);
599 		} else {
600 			SDT_PROBE1(sdt, kernel, threadpool, percpu__get__race,
601 			    pri);
602 		}
603 	}
604 	KASSERT(pool_percpu != NULL);
605 	pool_percpu->tpp_refcnt++;
606 	KASSERT(pool_percpu->tpp_refcnt != 0);
607 	mutex_exit(&threadpools_lock);
608 
609 	if (tmp != NULL)
610 		threadpool_percpu_destroy(tmp);
611 	KASSERT(pool_percpu != NULL);
612 	*pool_percpup = pool_percpu;
613 	return 0;
614 }
615 
616 void
threadpool_percpu_put(struct threadpool_percpu * pool_percpu,pri_t pri)617 threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri)
618 {
619 
620 	ASSERT_SLEEPABLE();
621 
622 	KASSERT(threadpool_pri_is_valid(pri));
623 
624 	SDT_PROBE2(sdt, kernel, threadpool, percpu__put,  pool_percpu, pri);
625 
626 	mutex_enter(&threadpools_lock);
627 	KASSERT(pool_percpu == threadpool_lookup_percpu(pri));
628 	KASSERT(0 < pool_percpu->tpp_refcnt);
629 	if (--pool_percpu->tpp_refcnt == 0) {
630 		SDT_PROBE2(sdt, kernel, threadpool, percpu__put__destroy,
631 		    pool_percpu, pri);
632 		threadpool_remove_percpu(pool_percpu);
633 	} else {
634 		pool_percpu = NULL;
635 	}
636 	mutex_exit(&threadpools_lock);
637 
638 	if (pool_percpu)
639 		threadpool_percpu_destroy(pool_percpu);
640 }
641 
642 struct threadpool *
threadpool_percpu_ref(struct threadpool_percpu * pool_percpu)643 threadpool_percpu_ref(struct threadpool_percpu *pool_percpu)
644 {
645 	struct threadpool **poolp, *pool;
646 
647 	poolp = percpu_getref(pool_percpu->tpp_percpu);
648 	pool = *poolp;
649 	percpu_putref(pool_percpu->tpp_percpu);
650 
651 	return pool;
652 }
653 
654 struct threadpool *
threadpool_percpu_ref_remote(struct threadpool_percpu * pool_percpu,struct cpu_info * ci)655 threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu,
656     struct cpu_info *ci)
657 {
658 	struct threadpool **poolp, *pool;
659 
660 	/*
661 	 * As long as xcalls are blocked -- e.g., by kpreempt_disable
662 	 * -- the percpu object will not be swapped and destroyed.  We
663 	 * can't write to it, because the data may have already been
664 	 * moved to a new buffer, but we can safely read from it.
665 	 */
666 	kpreempt_disable();
667 	poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
668 	pool = *poolp;
669 	kpreempt_enable();
670 
671 	return pool;
672 }
673 
674 static int
threadpool_percpu_create(struct threadpool_percpu ** pool_percpup,pri_t pri)675 threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri)
676 {
677 	struct threadpool_percpu *pool_percpu;
678 	bool ok = true;
679 
680 	pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP);
681 	pool_percpu->tpp_pri = pri;
682 	pool_percpu->tpp_percpu = percpu_create(sizeof(struct threadpool *),
683 	    threadpool_percpu_init, threadpool_percpu_fini,
684 	    (void *)(intptr_t)pri);
685 
686 	/*
687 	 * Verify that all of the CPUs were initialized.
688 	 *
689 	 * XXX What to do if we add CPU hotplug?
690 	 */
691 	percpu_foreach(pool_percpu->tpp_percpu, &threadpool_percpu_ok, &ok);
692 	if (!ok)
693 		goto fail;
694 
695 	/* Success!  */
696 	*pool_percpup = (struct threadpool_percpu *)pool_percpu;
697 	return 0;
698 
699 fail:	percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
700 	kmem_free(pool_percpu, sizeof(*pool_percpu));
701 	return ENOMEM;
702 }
703 
704 static void
threadpool_percpu_destroy(struct threadpool_percpu * pool_percpu)705 threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu)
706 {
707 
708 	percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
709 	kmem_free(pool_percpu, sizeof(*pool_percpu));
710 }
711 
712 static void
threadpool_percpu_init(void * vpoolp,void * vpri,struct cpu_info * ci)713 threadpool_percpu_init(void *vpoolp, void *vpri, struct cpu_info *ci)
714 {
715 	struct threadpool **const poolp = vpoolp;
716 	pri_t pri = (intptr_t)(void *)vpri;
717 	int error;
718 
719 	*poolp = kmem_zalloc(sizeof(**poolp), KM_SLEEP);
720 	error = threadpool_create(*poolp, ci, pri);
721 	if (error) {
722 		KASSERT(error == ENOMEM);
723 		kmem_free(*poolp, sizeof(**poolp));
724 		*poolp = NULL;
725 	}
726 }
727 
728 static void
threadpool_percpu_ok(void * vpoolp,void * vokp,struct cpu_info * ci)729 threadpool_percpu_ok(void *vpoolp, void *vokp, struct cpu_info *ci)
730 {
731 	struct threadpool **const poolp = vpoolp;
732 	bool *okp = vokp;
733 
734 	if (*poolp == NULL)
735 		atomic_store_relaxed(okp, false);
736 }
737 
738 static void
threadpool_percpu_fini(void * vpoolp,void * vprip,struct cpu_info * ci)739 threadpool_percpu_fini(void *vpoolp, void *vprip, struct cpu_info *ci)
740 {
741 	struct threadpool **const poolp = vpoolp;
742 
743 	if (*poolp == NULL)	/* initialization failed */
744 		return;
745 	threadpool_destroy(*poolp);
746 	kmem_free(*poolp, sizeof(**poolp));
747 }
748 
749 /* Thread pool jobs */
750 
751 void __printflike(4,5)
threadpool_job_init(struct threadpool_job * job,threadpool_job_fn_t fn,kmutex_t * lock,const char * fmt,...)752 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn,
753     kmutex_t *lock, const char *fmt, ...)
754 {
755 	va_list ap;
756 
757 	va_start(ap, fmt);
758 	(void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap);
759 	va_end(ap);
760 
761 	job->job_lock = lock;
762 	job->job_thread = NULL;
763 	job->job_refcnt = 0;
764 	cv_init(&job->job_cv, job->job_name);
765 	job->job_fn = fn;
766 }
767 
768 static void
threadpool_job_dead(struct threadpool_job * job)769 threadpool_job_dead(struct threadpool_job *job)
770 {
771 
772 	panic("threadpool job %p ran after destruction", job);
773 }
774 
775 void
threadpool_job_destroy(struct threadpool_job * job)776 threadpool_job_destroy(struct threadpool_job *job)
777 {
778 
779 	ASSERT_SLEEPABLE();
780 
781 	KASSERTMSG((job->job_thread == NULL), "job %p still running", job);
782 
783 	mutex_enter(job->job_lock);
784 	while (0 < atomic_load_relaxed(&job->job_refcnt))
785 		cv_wait(&job->job_cv, job->job_lock);
786 	mutex_exit(job->job_lock);
787 
788 	job->job_lock = NULL;
789 	KASSERT(job->job_thread == NULL);
790 	KASSERT(job->job_refcnt == 0);
791 	KASSERT(!cv_has_waiters(&job->job_cv));
792 	cv_destroy(&job->job_cv);
793 	job->job_fn = threadpool_job_dead;
794 	(void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
795 }
796 
797 static void
threadpool_job_hold(struct threadpool_job * job)798 threadpool_job_hold(struct threadpool_job *job)
799 {
800 	unsigned int refcnt __diagused;
801 
802 	refcnt = atomic_inc_uint_nv(&job->job_refcnt);
803 	KASSERT(refcnt != 0);
804 }
805 
806 static void
threadpool_job_rele(struct threadpool_job * job)807 threadpool_job_rele(struct threadpool_job *job)
808 {
809 	unsigned int refcnt;
810 
811 	KASSERT(mutex_owned(job->job_lock));
812 
813 	refcnt = atomic_dec_uint_nv(&job->job_refcnt);
814 	KASSERT(refcnt != UINT_MAX);
815 	if (refcnt == 0)
816 		cv_broadcast(&job->job_cv);
817 }
818 
819 void
threadpool_job_done(struct threadpool_job * job)820 threadpool_job_done(struct threadpool_job *job)
821 {
822 
823 	KASSERT(mutex_owned(job->job_lock));
824 	KASSERT(job->job_thread != NULL);
825 	KASSERT(job->job_thread->tpt_lwp == curlwp);
826 
827 	/*
828 	 * We can safely read this field; it's only modified right before
829 	 * we call the job work function, and we are only preserving it
830 	 * to use here; no one cares if it contains junk afterward.
831 	 */
832 	lwp_lock(curlwp);
833 	curlwp->l_name = job->job_thread->tpt_lwp_savedname;
834 	lwp_unlock(curlwp);
835 
836 	/*
837 	 * Inline the work of threadpool_job_rele(); the job is already
838 	 * locked, the most likely scenario (XXXJRT only scenario?) is
839 	 * that we're dropping the last reference (the one taken in
840 	 * threadpool_schedule_job()), and we always do the cv_broadcast()
841 	 * anyway.
842 	 */
843 	KASSERT(0 < atomic_load_relaxed(&job->job_refcnt));
844 	unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt);
845 	KASSERT(refcnt != UINT_MAX);
846 	cv_broadcast(&job->job_cv);
847 	job->job_thread = NULL;
848 }
849 
850 void
threadpool_schedule_job(struct threadpool * pool,struct threadpool_job * job)851 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
852 {
853 
854 	KASSERT(mutex_owned(job->job_lock));
855 
856 	SDT_PROBE2(sdt, kernel, threadpool, schedule__job,  pool, job);
857 
858 	/*
859 	 * If the job's already running, let it keep running.  The job
860 	 * is guaranteed by the interlock not to end early -- if it had
861 	 * ended early, threadpool_job_done would have set job_thread
862 	 * to NULL under the interlock.
863 	 */
864 	if (__predict_true(job->job_thread != NULL)) {
865 		SDT_PROBE2(sdt, kernel, threadpool, schedule__job__running,
866 		    pool, job);
867 		return;
868 	}
869 
870 	threadpool_job_hold(job);
871 
872 	/* Otherwise, try to assign a thread to the job.  */
873 	mutex_spin_enter(&pool->tp_lock);
874 	if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
875 		/* Nobody's idle.  Give it to the dispatcher.  */
876 		SDT_PROBE2(sdt, kernel, threadpool, schedule__job__dispatcher,
877 		    pool, job);
878 		job->job_thread = &pool->tp_dispatcher;
879 		TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
880 	} else {
881 		/* Assign it to the first idle thread.  */
882 		job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
883 		SDT_PROBE3(sdt, kernel, threadpool, schedule__job__thread,
884 		    pool, job, job->job_thread->tpt_lwp);
885 		TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
886 		    tpt_entry);
887 		job->job_thread->tpt_job = job;
888 	}
889 
890 	/* Notify whomever we gave it to, dispatcher or idle thread.  */
891 	KASSERT(job->job_thread != NULL);
892 	cv_broadcast(&job->job_thread->tpt_cv);
893 	mutex_spin_exit(&pool->tp_lock);
894 }
895 
896 bool
threadpool_cancel_job_async(struct threadpool * pool,struct threadpool_job * job)897 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
898 {
899 
900 	KASSERT(mutex_owned(job->job_lock));
901 
902 	/*
903 	 * XXXJRT This fails (albeit safely) when all of the following
904 	 * are true:
905 	 *
906 	 *	=> "pool" is something other than what the job was
907 	 *	   scheduled on.  This can legitimately occur if,
908 	 *	   for example, a job is percpu-scheduled on CPU0
909 	 *	   and then CPU1 attempts to cancel it without taking
910 	 *	   a remote pool reference.  (this might happen by
911 	 *	   "luck of the draw").
912 	 *
913 	 *	=> "job" is not yet running, but is assigned to the
914 	 *	   dispatcher.
915 	 *
916 	 * When this happens, this code makes the determination that
917 	 * the job is already running.  The failure mode is that the
918 	 * caller is told the job is running, and thus has to wait.
919 	 * The dispatcher will eventually get to it and the job will
920 	 * proceed as if it had been already running.
921 	 */
922 
923 	if (job->job_thread == NULL) {
924 		/* Nothing to do.  Guaranteed not running.  */
925 		return true;
926 	} else if (job->job_thread == &pool->tp_dispatcher) {
927 		/* Take it off the list to guarantee it won't run.  */
928 		job->job_thread = NULL;
929 		mutex_spin_enter(&pool->tp_lock);
930 		TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
931 		mutex_spin_exit(&pool->tp_lock);
932 		threadpool_job_rele(job);
933 		return true;
934 	} else {
935 		/* Too late -- already running.  */
936 		return false;
937 	}
938 }
939 
940 void
threadpool_cancel_job(struct threadpool * pool,struct threadpool_job * job)941 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
942 {
943 
944 	/*
945 	 * We may sleep here, but we can't ASSERT_SLEEPABLE() because
946 	 * the job lock (used to interlock the cv_wait()) may in fact
947 	 * legitimately be a spin lock, so the assertion would fire
948 	 * as a false-positive.
949 	 */
950 
951 	KASSERT(mutex_owned(job->job_lock));
952 
953 	if (threadpool_cancel_job_async(pool, job))
954 		return;
955 
956 	/* Already running.  Wait for it to complete.  */
957 	while (job->job_thread != NULL)
958 		cv_wait(&job->job_cv, job->job_lock);
959 }
960 
961 /* Thread pool dispatcher thread */
962 
963 static void __dead
threadpool_dispatcher_thread(void * arg)964 threadpool_dispatcher_thread(void *arg)
965 {
966 	struct threadpool_thread *const dispatcher = arg;
967 	struct threadpool *const pool = dispatcher->tpt_pool;
968 	struct lwp *lwp = NULL;
969 	int ktflags;
970 	char suffix[16];
971 	int error;
972 
973 	KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
974 	KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND));
975 
976 	/* Wait until we're initialized.  */
977 	mutex_spin_enter(&pool->tp_lock);
978 	while (dispatcher->tpt_lwp == NULL)
979 		cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
980 
981 	SDT_PROBE1(sdt, kernel, threadpool, dispatcher__start,  pool);
982 
983 	for (;;) {
984 		/* Wait until there's a job.  */
985 		while (TAILQ_EMPTY(&pool->tp_jobs)) {
986 			if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
987 				SDT_PROBE1(sdt, kernel, threadpool,
988 				    dispatcher__dying,  pool);
989 				break;
990 			}
991 			cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
992 		}
993 		if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
994 			break;
995 
996 		/* If there are no threads, we'll have to try to start one.  */
997 		if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
998 			SDT_PROBE1(sdt, kernel, threadpool, dispatcher__spawn,
999 			    pool);
1000 			threadpool_hold(pool);
1001 			mutex_spin_exit(&pool->tp_lock);
1002 
1003 			struct threadpool_thread *const thread =
1004 			    pool_cache_get(threadpool_thread_pc, PR_WAITOK);
1005 			thread->tpt_lwp = NULL;
1006 			thread->tpt_pool = pool;
1007 			thread->tpt_job = NULL;
1008 			cv_init(&thread->tpt_cv, "pooljob");
1009 
1010 			ktflags = 0;
1011 			ktflags |= KTHREAD_MPSAFE;
1012 			if (pool->tp_pri < PRI_KERNEL)
1013 				ktflags |= KTHREAD_TS;
1014 			threadnamesuffix(suffix, sizeof(suffix), pool->tp_cpu,
1015 			    pool->tp_pri);
1016 			error = kthread_create(pool->tp_pri, ktflags,
1017 			    pool->tp_cpu, &threadpool_thread, thread, &lwp,
1018 			    "poolthread%s", suffix);
1019 
1020 			mutex_spin_enter(&pool->tp_lock);
1021 			if (error) {
1022 				pool_cache_put(threadpool_thread_pc, thread);
1023 				threadpool_rele(pool);
1024 				/* XXX What to do to wait for memory?  */
1025 				(void)kpause("thrdplcr", false, hz,
1026 				    &pool->tp_lock);
1027 				continue;
1028 			}
1029 			/*
1030 			 * New kthread now owns the reference to the pool
1031 			 * taken above.
1032 			 */
1033 			KASSERT(lwp != NULL);
1034 			TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread,
1035 			    tpt_entry);
1036 			thread->tpt_lwp = lwp;
1037 			lwp = NULL;
1038 			cv_broadcast(&thread->tpt_cv);
1039 			continue;
1040 		}
1041 
1042 		/* There are idle threads, so try giving one a job.  */
1043 		struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs);
1044 
1045 		/*
1046 		 * Take an extra reference on the job temporarily so that
1047 		 * it won't disappear on us while we have both locks dropped.
1048 		 */
1049 		threadpool_job_hold(job);
1050 		mutex_spin_exit(&pool->tp_lock);
1051 
1052 		mutex_enter(job->job_lock);
1053 		/* If the job was cancelled, we'll no longer be its thread.  */
1054 		if (__predict_true(job->job_thread == dispatcher)) {
1055 			mutex_spin_enter(&pool->tp_lock);
1056 			TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
1057 			if (__predict_false(
1058 				    TAILQ_EMPTY(&pool->tp_idle_threads))) {
1059 				/*
1060 				 * Someone else snagged the thread
1061 				 * first.  We'll have to try again.
1062 				 */
1063 				SDT_PROBE2(sdt, kernel, threadpool,
1064 				    dispatcher__race,  pool, job);
1065 				TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
1066 				    job_entry);
1067 			} else {
1068 				/*
1069 				 * Assign the job to the thread and
1070 				 * wake the thread so it starts work.
1071 				 */
1072 				struct threadpool_thread *const thread =
1073 				    TAILQ_FIRST(&pool->tp_idle_threads);
1074 
1075 				SDT_PROBE2(sdt, kernel, threadpool,
1076 				    dispatcher__assign,  job, thread->tpt_lwp);
1077 				KASSERT(thread->tpt_job == NULL);
1078 				TAILQ_REMOVE(&pool->tp_idle_threads, thread,
1079 				    tpt_entry);
1080 				thread->tpt_job = job;
1081 				job->job_thread = thread;
1082 				cv_broadcast(&thread->tpt_cv);
1083 			}
1084 			mutex_spin_exit(&pool->tp_lock);
1085 		}
1086 		threadpool_job_rele(job);
1087 		mutex_exit(job->job_lock);
1088 
1089 		mutex_spin_enter(&pool->tp_lock);
1090 	}
1091 	threadpool_rele(pool);
1092 	mutex_spin_exit(&pool->tp_lock);
1093 
1094 	SDT_PROBE1(sdt, kernel, threadpool, dispatcher__exit,  pool);
1095 
1096 	kthread_exit(0);
1097 }
1098 
1099 /* Thread pool thread */
1100 
1101 static void __dead
threadpool_thread(void * arg)1102 threadpool_thread(void *arg)
1103 {
1104 	struct threadpool_thread *const thread = arg;
1105 	struct threadpool *const pool = thread->tpt_pool;
1106 
1107 	KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
1108 	KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND));
1109 
1110 	/* Wait until we're initialized and on the queue.  */
1111 	mutex_spin_enter(&pool->tp_lock);
1112 	while (thread->tpt_lwp == NULL)
1113 		cv_wait(&thread->tpt_cv, &pool->tp_lock);
1114 
1115 	SDT_PROBE1(sdt, kernel, threadpool, thread__start,  pool);
1116 
1117 	KASSERT(thread->tpt_lwp == curlwp);
1118 	for (;;) {
1119 		/* Wait until we are assigned a job.  */
1120 		while (thread->tpt_job == NULL) {
1121 			if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
1122 				SDT_PROBE1(sdt, kernel, threadpool,
1123 				    thread__dying,  pool);
1124 				break;
1125 			}
1126 			if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
1127 				mstohz(threadpool_idle_time_ms)))
1128 				break;
1129 		}
1130 		if (__predict_false(thread->tpt_job == NULL)) {
1131 			TAILQ_REMOVE(&pool->tp_idle_threads, thread,
1132 			    tpt_entry);
1133 			break;
1134 		}
1135 
1136 		struct threadpool_job *const job = thread->tpt_job;
1137 		KASSERT(job != NULL);
1138 
1139 		/* Set our lwp name to reflect what job we're doing.  */
1140 		lwp_lock(curlwp);
1141 		char *const lwp_name __diagused = curlwp->l_name;
1142 		thread->tpt_lwp_savedname = curlwp->l_name;
1143 		curlwp->l_name = job->job_name;
1144 		lwp_unlock(curlwp);
1145 
1146 		mutex_spin_exit(&pool->tp_lock);
1147 
1148 		SDT_PROBE2(sdt, kernel, threadpool, thread__job,  pool, job);
1149 
1150 		/* Run the job.  */
1151 		(*job->job_fn)(job);
1152 
1153 		/* lwp name restored in threadpool_job_done(). */
1154 		KASSERTMSG((curlwp->l_name == lwp_name),
1155 		    "someone forgot to call threadpool_job_done()!");
1156 
1157 		/*
1158 		 * We can compare pointers, but we can no longer deference
1159 		 * job after this because threadpool_job_done() drops the
1160 		 * last reference on the job while the job is locked.
1161 		 */
1162 
1163 		mutex_spin_enter(&pool->tp_lock);
1164 		KASSERT(thread->tpt_job == job);
1165 		thread->tpt_job = NULL;
1166 		TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry);
1167 	}
1168 	threadpool_rele(pool);
1169 	mutex_spin_exit(&pool->tp_lock);
1170 
1171 	SDT_PROBE1(sdt, kernel, threadpool, thread__exit,  pool);
1172 
1173 	KASSERT(!cv_has_waiters(&thread->tpt_cv));
1174 	cv_destroy(&thread->tpt_cv);
1175 	pool_cache_put(threadpool_thread_pc, thread);
1176 	kthread_exit(0);
1177 }
1178