1 /* $NetBSD: kern_threadpool.c,v 1.18 2020/04/25 17:43:23 thorpej Exp $ */ 2 3 /*- 4 * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Taylor R. Campbell and Jason R. Thorpe. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * Thread pools. 34 * 35 * A thread pool is a collection of worker threads idle or running 36 * jobs, together with an overseer thread that does not run jobs but 37 * can be given jobs to assign to a worker thread. Scheduling a job in 38 * a thread pool does not allocate or even sleep at all, except perhaps 39 * on an adaptive lock, unlike kthread_create. Jobs reuse threads, so 40 * they do not incur the expense of creating and destroying kthreads 41 * unless there is not much work to be done. 42 * 43 * A per-CPU thread pool (threadpool_percpu) is a collection of thread 44 * pools, one per CPU bound to that CPU. For each priority level in 45 * use, there is one shared unbound thread pool (i.e., pool of threads 46 * not bound to any CPU) and one shared per-CPU thread pool. 47 * 48 * To use the unbound thread pool at priority pri, call 49 * threadpool_get(&pool, pri). When you're done, call 50 * threadpool_put(pool, pri). 51 * 52 * To use the per-CPU thread pools at priority pri, call 53 * threadpool_percpu_get(&pool_percpu, pri), and then use the thread 54 * pool returned by threadpool_percpu_ref(pool_percpu) for the current 55 * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another 56 * CPU. When you're done, call threadpool_percpu_put(pool_percpu, 57 * pri). 58 * 59 * +--MACHINE-----------------------------------------------+ 60 * | +--CPU 0-------+ +--CPU 1-------+ +--CPU n-------+ | 61 * | | <overseer 0> | | <overseer 1> | ... | <overseer n> | | 62 * | | <idle 0a> | | <running 1a> | ... | <idle na> | | 63 * | | <running 0b> | | <running 1b> | ... | <idle nb> | | 64 * | | . | | . | ... | . | | 65 * | | . | | . | ... | . | | 66 * | | . | | . | ... | . | | 67 * | +--------------+ +--------------+ +--------------+ | 68 * | +--unbound---------+ | 69 * | | <overseer n+1> | | 70 * | | <idle (n+1)a> | | 71 * | | <running (n+1)b> | | 72 * | +------------------+ | 73 * +--------------------------------------------------------+ 74 * 75 * XXX Why one overseer per CPU? I did that originally to avoid 76 * touching remote CPUs' memory when scheduling a job, but that still 77 * requires interprocessor synchronization. Perhaps we could get by 78 * with a single overseer thread, at the expense of another pointer in 79 * struct threadpool_job to identify the CPU on which it must run 80 * in order for the overseer to schedule it correctly. 81 */ 82 83 #include <sys/cdefs.h> 84 __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.18 2020/04/25 17:43:23 thorpej Exp $"); 85 86 #include <sys/types.h> 87 #include <sys/param.h> 88 #include <sys/atomic.h> 89 #include <sys/condvar.h> 90 #include <sys/cpu.h> 91 #include <sys/kernel.h> 92 #include <sys/kmem.h> 93 #include <sys/kthread.h> 94 #include <sys/mutex.h> 95 #include <sys/once.h> 96 #include <sys/percpu.h> 97 #include <sys/pool.h> 98 #include <sys/proc.h> 99 #include <sys/queue.h> 100 #include <sys/sdt.h> 101 #include <sys/sysctl.h> 102 #include <sys/systm.h> 103 #include <sys/threadpool.h> 104 105 /* Probes */ 106 107 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get, 108 "pri_t"/*pri*/); 109 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__create, 110 "pri_t"/*pri*/); 111 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__race, 112 "pri_t"/*pri*/); 113 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put, 114 "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 115 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put__destroy, 116 "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 117 118 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get, 119 "pri_t"/*pri*/); 120 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__create, 121 "pri_t"/*pri*/); 122 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__race, 123 "pri_t"/*pri*/); 124 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put, 125 "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 126 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put__destroy, 127 "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 128 129 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, create, 130 "struct cpu_info *"/*ci*/, "pri_t"/*pri*/); 131 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__success, 132 "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "struct threadpool *"/*pool*/); 133 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__failure, 134 "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "int"/*error*/); 135 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, destroy, 136 "struct threadpool *"/*pool*/); 137 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, destroy__wait, 138 "struct threadpool *"/*pool*/, "uint64_t"/*refcnt*/); 139 140 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job, 141 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 142 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__running, 143 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 144 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__overseer, 145 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 146 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, schedule__job__thread, 147 "struct threadpool *"/*pool*/, 148 "struct threadpool_job *"/*job*/, 149 "struct lwp *"/*thread*/); 150 151 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__start, 152 "struct threadpool *"/*pool*/); 153 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__dying, 154 "struct threadpool *"/*pool*/); 155 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__spawn, 156 "struct threadpool *"/*pool*/); 157 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, overseer__race, 158 "struct threadpool *"/*pool*/, 159 "struct threadpool_job *"/*job*/); 160 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, overseer__assign, 161 "struct threadpool *"/*pool*/, 162 "struct threadpool_job *"/*job*/, 163 "struct lwp *"/*thread*/); 164 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__exit, 165 "struct threadpool *"/*pool*/); 166 167 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__start, 168 "struct threadpool *"/*pool*/); 169 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__dying, 170 "struct threadpool *"/*pool*/); 171 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, thread__job, 172 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 173 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__exit, 174 "struct threadpool *"/*pool*/); 175 176 /* Data structures */ 177 178 TAILQ_HEAD(job_head, threadpool_job); 179 TAILQ_HEAD(thread_head, threadpool_thread); 180 181 struct threadpool_thread { 182 struct lwp *tpt_lwp; 183 char *tpt_lwp_savedname; 184 struct threadpool *tpt_pool; 185 struct threadpool_job *tpt_job; 186 kcondvar_t tpt_cv; 187 TAILQ_ENTRY(threadpool_thread) tpt_entry; 188 }; 189 190 struct threadpool { 191 kmutex_t tp_lock; 192 struct threadpool_thread tp_overseer; 193 struct job_head tp_jobs; 194 struct thread_head tp_idle_threads; 195 uint64_t tp_refcnt; 196 int tp_flags; 197 #define THREADPOOL_DYING 0x01 198 struct cpu_info *tp_cpu; 199 pri_t tp_pri; 200 }; 201 202 static void threadpool_hold(struct threadpool *); 203 static void threadpool_rele(struct threadpool *); 204 205 static int threadpool_percpu_create(struct threadpool_percpu **, pri_t); 206 static void threadpool_percpu_destroy(struct threadpool_percpu *); 207 static void threadpool_percpu_init(void *, void *, struct cpu_info *); 208 static void threadpool_percpu_ok(void *, void *, struct cpu_info *); 209 static void threadpool_percpu_fini(void *, void *, struct cpu_info *); 210 211 static threadpool_job_fn_t threadpool_job_dead; 212 213 static void threadpool_job_hold(struct threadpool_job *); 214 static void threadpool_job_rele(struct threadpool_job *); 215 216 static void threadpool_overseer_thread(void *) __dead; 217 static void threadpool_thread(void *) __dead; 218 219 static pool_cache_t threadpool_thread_pc __read_mostly; 220 221 static kmutex_t threadpools_lock __cacheline_aligned; 222 223 /* Default to 30 second idle timeout for pool threads. */ 224 static int threadpool_idle_time_ms = 30 * 1000; 225 226 struct threadpool_unbound { 227 struct threadpool tpu_pool; 228 229 /* protected by threadpools_lock */ 230 LIST_ENTRY(threadpool_unbound) tpu_link; 231 uint64_t tpu_refcnt; 232 }; 233 234 static LIST_HEAD(, threadpool_unbound) unbound_threadpools; 235 236 static struct threadpool_unbound * 237 threadpool_lookup_unbound(pri_t pri) 238 { 239 struct threadpool_unbound *tpu; 240 241 LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) { 242 if (tpu->tpu_pool.tp_pri == pri) 243 return tpu; 244 } 245 return NULL; 246 } 247 248 static void 249 threadpool_insert_unbound(struct threadpool_unbound *tpu) 250 { 251 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL); 252 LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link); 253 } 254 255 static void 256 threadpool_remove_unbound(struct threadpool_unbound *tpu) 257 { 258 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu); 259 LIST_REMOVE(tpu, tpu_link); 260 } 261 262 struct threadpool_percpu { 263 percpu_t * tpp_percpu; 264 pri_t tpp_pri; 265 266 /* protected by threadpools_lock */ 267 LIST_ENTRY(threadpool_percpu) tpp_link; 268 uint64_t tpp_refcnt; 269 }; 270 271 static LIST_HEAD(, threadpool_percpu) percpu_threadpools; 272 273 static struct threadpool_percpu * 274 threadpool_lookup_percpu(pri_t pri) 275 { 276 struct threadpool_percpu *tpp; 277 278 LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) { 279 if (tpp->tpp_pri == pri) 280 return tpp; 281 } 282 return NULL; 283 } 284 285 static void 286 threadpool_insert_percpu(struct threadpool_percpu *tpp) 287 { 288 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL); 289 LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link); 290 } 291 292 static void 293 threadpool_remove_percpu(struct threadpool_percpu *tpp) 294 { 295 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp); 296 LIST_REMOVE(tpp, tpp_link); 297 } 298 299 static int 300 sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS) 301 { 302 struct sysctlnode node; 303 int val, error; 304 305 node = *rnode; 306 307 val = threadpool_idle_time_ms; 308 node.sysctl_data = &val; 309 error = sysctl_lookup(SYSCTLFN_CALL(&node)); 310 if (error == 0 && newp != NULL) { 311 /* Disallow negative values and 0 (forever). */ 312 if (val < 1) 313 error = EINVAL; 314 else 315 threadpool_idle_time_ms = val; 316 } 317 318 return error; 319 } 320 321 SYSCTL_SETUP_PROTO(sysctl_threadpool_setup); 322 323 SYSCTL_SETUP(sysctl_threadpool_setup, 324 "sysctl kern.threadpool subtree setup") 325 { 326 const struct sysctlnode *rnode, *cnode; 327 int error __diagused; 328 329 error = sysctl_createv(clog, 0, NULL, &rnode, 330 CTLFLAG_PERMANENT, 331 CTLTYPE_NODE, "threadpool", 332 SYSCTL_DESCR("threadpool subsystem options"), 333 NULL, 0, NULL, 0, 334 CTL_KERN, CTL_CREATE, CTL_EOL); 335 KASSERT(error == 0); 336 337 error = sysctl_createv(clog, 0, &rnode, &cnode, 338 CTLFLAG_PERMANENT | CTLFLAG_READWRITE, 339 CTLTYPE_INT, "idle_ms", 340 SYSCTL_DESCR("idle thread timeout in ms"), 341 sysctl_kern_threadpool_idle_ms, 0, NULL, 0, 342 CTL_CREATE, CTL_EOL); 343 KASSERT(error == 0); 344 } 345 346 void 347 threadpools_init(void) 348 { 349 350 threadpool_thread_pc = 351 pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0, 352 "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL); 353 354 LIST_INIT(&unbound_threadpools); 355 LIST_INIT(&percpu_threadpools); 356 mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE); 357 } 358 359 /* Thread pool creation */ 360 361 static bool 362 threadpool_pri_is_valid(pri_t pri) 363 { 364 return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT)); 365 } 366 367 static int 368 threadpool_create(struct threadpool *const pool, struct cpu_info *ci, 369 pri_t pri) 370 { 371 struct lwp *lwp; 372 int ktflags; 373 int error; 374 375 KASSERT(threadpool_pri_is_valid(pri)); 376 377 SDT_PROBE2(sdt, kernel, threadpool, create, ci, pri); 378 379 mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM); 380 /* XXX overseer */ 381 TAILQ_INIT(&pool->tp_jobs); 382 TAILQ_INIT(&pool->tp_idle_threads); 383 pool->tp_refcnt = 1; /* overseer's reference */ 384 pool->tp_flags = 0; 385 pool->tp_cpu = ci; 386 pool->tp_pri = pri; 387 388 pool->tp_overseer.tpt_lwp = NULL; 389 pool->tp_overseer.tpt_pool = pool; 390 pool->tp_overseer.tpt_job = NULL; 391 cv_init(&pool->tp_overseer.tpt_cv, "poolover"); 392 393 ktflags = 0; 394 ktflags |= KTHREAD_MPSAFE; 395 if (pri < PRI_KERNEL) 396 ktflags |= KTHREAD_TS; 397 error = kthread_create(pri, ktflags, ci, &threadpool_overseer_thread, 398 &pool->tp_overseer, &lwp, 399 "pooloverseer/%d@%d", (ci ? cpu_index(ci) : -1), (int)pri); 400 if (error) 401 goto fail0; 402 403 mutex_spin_enter(&pool->tp_lock); 404 pool->tp_overseer.tpt_lwp = lwp; 405 cv_broadcast(&pool->tp_overseer.tpt_cv); 406 mutex_spin_exit(&pool->tp_lock); 407 408 SDT_PROBE3(sdt, kernel, threadpool, create__success, ci, pri, pool); 409 return 0; 410 411 fail0: KASSERT(error); 412 KASSERT(pool->tp_overseer.tpt_job == NULL); 413 KASSERT(pool->tp_overseer.tpt_pool == pool); 414 KASSERT(pool->tp_flags == 0); 415 KASSERT(pool->tp_refcnt == 0); 416 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 417 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 418 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv)); 419 cv_destroy(&pool->tp_overseer.tpt_cv); 420 mutex_destroy(&pool->tp_lock); 421 SDT_PROBE3(sdt, kernel, threadpool, create__failure, ci, pri, error); 422 return error; 423 } 424 425 /* Thread pool destruction */ 426 427 static void 428 threadpool_destroy(struct threadpool *pool) 429 { 430 struct threadpool_thread *thread; 431 432 SDT_PROBE1(sdt, kernel, threadpool, destroy, pool); 433 434 /* Mark the pool dying and wait for threads to commit suicide. */ 435 mutex_spin_enter(&pool->tp_lock); 436 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 437 pool->tp_flags |= THREADPOOL_DYING; 438 cv_broadcast(&pool->tp_overseer.tpt_cv); 439 TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry) 440 cv_broadcast(&thread->tpt_cv); 441 while (0 < pool->tp_refcnt) { 442 SDT_PROBE2(sdt, kernel, threadpool, destroy__wait, 443 pool, pool->tp_refcnt); 444 cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock); 445 } 446 mutex_spin_exit(&pool->tp_lock); 447 448 KASSERT(pool->tp_overseer.tpt_job == NULL); 449 KASSERT(pool->tp_overseer.tpt_pool == pool); 450 KASSERT(pool->tp_flags == THREADPOOL_DYING); 451 KASSERT(pool->tp_refcnt == 0); 452 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 453 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 454 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv)); 455 cv_destroy(&pool->tp_overseer.tpt_cv); 456 mutex_destroy(&pool->tp_lock); 457 } 458 459 static void 460 threadpool_hold(struct threadpool *pool) 461 { 462 463 KASSERT(mutex_owned(&pool->tp_lock)); 464 pool->tp_refcnt++; 465 KASSERT(pool->tp_refcnt != 0); 466 } 467 468 static void 469 threadpool_rele(struct threadpool *pool) 470 { 471 472 KASSERT(mutex_owned(&pool->tp_lock)); 473 KASSERT(0 < pool->tp_refcnt); 474 if (--pool->tp_refcnt == 0) 475 cv_broadcast(&pool->tp_overseer.tpt_cv); 476 } 477 478 /* Unbound thread pools */ 479 480 int 481 threadpool_get(struct threadpool **poolp, pri_t pri) 482 { 483 struct threadpool_unbound *tpu, *tmp = NULL; 484 int error; 485 486 ASSERT_SLEEPABLE(); 487 488 SDT_PROBE1(sdt, kernel, threadpool, get, pri); 489 490 if (! threadpool_pri_is_valid(pri)) 491 return EINVAL; 492 493 mutex_enter(&threadpools_lock); 494 tpu = threadpool_lookup_unbound(pri); 495 if (tpu == NULL) { 496 mutex_exit(&threadpools_lock); 497 SDT_PROBE1(sdt, kernel, threadpool, get__create, pri); 498 tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP); 499 error = threadpool_create(&tmp->tpu_pool, NULL, pri); 500 if (error) { 501 kmem_free(tmp, sizeof(*tmp)); 502 return error; 503 } 504 mutex_enter(&threadpools_lock); 505 tpu = threadpool_lookup_unbound(pri); 506 if (tpu == NULL) { 507 tpu = tmp; 508 tmp = NULL; 509 threadpool_insert_unbound(tpu); 510 } else { 511 SDT_PROBE1(sdt, kernel, threadpool, get__race, pri); 512 } 513 } 514 KASSERT(tpu != NULL); 515 tpu->tpu_refcnt++; 516 KASSERT(tpu->tpu_refcnt != 0); 517 mutex_exit(&threadpools_lock); 518 519 if (tmp != NULL) { 520 threadpool_destroy(&tmp->tpu_pool); 521 kmem_free(tmp, sizeof(*tmp)); 522 } 523 KASSERT(tpu != NULL); 524 *poolp = &tpu->tpu_pool; 525 return 0; 526 } 527 528 void 529 threadpool_put(struct threadpool *pool, pri_t pri) 530 { 531 struct threadpool_unbound *tpu = 532 container_of(pool, struct threadpool_unbound, tpu_pool); 533 534 ASSERT_SLEEPABLE(); 535 KASSERT(threadpool_pri_is_valid(pri)); 536 537 SDT_PROBE2(sdt, kernel, threadpool, put, pool, pri); 538 539 mutex_enter(&threadpools_lock); 540 KASSERT(tpu == threadpool_lookup_unbound(pri)); 541 KASSERT(0 < tpu->tpu_refcnt); 542 if (--tpu->tpu_refcnt == 0) { 543 SDT_PROBE2(sdt, kernel, threadpool, put__destroy, pool, pri); 544 threadpool_remove_unbound(tpu); 545 } else { 546 tpu = NULL; 547 } 548 mutex_exit(&threadpools_lock); 549 550 if (tpu) { 551 threadpool_destroy(&tpu->tpu_pool); 552 kmem_free(tpu, sizeof(*tpu)); 553 } 554 } 555 556 /* Per-CPU thread pools */ 557 558 int 559 threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri) 560 { 561 struct threadpool_percpu *pool_percpu, *tmp = NULL; 562 int error; 563 564 ASSERT_SLEEPABLE(); 565 566 SDT_PROBE1(sdt, kernel, threadpool, percpu__get, pri); 567 568 if (! threadpool_pri_is_valid(pri)) 569 return EINVAL; 570 571 mutex_enter(&threadpools_lock); 572 pool_percpu = threadpool_lookup_percpu(pri); 573 if (pool_percpu == NULL) { 574 mutex_exit(&threadpools_lock); 575 SDT_PROBE1(sdt, kernel, threadpool, percpu__get__create, pri); 576 error = threadpool_percpu_create(&tmp, pri); 577 if (error) 578 return error; 579 KASSERT(tmp != NULL); 580 mutex_enter(&threadpools_lock); 581 pool_percpu = threadpool_lookup_percpu(pri); 582 if (pool_percpu == NULL) { 583 pool_percpu = tmp; 584 tmp = NULL; 585 threadpool_insert_percpu(pool_percpu); 586 } else { 587 SDT_PROBE1(sdt, kernel, threadpool, percpu__get__race, 588 pri); 589 } 590 } 591 KASSERT(pool_percpu != NULL); 592 pool_percpu->tpp_refcnt++; 593 KASSERT(pool_percpu->tpp_refcnt != 0); 594 mutex_exit(&threadpools_lock); 595 596 if (tmp != NULL) 597 threadpool_percpu_destroy(tmp); 598 KASSERT(pool_percpu != NULL); 599 *pool_percpup = pool_percpu; 600 return 0; 601 } 602 603 void 604 threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri) 605 { 606 607 ASSERT_SLEEPABLE(); 608 609 KASSERT(threadpool_pri_is_valid(pri)); 610 611 SDT_PROBE2(sdt, kernel, threadpool, percpu__put, pool_percpu, pri); 612 613 mutex_enter(&threadpools_lock); 614 KASSERT(pool_percpu == threadpool_lookup_percpu(pri)); 615 KASSERT(0 < pool_percpu->tpp_refcnt); 616 if (--pool_percpu->tpp_refcnt == 0) { 617 SDT_PROBE2(sdt, kernel, threadpool, percpu__put__destroy, 618 pool_percpu, pri); 619 threadpool_remove_percpu(pool_percpu); 620 } else { 621 pool_percpu = NULL; 622 } 623 mutex_exit(&threadpools_lock); 624 625 if (pool_percpu) 626 threadpool_percpu_destroy(pool_percpu); 627 } 628 629 struct threadpool * 630 threadpool_percpu_ref(struct threadpool_percpu *pool_percpu) 631 { 632 struct threadpool **poolp, *pool; 633 634 poolp = percpu_getref(pool_percpu->tpp_percpu); 635 pool = *poolp; 636 percpu_putref(pool_percpu->tpp_percpu); 637 638 return pool; 639 } 640 641 struct threadpool * 642 threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu, 643 struct cpu_info *ci) 644 { 645 struct threadpool **poolp, *pool; 646 647 percpu_traverse_enter(); 648 poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 649 pool = *poolp; 650 percpu_traverse_exit(); 651 652 return pool; 653 } 654 655 static int 656 threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri) 657 { 658 struct threadpool_percpu *pool_percpu; 659 bool ok = true; 660 661 pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP); 662 pool_percpu->tpp_pri = pri; 663 pool_percpu->tpp_percpu = percpu_create(sizeof(struct threadpool *), 664 threadpool_percpu_init, threadpool_percpu_fini, 665 (void *)(intptr_t)pri); 666 667 /* 668 * Verify that all of the CPUs were initialized. 669 * 670 * XXX What to do if we add CPU hotplug? 671 */ 672 percpu_foreach(pool_percpu->tpp_percpu, &threadpool_percpu_ok, &ok); 673 if (!ok) 674 goto fail; 675 676 /* Success! */ 677 *pool_percpup = (struct threadpool_percpu *)pool_percpu; 678 return 0; 679 680 fail: percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); 681 kmem_free(pool_percpu, sizeof(*pool_percpu)); 682 return ENOMEM; 683 } 684 685 static void 686 threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu) 687 { 688 689 percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); 690 kmem_free(pool_percpu, sizeof(*pool_percpu)); 691 } 692 693 static void 694 threadpool_percpu_init(void *vpoolp, void *vpri, struct cpu_info *ci) 695 { 696 struct threadpool **const poolp = vpoolp; 697 pri_t pri = (intptr_t)(void *)vpri; 698 int error; 699 700 *poolp = kmem_zalloc(sizeof(**poolp), KM_SLEEP); 701 error = threadpool_create(*poolp, ci, pri); 702 if (error) { 703 KASSERT(error == ENOMEM); 704 kmem_free(*poolp, sizeof(**poolp)); 705 *poolp = NULL; 706 } 707 } 708 709 static void 710 threadpool_percpu_ok(void *vpoolp, void *vokp, struct cpu_info *ci) 711 { 712 struct threadpool **const poolp = vpoolp; 713 bool *okp = vokp; 714 715 if (*poolp == NULL) 716 atomic_store_relaxed(okp, false); 717 } 718 719 static void 720 threadpool_percpu_fini(void *vpoolp, void *vprip, struct cpu_info *ci) 721 { 722 struct threadpool **const poolp = vpoolp; 723 724 if (*poolp == NULL) /* initialization failed */ 725 return; 726 threadpool_destroy(*poolp); 727 kmem_free(*poolp, sizeof(**poolp)); 728 } 729 730 /* Thread pool jobs */ 731 732 void __printflike(4,5) 733 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn, 734 kmutex_t *lock, const char *fmt, ...) 735 { 736 va_list ap; 737 738 va_start(ap, fmt); 739 (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap); 740 va_end(ap); 741 742 job->job_lock = lock; 743 job->job_thread = NULL; 744 job->job_refcnt = 0; 745 cv_init(&job->job_cv, job->job_name); 746 job->job_fn = fn; 747 } 748 749 static void 750 threadpool_job_dead(struct threadpool_job *job) 751 { 752 753 panic("threadpool job %p ran after destruction", job); 754 } 755 756 void 757 threadpool_job_destroy(struct threadpool_job *job) 758 { 759 760 ASSERT_SLEEPABLE(); 761 762 KASSERTMSG((job->job_thread == NULL), "job %p still running", job); 763 764 mutex_enter(job->job_lock); 765 while (0 < job->job_refcnt) 766 cv_wait(&job->job_cv, job->job_lock); 767 mutex_exit(job->job_lock); 768 769 job->job_lock = NULL; 770 KASSERT(job->job_thread == NULL); 771 KASSERT(job->job_refcnt == 0); 772 KASSERT(!cv_has_waiters(&job->job_cv)); 773 cv_destroy(&job->job_cv); 774 job->job_fn = threadpool_job_dead; 775 (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name)); 776 } 777 778 static void 779 threadpool_job_hold(struct threadpool_job *job) 780 { 781 unsigned int refcnt; 782 783 do { 784 refcnt = job->job_refcnt; 785 KASSERT(refcnt != UINT_MAX); 786 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1)) 787 != refcnt); 788 } 789 790 static void 791 threadpool_job_rele(struct threadpool_job *job) 792 { 793 unsigned int refcnt; 794 795 KASSERT(mutex_owned(job->job_lock)); 796 797 do { 798 refcnt = job->job_refcnt; 799 KASSERT(0 < refcnt); 800 if (refcnt == 1) { 801 refcnt = atomic_dec_uint_nv(&job->job_refcnt); 802 KASSERT(refcnt != UINT_MAX); 803 if (refcnt == 0) 804 cv_broadcast(&job->job_cv); 805 return; 806 } 807 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt - 1)) 808 != refcnt); 809 } 810 811 void 812 threadpool_job_done(struct threadpool_job *job) 813 { 814 815 KASSERT(mutex_owned(job->job_lock)); 816 KASSERT(job->job_thread != NULL); 817 KASSERT(job->job_thread->tpt_lwp == curlwp); 818 819 /* 820 * We can safely read this field; it's only modified right before 821 * we call the job work function, and we are only preserving it 822 * to use here; no one cares if it contains junk afterward. 823 */ 824 lwp_lock(curlwp); 825 curlwp->l_name = job->job_thread->tpt_lwp_savedname; 826 lwp_unlock(curlwp); 827 828 /* 829 * Inline the work of threadpool_job_rele(); the job is already 830 * locked, the most likely scenario (XXXJRT only scenario?) is 831 * that we're dropping the last reference (the one taken in 832 * threadpool_schedule_job()), and we always do the cv_broadcast() 833 * anyway. 834 */ 835 KASSERT(0 < job->job_refcnt); 836 unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt); 837 KASSERT(refcnt != UINT_MAX); 838 cv_broadcast(&job->job_cv); 839 job->job_thread = NULL; 840 } 841 842 void 843 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) 844 { 845 846 KASSERT(mutex_owned(job->job_lock)); 847 848 SDT_PROBE2(sdt, kernel, threadpool, schedule__job, pool, job); 849 850 /* 851 * If the job's already running, let it keep running. The job 852 * is guaranteed by the interlock not to end early -- if it had 853 * ended early, threadpool_job_done would have set job_thread 854 * to NULL under the interlock. 855 */ 856 if (__predict_true(job->job_thread != NULL)) { 857 SDT_PROBE2(sdt, kernel, threadpool, schedule__job__running, 858 pool, job); 859 return; 860 } 861 862 threadpool_job_hold(job); 863 864 /* Otherwise, try to assign a thread to the job. */ 865 mutex_spin_enter(&pool->tp_lock); 866 if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) { 867 /* Nobody's idle. Give it to the overseer. */ 868 SDT_PROBE2(sdt, kernel, threadpool, schedule__job__overseer, 869 pool, job); 870 job->job_thread = &pool->tp_overseer; 871 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry); 872 } else { 873 /* Assign it to the first idle thread. */ 874 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads); 875 SDT_PROBE3(sdt, kernel, threadpool, schedule__job__thread, 876 pool, job, job->job_thread->tpt_lwp); 877 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread, 878 tpt_entry); 879 job->job_thread->tpt_job = job; 880 } 881 882 /* Notify whomever we gave it to, overseer or idle thread. */ 883 KASSERT(job->job_thread != NULL); 884 cv_broadcast(&job->job_thread->tpt_cv); 885 mutex_spin_exit(&pool->tp_lock); 886 } 887 888 bool 889 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job) 890 { 891 892 KASSERT(mutex_owned(job->job_lock)); 893 894 /* 895 * XXXJRT This fails (albeit safely) when all of the following 896 * are true: 897 * 898 * => "pool" is something other than what the job was 899 * scheduled on. This can legitimately occur if, 900 * for example, a job is percpu-scheduled on CPU0 901 * and then CPU1 attempts to cancel it without taking 902 * a remote pool reference. (this might happen by 903 * "luck of the draw"). 904 * 905 * => "job" is not yet running, but is assigned to the 906 * overseer. 907 * 908 * When this happens, this code makes the determination that 909 * the job is already running. The failure mode is that the 910 * caller is told the job is running, and thus has to wait. 911 * The overseer will eventually get to it and the job will 912 * proceed as if it had been already running. 913 */ 914 915 if (job->job_thread == NULL) { 916 /* Nothing to do. Guaranteed not running. */ 917 return true; 918 } else if (job->job_thread == &pool->tp_overseer) { 919 /* Take it off the list to guarantee it won't run. */ 920 job->job_thread = NULL; 921 mutex_spin_enter(&pool->tp_lock); 922 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 923 mutex_spin_exit(&pool->tp_lock); 924 threadpool_job_rele(job); 925 return true; 926 } else { 927 /* Too late -- already running. */ 928 return false; 929 } 930 } 931 932 void 933 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job) 934 { 935 936 /* 937 * We may sleep here, but we can't ASSERT_SLEEPABLE() because 938 * the job lock (used to interlock the cv_wait()) may in fact 939 * legitimately be a spin lock, so the assertion would fire 940 * as a false-positive. 941 */ 942 943 KASSERT(mutex_owned(job->job_lock)); 944 945 if (threadpool_cancel_job_async(pool, job)) 946 return; 947 948 /* Already running. Wait for it to complete. */ 949 while (job->job_thread != NULL) 950 cv_wait(&job->job_cv, job->job_lock); 951 } 952 953 /* Thread pool overseer thread */ 954 955 static void __dead 956 threadpool_overseer_thread(void *arg) 957 { 958 struct threadpool_thread *const overseer = arg; 959 struct threadpool *const pool = overseer->tpt_pool; 960 struct lwp *lwp = NULL; 961 int ktflags; 962 int error; 963 964 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 965 KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND)); 966 967 /* Wait until we're initialized. */ 968 mutex_spin_enter(&pool->tp_lock); 969 while (overseer->tpt_lwp == NULL) 970 cv_wait(&overseer->tpt_cv, &pool->tp_lock); 971 972 SDT_PROBE1(sdt, kernel, threadpool, overseer__start, pool); 973 974 for (;;) { 975 /* Wait until there's a job. */ 976 while (TAILQ_EMPTY(&pool->tp_jobs)) { 977 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 978 SDT_PROBE1(sdt, kernel, threadpool, 979 overseer__dying, pool); 980 break; 981 } 982 cv_wait(&overseer->tpt_cv, &pool->tp_lock); 983 } 984 if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs))) 985 break; 986 987 /* If there are no threads, we'll have to try to start one. */ 988 if (TAILQ_EMPTY(&pool->tp_idle_threads)) { 989 SDT_PROBE1(sdt, kernel, threadpool, overseer__spawn, 990 pool); 991 threadpool_hold(pool); 992 mutex_spin_exit(&pool->tp_lock); 993 994 struct threadpool_thread *const thread = 995 pool_cache_get(threadpool_thread_pc, PR_WAITOK); 996 thread->tpt_lwp = NULL; 997 thread->tpt_pool = pool; 998 thread->tpt_job = NULL; 999 cv_init(&thread->tpt_cv, "poolthrd"); 1000 1001 ktflags = 0; 1002 ktflags |= KTHREAD_MPSAFE; 1003 if (pool->tp_pri < PRI_KERNEL) 1004 ktflags |= KTHREAD_TS; 1005 error = kthread_create(pool->tp_pri, ktflags, 1006 pool->tp_cpu, &threadpool_thread, thread, &lwp, 1007 "poolthread/%d@%d", 1008 (pool->tp_cpu ? cpu_index(pool->tp_cpu) : -1), 1009 (int)pool->tp_pri); 1010 1011 mutex_spin_enter(&pool->tp_lock); 1012 if (error) { 1013 pool_cache_put(threadpool_thread_pc, thread); 1014 threadpool_rele(pool); 1015 /* XXX What to do to wait for memory? */ 1016 (void)kpause("thrdplcr", false, hz, 1017 &pool->tp_lock); 1018 continue; 1019 } 1020 /* 1021 * New kthread now owns the reference to the pool 1022 * taken above. 1023 */ 1024 KASSERT(lwp != NULL); 1025 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, 1026 tpt_entry); 1027 thread->tpt_lwp = lwp; 1028 lwp = NULL; 1029 cv_broadcast(&thread->tpt_cv); 1030 continue; 1031 } 1032 1033 /* There are idle threads, so try giving one a job. */ 1034 struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs); 1035 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 1036 /* 1037 * Take an extra reference on the job temporarily so that 1038 * it won't disappear on us while we have both locks dropped. 1039 */ 1040 threadpool_job_hold(job); 1041 mutex_spin_exit(&pool->tp_lock); 1042 1043 mutex_enter(job->job_lock); 1044 /* If the job was cancelled, we'll no longer be its thread. */ 1045 if (__predict_true(job->job_thread == overseer)) { 1046 mutex_spin_enter(&pool->tp_lock); 1047 if (__predict_false( 1048 TAILQ_EMPTY(&pool->tp_idle_threads))) { 1049 /* 1050 * Someone else snagged the thread 1051 * first. We'll have to try again. 1052 */ 1053 SDT_PROBE2(sdt, kernel, threadpool, 1054 overseer__race, pool, job); 1055 TAILQ_INSERT_HEAD(&pool->tp_jobs, job, 1056 job_entry); 1057 } else { 1058 /* 1059 * Assign the job to the thread and 1060 * wake the thread so it starts work. 1061 */ 1062 struct threadpool_thread *const thread = 1063 TAILQ_FIRST(&pool->tp_idle_threads); 1064 1065 SDT_PROBE2(sdt, kernel, threadpool, 1066 overseer__assign, job, thread->tpt_lwp); 1067 KASSERT(thread->tpt_job == NULL); 1068 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 1069 tpt_entry); 1070 thread->tpt_job = job; 1071 job->job_thread = thread; 1072 cv_broadcast(&thread->tpt_cv); 1073 } 1074 mutex_spin_exit(&pool->tp_lock); 1075 } 1076 threadpool_job_rele(job); 1077 mutex_exit(job->job_lock); 1078 1079 mutex_spin_enter(&pool->tp_lock); 1080 } 1081 threadpool_rele(pool); 1082 mutex_spin_exit(&pool->tp_lock); 1083 1084 SDT_PROBE1(sdt, kernel, threadpool, overseer__exit, pool); 1085 1086 kthread_exit(0); 1087 } 1088 1089 /* Thread pool thread */ 1090 1091 static void __dead 1092 threadpool_thread(void *arg) 1093 { 1094 struct threadpool_thread *const thread = arg; 1095 struct threadpool *const pool = thread->tpt_pool; 1096 1097 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 1098 KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND)); 1099 1100 /* Wait until we're initialized and on the queue. */ 1101 mutex_spin_enter(&pool->tp_lock); 1102 while (thread->tpt_lwp == NULL) 1103 cv_wait(&thread->tpt_cv, &pool->tp_lock); 1104 1105 SDT_PROBE1(sdt, kernel, threadpool, thread__start, pool); 1106 1107 KASSERT(thread->tpt_lwp == curlwp); 1108 for (;;) { 1109 /* Wait until we are assigned a job. */ 1110 while (thread->tpt_job == NULL) { 1111 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 1112 SDT_PROBE1(sdt, kernel, threadpool, 1113 thread__dying, pool); 1114 break; 1115 } 1116 if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock, 1117 mstohz(threadpool_idle_time_ms))) 1118 break; 1119 } 1120 if (__predict_false(thread->tpt_job == NULL)) { 1121 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 1122 tpt_entry); 1123 break; 1124 } 1125 1126 struct threadpool_job *const job = thread->tpt_job; 1127 KASSERT(job != NULL); 1128 1129 /* Set our lwp name to reflect what job we're doing. */ 1130 lwp_lock(curlwp); 1131 char *const lwp_name __diagused = curlwp->l_name; 1132 thread->tpt_lwp_savedname = curlwp->l_name; 1133 curlwp->l_name = job->job_name; 1134 lwp_unlock(curlwp); 1135 1136 mutex_spin_exit(&pool->tp_lock); 1137 1138 SDT_PROBE2(sdt, kernel, threadpool, thread__job, pool, job); 1139 1140 /* Run the job. */ 1141 (*job->job_fn)(job); 1142 1143 /* lwp name restored in threadpool_job_done(). */ 1144 KASSERTMSG((curlwp->l_name == lwp_name), 1145 "someone forgot to call threadpool_job_done()!"); 1146 1147 /* 1148 * We can compare pointers, but we can no longer deference 1149 * job after this because threadpool_job_done() drops the 1150 * last reference on the job while the job is locked. 1151 */ 1152 1153 mutex_spin_enter(&pool->tp_lock); 1154 KASSERT(thread->tpt_job == job); 1155 thread->tpt_job = NULL; 1156 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry); 1157 } 1158 threadpool_rele(pool); 1159 mutex_spin_exit(&pool->tp_lock); 1160 1161 SDT_PROBE1(sdt, kernel, threadpool, thread__exit, pool); 1162 1163 KASSERT(!cv_has_waiters(&thread->tpt_cv)); 1164 cv_destroy(&thread->tpt_cv); 1165 pool_cache_put(threadpool_thread_pc, thread); 1166 kthread_exit(0); 1167 } 1168