1789Sahrens /* 2789Sahrens * CDDL HEADER START 3789Sahrens * 4789Sahrens * The contents of this file are subject to the terms of the 54831Sgw25295 * Common Development and Distribution License (the "License"). 64831Sgw25295 * You may not use this file except in compliance with the License. 7789Sahrens * 8789Sahrens * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9789Sahrens * or http://www.opensolaris.org/os/licensing. 10789Sahrens * See the License for the specific language governing permissions 11789Sahrens * and limitations under the License. 12789Sahrens * 13789Sahrens * When distributing Covered Code, include this CDDL HEADER in each 14789Sahrens * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15789Sahrens * If applicable, add the following below this CDDL HEADER, with the 16789Sahrens * fields enclosed by brackets "[]" replaced with your own identifying 17789Sahrens * information: Portions Copyright [yyyy] [name of copyright owner] 18789Sahrens * 19789Sahrens * CDDL HEADER END 20789Sahrens */ 21789Sahrens /* 229515SJonathan.Adams@Sun.COM * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23789Sahrens * Use is subject to license terms. 24789Sahrens */ 25789Sahrens 26789Sahrens #include <sys/zfs_context.h> 27789Sahrens 28789Sahrens int taskq_now; 297837SMatthew.Ahrens@Sun.COM taskq_t *system_taskq; 30789Sahrens 31789Sahrens typedef struct task { 32789Sahrens struct task *task_next; 33789Sahrens struct task *task_prev; 34789Sahrens task_func_t *task_func; 35789Sahrens void *task_arg; 36789Sahrens } task_t; 37789Sahrens 38789Sahrens #define TASKQ_ACTIVE 0x00010000 39789Sahrens 40789Sahrens struct taskq { 41789Sahrens kmutex_t tq_lock; 42789Sahrens krwlock_t tq_threadlock; 43789Sahrens kcondvar_t tq_dispatch_cv; 44789Sahrens kcondvar_t tq_wait_cv; 45789Sahrens thread_t *tq_threadlist; 46789Sahrens int tq_flags; 47789Sahrens int tq_active; 48789Sahrens int tq_nthreads; 49789Sahrens int tq_nalloc; 50789Sahrens int tq_minalloc; 51789Sahrens int tq_maxalloc; 52789Sahrens task_t *tq_freelist; 53789Sahrens task_t tq_task; 54789Sahrens }; 55789Sahrens 56789Sahrens static task_t * 57789Sahrens task_alloc(taskq_t *tq, int tqflags) 58789Sahrens { 59789Sahrens task_t *t; 60789Sahrens 61789Sahrens if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 62789Sahrens tq->tq_freelist = t->task_next; 63789Sahrens } else { 64789Sahrens mutex_exit(&tq->tq_lock); 65789Sahrens if (tq->tq_nalloc >= tq->tq_maxalloc) { 66789Sahrens if (!(tqflags & KM_SLEEP)) { 67789Sahrens mutex_enter(&tq->tq_lock); 68789Sahrens return (NULL); 69789Sahrens } 70789Sahrens /* 71789Sahrens * We don't want to exceed tq_maxalloc, but we can't 72789Sahrens * wait for other tasks to complete (and thus free up 73789Sahrens * task structures) without risking deadlock with 74789Sahrens * the caller. So, we just delay for one second 75789Sahrens * to throttle the allocation rate. 76789Sahrens */ 77789Sahrens delay(hz); 78789Sahrens } 79789Sahrens t = kmem_alloc(sizeof (task_t), tqflags); 80789Sahrens mutex_enter(&tq->tq_lock); 81789Sahrens if (t != NULL) 82789Sahrens tq->tq_nalloc++; 83789Sahrens } 84789Sahrens return (t); 85789Sahrens } 86789Sahrens 87789Sahrens static void 88789Sahrens task_free(taskq_t *tq, task_t *t) 89789Sahrens { 90789Sahrens if (tq->tq_nalloc <= tq->tq_minalloc) { 91789Sahrens t->task_next = tq->tq_freelist; 92789Sahrens tq->tq_freelist = t; 93789Sahrens } else { 94789Sahrens tq->tq_nalloc--; 95789Sahrens mutex_exit(&tq->tq_lock); 96789Sahrens kmem_free(t, sizeof (task_t)); 97789Sahrens mutex_enter(&tq->tq_lock); 98789Sahrens } 99789Sahrens } 100789Sahrens 101789Sahrens taskqid_t 102789Sahrens taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 103789Sahrens { 104789Sahrens task_t *t; 105789Sahrens 106789Sahrens if (taskq_now) { 107789Sahrens func(arg); 108789Sahrens return (1); 109789Sahrens } 110789Sahrens 111789Sahrens mutex_enter(&tq->tq_lock); 112789Sahrens ASSERT(tq->tq_flags & TASKQ_ACTIVE); 113789Sahrens if ((t = task_alloc(tq, tqflags)) == NULL) { 114789Sahrens mutex_exit(&tq->tq_lock); 115789Sahrens return (0); 116789Sahrens } 117789Sahrens t->task_next = &tq->tq_task; 118789Sahrens t->task_prev = tq->tq_task.task_prev; 119789Sahrens t->task_next->task_prev = t; 120789Sahrens t->task_prev->task_next = t; 121789Sahrens t->task_func = func; 122789Sahrens t->task_arg = arg; 123789Sahrens cv_signal(&tq->tq_dispatch_cv); 124789Sahrens mutex_exit(&tq->tq_lock); 125789Sahrens return (1); 126789Sahrens } 127789Sahrens 128789Sahrens void 129789Sahrens taskq_wait(taskq_t *tq) 130789Sahrens { 131789Sahrens mutex_enter(&tq->tq_lock); 132789Sahrens while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0) 133789Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 134789Sahrens mutex_exit(&tq->tq_lock); 135789Sahrens } 136789Sahrens 137789Sahrens static void * 138789Sahrens taskq_thread(void *arg) 139789Sahrens { 140789Sahrens taskq_t *tq = arg; 141789Sahrens task_t *t; 142789Sahrens 143789Sahrens mutex_enter(&tq->tq_lock); 144789Sahrens while (tq->tq_flags & TASKQ_ACTIVE) { 145789Sahrens if ((t = tq->tq_task.task_next) == &tq->tq_task) { 146789Sahrens if (--tq->tq_active == 0) 147789Sahrens cv_broadcast(&tq->tq_wait_cv); 148789Sahrens cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 149789Sahrens tq->tq_active++; 150789Sahrens continue; 151789Sahrens } 152789Sahrens t->task_prev->task_next = t->task_next; 153789Sahrens t->task_next->task_prev = t->task_prev; 154789Sahrens mutex_exit(&tq->tq_lock); 155789Sahrens 156789Sahrens rw_enter(&tq->tq_threadlock, RW_READER); 157789Sahrens t->task_func(t->task_arg); 158789Sahrens rw_exit(&tq->tq_threadlock); 159789Sahrens 160789Sahrens mutex_enter(&tq->tq_lock); 161789Sahrens task_free(tq, t); 162789Sahrens } 163789Sahrens tq->tq_nthreads--; 164789Sahrens cv_broadcast(&tq->tq_wait_cv); 165789Sahrens mutex_exit(&tq->tq_lock); 166789Sahrens return (NULL); 167789Sahrens } 168789Sahrens 169789Sahrens /*ARGSUSED*/ 170789Sahrens taskq_t * 171789Sahrens taskq_create(const char *name, int nthreads, pri_t pri, 172789Sahrens int minalloc, int maxalloc, uint_t flags) 173789Sahrens { 174789Sahrens taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 175789Sahrens int t; 176789Sahrens 1779515SJonathan.Adams@Sun.COM if (flags & TASKQ_THREADS_CPU_PCT) { 1789515SJonathan.Adams@Sun.COM int pct; 1799515SJonathan.Adams@Sun.COM ASSERT3S(nthreads, >=, 0); 1809515SJonathan.Adams@Sun.COM ASSERT3S(nthreads, <=, 100); 1819515SJonathan.Adams@Sun.COM pct = MIN(nthreads, 100); 1829515SJonathan.Adams@Sun.COM pct = MAX(pct, 0); 1839515SJonathan.Adams@Sun.COM 1849515SJonathan.Adams@Sun.COM nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 1859515SJonathan.Adams@Sun.COM nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 1869515SJonathan.Adams@Sun.COM } else { 1879515SJonathan.Adams@Sun.COM ASSERT3S(nthreads, >=, 1); 1889515SJonathan.Adams@Sun.COM } 1899515SJonathan.Adams@Sun.COM 190789Sahrens rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 1914831Sgw25295 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 1924831Sgw25295 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 1934831Sgw25295 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 194789Sahrens tq->tq_flags = flags | TASKQ_ACTIVE; 195789Sahrens tq->tq_active = nthreads; 196789Sahrens tq->tq_nthreads = nthreads; 197789Sahrens tq->tq_minalloc = minalloc; 198789Sahrens tq->tq_maxalloc = maxalloc; 199789Sahrens tq->tq_task.task_next = &tq->tq_task; 200789Sahrens tq->tq_task.task_prev = &tq->tq_task; 201789Sahrens tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 202789Sahrens 203789Sahrens if (flags & TASKQ_PREPOPULATE) { 204789Sahrens mutex_enter(&tq->tq_lock); 205789Sahrens while (minalloc-- > 0) 206789Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 207789Sahrens mutex_exit(&tq->tq_lock); 208789Sahrens } 209789Sahrens 210789Sahrens for (t = 0; t < nthreads; t++) 211789Sahrens (void) thr_create(0, 0, taskq_thread, 212789Sahrens tq, THR_BOUND, &tq->tq_threadlist[t]); 213789Sahrens 214789Sahrens return (tq); 215789Sahrens } 216789Sahrens 217789Sahrens void 218789Sahrens taskq_destroy(taskq_t *tq) 219789Sahrens { 220789Sahrens int t; 221789Sahrens int nthreads = tq->tq_nthreads; 222789Sahrens 223789Sahrens taskq_wait(tq); 224789Sahrens 225789Sahrens mutex_enter(&tq->tq_lock); 226789Sahrens 227789Sahrens tq->tq_flags &= ~TASKQ_ACTIVE; 228789Sahrens cv_broadcast(&tq->tq_dispatch_cv); 229789Sahrens 230789Sahrens while (tq->tq_nthreads != 0) 231789Sahrens cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 232789Sahrens 233789Sahrens tq->tq_minalloc = 0; 234789Sahrens while (tq->tq_nalloc != 0) { 235789Sahrens ASSERT(tq->tq_freelist != NULL); 236789Sahrens task_free(tq, task_alloc(tq, KM_SLEEP)); 237789Sahrens } 238789Sahrens 239789Sahrens mutex_exit(&tq->tq_lock); 240789Sahrens 241789Sahrens for (t = 0; t < nthreads; t++) 242789Sahrens (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 243789Sahrens 244789Sahrens kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 245789Sahrens 246789Sahrens rw_destroy(&tq->tq_threadlock); 2474831Sgw25295 mutex_destroy(&tq->tq_lock); 2484831Sgw25295 cv_destroy(&tq->tq_dispatch_cv); 2494831Sgw25295 cv_destroy(&tq->tq_wait_cv); 250789Sahrens 251789Sahrens kmem_free(tq, sizeof (taskq_t)); 252789Sahrens } 253789Sahrens 254789Sahrens int 255789Sahrens taskq_member(taskq_t *tq, void *t) 256789Sahrens { 257789Sahrens int i; 258789Sahrens 259789Sahrens if (taskq_now) 260789Sahrens return (1); 261789Sahrens 262789Sahrens for (i = 0; i < tq->tq_nthreads; i++) 263789Sahrens if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 264789Sahrens return (1); 265789Sahrens 266789Sahrens return (0); 267789Sahrens } 2687837SMatthew.Ahrens@Sun.COM 2697837SMatthew.Ahrens@Sun.COM void 2707837SMatthew.Ahrens@Sun.COM system_taskq_init(void) 2717837SMatthew.Ahrens@Sun.COM { 2727837SMatthew.Ahrens@Sun.COM system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 2737837SMatthew.Ahrens@Sun.COM TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 2747837SMatthew.Ahrens@Sun.COM } 275*10612SRicardo.M.Correia@Sun.COM 276*10612SRicardo.M.Correia@Sun.COM void 277*10612SRicardo.M.Correia@Sun.COM system_taskq_fini(void) 278*10612SRicardo.M.Correia@Sun.COM { 279*10612SRicardo.M.Correia@Sun.COM taskq_destroy(system_taskq); 280*10612SRicardo.M.Correia@Sun.COM system_taskq = NULL; /* defensive */ 281*10612SRicardo.M.Correia@Sun.COM } 282