1*789Sahrens /* 2*789Sahrens * CDDL HEADER START 3*789Sahrens * 4*789Sahrens * The contents of this file are subject to the terms of the 5*789Sahrens * Common Development and Distribution License, Version 1.0 only 6*789Sahrens * (the "License"). You may not use this file except in compliance 7*789Sahrens * with the License. 8*789Sahrens * 9*789Sahrens * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10*789Sahrens * or http://www.opensolaris.org/os/licensing. 11*789Sahrens * See the License for the specific language governing permissions 12*789Sahrens * and limitations under the License. 13*789Sahrens * 14*789Sahrens * When distributing Covered Code, include this CDDL HEADER in each 15*789Sahrens * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16*789Sahrens * If applicable, add the following below this CDDL HEADER, with the 17*789Sahrens * fields enclosed by brackets "[]" replaced with your own identifying 18*789Sahrens * information: Portions Copyright [yyyy] [name of copyright owner] 19*789Sahrens * 20*789Sahrens * CDDL HEADER END 21*789Sahrens */ 22*789Sahrens /* 23*789Sahrens * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 24*789Sahrens * Use is subject to license terms. 25*789Sahrens */ 26*789Sahrens 27*789Sahrens #pragma ident "%Z%%M% %I% %E% SMI" 28*789Sahrens 29*789Sahrens #include <sys/zfs_context.h> 30*789Sahrens 31*789Sahrens int taskq_now; 32*789Sahrens 33*789Sahrens typedef struct task { 34*789Sahrens struct task *task_next; 35*789Sahrens struct task *task_prev; 36*789Sahrens task_func_t *task_func; 37*789Sahrens void *task_arg; 38*789Sahrens } task_t; 39*789Sahrens 40*789Sahrens #define TASKQ_ACTIVE 0x00010000 41*789Sahrens 42*789Sahrens struct taskq { 43*789Sahrens kmutex_t tq_lock; 44*789Sahrens krwlock_t tq_threadlock; 45*789Sahrens kcondvar_t tq_dispatch_cv; 46*789Sahrens kcondvar_t tq_wait_cv; 47*789Sahrens thread_t *tq_threadlist; 48*789Sahrens int tq_flags; 49*789Sahrens int tq_active; 50*789Sahrens int tq_nthreads; 51*789Sahrens int tq_nalloc; 52*789Sahrens int tq_minalloc; 53*789Sahrens int tq_maxalloc; 54*789Sahrens task_t *tq_freelist; 55*789Sahrens task_t tq_task; 56*789Sahrens }; 57*789Sahrens 58*789Sahrens static task_t * 59*789Sahrens task_alloc(taskq_t *tq, int tqflags) 60*789Sahrens { 61*789Sahrens task_t *t; 62*789Sahrens 63*789Sahrens if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 64*789Sahrens tq->tq_freelist = t->task_next; 65*789Sahrens } else { 66*789Sahrens mutex_exit(&tq->tq_lock); 67*789Sahrens if (tq->tq_nalloc >= tq->tq_maxalloc) { 68*789Sahrens if (!(tqflags & KM_SLEEP)) { 69*789Sahrens mutex_enter(&tq->tq_lock); 70*789Sahrens return (NULL); 71*789Sahrens } 72*789Sahrens /* 73*789Sahrens * We don't want to exceed tq_maxalloc, but we can't 74*789Sahrens * wait for other tasks to complete (and thus free up 75*789Sahrens * task structures) without risking deadlock with 76*789Sahrens * the caller. So, we just delay for one second 77*789Sahrens * to throttle the allocation rate. 78*789Sahrens */ 79*789Sahrens delay(hz); 80*789Sahrens } 81*789Sahrens t = kmem_alloc(sizeof (task_t), tqflags); 82*789Sahrens mutex_enter(&tq->tq_lock); 83*789Sahrens if (t != NULL) 84*789Sahrens tq->tq_nalloc++; 85*789Sahrens } 86*789Sahrens return (t); 87*789Sahrens } 88*789Sahrens 89*789Sahrens static void 90*789Sahrens task_free(taskq_t *tq, task_t *t) 91*789Sahrens { 92*789Sahrens if (tq->tq_nalloc <= tq->tq_minalloc) { 93*789Sahrens t->task_next = tq->tq_freelist; 94*789Sahrens tq->tq_freelist = t; 95*789Sahrens } else { 96*789Sahrens tq->tq_nalloc--; 97*789Sahrens mutex_exit(&tq->tq_lock); 98*789Sahrens kmem_free(t, sizeof (task_t)); 99*789Sahrens mutex_enter(&tq->tq_lock); 100*789Sahrens } 101*789Sahrens } 102*789Sahrens 103*789Sahrens taskqid_t 104*789Sahrens taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 105*789Sahrens { 106*789Sahrens task_t *t; 107*789Sahrens 108*789Sahrens if (taskq_now) { 109*789Sahrens func(arg); 110*789Sahrens return (1); 111*789Sahrens } 112*789Sahrens 113*789Sahrens mutex_enter(&tq->tq_lock); 114*789Sahrens ASSERT(tq->tq_flags & TASKQ_ACTIVE); 115*789Sahrens if ((t = task_alloc(tq, tqflags)) == NULL) { 116*789Sahrens mutex_exit(&tq->tq_lock); 117*789Sahrens return (0); 118*789Sahrens } 119*789Sahrens t->task_next = &tq->tq_task; 120*789Sahrens t->task_prev = tq->tq_task.task_prev; 121*789Sahrens t->task_next->task_prev = t; 122*789Sahrens t->task_prev->task_next = t; 123*789Sahrens t->task_func = func; 124*789Sahrens t->task_arg = arg; 125*789Sahrens cv_signal(&tq->tq_dispatch_cv); 126*789Sahrens mutex_exit(&tq->tq_lock); 127*789Sahrens return (1); 128*789Sahrens } 129*789Sahrens 130*789Sahrens void 131*789Sahrens taskq_wait(taskq_t *tq) 132*789Sahrens { 133*789Sahrens mutex_enter(&tq->tq_lock); 134*789Sahrens while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0) 135*789Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 136*789Sahrens mutex_exit(&tq->tq_lock); 137*789Sahrens } 138*789Sahrens 139*789Sahrens static void * 140*789Sahrens taskq_thread(void *arg) 141*789Sahrens { 142*789Sahrens taskq_t *tq = arg; 143*789Sahrens task_t *t; 144*789Sahrens 145*789Sahrens mutex_enter(&tq->tq_lock); 146*789Sahrens while (tq->tq_flags & TASKQ_ACTIVE) { 147*789Sahrens if ((t = tq->tq_task.task_next) == &tq->tq_task) { 148*789Sahrens if (--tq->tq_active == 0) 149*789Sahrens cv_broadcast(&tq->tq_wait_cv); 150*789Sahrens cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 151*789Sahrens tq->tq_active++; 152*789Sahrens continue; 153*789Sahrens } 154*789Sahrens t->task_prev->task_next = t->task_next; 155*789Sahrens t->task_next->task_prev = t->task_prev; 156*789Sahrens mutex_exit(&tq->tq_lock); 157*789Sahrens 158*789Sahrens rw_enter(&tq->tq_threadlock, RW_READER); 159*789Sahrens t->task_func(t->task_arg); 160*789Sahrens rw_exit(&tq->tq_threadlock); 161*789Sahrens 162*789Sahrens mutex_enter(&tq->tq_lock); 163*789Sahrens task_free(tq, t); 164*789Sahrens } 165*789Sahrens tq->tq_nthreads--; 166*789Sahrens cv_broadcast(&tq->tq_wait_cv); 167*789Sahrens mutex_exit(&tq->tq_lock); 168*789Sahrens return (NULL); 169*789Sahrens } 170*789Sahrens 171*789Sahrens /*ARGSUSED*/ 172*789Sahrens taskq_t * 173*789Sahrens taskq_create(const char *name, int nthreads, pri_t pri, 174*789Sahrens int minalloc, int maxalloc, uint_t flags) 175*789Sahrens { 176*789Sahrens taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 177*789Sahrens int t; 178*789Sahrens 179*789Sahrens rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 180*789Sahrens tq->tq_flags = flags | TASKQ_ACTIVE; 181*789Sahrens tq->tq_active = nthreads; 182*789Sahrens tq->tq_nthreads = nthreads; 183*789Sahrens tq->tq_minalloc = minalloc; 184*789Sahrens tq->tq_maxalloc = maxalloc; 185*789Sahrens tq->tq_task.task_next = &tq->tq_task; 186*789Sahrens tq->tq_task.task_prev = &tq->tq_task; 187*789Sahrens tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 188*789Sahrens 189*789Sahrens if (flags & TASKQ_PREPOPULATE) { 190*789Sahrens mutex_enter(&tq->tq_lock); 191*789Sahrens while (minalloc-- > 0) 192*789Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 193*789Sahrens mutex_exit(&tq->tq_lock); 194*789Sahrens } 195*789Sahrens 196*789Sahrens for (t = 0; t < nthreads; t++) 197*789Sahrens (void) thr_create(0, 0, taskq_thread, 198*789Sahrens tq, THR_BOUND, &tq->tq_threadlist[t]); 199*789Sahrens 200*789Sahrens return (tq); 201*789Sahrens } 202*789Sahrens 203*789Sahrens void 204*789Sahrens taskq_destroy(taskq_t *tq) 205*789Sahrens { 206*789Sahrens int t; 207*789Sahrens int nthreads = tq->tq_nthreads; 208*789Sahrens 209*789Sahrens taskq_wait(tq); 210*789Sahrens 211*789Sahrens mutex_enter(&tq->tq_lock); 212*789Sahrens 213*789Sahrens tq->tq_flags &= ~TASKQ_ACTIVE; 214*789Sahrens cv_broadcast(&tq->tq_dispatch_cv); 215*789Sahrens 216*789Sahrens while (tq->tq_nthreads != 0) 217*789Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 218*789Sahrens 219*789Sahrens tq->tq_minalloc = 0; 220*789Sahrens while (tq->tq_nalloc != 0) { 221*789Sahrens ASSERT(tq->tq_freelist != NULL); 222*789Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 223*789Sahrens } 224*789Sahrens 225*789Sahrens mutex_exit(&tq->tq_lock); 226*789Sahrens 227*789Sahrens for (t = 0; t < nthreads; t++) 228*789Sahrens (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 229*789Sahrens 230*789Sahrens kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 231*789Sahrens 232*789Sahrens rw_destroy(&tq->tq_threadlock); 233*789Sahrens 234*789Sahrens kmem_free(tq, sizeof (taskq_t)); 235*789Sahrens } 236*789Sahrens 237*789Sahrens int 238*789Sahrens taskq_member(taskq_t *tq, void *t) 239*789Sahrens { 240*789Sahrens int i; 241*789Sahrens 242*789Sahrens if (taskq_now) 243*789Sahrens return (1); 244*789Sahrens 245*789Sahrens for (i = 0; i < tq->tq_nthreads; i++) 246*789Sahrens if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 247*789Sahrens return (1); 248*789Sahrens 249*789Sahrens return (0); 250*789Sahrens } 251