1789Sahrens /* 2789Sahrens * CDDL HEADER START 3789Sahrens * 4789Sahrens * The contents of this file are subject to the terms of the 54831Sgw25295 * Common Development and Distribution License (the "License"). 64831Sgw25295 * You may not use this file except in compliance with the License. 7789Sahrens * 8789Sahrens * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9789Sahrens * or http://www.opensolaris.org/os/licensing. 10789Sahrens * See the License for the specific language governing permissions 11789Sahrens * and limitations under the License. 12789Sahrens * 13789Sahrens * When distributing Covered Code, include this CDDL HEADER in each 14789Sahrens * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15789Sahrens * If applicable, add the following below this CDDL HEADER, with the 16789Sahrens * fields enclosed by brackets "[]" replaced with your own identifying 17789Sahrens * information: Portions Copyright [yyyy] [name of copyright owner] 18789Sahrens * 19789Sahrens * CDDL HEADER END 20789Sahrens */ 21789Sahrens /* 22*7837SMatthew.Ahrens@Sun.COM * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23789Sahrens * Use is subject to license terms. 24789Sahrens */ 25789Sahrens 26789Sahrens #include <sys/zfs_context.h> 27789Sahrens 28789Sahrens int taskq_now; 29*7837SMatthew.Ahrens@Sun.COM taskq_t *system_taskq; 30789Sahrens 31789Sahrens typedef struct task { 32789Sahrens struct task *task_next; 33789Sahrens struct task *task_prev; 34789Sahrens task_func_t *task_func; 35789Sahrens void *task_arg; 36789Sahrens } task_t; 37789Sahrens 38789Sahrens #define TASKQ_ACTIVE 0x00010000 39789Sahrens 40789Sahrens struct taskq { 41789Sahrens kmutex_t tq_lock; 42789Sahrens krwlock_t tq_threadlock; 43789Sahrens kcondvar_t tq_dispatch_cv; 44789Sahrens kcondvar_t tq_wait_cv; 45789Sahrens thread_t *tq_threadlist; 46789Sahrens int tq_flags; 47789Sahrens int tq_active; 48789Sahrens int tq_nthreads; 49789Sahrens int tq_nalloc; 50789Sahrens int tq_minalloc; 51789Sahrens int tq_maxalloc; 52789Sahrens task_t *tq_freelist; 53789Sahrens task_t tq_task; 54789Sahrens }; 55789Sahrens 56789Sahrens static task_t * 57789Sahrens task_alloc(taskq_t *tq, int tqflags) 58789Sahrens { 59789Sahrens task_t *t; 60789Sahrens 61789Sahrens if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 62789Sahrens tq->tq_freelist = t->task_next; 63789Sahrens } else { 64789Sahrens mutex_exit(&tq->tq_lock); 65789Sahrens if (tq->tq_nalloc >= tq->tq_maxalloc) { 66789Sahrens if (!(tqflags & KM_SLEEP)) { 67789Sahrens mutex_enter(&tq->tq_lock); 68789Sahrens return (NULL); 69789Sahrens } 70789Sahrens /* 71789Sahrens * We don't want to exceed tq_maxalloc, but we can't 72789Sahrens * wait for other tasks to complete (and thus free up 73789Sahrens * task structures) without risking deadlock with 74789Sahrens * the caller. So, we just delay for one second 75789Sahrens * to throttle the allocation rate. 76789Sahrens */ 77789Sahrens delay(hz); 78789Sahrens } 79789Sahrens t = kmem_alloc(sizeof (task_t), tqflags); 80789Sahrens mutex_enter(&tq->tq_lock); 81789Sahrens if (t != NULL) 82789Sahrens tq->tq_nalloc++; 83789Sahrens } 84789Sahrens return (t); 85789Sahrens } 86789Sahrens 87789Sahrens static void 88789Sahrens task_free(taskq_t *tq, task_t *t) 89789Sahrens { 90789Sahrens if (tq->tq_nalloc <= tq->tq_minalloc) { 91789Sahrens t->task_next = tq->tq_freelist; 92789Sahrens tq->tq_freelist = t; 93789Sahrens } else { 94789Sahrens tq->tq_nalloc--; 95789Sahrens mutex_exit(&tq->tq_lock); 96789Sahrens kmem_free(t, sizeof (task_t)); 97789Sahrens mutex_enter(&tq->tq_lock); 98789Sahrens } 99789Sahrens } 100789Sahrens 101789Sahrens taskqid_t 102789Sahrens taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 103789Sahrens { 104789Sahrens task_t *t; 105789Sahrens 106789Sahrens if (taskq_now) { 107789Sahrens func(arg); 108789Sahrens return (1); 109789Sahrens } 110789Sahrens 111789Sahrens mutex_enter(&tq->tq_lock); 112789Sahrens ASSERT(tq->tq_flags & TASKQ_ACTIVE); 113789Sahrens if ((t = task_alloc(tq, tqflags)) == NULL) { 114789Sahrens mutex_exit(&tq->tq_lock); 115789Sahrens return (0); 116789Sahrens } 117789Sahrens t->task_next = &tq->tq_task; 118789Sahrens t->task_prev = tq->tq_task.task_prev; 119789Sahrens t->task_next->task_prev = t; 120789Sahrens t->task_prev->task_next = t; 121789Sahrens t->task_func = func; 122789Sahrens t->task_arg = arg; 123789Sahrens cv_signal(&tq->tq_dispatch_cv); 124789Sahrens mutex_exit(&tq->tq_lock); 125789Sahrens return (1); 126789Sahrens } 127789Sahrens 128789Sahrens void 129789Sahrens taskq_wait(taskq_t *tq) 130789Sahrens { 131789Sahrens mutex_enter(&tq->tq_lock); 132789Sahrens while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0) 133789Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 134789Sahrens mutex_exit(&tq->tq_lock); 135789Sahrens } 136789Sahrens 137789Sahrens static void * 138789Sahrens taskq_thread(void *arg) 139789Sahrens { 140789Sahrens taskq_t *tq = arg; 141789Sahrens task_t *t; 142789Sahrens 143789Sahrens mutex_enter(&tq->tq_lock); 144789Sahrens while (tq->tq_flags & TASKQ_ACTIVE) { 145789Sahrens if ((t = tq->tq_task.task_next) == &tq->tq_task) { 146789Sahrens if (--tq->tq_active == 0) 147789Sahrens cv_broadcast(&tq->tq_wait_cv); 148789Sahrens cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 149789Sahrens tq->tq_active++; 150789Sahrens continue; 151789Sahrens } 152789Sahrens t->task_prev->task_next = t->task_next; 153789Sahrens t->task_next->task_prev = t->task_prev; 154789Sahrens mutex_exit(&tq->tq_lock); 155789Sahrens 156789Sahrens rw_enter(&tq->tq_threadlock, RW_READER); 157789Sahrens t->task_func(t->task_arg); 158789Sahrens rw_exit(&tq->tq_threadlock); 159789Sahrens 160789Sahrens mutex_enter(&tq->tq_lock); 161789Sahrens task_free(tq, t); 162789Sahrens } 163789Sahrens tq->tq_nthreads--; 164789Sahrens cv_broadcast(&tq->tq_wait_cv); 165789Sahrens mutex_exit(&tq->tq_lock); 166789Sahrens return (NULL); 167789Sahrens } 168789Sahrens 169789Sahrens /*ARGSUSED*/ 170789Sahrens taskq_t * 171789Sahrens taskq_create(const char *name, int nthreads, pri_t pri, 172789Sahrens int minalloc, int maxalloc, uint_t flags) 173789Sahrens { 174789Sahrens taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 175789Sahrens int t; 176789Sahrens 177789Sahrens rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 1784831Sgw25295 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 1794831Sgw25295 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 1804831Sgw25295 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 181789Sahrens tq->tq_flags = flags | TASKQ_ACTIVE; 182789Sahrens tq->tq_active = nthreads; 183789Sahrens tq->tq_nthreads = nthreads; 184789Sahrens tq->tq_minalloc = minalloc; 185789Sahrens tq->tq_maxalloc = maxalloc; 186789Sahrens tq->tq_task.task_next = &tq->tq_task; 187789Sahrens tq->tq_task.task_prev = &tq->tq_task; 188789Sahrens tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 189789Sahrens 190789Sahrens if (flags & TASKQ_PREPOPULATE) { 191789Sahrens mutex_enter(&tq->tq_lock); 192789Sahrens while (minalloc-- > 0) 193789Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 194789Sahrens mutex_exit(&tq->tq_lock); 195789Sahrens } 196789Sahrens 197789Sahrens for (t = 0; t < nthreads; t++) 198789Sahrens (void) thr_create(0, 0, taskq_thread, 199789Sahrens tq, THR_BOUND, &tq->tq_threadlist[t]); 200789Sahrens 201789Sahrens return (tq); 202789Sahrens } 203789Sahrens 204789Sahrens void 205789Sahrens taskq_destroy(taskq_t *tq) 206789Sahrens { 207789Sahrens int t; 208789Sahrens int nthreads = tq->tq_nthreads; 209789Sahrens 210789Sahrens taskq_wait(tq); 211789Sahrens 212789Sahrens mutex_enter(&tq->tq_lock); 213789Sahrens 214789Sahrens tq->tq_flags &= ~TASKQ_ACTIVE; 215789Sahrens cv_broadcast(&tq->tq_dispatch_cv); 216789Sahrens 217789Sahrens while (tq->tq_nthreads != 0) 218789Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 219789Sahrens 220789Sahrens tq->tq_minalloc = 0; 221789Sahrens while (tq->tq_nalloc != 0) { 222789Sahrens ASSERT(tq->tq_freelist != NULL); 223789Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 224789Sahrens } 225789Sahrens 226789Sahrens mutex_exit(&tq->tq_lock); 227789Sahrens 228789Sahrens for (t = 0; t < nthreads; t++) 229789Sahrens (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 230789Sahrens 231789Sahrens kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 232789Sahrens 233789Sahrens rw_destroy(&tq->tq_threadlock); 2344831Sgw25295 mutex_destroy(&tq->tq_lock); 2354831Sgw25295 cv_destroy(&tq->tq_dispatch_cv); 2364831Sgw25295 cv_destroy(&tq->tq_wait_cv); 237789Sahrens 238789Sahrens kmem_free(tq, sizeof (taskq_t)); 239789Sahrens } 240789Sahrens 241789Sahrens int 242789Sahrens taskq_member(taskq_t *tq, void *t) 243789Sahrens { 244789Sahrens int i; 245789Sahrens 246789Sahrens if (taskq_now) 247789Sahrens return (1); 248789Sahrens 249789Sahrens for (i = 0; i < tq->tq_nthreads; i++) 250789Sahrens if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 251789Sahrens return (1); 252789Sahrens 253789Sahrens return (0); 254789Sahrens } 255*7837SMatthew.Ahrens@Sun.COM 256*7837SMatthew.Ahrens@Sun.COM void 257*7837SMatthew.Ahrens@Sun.COM system_taskq_init(void) 258*7837SMatthew.Ahrens@Sun.COM { 259*7837SMatthew.Ahrens@Sun.COM system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 260*7837SMatthew.Ahrens@Sun.COM TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 261*7837SMatthew.Ahrens@Sun.COM } 262