1 /* $NetBSD: taskq.c,v 1.5 2015/04/11 16:32:07 riastradh Exp $ */ 2 3 /* 4 * CDDL HEADER START 5 * 6 * The contents of this file are subject to the terms of the 7 * Common Development and Distribution License, Version 1.0 only 8 * (the "License"). You may not use this file except in compliance 9 * with the License. 10 * 11 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 12 * or http://www.opensolaris.org/os/licensing. 13 * See the License for the specific language governing permissions 14 * and limitations under the License. 15 * 16 * When distributing Covered Code, include this CDDL HEADER in each 17 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 18 * If applicable, add the following below this CDDL HEADER, with the 19 * fields enclosed by brackets "[]" replaced with your own identifying 20 * information: Portions Copyright [yyyy] [name of copyright owner] 21 * 22 * CDDL HEADER END 23 */ 24 /* 25 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 */ 28 29 #pragma ident "%Z%%M% %I% %E% SMI" 30 31 /* 32 * Kernel task queues: general-purpose asynchronous task scheduling. 33 * 34 * A common problem in kernel programming is the need to schedule tasks 35 * to be performed later, by another thread. There are several reasons 36 * you may want or need to do this: 37 * 38 * (1) The task isn't time-critical, but your current code path is. 39 * 40 * (2) The task may require grabbing locks that you already hold. 41 * 42 * (3) The task may need to block (e.g. to wait for memory), but you 43 * cannot block in your current context. 44 * 45 * (4) Your code path can't complete because of some condition, but you can't 46 * sleep or fail, so you queue the task for later execution when condition 47 * disappears. 48 * 49 * (5) You just want a simple way to launch multiple tasks in parallel. 50 * 51 * Task queues provide such a facility. In its simplest form (used when 52 * performance is not a critical consideration) a task queue consists of a 53 * single list of tasks, together with one or more threads to service the 54 * list. There are some cases when this simple queue is not sufficient: 55 * 56 * (1) The task queues are very hot and there is a need to avoid data and lock 57 * contention over global resources. 58 * 59 * (2) Some tasks may depend on other tasks to complete, so they can't be put in 60 * the same list managed by the same thread. 61 * 62 * (3) Some tasks may block for a long time, and this should not block other 63 * tasks in the queue. 64 * 65 * To provide useful service in such cases we define a "dynamic task queue" 66 * which has an individual thread for each of the tasks. These threads are 67 * dynamically created as they are needed and destroyed when they are not in 68 * use. The API for managing task pools is the same as for managing task queues 69 * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that 70 * dynamic task pool behavior is desired. 71 * 72 * Dynamic task queues may also place tasks in the normal queue (called "backing 73 * queue") when task pool runs out of resources. Users of task queues may 74 * disallow such queued scheduling by specifying TQ_NOQUEUE in the dispatch 75 * flags. 76 * 77 * The backing task queue is also used for scheduling internal tasks needed for 78 * dynamic task queue maintenance. 79 * 80 * INTERFACES: 81 * 82 * taskq_t *taskq_create(name, nthreads, pri_t pri, minalloc, maxall, flags); 83 * 84 * Create a taskq with specified properties. 85 * Possible 'flags': 86 * 87 * TASKQ_DYNAMIC: Create task pool for task management. If this flag is 88 * specified, 'nthreads' specifies the maximum number of threads in 89 * the task queue. Task execution order for dynamic task queues is 90 * not predictable. 91 * 92 * If this flag is not specified (default case) a 93 * single-list task queue is created with 'nthreads' threads 94 * servicing it. Entries in this queue are managed by 95 * taskq_ent_alloc() and taskq_ent_free() which try to keep the 96 * task population between 'minalloc' and 'maxalloc', but the 97 * latter limit is only advisory for TQ_SLEEP dispatches and the 98 * former limit is only advisory for TQ_NOALLOC dispatches. If 99 * TASKQ_PREPOPULATE is set in 'flags', the taskq will be 100 * prepopulated with 'minalloc' task structures. 101 * 102 * Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be 103 * executed in the order they are scheduled if nthreads == 1. 104 * If nthreads > 1, task execution order is not predictable. 105 * 106 * TASKQ_PREPOPULATE: Prepopulate task queue with threads. 107 * Also prepopulate the task queue with 'minalloc' task structures. 108 * 109 * TASKQ_CPR_SAFE: This flag specifies that users of the task queue will 110 * use their own protocol for handling CPR issues. This flag is not 111 * supported for DYNAMIC task queues. 112 * 113 * The 'pri' field specifies the default priority for the threads that 114 * service all scheduled tasks. 115 * 116 * void taskq_destroy(tap): 117 * 118 * Waits for any scheduled tasks to complete, then destroys the taskq. 119 * Caller should guarantee that no new tasks are scheduled in the closing 120 * taskq. 121 * 122 * taskqid_t taskq_dispatch(tq, func, arg, flags): 123 * 124 * Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether 125 * the caller is willing to block for memory. The function returns an 126 * opaque value which is zero iff dispatch fails. If flags is TQ_NOSLEEP 127 * or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails 128 * and returns (taskqid_t)0. 129 * 130 * ASSUMES: func != NULL. 131 * 132 * Possible flags: 133 * TQ_NOSLEEP: Do not wait for resources; may fail. 134 * 135 * TQ_NOALLOC: Do not allocate memory; may fail. May only be used with 136 * non-dynamic task queues. 137 * 138 * TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to 139 * lack of available resources and fail. If this flag is not 140 * set, and the task pool is exhausted, the task may be scheduled 141 * in the backing queue. This flag may ONLY be used with dynamic 142 * task queues. 143 * 144 * NOTE: This flag should always be used when a task queue is used 145 * for tasks that may depend on each other for completion. 146 * Enqueueing dependent tasks may create deadlocks. 147 * 148 * TQ_SLEEP: May block waiting for resources. May still fail for 149 * dynamic task queues if TQ_NOQUEUE is also specified, otherwise 150 * always succeed. 151 * 152 * NOTE: Dynamic task queues are much more likely to fail in 153 * taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it 154 * is important to have backup strategies handling such failures. 155 * 156 * void taskq_wait(tq): 157 * 158 * Waits for all previously scheduled tasks to complete. 159 * 160 * NOTE: It does not stop any new task dispatches. 161 * Do NOT call taskq_wait() from a task: it will cause deadlock. 162 * 163 * void taskq_suspend(tq) 164 * 165 * Suspend all task execution. Tasks already scheduled for a dynamic task 166 * queue will still be executed, but all new scheduled tasks will be 167 * suspended until taskq_resume() is called. 168 * 169 * int taskq_suspended(tq) 170 * 171 * Returns 1 if taskq is suspended and 0 otherwise. It is intended to 172 * ASSERT that the task queue is suspended. 173 * 174 * void taskq_resume(tq) 175 * 176 * Resume task queue execution. 177 * 178 * int taskq_member(tq, thread) 179 * 180 * Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The 181 * intended use is to ASSERT that a given function is called in taskq 182 * context only. 183 * 184 * system_taskq 185 * 186 * Global system-wide dynamic task queue for common uses. It may be used by 187 * any subsystem that needs to schedule tasks and does not need to manage 188 * its own task queues. It is initialized quite early during system boot. 189 * 190 * IMPLEMENTATION. 191 * 192 * This is schematic representation of the task queue structures. 193 * 194 * taskq: 195 * +-------------+ 196 * |tq_lock | +---< taskq_ent_free() 197 * +-------------+ | 198 * |... | | tqent: tqent: 199 * +-------------+ | +------------+ +------------+ 200 * | tq_freelist |-->| tqent_next |--> ... ->| tqent_next | 201 * +-------------+ +------------+ +------------+ 202 * |... | | ... | | ... | 203 * +-------------+ +------------+ +------------+ 204 * | tq_task | | 205 * | | +-------------->taskq_ent_alloc() 206 * +--------------------------------------------------------------------------+ 207 * | | | tqent tqent | 208 * | +---------------------+ +--> +------------+ +--> +------------+ | 209 * | | ... | | | func, arg | | | func, arg | | 210 * +>+---------------------+ <---|-+ +------------+ <---|-+ +------------+ | 211 * | tq_taskq.tqent_next | ----+ | | tqent_next | --->+ | | tqent_next |--+ 212 * +---------------------+ | +------------+ ^ | +------------+ 213 * +-| tq_task.tqent_prev | +--| tqent_prev | | +--| tqent_prev | ^ 214 * | +---------------------+ +------------+ | +------------+ | 215 * | |... | | ... | | | ... | | 216 * | +---------------------+ +------------+ | +------------+ | 217 * | ^ | | 218 * | | | | 219 * +--------------------------------------+--------------+ TQ_APPEND() -+ 220 * | | | 221 * |... | taskq_thread()-----+ 222 * +-------------+ 223 * | tq_buckets |--+-------> [ NULL ] (for regular task queues) 224 * +-------------+ | 225 * | DYNAMIC TASK QUEUES: 226 * | 227 * +-> taskq_bucket[nCPU] taskq_bucket_dispatch() 228 * +-------------------+ ^ 229 * +--->| tqbucket_lock | | 230 * | +-------------------+ +--------+ +--------+ 231 * | | tqbucket_freelist |-->| tqent |-->...| tqent | ^ 232 * | +-------------------+<--+--------+<--...+--------+ | 233 * | | ... | | thread | | thread | | 234 * | +-------------------+ +--------+ +--------+ | 235 * | +-------------------+ | 236 * taskq_dispatch()--+--->| tqbucket_lock | TQ_APPEND()------+ 237 * TQ_HASH() | +-------------------+ +--------+ +--------+ 238 * | | tqbucket_freelist |-->| tqent |-->...| tqent | 239 * | +-------------------+<--+--------+<--...+--------+ 240 * | | ... | | thread | | thread | 241 * | +-------------------+ +--------+ +--------+ 242 * +---> ... 243 * 244 * 245 * Task queues use tq_task field to link new entry in the queue. The queue is a 246 * circular doubly-linked list. Entries are put in the end of the list with 247 * TQ_APPEND() and processed from the front of the list by taskq_thread() in 248 * FIFO order. Task queue entries are cached in the free list managed by 249 * taskq_ent_alloc() and taskq_ent_free() functions. 250 * 251 * All threads used by task queues mark t_taskq field of the thread to 252 * point to the task queue. 253 * 254 * Dynamic Task Queues Implementation. 255 * 256 * For a dynamic task queues there is a 1-to-1 mapping between a thread and 257 * taskq_ent_structure. Each entry is serviced by its own thread and each thread 258 * is controlled by a single entry. 259 * 260 * Entries are distributed over a set of buckets. To avoid using modulo 261 * arithmetics the number of buckets is 2^n and is determined as the nearest 262 * power of two roundown of the number of CPUs in the system. Tunable 263 * variable 'taskq_maxbuckets' limits the maximum number of buckets. Each entry 264 * is attached to a bucket for its lifetime and can't migrate to other buckets. 265 * 266 * Entries that have scheduled tasks are not placed in any list. The dispatch 267 * function sets their "func" and "arg" fields and signals the corresponding 268 * thread to execute the task. Once the thread executes the task it clears the 269 * "func" field and places an entry on the bucket cache of free entries pointed 270 * by "tqbucket_freelist" field. ALL entries on the free list should have "func" 271 * field equal to NULL. The free list is a circular doubly-linked list identical 272 * in structure to the tq_task list above, but entries are taken from it in LIFO 273 * order - the last freed entry is the first to be allocated. The 274 * taskq_bucket_dispatch() function gets the most recently used entry from the 275 * free list, sets its "func" and "arg" fields and signals a worker thread. 276 * 277 * After executing each task a per-entry thread taskq_d_thread() places its 278 * entry on the bucket free list and goes to a timed sleep. If it wakes up 279 * without getting new task it removes the entry from the free list and destroys 280 * itself. The thread sleep time is controlled by a tunable variable 281 * `taskq_thread_timeout'. 282 * 283 * There is various statistics kept in the bucket which allows for later 284 * analysis of taskq usage patterns. Also, a global copy of taskq creation and 285 * death statistics is kept in the global taskq data structure. Since thread 286 * creation and death happen rarely, updating such global data does not present 287 * a performance problem. 288 * 289 * NOTE: Threads are not bound to any CPU and there is absolutely no association 290 * between the bucket and actual thread CPU, so buckets are used only to 291 * split resources and reduce resource contention. Having threads attached 292 * to the CPU denoted by a bucket may reduce number of times the job 293 * switches between CPUs. 294 * 295 * Current algorithm creates a thread whenever a bucket has no free 296 * entries. It would be nice to know how many threads are in the running 297 * state and don't create threads if all CPUs are busy with existing 298 * tasks, but it is unclear how such strategy can be implemented. 299 * 300 * Currently buckets are created statically as an array attached to task 301 * queue. On some system with nCPUs < max_ncpus it may waste system 302 * memory. One solution may be allocation of buckets when they are first 303 * touched, but it is not clear how useful it is. 304 * 305 * SUSPEND/RESUME implementation. 306 * 307 * Before executing a task taskq_thread() (executing non-dynamic task 308 * queues) obtains taskq's thread lock as a reader. The taskq_suspend() 309 * function gets the same lock as a writer blocking all non-dynamic task 310 * execution. The taskq_resume() function releases the lock allowing 311 * taskq_thread to continue execution. 312 * 313 * For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by 314 * taskq_suspend() function. After that taskq_bucket_dispatch() always 315 * fails, so that taskq_dispatch() will either enqueue tasks for a 316 * suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch 317 * flags. 318 * 319 * NOTE: taskq_suspend() does not immediately block any tasks already 320 * scheduled for dynamic task queues. It only suspends new tasks 321 * scheduled after taskq_suspend() was called. 322 * 323 * taskq_member() function works by comparing a thread t_taskq pointer with 324 * the passed thread pointer. 325 * 326 * LOCKS and LOCK Hierarchy: 327 * 328 * There are two locks used in task queues. 329 * 330 * 1) Task queue structure has a lock, protecting global task queue state. 331 * 332 * 2) Each per-CPU bucket has a lock for bucket management. 333 * 334 * If both locks are needed, task queue lock should be taken only after bucket 335 * lock. 336 * 337 * DEBUG FACILITIES. 338 * 339 * For DEBUG kernels it is possible to induce random failures to 340 * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of 341 * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced 342 * failures for dynamic and static task queues respectively. 343 * 344 * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics. 345 * 346 * TUNABLES 347 * 348 * system_taskq_size - Size of the global system_taskq. 349 * This value is multiplied by nCPUs to determine 350 * actual size. 351 * Default value: 64 352 * 353 * taskq_thread_timeout - Maximum idle time for taskq_d_thread() 354 * Default value: 5 minutes 355 * 356 * taskq_maxbuckets - Maximum number of buckets in any task queue 357 * Default value: 128 358 * 359 * taskq_search_depth - Maximum # of buckets searched for a free entry 360 * Default value: 4 361 * 362 * taskq_dmtbf - Mean time between induced dispatch failures 363 * for dynamic task queues. 364 * Default value: UINT_MAX (no induced failures) 365 * 366 * taskq_smtbf - Mean time between induced dispatch failures 367 * for static task queues. 368 * Default value: UINT_MAX (no induced failures) 369 * 370 * CONDITIONAL compilation. 371 * 372 * TASKQ_STATISTIC - If set will enable bucket statistic (default). 373 * 374 */ 375 376 #include <sys/kthread.h> 377 #include <sys/taskq_impl.h> 378 #include <sys/proc.h> 379 #include <sys/kmem.h> 380 #include <sys/callb.h> 381 #include <sys/systm.h> 382 #include <sys/cmn_err.h> 383 #include <sys/debug.h> 384 #include <sys/sysmacros.h> 385 #include <sys/sdt.h> 386 #include <sys/mutex.h> 387 #include <sys/kernel.h> 388 #include <sys/limits.h> 389 390 static kmem_cache_t *taskq_ent_cache, *taskq_cache; 391 392 /* Global system task queue for common use */ 393 taskq_t *system_taskq; 394 395 /* 396 * Maxmimum number of entries in global system taskq is 397 * system_taskq_size * max_ncpus 398 */ 399 #define SYSTEM_TASKQ_SIZE 1 400 int system_taskq_size = SYSTEM_TASKQ_SIZE; 401 402 #define TASKQ_ACTIVE 0x00010000 403 404 /* 405 * Dynamic task queue threads that don't get any work within 406 * taskq_thread_timeout destroy themselves 407 */ 408 #define TASKQ_THREAD_TIMEOUT (60 * 5) 409 int taskq_thread_timeout = TASKQ_THREAD_TIMEOUT; 410 411 #define TASKQ_MAXBUCKETS 128 412 int taskq_maxbuckets = TASKQ_MAXBUCKETS; 413 414 /* 415 * When a bucket has no available entries another buckets are tried. 416 * taskq_search_depth parameter limits the amount of buckets that we search 417 * before failing. This is mostly useful in systems with many CPUs where we may 418 * spend too much time scanning busy buckets. 419 */ 420 #define TASKQ_SEARCH_DEPTH 4 421 int taskq_search_depth = TASKQ_SEARCH_DEPTH; 422 423 /* 424 * Hashing function: mix various bits of x. May be pretty much anything. 425 */ 426 #define TQ_HASH(x) ((x) ^ ((x) >> 11) ^ ((x) >> 17) ^ ((x) ^ 27)) 427 428 /* 429 * We do not create any new threads when the system is low on memory and start 430 * throttling memory allocations. The following macro tries to estimate such 431 * condition. 432 */ 433 #define ENOUGH_MEMORY() (freemem > throttlefree) 434 435 /* 436 * Static functions. 437 */ 438 static taskq_t *taskq_create_common(const char *, int, int, pri_t, int, 439 int, uint_t); 440 static void taskq_thread(void *); 441 static int taskq_constructor(void *, void *, int); 442 static void taskq_destructor(void *, void *); 443 static int taskq_ent_constructor(void *, void *, int); 444 static void taskq_ent_destructor(void *, void *); 445 static taskq_ent_t *taskq_ent_alloc(taskq_t *, int); 446 static void taskq_ent_free(taskq_t *, taskq_ent_t *); 447 448 /* 449 * Collect per-bucket statistic when TASKQ_STATISTIC is defined. 450 */ 451 #define TASKQ_STATISTIC 1 452 453 #if TASKQ_STATISTIC 454 #define TQ_STAT(b, x) b->tqbucket_stat.x++ 455 #else 456 #define TQ_STAT(b, x) 457 #endif 458 459 /* 460 * Random fault injection. 461 */ 462 uint_t taskq_random; 463 uint_t taskq_dmtbf = UINT_MAX; /* mean time between injected failures */ 464 uint_t taskq_smtbf = UINT_MAX; /* mean time between injected failures */ 465 466 /* 467 * TQ_NOSLEEP dispatches on dynamic task queues are always allowed to fail. 468 * 469 * TQ_NOSLEEP dispatches on static task queues can't arbitrarily fail because 470 * they could prepopulate the cache and make sure that they do not use more 471 * then minalloc entries. So, fault injection in this case insures that 472 * either TASKQ_PREPOPULATE is not set or there are more entries allocated 473 * than is specified by minalloc. TQ_NOALLOC dispatches are always allowed 474 * to fail, but for simplicity we treat them identically to TQ_NOSLEEP 475 * dispatches. 476 */ 477 #ifdef DEBUG 478 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) \ 479 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 480 if ((flag & TQ_NOSLEEP) && \ 481 taskq_random < 1771875 / taskq_dmtbf) { \ 482 return (NULL); \ 483 } 484 485 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) \ 486 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 487 if ((flag & (TQ_NOSLEEP | TQ_NOALLOC)) && \ 488 (!(tq->tq_flags & TASKQ_PREPOPULATE) || \ 489 (tq->tq_nalloc > tq->tq_minalloc)) && \ 490 (taskq_random < (1771875 / taskq_smtbf))) { \ 491 mutex_exit(&tq->tq_lock); \ 492 return ((taskqid_t)0); \ 493 } 494 #else 495 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) 496 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) 497 #endif 498 499 #define IS_EMPTY(l) (((l).tqent_prev == (l).tqent_next) && \ 500 ((l).tqent_prev == &(l))) 501 502 /* 503 * Append `tqe' in the end of the doubly-linked list denoted by l. 504 */ 505 #define TQ_APPEND(l, tqe) { \ 506 tqe->tqent_next = &l; \ 507 tqe->tqent_prev = l.tqent_prev; \ 508 tqe->tqent_next->tqent_prev = tqe; \ 509 tqe->tqent_prev->tqent_next = tqe; \ 510 } 511 512 /* 513 * Schedule a task specified by func and arg into the task queue entry tqe. 514 */ 515 #define TQ_ENQUEUE(tq, tqe, func, arg) { \ 516 ASSERT(MUTEX_HELD(&tq->tq_lock)); \ 517 TQ_APPEND(tq->tq_task, tqe); \ 518 tqe->tqent_func = (func); \ 519 tqe->tqent_arg = (arg); \ 520 tq->tq_tasks++; \ 521 if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks) \ 522 tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed; \ 523 cv_signal(&tq->tq_dispatch_cv); \ 524 DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \ 525 } 526 527 /* 528 * Do-nothing task which may be used to prepopulate thread caches. 529 */ 530 /*ARGSUSED*/ 531 void 532 nulltask(void *unused) 533 { 534 } 535 536 537 /*ARGSUSED*/ 538 static int 539 taskq_constructor(void *arg, void *obj, int kmflags) 540 { 541 taskq_t *tq = obj; 542 543 memset(tq, 0, sizeof (taskq_t)); 544 545 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 546 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 547 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 548 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 549 550 tq->tq_task.tqent_next = &tq->tq_task; 551 tq->tq_task.tqent_prev = &tq->tq_task; 552 553 return (0); 554 } 555 556 /*ARGSUSED*/ 557 static void 558 taskq_destructor(void *arg, void *obj) 559 { 560 taskq_t *tq = obj; 561 562 mutex_destroy(&tq->tq_lock); 563 rw_destroy(&tq->tq_threadlock); 564 cv_destroy(&tq->tq_dispatch_cv); 565 cv_destroy(&tq->tq_wait_cv); 566 } 567 568 /*ARGSUSED*/ 569 static int 570 taskq_ent_constructor(void *arg, void *obj, int kmflags) 571 { 572 taskq_ent_t *tqe = obj; 573 574 tqe->tqent_thread = NULL; 575 cv_init(&tqe->tqent_cv, NULL, CV_DEFAULT, NULL); 576 577 return (0); 578 } 579 580 /*ARGSUSED*/ 581 static void 582 taskq_ent_destructor(void *arg, void *obj) 583 { 584 taskq_ent_t *tqe = obj; 585 586 ASSERT(tqe->tqent_thread == NULL); 587 cv_destroy(&tqe->tqent_cv); 588 } 589 590 /* 591 * Create global system dynamic task queue. 592 */ 593 void 594 system_taskq_init(void) 595 { 596 system_taskq = taskq_create_common("system_taskq", 0, 597 system_taskq_size * max_ncpus, minclsyspri, 4, 512, 598 TASKQ_PREPOPULATE); 599 } 600 601 void 602 system_taskq_fini(void) 603 { 604 taskq_destroy(system_taskq); 605 } 606 607 void 608 taskq_init(void) 609 { 610 taskq_ent_cache = kmem_cache_create("taskq_ent_cache", 611 sizeof (taskq_ent_t), 0, taskq_ent_constructor, 612 taskq_ent_destructor, NULL, NULL, NULL, 0); 613 taskq_cache = kmem_cache_create("taskq_cache", sizeof (taskq_t), 614 0, taskq_constructor, taskq_destructor, NULL, NULL, NULL, 0); 615 system_taskq_init(); 616 } 617 618 void 619 taskq_fini(void) 620 { 621 system_taskq_fini(); 622 kmem_cache_destroy(taskq_cache); 623 kmem_cache_destroy(taskq_ent_cache); 624 } 625 626 /* 627 * taskq_ent_alloc() 628 * 629 * Allocates a new taskq_ent_t structure either from the free list or from the 630 * cache. Returns NULL if it can't be allocated. 631 * 632 * Assumes: tq->tq_lock is held. 633 */ 634 static taskq_ent_t * 635 taskq_ent_alloc(taskq_t *tq, int flags) 636 { 637 int kmflags = KM_NOSLEEP; 638 639 taskq_ent_t *tqe; 640 641 ASSERT(MUTEX_HELD(&tq->tq_lock)); 642 643 /* 644 * TQ_NOALLOC allocations are allowed to use the freelist, even if 645 * we are below tq_minalloc. 646 */ 647 if ((tqe = tq->tq_freelist) != NULL && 648 ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) { 649 tq->tq_freelist = tqe->tqent_next; 650 } else { 651 if (flags & TQ_NOALLOC) 652 return (NULL); 653 654 mutex_exit(&tq->tq_lock); 655 if (tq->tq_nalloc >= tq->tq_maxalloc) { 656 if (kmflags & KM_NOSLEEP) { 657 mutex_enter(&tq->tq_lock); 658 return (NULL); 659 } 660 /* 661 * We don't want to exceed tq_maxalloc, but we can't 662 * wait for other tasks to complete (and thus free up 663 * task structures) without risking deadlock with 664 * the caller. So, we just delay for one second 665 * to throttle the allocation rate. 666 */ 667 xdelay(hz); 668 } 669 tqe = kmem_cache_alloc(taskq_ent_cache, kmflags); 670 mutex_enter(&tq->tq_lock); 671 if (tqe != NULL) 672 tq->tq_nalloc++; 673 } 674 return (tqe); 675 } 676 677 /* 678 * taskq_ent_free() 679 * 680 * Free taskq_ent_t structure by either putting it on the free list or freeing 681 * it to the cache. 682 * 683 * Assumes: tq->tq_lock is held. 684 */ 685 static void 686 taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe) 687 { 688 ASSERT(MUTEX_HELD(&tq->tq_lock)); 689 690 if (tq->tq_nalloc <= tq->tq_minalloc) { 691 tqe->tqent_next = tq->tq_freelist; 692 tq->tq_freelist = tqe; 693 } else { 694 tq->tq_nalloc--; 695 mutex_exit(&tq->tq_lock); 696 kmem_cache_free(taskq_ent_cache, tqe); 697 mutex_enter(&tq->tq_lock); 698 } 699 } 700 701 /* 702 * Dispatch a task. 703 * 704 * Assumes: func != NULL 705 * 706 * Returns: NULL if dispatch failed. 707 * non-NULL if task dispatched successfully. 708 * Actual return value is the pointer to taskq entry that was used to 709 * dispatch a task. This is useful for debugging. 710 */ 711 /* ARGSUSED */ 712 taskqid_t 713 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 714 { 715 taskq_ent_t *tqe = NULL; 716 717 ASSERT(tq != NULL); 718 ASSERT(func != NULL); 719 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 720 721 /* 722 * TQ_NOQUEUE flag can't be used with non-dynamic task queues. 723 */ 724 #ifdef __NetBSD__ 725 /* 726 * Dynamic task queues didn't seem to get imported. Caller 727 * must be prepared to handle failure anyway, so just fail. 728 */ 729 if (flags & TQ_NOQUEUE) 730 return ((taskqid_t)NULL); 731 #endif 732 ASSERT(! (flags & TQ_NOQUEUE)); 733 734 /* 735 * Enqueue the task to the underlying queue. 736 */ 737 mutex_enter(&tq->tq_lock); 738 739 TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags); 740 741 if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) { 742 mutex_exit(&tq->tq_lock); 743 return ((taskqid_t)NULL); 744 } 745 TQ_ENQUEUE(tq, tqe, func, arg); 746 mutex_exit(&tq->tq_lock); 747 return ((taskqid_t)tqe); 748 } 749 750 /* 751 * Wait for all pending tasks to complete. 752 * Calling taskq_wait from a task will cause deadlock. 753 */ 754 void 755 taskq_wait(taskq_t *tq) 756 { 757 758 mutex_enter(&tq->tq_lock); 759 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 760 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 761 mutex_exit(&tq->tq_lock); 762 } 763 764 /* 765 * Suspend execution of tasks. 766 * 767 * Tasks in the queue part will be suspended immediately upon return from this 768 * function. Pending tasks in the dynamic part will continue to execute, but all 769 * new tasks will be suspended. 770 */ 771 void 772 taskq_suspend(taskq_t *tq) 773 { 774 rw_enter(&tq->tq_threadlock, RW_WRITER); 775 776 /* 777 * Mark task queue as being suspended. Needed for taskq_suspended(). 778 */ 779 mutex_enter(&tq->tq_lock); 780 ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED)); 781 tq->tq_flags |= TASKQ_SUSPENDED; 782 mutex_exit(&tq->tq_lock); 783 } 784 785 /* 786 * returns: 1 if tq is suspended, 0 otherwise. 787 */ 788 int 789 taskq_suspended(taskq_t *tq) 790 { 791 return ((tq->tq_flags & TASKQ_SUSPENDED) != 0); 792 } 793 794 /* 795 * Resume taskq execution. 796 */ 797 void 798 taskq_resume(taskq_t *tq) 799 { 800 ASSERT(RW_WRITE_HELD(&tq->tq_threadlock)); 801 802 mutex_enter(&tq->tq_lock); 803 ASSERT(tq->tq_flags & TASKQ_SUSPENDED); 804 tq->tq_flags &= ~TASKQ_SUSPENDED; 805 mutex_exit(&tq->tq_lock); 806 807 rw_exit(&tq->tq_threadlock); 808 } 809 810 int 811 taskq_member(taskq_t *tq, kthread_t *thread) 812 { 813 if (tq->tq_nthreads == 1) 814 return (tq->tq_thread == thread); 815 else { 816 int i, found = 0; 817 818 mutex_enter(&tq->tq_lock); 819 for (i = 0; i < tq->tq_nthreads; i++) { 820 if (tq->tq_threadlist[i] == thread) { 821 found = 1; 822 break; 823 } 824 } 825 mutex_exit(&tq->tq_lock); 826 return (found); 827 } 828 } 829 830 /* 831 * Worker thread for processing task queue. 832 */ 833 static void 834 taskq_thread(void *arg) 835 { 836 taskq_t *tq = arg; 837 taskq_ent_t *tqe; 838 callb_cpr_t cprinfo; 839 hrtime_t start, end; 840 841 CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr, tq->tq_name); 842 843 mutex_enter(&tq->tq_lock); 844 while (tq->tq_flags & TASKQ_ACTIVE) { 845 if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) { 846 if (--tq->tq_active == 0) 847 cv_broadcast(&tq->tq_wait_cv); 848 if (tq->tq_flags & TASKQ_CPR_SAFE) { 849 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 850 } else { 851 CALLB_CPR_SAFE_BEGIN(&cprinfo); 852 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 853 CALLB_CPR_SAFE_END(&cprinfo, &tq->tq_lock); 854 } 855 tq->tq_active++; 856 continue; 857 } 858 tqe->tqent_prev->tqent_next = tqe->tqent_next; 859 tqe->tqent_next->tqent_prev = tqe->tqent_prev; 860 mutex_exit(&tq->tq_lock); 861 862 rw_enter(&tq->tq_threadlock, RW_READER); 863 start = gethrtime(); 864 DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq, 865 taskq_ent_t *, tqe); 866 tqe->tqent_func(tqe->tqent_arg); 867 DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq, 868 taskq_ent_t *, tqe); 869 end = gethrtime(); 870 rw_exit(&tq->tq_threadlock); 871 872 mutex_enter(&tq->tq_lock); 873 tq->tq_totaltime += end - start; 874 tq->tq_executed++; 875 876 taskq_ent_free(tq, tqe); 877 } 878 tq->tq_nthreads--; 879 cv_broadcast(&tq->tq_wait_cv); 880 ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE)); 881 CALLB_CPR_EXIT(&cprinfo); 882 thread_exit(); 883 } 884 885 /* 886 * Taskq creation. May sleep for memory. 887 * Always use automatically generated instances to avoid kstat name space 888 * collisions. 889 */ 890 891 taskq_t * 892 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, 893 int maxalloc, uint_t flags) 894 { 895 return taskq_create_common(name, 0, nthreads, pri, minalloc, 896 maxalloc, flags | TASKQ_NOINSTANCE); 897 } 898 899 static taskq_t * 900 taskq_create_common(const char *name, int instance, int nthreads, pri_t pri, 901 int minalloc, int maxalloc, uint_t flags) 902 { 903 taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_NOSLEEP); 904 uint_t ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus); 905 uint_t bsize; /* # of buckets - always power of 2 */ 906 907 ASSERT(instance == 0); 908 ASSERT(!ISSET(flags, TASKQ_CPR_SAFE)); 909 ASSERT(!ISSET(flags, TASKQ_DYNAMIC)); 910 911 /* 912 * TASKQ_CPR_SAFE and TASKQ_DYNAMIC flags are mutually exclusive. 913 */ 914 ASSERT((flags & (TASKQ_DYNAMIC | TASKQ_CPR_SAFE)) != 915 ((TASKQ_DYNAMIC | TASKQ_CPR_SAFE))); 916 917 ASSERT(tq->tq_buckets == NULL); 918 919 bsize = 1 << (highbit(ncpus) - 1); 920 ASSERT(bsize >= 1); 921 bsize = MIN(bsize, taskq_maxbuckets); 922 923 ASSERT(!(flags & TASKQ_DYNAMIC)); 924 if (flags & TASKQ_THREADS_CPU_PCT) 925 /* nthreads is % of CPUs we want to use. */ 926 nthreads = (ncpus*nthreads)/100; 927 928 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); 929 tq->tq_name[TASKQ_NAMELEN] = '\0'; 930 /* Make sure the name conforms to the rules for C indentifiers */ 931 strident_canon(tq->tq_name, TASKQ_NAMELEN); 932 933 tq->tq_flags = flags | TASKQ_ACTIVE; 934 tq->tq_active = nthreads; 935 tq->tq_nthreads = nthreads; 936 tq->tq_minalloc = minalloc; 937 tq->tq_maxalloc = maxalloc; 938 tq->tq_nbuckets = bsize; 939 tq->tq_pri = pri; 940 941 if (flags & TASKQ_PREPOPULATE) { 942 mutex_enter(&tq->tq_lock); 943 while (minalloc-- > 0) 944 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 945 mutex_exit(&tq->tq_lock); 946 } 947 948 if (nthreads == 1) { 949 tq->tq_thread = thread_create(NULL, 0, taskq_thread, tq, 950 0, NULL, TS_RUN, pri); 951 } else { 952 kthread_t **tpp = kmem_alloc(sizeof (kthread_t *) * nthreads, 953 KM_SLEEP); 954 955 tq->tq_threadlist = tpp; 956 957 mutex_enter(&tq->tq_lock); 958 while (nthreads-- > 0) { 959 *tpp = thread_create(NULL, 0, taskq_thread, tq, 960 0, NULL, TS_RUN, pri); 961 tpp++; 962 } 963 mutex_exit(&tq->tq_lock); 964 } 965 966 return (tq); 967 } 968 969 /* 970 * taskq_destroy(). 971 * 972 * Assumes: by the time taskq_destroy is called no one will use this task queue 973 * in any way and no one will try to dispatch entries in it. 974 */ 975 void 976 taskq_destroy(taskq_t *tq) 977 { 978 taskq_bucket_t *b = tq->tq_buckets; 979 int bid = 0; 980 981 ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE)); 982 983 /* 984 * Wait for any pending entries to complete. 985 */ 986 taskq_wait(tq); 987 988 mutex_enter(&tq->tq_lock); 989 ASSERT((tq->tq_task.tqent_next == &tq->tq_task) && 990 (tq->tq_active == 0)); 991 992 if ((tq->tq_nthreads > 1) && (tq->tq_threadlist != NULL)) 993 kmem_free(tq->tq_threadlist, sizeof (kthread_t *) * 994 tq->tq_nthreads); 995 996 tq->tq_flags &= ~TASKQ_ACTIVE; 997 cv_broadcast(&tq->tq_dispatch_cv); 998 while (tq->tq_nthreads != 0) 999 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 1000 1001 tq->tq_minalloc = 0; 1002 while (tq->tq_nalloc != 0) 1003 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 1004 1005 mutex_exit(&tq->tq_lock); 1006 1007 /* 1008 * Mark each bucket as closing and wakeup all sleeping threads. 1009 */ 1010 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 1011 taskq_ent_t *tqe; 1012 1013 mutex_enter(&b->tqbucket_lock); 1014 1015 b->tqbucket_flags |= TQBUCKET_CLOSE; 1016 /* Wakeup all sleeping threads */ 1017 1018 for (tqe = b->tqbucket_freelist.tqent_next; 1019 tqe != &b->tqbucket_freelist; tqe = tqe->tqent_next) 1020 cv_signal(&tqe->tqent_cv); 1021 1022 ASSERT(b->tqbucket_nalloc == 0); 1023 1024 /* 1025 * At this point we waited for all pending jobs to complete (in 1026 * both the task queue and the bucket and no new jobs should 1027 * arrive. Wait for all threads to die. 1028 */ 1029 while (b->tqbucket_nfree > 0) 1030 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock); 1031 mutex_exit(&b->tqbucket_lock); 1032 mutex_destroy(&b->tqbucket_lock); 1033 cv_destroy(&b->tqbucket_cv); 1034 } 1035 1036 if (tq->tq_buckets != NULL) { 1037 ASSERT(tq->tq_flags & TASKQ_DYNAMIC); 1038 kmem_free(tq->tq_buckets, 1039 sizeof (taskq_bucket_t) * tq->tq_nbuckets); 1040 1041 /* Cleanup fields before returning tq to the cache */ 1042 tq->tq_buckets = NULL; 1043 tq->tq_tcreates = 0; 1044 tq->tq_tdeaths = 0; 1045 } else { 1046 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 1047 } 1048 1049 tq->tq_totaltime = 0; 1050 tq->tq_tasks = 0; 1051 tq->tq_maxtasks = 0; 1052 tq->tq_executed = 0; 1053 kmem_cache_free(taskq_cache, tq); 1054 } 1055