1 /* 2 * Copyright (c) 2009 Pawel Jakub Dawidek <pjd@FreeBSD.org> 3 * All rights reserved. 4 * 5 * Copyright (c) 2012 Spectra Logic Corporation. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/kernel.h> 31 #include <sys/kmem.h> 32 #include <sys/lock.h> 33 #include <sys/mutex.h> 34 #include <sys/queue.h> 35 #include <sys/taskq.h> 36 #include <sys/taskqueue.h> 37 #include <sys/zfs_context.h> 38 39 #if defined(__i386__) || defined(__amd64__) || defined(__aarch64__) 40 #include <machine/pcb.h> 41 #endif 42 43 #include <vm/uma.h> 44 45 static uint_t taskq_tsd; 46 static uma_zone_t taskq_zone; 47 48 /* 49 * Global system-wide dynamic task queue available for all consumers. This 50 * taskq is not intended for long-running tasks; instead, a dedicated taskq 51 * should be created. 52 */ 53 taskq_t *system_taskq = NULL; 54 taskq_t *system_delay_taskq = NULL; 55 taskq_t *dynamic_taskq = NULL; 56 57 proc_t *system_proc; 58 59 static MALLOC_DEFINE(M_TASKQ, "taskq", "taskq structures"); 60 61 static LIST_HEAD(tqenthashhead, taskq_ent) *tqenthashtbl; 62 static unsigned long tqenthash; 63 static unsigned long tqenthashlock; 64 static struct sx *tqenthashtbl_lock; 65 66 static taskqid_t tqidnext; 67 68 #define TQIDHASH(tqid) (&tqenthashtbl[(tqid) & tqenthash]) 69 #define TQIDHASHLOCK(tqid) (&tqenthashtbl_lock[((tqid) & tqenthashlock)]) 70 71 #define NORMAL_TASK 0 72 #define TIMEOUT_TASK 1 73 74 static void 75 system_taskq_init(void *arg) 76 { 77 int i; 78 79 tsd_create(&taskq_tsd, NULL); 80 tqenthashtbl = hashinit(mp_ncpus * 8, M_TASKQ, &tqenthash); 81 tqenthashlock = (tqenthash + 1) / 8; 82 if (tqenthashlock > 0) 83 tqenthashlock--; 84 tqenthashtbl_lock = 85 malloc(sizeof (*tqenthashtbl_lock) * (tqenthashlock + 1), 86 M_TASKQ, M_WAITOK | M_ZERO); 87 for (i = 0; i < tqenthashlock + 1; i++) 88 sx_init_flags(&tqenthashtbl_lock[i], "tqenthash", SX_DUPOK); 89 taskq_zone = uma_zcreate("taskq_zone", sizeof (taskq_ent_t), 90 NULL, NULL, NULL, NULL, 91 UMA_ALIGN_CACHE, 0); 92 system_taskq = taskq_create("system_taskq", mp_ncpus, minclsyspri, 93 0, 0, 0); 94 system_delay_taskq = taskq_create("system_delay_taskq", mp_ncpus, 95 minclsyspri, 0, 0, 0); 96 } 97 SYSINIT(system_taskq_init, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_init, 98 NULL); 99 100 static void 101 system_taskq_fini(void *arg) 102 { 103 int i; 104 105 taskq_destroy(system_delay_taskq); 106 taskq_destroy(system_taskq); 107 uma_zdestroy(taskq_zone); 108 tsd_destroy(&taskq_tsd); 109 for (i = 0; i < tqenthashlock + 1; i++) 110 sx_destroy(&tqenthashtbl_lock[i]); 111 for (i = 0; i < tqenthash + 1; i++) 112 VERIFY(LIST_EMPTY(&tqenthashtbl[i])); 113 free(tqenthashtbl_lock, M_TASKQ); 114 free(tqenthashtbl, M_TASKQ); 115 } 116 SYSUNINIT(system_taskq_fini, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_fini, 117 NULL); 118 119 #ifdef __LP64__ 120 static taskqid_t 121 __taskq_genid(void) 122 { 123 taskqid_t tqid; 124 125 /* 126 * Assume a 64-bit counter will not wrap in practice. 127 */ 128 tqid = atomic_add_64_nv(&tqidnext, 1); 129 VERIFY(tqid); 130 return (tqid); 131 } 132 #else 133 static taskqid_t 134 __taskq_genid(void) 135 { 136 taskqid_t tqid; 137 138 for (;;) { 139 tqid = atomic_add_32_nv(&tqidnext, 1); 140 if (__predict_true(tqid != 0)) 141 break; 142 } 143 VERIFY(tqid); 144 return (tqid); 145 } 146 #endif 147 148 static taskq_ent_t * 149 taskq_lookup(taskqid_t tqid) 150 { 151 taskq_ent_t *ent = NULL; 152 153 if (tqid == 0) 154 return (NULL); 155 sx_slock(TQIDHASHLOCK(tqid)); 156 LIST_FOREACH(ent, TQIDHASH(tqid), tqent_hash) { 157 if (ent->tqent_id == tqid) 158 break; 159 } 160 if (ent != NULL) 161 refcount_acquire(&ent->tqent_rc); 162 sx_sunlock(TQIDHASHLOCK(tqid)); 163 return (ent); 164 } 165 166 static taskqid_t 167 taskq_insert(taskq_ent_t *ent) 168 { 169 taskqid_t tqid = __taskq_genid(); 170 171 ent->tqent_id = tqid; 172 sx_xlock(TQIDHASHLOCK(tqid)); 173 LIST_INSERT_HEAD(TQIDHASH(tqid), ent, tqent_hash); 174 sx_xunlock(TQIDHASHLOCK(tqid)); 175 return (tqid); 176 } 177 178 static void 179 taskq_remove(taskq_ent_t *ent) 180 { 181 taskqid_t tqid = ent->tqent_id; 182 183 if (tqid == 0) 184 return; 185 sx_xlock(TQIDHASHLOCK(tqid)); 186 if (ent->tqent_id != 0) { 187 LIST_REMOVE(ent, tqent_hash); 188 ent->tqent_id = 0; 189 } 190 sx_xunlock(TQIDHASHLOCK(tqid)); 191 } 192 193 static void 194 taskq_tsd_set(void *context) 195 { 196 taskq_t *tq = context; 197 198 #if defined(__amd64__) || defined(__aarch64__) 199 if (context != NULL && tsd_get(taskq_tsd) == NULL) 200 fpu_kern_thread(FPU_KERN_NORMAL); 201 #endif 202 tsd_set(taskq_tsd, tq); 203 } 204 205 static taskq_t * 206 taskq_create_impl(const char *name, int nthreads, pri_t pri, 207 proc_t *proc __maybe_unused, uint_t flags) 208 { 209 taskq_t *tq; 210 211 if ((flags & TASKQ_THREADS_CPU_PCT) != 0) 212 nthreads = MAX((mp_ncpus * nthreads) / 100, 1); 213 214 tq = kmem_alloc(sizeof (*tq), KM_SLEEP); 215 tq->tq_nthreads = nthreads; 216 tq->tq_queue = taskqueue_create(name, M_WAITOK, 217 taskqueue_thread_enqueue, &tq->tq_queue); 218 taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT, 219 taskq_tsd_set, tq); 220 taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN, 221 taskq_tsd_set, NULL); 222 (void) taskqueue_start_threads_in_proc(&tq->tq_queue, nthreads, pri, 223 proc, "%s", name); 224 225 return ((taskq_t *)tq); 226 } 227 228 taskq_t * 229 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc __unused, 230 int maxalloc __unused, uint_t flags) 231 { 232 return (taskq_create_impl(name, nthreads, pri, system_proc, flags)); 233 } 234 235 taskq_t * 236 taskq_create_proc(const char *name, int nthreads, pri_t pri, 237 int minalloc __unused, int maxalloc __unused, proc_t *proc, uint_t flags) 238 { 239 return (taskq_create_impl(name, nthreads, pri, proc, flags)); 240 } 241 242 void 243 taskq_destroy(taskq_t *tq) 244 { 245 246 taskqueue_free(tq->tq_queue); 247 kmem_free(tq, sizeof (*tq)); 248 } 249 250 static void taskq_sync_assign(void *arg); 251 252 typedef struct taskq_sync_arg { 253 kthread_t *tqa_thread; 254 kcondvar_t tqa_cv; 255 kmutex_t tqa_lock; 256 int tqa_ready; 257 } taskq_sync_arg_t; 258 259 static void 260 taskq_sync_assign(void *arg) 261 { 262 taskq_sync_arg_t *tqa = arg; 263 264 mutex_enter(&tqa->tqa_lock); 265 tqa->tqa_thread = curthread; 266 tqa->tqa_ready = 1; 267 cv_signal(&tqa->tqa_cv); 268 while (tqa->tqa_ready == 1) 269 cv_wait(&tqa->tqa_cv, &tqa->tqa_lock); 270 mutex_exit(&tqa->tqa_lock); 271 } 272 273 /* 274 * Create a taskq with a specified number of pool threads. Allocate 275 * and return an array of nthreads kthread_t pointers, one for each 276 * thread in the pool. The array is not ordered and must be freed 277 * by the caller. 278 */ 279 taskq_t * 280 taskq_create_synced(const char *name, int nthreads, pri_t pri, 281 int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp) 282 { 283 taskq_t *tq; 284 taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP); 285 kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads, 286 KM_SLEEP); 287 288 flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH); 289 290 tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX, 291 flags | TASKQ_PREPOPULATE); 292 VERIFY(tq != NULL); 293 VERIFY(tq->tq_nthreads == nthreads); 294 295 /* spawn all syncthreads */ 296 for (int i = 0; i < nthreads; i++) { 297 cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL); 298 mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL); 299 (void) taskq_dispatch(tq, taskq_sync_assign, 300 &tqs[i], TQ_FRONT); 301 } 302 303 /* wait on all syncthreads to start */ 304 for (int i = 0; i < nthreads; i++) { 305 mutex_enter(&tqs[i].tqa_lock); 306 while (tqs[i].tqa_ready == 0) 307 cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock); 308 mutex_exit(&tqs[i].tqa_lock); 309 } 310 311 /* let all syncthreads resume, finish */ 312 for (int i = 0; i < nthreads; i++) { 313 mutex_enter(&tqs[i].tqa_lock); 314 tqs[i].tqa_ready = 2; 315 cv_broadcast(&tqs[i].tqa_cv); 316 mutex_exit(&tqs[i].tqa_lock); 317 } 318 taskq_wait(tq); 319 320 for (int i = 0; i < nthreads; i++) { 321 kthreads[i] = tqs[i].tqa_thread; 322 mutex_destroy(&tqs[i].tqa_lock); 323 cv_destroy(&tqs[i].tqa_cv); 324 } 325 kmem_free(tqs, sizeof (*tqs) * nthreads); 326 327 *ktpp = kthreads; 328 return (tq); 329 } 330 331 int 332 taskq_member(taskq_t *tq, kthread_t *thread) 333 { 334 335 return (taskqueue_member(tq->tq_queue, thread)); 336 } 337 338 taskq_t * 339 taskq_of_curthread(void) 340 { 341 return (tsd_get(taskq_tsd)); 342 } 343 344 static void 345 taskq_free(taskq_ent_t *task) 346 { 347 taskq_remove(task); 348 if (refcount_release(&task->tqent_rc)) 349 uma_zfree(taskq_zone, task); 350 } 351 352 int 353 taskq_cancel_id(taskq_t *tq, taskqid_t tid) 354 { 355 uint32_t pend; 356 int rc; 357 taskq_ent_t *ent; 358 359 if ((ent = taskq_lookup(tid)) == NULL) 360 return (ENOENT); 361 362 if (ent->tqent_type == NORMAL_TASK) { 363 rc = taskqueue_cancel(tq->tq_queue, &ent->tqent_task, &pend); 364 if (rc == EBUSY) 365 taskqueue_drain(tq->tq_queue, &ent->tqent_task); 366 } else { 367 rc = taskqueue_cancel_timeout(tq->tq_queue, 368 &ent->tqent_timeout_task, &pend); 369 if (rc == EBUSY) { 370 taskqueue_drain_timeout(tq->tq_queue, 371 &ent->tqent_timeout_task); 372 } 373 } 374 if (pend) { 375 /* 376 * Tasks normally free themselves when run, but here the task 377 * was cancelled so it did not free itself. 378 */ 379 taskq_free(ent); 380 } 381 /* Free the extra reference we added with taskq_lookup. */ 382 taskq_free(ent); 383 return (pend ? 0 : ENOENT); 384 } 385 386 static void 387 taskq_run(void *arg, int pending) 388 { 389 taskq_ent_t *task = arg; 390 391 if (pending == 0) 392 return; 393 task->tqent_func(task->tqent_arg); 394 taskq_free(task); 395 } 396 397 taskqid_t 398 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, 399 uint_t flags, clock_t expire_time) 400 { 401 taskq_ent_t *task; 402 taskqid_t tqid; 403 clock_t timo; 404 int mflag; 405 406 timo = expire_time - ddi_get_lbolt(); 407 if (timo <= 0) 408 return (taskq_dispatch(tq, func, arg, flags)); 409 410 if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP) 411 mflag = M_WAITOK; 412 else 413 mflag = M_NOWAIT; 414 415 task = uma_zalloc(taskq_zone, mflag); 416 if (task == NULL) 417 return (0); 418 task->tqent_func = func; 419 task->tqent_arg = arg; 420 task->tqent_type = TIMEOUT_TASK; 421 refcount_init(&task->tqent_rc, 1); 422 tqid = taskq_insert(task); 423 TIMEOUT_TASK_INIT(tq->tq_queue, &task->tqent_timeout_task, 0, 424 taskq_run, task); 425 426 taskqueue_enqueue_timeout(tq->tq_queue, &task->tqent_timeout_task, 427 timo); 428 return (tqid); 429 } 430 431 taskqid_t 432 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 433 { 434 taskq_ent_t *task; 435 int mflag, prio; 436 taskqid_t tqid; 437 438 if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP) 439 mflag = M_WAITOK; 440 else 441 mflag = M_NOWAIT; 442 /* 443 * If TQ_FRONT is given, we want higher priority for this task, so it 444 * can go at the front of the queue. 445 */ 446 prio = !!(flags & TQ_FRONT); 447 448 task = uma_zalloc(taskq_zone, mflag); 449 if (task == NULL) 450 return (0); 451 refcount_init(&task->tqent_rc, 1); 452 task->tqent_func = func; 453 task->tqent_arg = arg; 454 task->tqent_type = NORMAL_TASK; 455 tqid = taskq_insert(task); 456 TASK_INIT(&task->tqent_task, prio, taskq_run, task); 457 taskqueue_enqueue(tq->tq_queue, &task->tqent_task); 458 return (tqid); 459 } 460 461 static void 462 taskq_run_ent(void *arg, int pending) 463 { 464 taskq_ent_t *task = arg; 465 466 if (pending == 0) 467 return; 468 task->tqent_func(task->tqent_arg); 469 } 470 471 void 472 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint32_t flags, 473 taskq_ent_t *task) 474 { 475 /* 476 * If TQ_FRONT is given, we want higher priority for this task, so it 477 * can go at the front of the queue. 478 */ 479 task->tqent_task.ta_priority = !!(flags & TQ_FRONT); 480 task->tqent_func = func; 481 task->tqent_arg = arg; 482 taskqueue_enqueue(tq->tq_queue, &task->tqent_task); 483 } 484 485 void 486 taskq_init_ent(taskq_ent_t *task) 487 { 488 TASK_INIT(&task->tqent_task, 0, taskq_run_ent, task); 489 task->tqent_func = NULL; 490 task->tqent_arg = NULL; 491 task->tqent_id = 0; 492 task->tqent_type = NORMAL_TASK; 493 task->tqent_rc = 0; 494 } 495 496 int 497 taskq_empty_ent(taskq_ent_t *task) 498 { 499 return (task->tqent_task.ta_pending == 0); 500 } 501 502 void 503 taskq_wait(taskq_t *tq) 504 { 505 taskqueue_quiesce(tq->tq_queue); 506 } 507 508 void 509 taskq_wait_id(taskq_t *tq, taskqid_t tid) 510 { 511 taskq_ent_t *ent; 512 513 if ((ent = taskq_lookup(tid)) == NULL) 514 return; 515 516 if (ent->tqent_type == NORMAL_TASK) 517 taskqueue_drain(tq->tq_queue, &ent->tqent_task); 518 else 519 taskqueue_drain_timeout(tq->tq_queue, &ent->tqent_timeout_task); 520 taskq_free(ent); 521 } 522 523 void 524 taskq_wait_outstanding(taskq_t *tq, taskqid_t id __unused) 525 { 526 taskqueue_drain_all(tq->tq_queue); 527 } 528