1 /* $NetBSD: kern_threadpool.c,v 1.15 2019/01/17 10:18:52 hannken Exp $ */ 2 3 /*- 4 * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Taylor R. Campbell and Jason R. Thorpe. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * Thread pools. 34 * 35 * A thread pool is a collection of worker threads idle or running 36 * jobs, together with an overseer thread that does not run jobs but 37 * can be given jobs to assign to a worker thread. Scheduling a job in 38 * a thread pool does not allocate or even sleep at all, except perhaps 39 * on an adaptive lock, unlike kthread_create. Jobs reuse threads, so 40 * they do not incur the expense of creating and destroying kthreads 41 * unless there is not much work to be done. 42 * 43 * A per-CPU thread pool (threadpool_percpu) is a collection of thread 44 * pools, one per CPU bound to that CPU. For each priority level in 45 * use, there is one shared unbound thread pool (i.e., pool of threads 46 * not bound to any CPU) and one shared per-CPU thread pool. 47 * 48 * To use the unbound thread pool at priority pri, call 49 * threadpool_get(&pool, pri). When you're done, call 50 * threadpool_put(pool, pri). 51 * 52 * To use the per-CPU thread pools at priority pri, call 53 * threadpool_percpu_get(&pool_percpu, pri), and then use the thread 54 * pool returned by threadpool_percpu_ref(pool_percpu) for the current 55 * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another 56 * CPU. When you're done, call threadpool_percpu_put(pool_percpu, 57 * pri). 58 * 59 * +--MACHINE-----------------------------------------------+ 60 * | +--CPU 0-------+ +--CPU 1-------+ +--CPU n-------+ | 61 * | | <overseer 0> | | <overseer 1> | ... | <overseer n> | | 62 * | | <idle 0a> | | <running 1a> | ... | <idle na> | | 63 * | | <running 0b> | | <running 1b> | ... | <idle nb> | | 64 * | | . | | . | ... | . | | 65 * | | . | | . | ... | . | | 66 * | | . | | . | ... | . | | 67 * | +--------------+ +--------------+ +--------------+ | 68 * | +--unbound---------+ | 69 * | | <overseer n+1> | | 70 * | | <idle (n+1)a> | | 71 * | | <running (n+1)b> | | 72 * | +------------------+ | 73 * +--------------------------------------------------------+ 74 * 75 * XXX Why one overseer per CPU? I did that originally to avoid 76 * touching remote CPUs' memory when scheduling a job, but that still 77 * requires interprocessor synchronization. Perhaps we could get by 78 * with a single overseer thread, at the expense of another pointer in 79 * struct threadpool_job to identify the CPU on which it must run 80 * in order for the overseer to schedule it correctly. 81 */ 82 83 #include <sys/cdefs.h> 84 __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.15 2019/01/17 10:18:52 hannken Exp $"); 85 86 #include <sys/types.h> 87 #include <sys/param.h> 88 #include <sys/atomic.h> 89 #include <sys/condvar.h> 90 #include <sys/cpu.h> 91 #include <sys/kernel.h> 92 #include <sys/kmem.h> 93 #include <sys/kthread.h> 94 #include <sys/mutex.h> 95 #include <sys/once.h> 96 #include <sys/percpu.h> 97 #include <sys/pool.h> 98 #include <sys/proc.h> 99 #include <sys/queue.h> 100 #include <sys/systm.h> 101 #include <sys/sysctl.h> 102 #include <sys/threadpool.h> 103 104 /* Data structures */ 105 106 TAILQ_HEAD(job_head, threadpool_job); 107 TAILQ_HEAD(thread_head, threadpool_thread); 108 109 struct threadpool_thread { 110 struct lwp *tpt_lwp; 111 char *tpt_lwp_savedname; 112 struct threadpool *tpt_pool; 113 struct threadpool_job *tpt_job; 114 kcondvar_t tpt_cv; 115 TAILQ_ENTRY(threadpool_thread) tpt_entry; 116 }; 117 118 struct threadpool { 119 kmutex_t tp_lock; 120 struct threadpool_thread tp_overseer; 121 struct job_head tp_jobs; 122 struct thread_head tp_idle_threads; 123 uint64_t tp_refcnt; 124 int tp_flags; 125 #define THREADPOOL_DYING 0x01 126 struct cpu_info *tp_cpu; 127 pri_t tp_pri; 128 }; 129 130 static void threadpool_hold(struct threadpool *); 131 static void threadpool_rele(struct threadpool *); 132 133 static int threadpool_percpu_create(struct threadpool_percpu **, pri_t); 134 static void threadpool_percpu_destroy(struct threadpool_percpu *); 135 136 static threadpool_job_fn_t threadpool_job_dead; 137 138 static void threadpool_job_hold(struct threadpool_job *); 139 static void threadpool_job_rele(struct threadpool_job *); 140 141 static void threadpool_overseer_thread(void *) __dead; 142 static void threadpool_thread(void *) __dead; 143 144 static pool_cache_t threadpool_thread_pc __read_mostly; 145 146 static kmutex_t threadpools_lock __cacheline_aligned; 147 148 /* Default to 30 second idle timeout for pool threads. */ 149 static int threadpool_idle_time_ms = 30 * 1000; 150 151 struct threadpool_unbound { 152 struct threadpool tpu_pool; 153 154 /* protected by threadpools_lock */ 155 LIST_ENTRY(threadpool_unbound) tpu_link; 156 uint64_t tpu_refcnt; 157 }; 158 159 static LIST_HEAD(, threadpool_unbound) unbound_threadpools; 160 161 static struct threadpool_unbound * 162 threadpool_lookup_unbound(pri_t pri) 163 { 164 struct threadpool_unbound *tpu; 165 166 LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) { 167 if (tpu->tpu_pool.tp_pri == pri) 168 return tpu; 169 } 170 return NULL; 171 } 172 173 static void 174 threadpool_insert_unbound(struct threadpool_unbound *tpu) 175 { 176 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL); 177 LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link); 178 } 179 180 static void 181 threadpool_remove_unbound(struct threadpool_unbound *tpu) 182 { 183 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu); 184 LIST_REMOVE(tpu, tpu_link); 185 } 186 187 struct threadpool_percpu { 188 percpu_t * tpp_percpu; 189 pri_t tpp_pri; 190 191 /* protected by threadpools_lock */ 192 LIST_ENTRY(threadpool_percpu) tpp_link; 193 uint64_t tpp_refcnt; 194 }; 195 196 static LIST_HEAD(, threadpool_percpu) percpu_threadpools; 197 198 static struct threadpool_percpu * 199 threadpool_lookup_percpu(pri_t pri) 200 { 201 struct threadpool_percpu *tpp; 202 203 LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) { 204 if (tpp->tpp_pri == pri) 205 return tpp; 206 } 207 return NULL; 208 } 209 210 static void 211 threadpool_insert_percpu(struct threadpool_percpu *tpp) 212 { 213 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL); 214 LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link); 215 } 216 217 static void 218 threadpool_remove_percpu(struct threadpool_percpu *tpp) 219 { 220 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp); 221 LIST_REMOVE(tpp, tpp_link); 222 } 223 224 #ifdef THREADPOOL_VERBOSE 225 #define TP_LOG(x) printf x 226 #else 227 #define TP_LOG(x) /* nothing */ 228 #endif /* THREADPOOL_VERBOSE */ 229 230 static int 231 sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS) 232 { 233 struct sysctlnode node; 234 int val, error; 235 236 node = *rnode; 237 238 val = threadpool_idle_time_ms; 239 node.sysctl_data = &val; 240 error = sysctl_lookup(SYSCTLFN_CALL(&node)); 241 if (error == 0 && newp != NULL) { 242 /* Disallow negative values and 0 (forever). */ 243 if (val < 1) 244 error = EINVAL; 245 else 246 threadpool_idle_time_ms = val; 247 } 248 249 return error; 250 } 251 252 SYSCTL_SETUP_PROTO(sysctl_threadpool_setup); 253 254 SYSCTL_SETUP(sysctl_threadpool_setup, 255 "sysctl kern.threadpool subtree setup") 256 { 257 const struct sysctlnode *rnode, *cnode; 258 int error __diagused; 259 260 error = sysctl_createv(clog, 0, NULL, &rnode, 261 CTLFLAG_PERMANENT, 262 CTLTYPE_NODE, "threadpool", 263 SYSCTL_DESCR("threadpool subsystem options"), 264 NULL, 0, NULL, 0, 265 CTL_KERN, CTL_CREATE, CTL_EOL); 266 KASSERT(error == 0); 267 268 error = sysctl_createv(clog, 0, &rnode, &cnode, 269 CTLFLAG_PERMANENT | CTLFLAG_READWRITE, 270 CTLTYPE_INT, "idle_ms", 271 SYSCTL_DESCR("idle thread timeout in ms"), 272 sysctl_kern_threadpool_idle_ms, 0, NULL, 0, 273 CTL_CREATE, CTL_EOL); 274 KASSERT(error == 0); 275 } 276 277 void 278 threadpools_init(void) 279 { 280 281 threadpool_thread_pc = 282 pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0, 283 "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL); 284 285 LIST_INIT(&unbound_threadpools); 286 LIST_INIT(&percpu_threadpools); 287 mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE); 288 } 289 290 /* Thread pool creation */ 291 292 static bool 293 threadpool_pri_is_valid(pri_t pri) 294 { 295 return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT)); 296 } 297 298 static int 299 threadpool_create(struct threadpool *const pool, struct cpu_info *ci, 300 pri_t pri) 301 { 302 struct lwp *lwp; 303 int ktflags; 304 int error; 305 306 KASSERT(threadpool_pri_is_valid(pri)); 307 308 mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM); 309 /* XXX overseer */ 310 TAILQ_INIT(&pool->tp_jobs); 311 TAILQ_INIT(&pool->tp_idle_threads); 312 pool->tp_refcnt = 1; /* overseer's reference */ 313 pool->tp_flags = 0; 314 pool->tp_cpu = ci; 315 pool->tp_pri = pri; 316 317 pool->tp_overseer.tpt_lwp = NULL; 318 pool->tp_overseer.tpt_pool = pool; 319 pool->tp_overseer.tpt_job = NULL; 320 cv_init(&pool->tp_overseer.tpt_cv, "poolover"); 321 322 ktflags = 0; 323 ktflags |= KTHREAD_MPSAFE; 324 if (pri < PRI_KERNEL) 325 ktflags |= KTHREAD_TS; 326 error = kthread_create(pri, ktflags, ci, &threadpool_overseer_thread, 327 &pool->tp_overseer, &lwp, 328 "pooloverseer/%d@%d", (ci ? cpu_index(ci) : -1), (int)pri); 329 if (error) 330 goto fail0; 331 332 mutex_spin_enter(&pool->tp_lock); 333 pool->tp_overseer.tpt_lwp = lwp; 334 cv_broadcast(&pool->tp_overseer.tpt_cv); 335 mutex_spin_exit(&pool->tp_lock); 336 337 return 0; 338 339 fail0: KASSERT(error); 340 KASSERT(pool->tp_overseer.tpt_job == NULL); 341 KASSERT(pool->tp_overseer.tpt_pool == pool); 342 KASSERT(pool->tp_flags == 0); 343 KASSERT(pool->tp_refcnt == 0); 344 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 345 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 346 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv)); 347 cv_destroy(&pool->tp_overseer.tpt_cv); 348 mutex_destroy(&pool->tp_lock); 349 return error; 350 } 351 352 /* Thread pool destruction */ 353 354 static void 355 threadpool_destroy(struct threadpool *pool) 356 { 357 struct threadpool_thread *thread; 358 359 /* Mark the pool dying and wait for threads to commit suicide. */ 360 mutex_spin_enter(&pool->tp_lock); 361 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 362 pool->tp_flags |= THREADPOOL_DYING; 363 cv_broadcast(&pool->tp_overseer.tpt_cv); 364 TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry) 365 cv_broadcast(&thread->tpt_cv); 366 while (0 < pool->tp_refcnt) { 367 TP_LOG(("%s: draining %" PRIu64 " references...\n", __func__, 368 pool->tp_refcnt)); 369 cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock); 370 } 371 mutex_spin_exit(&pool->tp_lock); 372 373 KASSERT(pool->tp_overseer.tpt_job == NULL); 374 KASSERT(pool->tp_overseer.tpt_pool == pool); 375 KASSERT(pool->tp_flags == THREADPOOL_DYING); 376 KASSERT(pool->tp_refcnt == 0); 377 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 378 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 379 KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv)); 380 cv_destroy(&pool->tp_overseer.tpt_cv); 381 mutex_destroy(&pool->tp_lock); 382 } 383 384 static void 385 threadpool_hold(struct threadpool *pool) 386 { 387 388 KASSERT(mutex_owned(&pool->tp_lock)); 389 pool->tp_refcnt++; 390 KASSERT(pool->tp_refcnt != 0); 391 } 392 393 static void 394 threadpool_rele(struct threadpool *pool) 395 { 396 397 KASSERT(mutex_owned(&pool->tp_lock)); 398 KASSERT(0 < pool->tp_refcnt); 399 if (--pool->tp_refcnt == 0) 400 cv_broadcast(&pool->tp_overseer.tpt_cv); 401 } 402 403 /* Unbound thread pools */ 404 405 int 406 threadpool_get(struct threadpool **poolp, pri_t pri) 407 { 408 struct threadpool_unbound *tpu, *tmp = NULL; 409 int error; 410 411 ASSERT_SLEEPABLE(); 412 413 if (! threadpool_pri_is_valid(pri)) 414 return EINVAL; 415 416 mutex_enter(&threadpools_lock); 417 tpu = threadpool_lookup_unbound(pri); 418 if (tpu == NULL) { 419 mutex_exit(&threadpools_lock); 420 TP_LOG(("%s: No pool for pri=%d, creating one.\n", 421 __func__, (int)pri)); 422 tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP); 423 error = threadpool_create(&tmp->tpu_pool, NULL, pri); 424 if (error) { 425 kmem_free(tmp, sizeof(*tmp)); 426 return error; 427 } 428 mutex_enter(&threadpools_lock); 429 tpu = threadpool_lookup_unbound(pri); 430 if (tpu == NULL) { 431 TP_LOG(("%s: Won the creation race for pri=%d.\n", 432 __func__, (int)pri)); 433 tpu = tmp; 434 tmp = NULL; 435 threadpool_insert_unbound(tpu); 436 } 437 } 438 KASSERT(tpu != NULL); 439 tpu->tpu_refcnt++; 440 KASSERT(tpu->tpu_refcnt != 0); 441 mutex_exit(&threadpools_lock); 442 443 if (tmp != NULL) { 444 threadpool_destroy(&tmp->tpu_pool); 445 kmem_free(tmp, sizeof(*tmp)); 446 } 447 KASSERT(tpu != NULL); 448 *poolp = &tpu->tpu_pool; 449 return 0; 450 } 451 452 void 453 threadpool_put(struct threadpool *pool, pri_t pri) 454 { 455 struct threadpool_unbound *tpu = 456 container_of(pool, struct threadpool_unbound, tpu_pool); 457 458 ASSERT_SLEEPABLE(); 459 460 KASSERT(threadpool_pri_is_valid(pri)); 461 462 mutex_enter(&threadpools_lock); 463 KASSERT(tpu == threadpool_lookup_unbound(pri)); 464 KASSERT(0 < tpu->tpu_refcnt); 465 if (--tpu->tpu_refcnt == 0) { 466 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n", 467 __func__, (int)pri)); 468 threadpool_remove_unbound(tpu); 469 } else { 470 tpu = NULL; 471 } 472 mutex_exit(&threadpools_lock); 473 474 if (tpu) { 475 threadpool_destroy(&tpu->tpu_pool); 476 kmem_free(tpu, sizeof(*tpu)); 477 } 478 } 479 480 /* Per-CPU thread pools */ 481 482 int 483 threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri) 484 { 485 struct threadpool_percpu *pool_percpu, *tmp = NULL; 486 int error; 487 488 ASSERT_SLEEPABLE(); 489 490 if (! threadpool_pri_is_valid(pri)) 491 return EINVAL; 492 493 mutex_enter(&threadpools_lock); 494 pool_percpu = threadpool_lookup_percpu(pri); 495 if (pool_percpu == NULL) { 496 mutex_exit(&threadpools_lock); 497 TP_LOG(("%s: No pool for pri=%d, creating one.\n", 498 __func__, (int)pri)); 499 error = threadpool_percpu_create(&tmp, pri); 500 if (error) 501 return error; 502 KASSERT(tmp != NULL); 503 mutex_enter(&threadpools_lock); 504 pool_percpu = threadpool_lookup_percpu(pri); 505 if (pool_percpu == NULL) { 506 TP_LOG(("%s: Won the creation race for pri=%d.\n", 507 __func__, (int)pri)); 508 pool_percpu = tmp; 509 tmp = NULL; 510 threadpool_insert_percpu(pool_percpu); 511 } 512 } 513 KASSERT(pool_percpu != NULL); 514 pool_percpu->tpp_refcnt++; 515 KASSERT(pool_percpu->tpp_refcnt != 0); 516 mutex_exit(&threadpools_lock); 517 518 if (tmp != NULL) 519 threadpool_percpu_destroy(tmp); 520 KASSERT(pool_percpu != NULL); 521 *pool_percpup = pool_percpu; 522 return 0; 523 } 524 525 void 526 threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri) 527 { 528 529 ASSERT_SLEEPABLE(); 530 531 KASSERT(threadpool_pri_is_valid(pri)); 532 533 mutex_enter(&threadpools_lock); 534 KASSERT(pool_percpu == threadpool_lookup_percpu(pri)); 535 KASSERT(0 < pool_percpu->tpp_refcnt); 536 if (--pool_percpu->tpp_refcnt == 0) { 537 TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n", 538 __func__, (int)pri)); 539 threadpool_remove_percpu(pool_percpu); 540 } else { 541 pool_percpu = NULL; 542 } 543 mutex_exit(&threadpools_lock); 544 545 if (pool_percpu) 546 threadpool_percpu_destroy(pool_percpu); 547 } 548 549 struct threadpool * 550 threadpool_percpu_ref(struct threadpool_percpu *pool_percpu) 551 { 552 struct threadpool **poolp, *pool; 553 554 poolp = percpu_getref(pool_percpu->tpp_percpu); 555 pool = *poolp; 556 percpu_putref(pool_percpu->tpp_percpu); 557 558 return pool; 559 } 560 561 struct threadpool * 562 threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu, 563 struct cpu_info *ci) 564 { 565 struct threadpool **poolp, *pool; 566 567 percpu_traverse_enter(); 568 poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 569 pool = *poolp; 570 percpu_traverse_exit(); 571 572 return pool; 573 } 574 575 static int 576 threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri) 577 { 578 struct threadpool_percpu *pool_percpu; 579 struct cpu_info *ci; 580 CPU_INFO_ITERATOR cii; 581 unsigned int i, j; 582 int error; 583 584 pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP); 585 if (pool_percpu == NULL) { 586 error = ENOMEM; 587 goto fail0; 588 } 589 pool_percpu->tpp_pri = pri; 590 591 pool_percpu->tpp_percpu = percpu_alloc(sizeof(struct threadpool *)); 592 if (pool_percpu->tpp_percpu == NULL) { 593 error = ENOMEM; 594 goto fail1; 595 } 596 597 for (i = 0, CPU_INFO_FOREACH(cii, ci), i++) { 598 struct threadpool *pool; 599 600 pool = kmem_zalloc(sizeof(*pool), KM_SLEEP); 601 error = threadpool_create(pool, ci, pri); 602 if (error) { 603 kmem_free(pool, sizeof(*pool)); 604 goto fail2; 605 } 606 percpu_traverse_enter(); 607 struct threadpool **const poolp = 608 percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 609 *poolp = pool; 610 percpu_traverse_exit(); 611 } 612 613 /* Success! */ 614 *pool_percpup = (struct threadpool_percpu *)pool_percpu; 615 return 0; 616 617 fail2: for (j = 0, CPU_INFO_FOREACH(cii, ci), j++) { 618 if (i <= j) 619 break; 620 percpu_traverse_enter(); 621 struct threadpool **const poolp = 622 percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 623 struct threadpool *const pool = *poolp; 624 percpu_traverse_exit(); 625 threadpool_destroy(pool); 626 kmem_free(pool, sizeof(*pool)); 627 } 628 percpu_free(pool_percpu->tpp_percpu, sizeof(struct taskthread_pool *)); 629 fail1: kmem_free(pool_percpu, sizeof(*pool_percpu)); 630 fail0: return error; 631 } 632 633 static void 634 threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu) 635 { 636 struct cpu_info *ci; 637 CPU_INFO_ITERATOR cii; 638 639 for (CPU_INFO_FOREACH(cii, ci)) { 640 percpu_traverse_enter(); 641 struct threadpool **const poolp = 642 percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 643 struct threadpool *const pool = *poolp; 644 percpu_traverse_exit(); 645 threadpool_destroy(pool); 646 kmem_free(pool, sizeof(*pool)); 647 } 648 649 percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); 650 kmem_free(pool_percpu, sizeof(*pool_percpu)); 651 } 652 653 /* Thread pool jobs */ 654 655 void __printflike(4,5) 656 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn, 657 kmutex_t *lock, const char *fmt, ...) 658 { 659 va_list ap; 660 661 va_start(ap, fmt); 662 (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap); 663 va_end(ap); 664 665 job->job_lock = lock; 666 job->job_thread = NULL; 667 job->job_refcnt = 0; 668 cv_init(&job->job_cv, job->job_name); 669 job->job_fn = fn; 670 } 671 672 static void 673 threadpool_job_dead(struct threadpool_job *job) 674 { 675 676 panic("threadpool job %p ran after destruction", job); 677 } 678 679 void 680 threadpool_job_destroy(struct threadpool_job *job) 681 { 682 683 ASSERT_SLEEPABLE(); 684 685 KASSERTMSG((job->job_thread == NULL), "job %p still running", job); 686 687 mutex_enter(job->job_lock); 688 while (0 < job->job_refcnt) 689 cv_wait(&job->job_cv, job->job_lock); 690 mutex_exit(job->job_lock); 691 692 job->job_lock = NULL; 693 KASSERT(job->job_thread == NULL); 694 KASSERT(job->job_refcnt == 0); 695 KASSERT(!cv_has_waiters(&job->job_cv)); 696 cv_destroy(&job->job_cv); 697 job->job_fn = threadpool_job_dead; 698 (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name)); 699 } 700 701 static void 702 threadpool_job_hold(struct threadpool_job *job) 703 { 704 unsigned int refcnt; 705 706 do { 707 refcnt = job->job_refcnt; 708 KASSERT(refcnt != UINT_MAX); 709 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1)) 710 != refcnt); 711 } 712 713 static void 714 threadpool_job_rele(struct threadpool_job *job) 715 { 716 unsigned int refcnt; 717 718 KASSERT(mutex_owned(job->job_lock)); 719 720 do { 721 refcnt = job->job_refcnt; 722 KASSERT(0 < refcnt); 723 if (refcnt == 1) { 724 refcnt = atomic_dec_uint_nv(&job->job_refcnt); 725 KASSERT(refcnt != UINT_MAX); 726 if (refcnt == 0) 727 cv_broadcast(&job->job_cv); 728 return; 729 } 730 } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt - 1)) 731 != refcnt); 732 } 733 734 void 735 threadpool_job_done(struct threadpool_job *job) 736 { 737 738 KASSERT(mutex_owned(job->job_lock)); 739 KASSERT(job->job_thread != NULL); 740 KASSERT(job->job_thread->tpt_lwp == curlwp); 741 742 /* 743 * We can safely read this field; it's only modified right before 744 * we call the job work function, and we are only preserving it 745 * to use here; no one cares if it contains junk afterward. 746 */ 747 lwp_lock(curlwp); 748 curlwp->l_name = job->job_thread->tpt_lwp_savedname; 749 lwp_unlock(curlwp); 750 751 /* 752 * Inline the work of threadpool_job_rele(); the job is already 753 * locked, the most likely scenario (XXXJRT only scenario?) is 754 * that we're dropping the last reference (the one taken in 755 * threadpool_schedule_job()), and we always do the cv_broadcast() 756 * anyway. 757 */ 758 KASSERT(0 < job->job_refcnt); 759 unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt); 760 KASSERT(refcnt != UINT_MAX); 761 cv_broadcast(&job->job_cv); 762 job->job_thread = NULL; 763 } 764 765 void 766 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) 767 { 768 769 KASSERT(mutex_owned(job->job_lock)); 770 771 /* 772 * If the job's already running, let it keep running. The job 773 * is guaranteed by the interlock not to end early -- if it had 774 * ended early, threadpool_job_done would have set job_thread 775 * to NULL under the interlock. 776 */ 777 if (__predict_true(job->job_thread != NULL)) { 778 TP_LOG(("%s: job '%s' already runnining.\n", 779 __func__, job->job_name)); 780 return; 781 } 782 783 threadpool_job_hold(job); 784 785 /* Otherwise, try to assign a thread to the job. */ 786 mutex_spin_enter(&pool->tp_lock); 787 if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) { 788 /* Nobody's idle. Give it to the overseer. */ 789 TP_LOG(("%s: giving job '%s' to overseer.\n", 790 __func__, job->job_name)); 791 job->job_thread = &pool->tp_overseer; 792 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry); 793 } else { 794 /* Assign it to the first idle thread. */ 795 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads); 796 TP_LOG(("%s: giving job '%s' to idle thread %p.\n", 797 __func__, job->job_name, job->job_thread)); 798 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread, 799 tpt_entry); 800 job->job_thread->tpt_job = job; 801 } 802 803 /* Notify whomever we gave it to, overseer or idle thread. */ 804 KASSERT(job->job_thread != NULL); 805 cv_broadcast(&job->job_thread->tpt_cv); 806 mutex_spin_exit(&pool->tp_lock); 807 } 808 809 bool 810 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job) 811 { 812 813 KASSERT(mutex_owned(job->job_lock)); 814 815 /* 816 * XXXJRT This fails (albeit safely) when all of the following 817 * are true: 818 * 819 * => "pool" is something other than what the job was 820 * scheduled on. This can legitimately occur if, 821 * for example, a job is percpu-scheduled on CPU0 822 * and then CPU1 attempts to cancel it without taking 823 * a remote pool reference. (this might happen by 824 * "luck of the draw"). 825 * 826 * => "job" is not yet running, but is assigned to the 827 * overseer. 828 * 829 * When this happens, this code makes the determination that 830 * the job is already running. The failure mode is that the 831 * caller is told the job is running, and thus has to wait. 832 * The overseer will eventually get to it and the job will 833 * proceed as if it had been already running. 834 */ 835 836 if (job->job_thread == NULL) { 837 /* Nothing to do. Guaranteed not running. */ 838 return true; 839 } else if (job->job_thread == &pool->tp_overseer) { 840 /* Take it off the list to guarantee it won't run. */ 841 job->job_thread = NULL; 842 mutex_spin_enter(&pool->tp_lock); 843 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 844 mutex_spin_exit(&pool->tp_lock); 845 threadpool_job_rele(job); 846 return true; 847 } else { 848 /* Too late -- already running. */ 849 return false; 850 } 851 } 852 853 void 854 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job) 855 { 856 857 ASSERT_SLEEPABLE(); 858 859 KASSERT(mutex_owned(job->job_lock)); 860 861 if (threadpool_cancel_job_async(pool, job)) 862 return; 863 864 /* Already running. Wait for it to complete. */ 865 while (job->job_thread != NULL) 866 cv_wait(&job->job_cv, job->job_lock); 867 } 868 869 /* Thread pool overseer thread */ 870 871 static void __dead 872 threadpool_overseer_thread(void *arg) 873 { 874 struct threadpool_thread *const overseer = arg; 875 struct threadpool *const pool = overseer->tpt_pool; 876 struct lwp *lwp = NULL; 877 int ktflags; 878 int error; 879 880 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 881 882 /* Wait until we're initialized. */ 883 mutex_spin_enter(&pool->tp_lock); 884 while (overseer->tpt_lwp == NULL) 885 cv_wait(&overseer->tpt_cv, &pool->tp_lock); 886 887 TP_LOG(("%s: starting.\n", __func__)); 888 889 for (;;) { 890 /* Wait until there's a job. */ 891 while (TAILQ_EMPTY(&pool->tp_jobs)) { 892 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 893 TP_LOG(("%s: THREADPOOL_DYING\n", 894 __func__)); 895 break; 896 } 897 cv_wait(&overseer->tpt_cv, &pool->tp_lock); 898 } 899 if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs))) 900 break; 901 902 /* If there are no threads, we'll have to try to start one. */ 903 if (TAILQ_EMPTY(&pool->tp_idle_threads)) { 904 TP_LOG(("%s: Got a job, need to create a thread.\n", 905 __func__)); 906 threadpool_hold(pool); 907 mutex_spin_exit(&pool->tp_lock); 908 909 struct threadpool_thread *const thread = 910 pool_cache_get(threadpool_thread_pc, PR_WAITOK); 911 thread->tpt_lwp = NULL; 912 thread->tpt_pool = pool; 913 thread->tpt_job = NULL; 914 cv_init(&thread->tpt_cv, "poolthrd"); 915 916 ktflags = 0; 917 ktflags |= KTHREAD_MPSAFE; 918 if (pool->tp_pri < PRI_KERNEL) 919 ktflags |= KTHREAD_TS; 920 error = kthread_create(pool->tp_pri, ktflags, 921 pool->tp_cpu, &threadpool_thread, thread, &lwp, 922 "poolthread/%d@%d", 923 (pool->tp_cpu ? cpu_index(pool->tp_cpu) : -1), 924 (int)pool->tp_pri); 925 926 mutex_spin_enter(&pool->tp_lock); 927 if (error) { 928 pool_cache_put(threadpool_thread_pc, thread); 929 threadpool_rele(pool); 930 /* XXX What to do to wait for memory? */ 931 (void)kpause("thrdplcr", false, hz, 932 &pool->tp_lock); 933 continue; 934 } 935 /* 936 * New kthread now owns the reference to the pool 937 * taken above. 938 */ 939 KASSERT(lwp != NULL); 940 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, 941 tpt_entry); 942 thread->tpt_lwp = lwp; 943 lwp = NULL; 944 cv_broadcast(&thread->tpt_cv); 945 continue; 946 } 947 948 /* There are idle threads, so try giving one a job. */ 949 struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs); 950 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 951 /* 952 * Take an extra reference on the job temporarily so that 953 * it won't disappear on us while we have both locks dropped. 954 */ 955 threadpool_job_hold(job); 956 mutex_spin_exit(&pool->tp_lock); 957 958 mutex_enter(job->job_lock); 959 /* If the job was cancelled, we'll no longer be its thread. */ 960 if (__predict_true(job->job_thread == overseer)) { 961 mutex_spin_enter(&pool->tp_lock); 962 if (__predict_false( 963 TAILQ_EMPTY(&pool->tp_idle_threads))) { 964 /* 965 * Someone else snagged the thread 966 * first. We'll have to try again. 967 */ 968 TP_LOG(("%s: '%s' lost race to use idle thread.\n", 969 __func__, job->job_name)); 970 TAILQ_INSERT_HEAD(&pool->tp_jobs, job, 971 job_entry); 972 } else { 973 /* 974 * Assign the job to the thread and 975 * wake the thread so it starts work. 976 */ 977 struct threadpool_thread *const thread = 978 TAILQ_FIRST(&pool->tp_idle_threads); 979 980 TP_LOG(("%s: '%s' gets thread %p\n", 981 __func__, job->job_name, thread)); 982 KASSERT(thread->tpt_job == NULL); 983 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 984 tpt_entry); 985 thread->tpt_job = job; 986 job->job_thread = thread; 987 cv_broadcast(&thread->tpt_cv); 988 } 989 mutex_spin_exit(&pool->tp_lock); 990 } 991 threadpool_job_rele(job); 992 mutex_exit(job->job_lock); 993 994 mutex_spin_enter(&pool->tp_lock); 995 } 996 threadpool_rele(pool); 997 mutex_spin_exit(&pool->tp_lock); 998 999 TP_LOG(("%s: exiting.\n", __func__)); 1000 1001 kthread_exit(0); 1002 } 1003 1004 /* Thread pool thread */ 1005 1006 static void __dead 1007 threadpool_thread(void *arg) 1008 { 1009 struct threadpool_thread *const thread = arg; 1010 struct threadpool *const pool = thread->tpt_pool; 1011 1012 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 1013 1014 /* Wait until we're initialized and on the queue. */ 1015 mutex_spin_enter(&pool->tp_lock); 1016 while (thread->tpt_lwp == NULL) 1017 cv_wait(&thread->tpt_cv, &pool->tp_lock); 1018 1019 TP_LOG(("%s: starting.\n", __func__)); 1020 1021 KASSERT(thread->tpt_lwp == curlwp); 1022 for (;;) { 1023 /* Wait until we are assigned a job. */ 1024 while (thread->tpt_job == NULL) { 1025 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 1026 TP_LOG(("%s: THREADPOOL_DYING\n", 1027 __func__)); 1028 break; 1029 } 1030 if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock, 1031 mstohz(threadpool_idle_time_ms))) 1032 break; 1033 } 1034 if (__predict_false(thread->tpt_job == NULL)) { 1035 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 1036 tpt_entry); 1037 break; 1038 } 1039 1040 struct threadpool_job *const job = thread->tpt_job; 1041 KASSERT(job != NULL); 1042 1043 /* Set our lwp name to reflect what job we're doing. */ 1044 lwp_lock(curlwp); 1045 char *const lwp_name __diagused = curlwp->l_name; 1046 thread->tpt_lwp_savedname = curlwp->l_name; 1047 curlwp->l_name = job->job_name; 1048 lwp_unlock(curlwp); 1049 1050 mutex_spin_exit(&pool->tp_lock); 1051 1052 TP_LOG(("%s: running job '%s' on thread %p.\n", 1053 __func__, job->job_name, thread)); 1054 1055 /* Run the job. */ 1056 (*job->job_fn)(job); 1057 1058 /* lwp name restored in threadpool_job_done(). */ 1059 KASSERTMSG((curlwp->l_name == lwp_name), 1060 "someone forgot to call threadpool_job_done()!"); 1061 1062 /* 1063 * We can compare pointers, but we can no longer deference 1064 * job after this because threadpool_job_done() drops the 1065 * last reference on the job while the job is locked. 1066 */ 1067 1068 mutex_spin_enter(&pool->tp_lock); 1069 KASSERT(thread->tpt_job == job); 1070 thread->tpt_job = NULL; 1071 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry); 1072 } 1073 threadpool_rele(pool); 1074 mutex_spin_exit(&pool->tp_lock); 1075 1076 TP_LOG(("%s: thread %p exiting.\n", __func__, thread)); 1077 1078 KASSERT(!cv_has_waiters(&thread->tpt_cv)); 1079 cv_destroy(&thread->tpt_cv); 1080 pool_cache_put(threadpool_thread_pc, thread); 1081 kthread_exit(0); 1082 } 1083