1 /* $NetBSD: taskq.c,v 1.11 2019/08/20 08:12:50 hannken Exp $ */ 2 3 /*- 4 * Copyright (c) 2019 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Juergen Hannken-Illjes. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 #include <sys/types.h> 33 #include <sys/param.h> 34 #include <sys/kcondvar.h> 35 #include <sys/kernel.h> 36 #include <sys/kmem.h> 37 #include <sys/mutex.h> 38 #include <sys/proc.h> 39 #include <sys/threadpool.h> 40 41 #include <sys/taskq.h> 42 43 struct taskq_executor { 44 struct threadpool_job te_job; /* Threadpool job serving the queue. */ 45 taskq_t *te_self; /* Backpointer to the queue. */ 46 unsigned te_running:1; /* True if the job is running. */ 47 }; 48 49 struct taskq { 50 int tq_nthreads; /* # of threads serving queue. */ 51 pri_t tq_pri; /* Scheduling priority. */ 52 uint_t tq_flags; /* Saved flags from taskq_create. */ 53 int tq_active; /* # of tasks (queued or running). */ 54 int tq_running; /* # of jobs currently running. */ 55 int tq_waiting; /* # of jobs currently idle. */ 56 unsigned tq_destroyed:1; /* True if queue gets destroyed. */ 57 kmutex_t tq_lock; /* Queue and job lock. */ 58 kcondvar_t tq_cv; /* Queue condvar. */ 59 struct taskq_executor *tq_executor; /* Array of jobs. */ 60 struct threadpool *tq_threadpool; /* Pool backing the jobs. */ 61 SIMPLEQ_HEAD(, taskq_ent) tq_list; /* Queue of tasks waiting. */ 62 }; 63 64 taskq_t *system_taskq; /* General purpose task queue. */ 65 66 static specificdata_key_t taskq_lwp_key; /* Null or taskq this thread runs. */ 67 68 /* 69 * Threadpool job to service tasks from task queue. 70 * Runs until the task queue gets destroyed or the queue is empty for 10 secs. 71 */ 72 static void 73 task_executor(struct threadpool_job *job) 74 { 75 struct taskq_executor *state = (struct taskq_executor *)job; 76 taskq_t *tq = state->te_self; 77 taskq_ent_t *tqe; 78 bool is_dynamic; 79 int error; 80 81 lwp_setspecific(taskq_lwp_key, tq); 82 83 mutex_enter(&tq->tq_lock); 84 while (!tq->tq_destroyed) { 85 if (SIMPLEQ_EMPTY(&tq->tq_list)) { 86 if (ISSET(tq->tq_flags, TASKQ_DYNAMIC)) 87 break; 88 tq->tq_waiting++; 89 error = cv_timedwait(&tq->tq_cv, &tq->tq_lock, 90 mstohz(10000)); 91 tq->tq_waiting--; 92 if (SIMPLEQ_EMPTY(&tq->tq_list)) { 93 if (error) 94 break; 95 continue; 96 } 97 } 98 tqe = SIMPLEQ_FIRST(&tq->tq_list); 99 KASSERT(tqe != NULL); 100 SIMPLEQ_REMOVE_HEAD(&tq->tq_list, tqent_list); 101 is_dynamic = tqe->tqent_dynamic; 102 tqe->tqent_queued = 0; 103 mutex_exit(&tq->tq_lock); 104 105 (*tqe->tqent_func)(tqe->tqent_arg); 106 107 mutex_enter(&tq->tq_lock); 108 if (is_dynamic) 109 kmem_free(tqe, sizeof(*tqe)); 110 tq->tq_active--; 111 } 112 state->te_running = 0; 113 tq->tq_running--; 114 threadpool_job_done(job); 115 mutex_exit(&tq->tq_lock); 116 117 lwp_setspecific(taskq_lwp_key, NULL); 118 } 119 120 void 121 taskq_init(void) 122 { 123 124 lwp_specific_key_create(&taskq_lwp_key, NULL); 125 system_taskq = taskq_create("system_taskq", ncpu * 4, PRI_KERNEL, 126 4, 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 127 KASSERT(system_taskq != NULL); 128 } 129 130 void 131 taskq_fini(void) 132 { 133 134 taskq_destroy(system_taskq); 135 lwp_specific_key_delete(taskq_lwp_key); 136 } 137 138 /* 139 * Dispatch a task entry creating executors as neeeded. 140 */ 141 static void 142 taskq_dispatch_common(taskq_t *tq, taskq_ent_t *tqe, uint_t flags) 143 { 144 int i; 145 146 KASSERT(mutex_owned(&tq->tq_lock)); 147 148 if (ISSET(flags, TQ_FRONT)) 149 SIMPLEQ_INSERT_HEAD(&tq->tq_list, tqe, tqent_list); 150 else 151 SIMPLEQ_INSERT_TAIL(&tq->tq_list, tqe, tqent_list); 152 tqe->tqent_queued = 1; 153 tq->tq_active++; 154 if (tq->tq_waiting) { 155 cv_signal(&tq->tq_cv); 156 mutex_exit(&tq->tq_lock); 157 return; 158 } 159 if (tq->tq_running < tq->tq_nthreads) { 160 for (i = 0; i < tq->tq_nthreads; i++) { 161 if (!tq->tq_executor[i].te_running) { 162 tq->tq_executor[i].te_running = 1; 163 tq->tq_running++; 164 threadpool_schedule_job(tq->tq_threadpool, 165 &tq->tq_executor[i].te_job); 166 break; 167 } 168 } 169 } 170 mutex_exit(&tq->tq_lock); 171 } 172 173 /* 174 * Allocate and dispatch a task entry. 175 */ 176 taskqid_t 177 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 178 { 179 taskq_ent_t *tqe; 180 181 KASSERT(!ISSET(flags, ~(TQ_SLEEP | TQ_NOSLEEP | TQ_NOQUEUE))); 182 KASSERT(ISSET(tq->tq_flags, TASKQ_DYNAMIC) || 183 !ISSET(flags, TQ_NOQUEUE)); 184 185 if (ISSET(flags, (TQ_SLEEP | TQ_NOSLEEP)) == TQ_NOSLEEP) 186 tqe = kmem_alloc(sizeof(*tqe), KM_NOSLEEP); 187 else 188 tqe = kmem_alloc(sizeof(*tqe), KM_SLEEP); 189 if (tqe == NULL) 190 return (taskqid_t) NULL; 191 192 mutex_enter(&tq->tq_lock); 193 if (ISSET(flags, TQ_NOQUEUE) && tq->tq_active == tq->tq_nthreads) { 194 mutex_exit(&tq->tq_lock); 195 kmem_free(tqe, sizeof(*tqe)); 196 return (taskqid_t) NULL; 197 } 198 tqe->tqent_dynamic = 1; 199 tqe->tqent_queued = 0; 200 tqe->tqent_func = func; 201 tqe->tqent_arg = arg; 202 taskq_dispatch_common(tq, tqe, flags); 203 204 return (taskqid_t) tqe; 205 } 206 207 /* 208 * Dispatch a preallocated task entry. 209 * Assume caller zeroed it. 210 */ 211 void 212 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 213 taskq_ent_t *tqe) 214 { 215 216 KASSERT(!ISSET(flags, ~(TQ_FRONT))); 217 218 tqe->tqent_func = func; 219 tqe->tqent_arg = arg; 220 mutex_enter(&tq->tq_lock); 221 taskq_dispatch_common(tq, tqe, flags); 222 } 223 224 /* 225 * Wait until all tasks have completed. 226 */ 227 void 228 taskq_wait(taskq_t *tq) 229 { 230 231 KASSERT(!taskq_member(tq, curlwp)); 232 233 mutex_enter(&tq->tq_lock); 234 while (tq->tq_active) 235 kpause("qwait", false, 1, &tq->tq_lock); 236 mutex_exit(&tq->tq_lock); 237 } 238 239 /* 240 * True if the current thread is an executor for this queue. 241 */ 242 int 243 taskq_member(taskq_t *tq, kthread_t *thread) 244 { 245 246 KASSERT(thread == curlwp); 247 248 return (lwp_getspecific(taskq_lwp_key) == tq); 249 } 250 251 /* 252 * Create a task queue. 253 * Allocation hints are ignored. 254 */ 255 taskq_t * 256 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, 257 int maxalloc, uint_t flags) 258 { 259 int i; 260 struct threadpool *threadpool; 261 taskq_t *tq; 262 263 KASSERT(!ISSET(flags, 264 ~(TASKQ_DYNAMIC | TASKQ_PREPOPULATE | TASKQ_THREADS_CPU_PCT))); 265 266 if (threadpool_get(&threadpool, pri) != 0) 267 return NULL; 268 269 if (ISSET(flags, TASKQ_THREADS_CPU_PCT)) 270 nthreads = MAX((ncpu * nthreads) / 100, 1); 271 272 tq = kmem_zalloc(sizeof(*tq), KM_SLEEP); 273 tq->tq_nthreads = nthreads; 274 tq->tq_pri = pri; 275 tq->tq_flags = flags; 276 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, IPL_NONE); 277 cv_init(&tq->tq_cv, NULL, CV_DEFAULT, NULL); 278 SIMPLEQ_INIT(&tq->tq_list); 279 tq->tq_executor = kmem_alloc(sizeof(*tq->tq_executor) * nthreads, 280 KM_SLEEP); 281 for (i = 0; i < nthreads; i++) { 282 threadpool_job_init(&tq->tq_executor[i].te_job, task_executor, 283 &tq->tq_lock, "%s/%d", name, i); 284 tq->tq_executor[i].te_self = tq; 285 tq->tq_executor[i].te_running = 0; 286 } 287 tq->tq_threadpool = threadpool; 288 289 return tq; 290 } 291 292 taskq_t * 293 taskq_create_proc(const char *name, int nthreads, pri_t pri, int minalloc, 294 int maxalloc, struct proc *proc, uint_t flags) 295 { 296 297 return taskq_create(name, nthreads, pri, minalloc, maxalloc, flags); 298 } 299 300 /* 301 * Destroy a task queue. 302 */ 303 void 304 taskq_destroy(taskq_t *tq) 305 { 306 int i; 307 taskq_ent_t *tqe; 308 309 KASSERT(!taskq_member(tq, curlwp)); 310 311 /* Wait for tasks to complete. */ 312 taskq_wait(tq); 313 314 /* Mark destroyed and ask running executors to quit. */ 315 mutex_enter(&tq->tq_lock); 316 tq->tq_destroyed = 1; 317 cv_broadcast(&tq->tq_cv); 318 319 /* Wait for all executors to quit. */ 320 while (tq->tq_running > 0) 321 kpause("tqdestroy", false, 1, &tq->tq_lock); 322 mutex_exit(&tq->tq_lock); 323 324 for (i = 0; i < tq->tq_nthreads; i++) 325 threadpool_job_destroy(&tq->tq_executor[i].te_job); 326 threadpool_put(tq->tq_threadpool, tq->tq_pri); 327 mutex_destroy(&tq->tq_lock); 328 cv_destroy(&tq->tq_cv); 329 kmem_free(tq->tq_executor, sizeof(*tq->tq_executor) * tq->tq_nthreads); 330 kmem_free(tq, sizeof(*tq)); 331 } 332