1 /* $NetBSD: subr_workqueue.c,v 1.38 2020/08/01 02:14:43 riastradh Exp $ */ 2 3 /*- 4 * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.38 2020/08/01 02:14:43 riastradh Exp $"); 31 32 #include <sys/param.h> 33 #include <sys/cpu.h> 34 #include <sys/systm.h> 35 #include <sys/kthread.h> 36 #include <sys/kmem.h> 37 #include <sys/proc.h> 38 #include <sys/workqueue.h> 39 #include <sys/mutex.h> 40 #include <sys/condvar.h> 41 #include <sys/queue.h> 42 43 typedef struct work_impl { 44 SIMPLEQ_ENTRY(work_impl) wk_entry; 45 } work_impl_t; 46 47 SIMPLEQ_HEAD(workqhead, work_impl); 48 49 struct workqueue_queue { 50 kmutex_t q_mutex; 51 kcondvar_t q_cv; 52 struct workqhead q_queue_pending; 53 struct workqhead q_queue_running; 54 lwp_t *q_worker; 55 work_impl_t *q_waiter; 56 }; 57 58 struct workqueue { 59 void (*wq_func)(struct work *, void *); 60 void *wq_arg; 61 int wq_flags; 62 63 char wq_name[MAXCOMLEN]; 64 pri_t wq_prio; 65 void *wq_ptr; 66 }; 67 68 #define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit)) 69 #define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit)) 70 71 #define POISON 0xaabbccdd 72 73 static size_t 74 workqueue_size(int flags) 75 { 76 77 return WQ_SIZE 78 + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE 79 + coherency_unit; 80 } 81 82 static struct workqueue_queue * 83 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci) 84 { 85 u_int idx = 0; 86 87 if (wq->wq_flags & WQ_PERCPU) { 88 idx = ci ? cpu_index(ci) : cpu_index(curcpu()); 89 } 90 91 return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE)); 92 } 93 94 static void 95 workqueue_runlist(struct workqueue *wq, struct workqhead *list) 96 { 97 work_impl_t *wk; 98 work_impl_t *next; 99 100 /* 101 * note that "list" is not a complete SIMPLEQ. 102 */ 103 104 for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { 105 next = SIMPLEQ_NEXT(wk, wk_entry); 106 (*wq->wq_func)((void *)wk, wq->wq_arg); 107 } 108 } 109 110 static void 111 workqueue_worker(void *cookie) 112 { 113 struct workqueue *wq = cookie; 114 struct workqueue_queue *q; 115 int s; 116 117 /* find the workqueue of this kthread */ 118 q = workqueue_queue_lookup(wq, curlwp->l_cpu); 119 120 if (wq->wq_flags & WQ_FPU) 121 s = kthread_fpu_enter(); 122 for (;;) { 123 /* 124 * we violate abstraction of SIMPLEQ. 125 */ 126 127 mutex_enter(&q->q_mutex); 128 while (SIMPLEQ_EMPTY(&q->q_queue_pending)) 129 cv_wait(&q->q_cv, &q->q_mutex); 130 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); 131 q->q_queue_running.sqh_first = 132 q->q_queue_pending.sqh_first; /* XXX */ 133 SIMPLEQ_INIT(&q->q_queue_pending); 134 mutex_exit(&q->q_mutex); 135 136 workqueue_runlist(wq, &q->q_queue_running); 137 138 mutex_enter(&q->q_mutex); 139 KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); 140 SIMPLEQ_INIT(&q->q_queue_running); 141 if (__predict_false(q->q_waiter != NULL)) { 142 /* Wake up workqueue_wait */ 143 cv_signal(&q->q_cv); 144 } 145 mutex_exit(&q->q_mutex); 146 } 147 if (wq->wq_flags & WQ_FPU) 148 kthread_fpu_exit(s); 149 } 150 151 static void 152 workqueue_init(struct workqueue *wq, const char *name, 153 void (*callback_func)(struct work *, void *), void *callback_arg, 154 pri_t prio, int ipl) 155 { 156 157 KASSERT(sizeof(wq->wq_name) > strlen(name)); 158 strncpy(wq->wq_name, name, sizeof(wq->wq_name)); 159 160 wq->wq_prio = prio; 161 wq->wq_func = callback_func; 162 wq->wq_arg = callback_arg; 163 } 164 165 static int 166 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, 167 int ipl, struct cpu_info *ci) 168 { 169 int error, ktf; 170 171 KASSERT(q->q_worker == NULL); 172 173 mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); 174 cv_init(&q->q_cv, wq->wq_name); 175 SIMPLEQ_INIT(&q->q_queue_pending); 176 SIMPLEQ_INIT(&q->q_queue_running); 177 ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); 178 if (wq->wq_prio < PRI_KERNEL) 179 ktf |= KTHREAD_TS; 180 if (ci) { 181 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 182 wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index); 183 } else { 184 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 185 wq, &q->q_worker, "%s", wq->wq_name); 186 } 187 if (error != 0) { 188 mutex_destroy(&q->q_mutex); 189 cv_destroy(&q->q_cv); 190 KASSERT(q->q_worker == NULL); 191 } 192 return error; 193 } 194 195 struct workqueue_exitargs { 196 work_impl_t wqe_wk; 197 struct workqueue_queue *wqe_q; 198 }; 199 200 static void 201 workqueue_exit(struct work *wk, void *arg) 202 { 203 struct workqueue_exitargs *wqe = (void *)wk; 204 struct workqueue_queue *q = wqe->wqe_q; 205 206 /* 207 * only competition at this point is workqueue_finiqueue. 208 */ 209 210 KASSERT(q->q_worker == curlwp); 211 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 212 mutex_enter(&q->q_mutex); 213 q->q_worker = NULL; 214 cv_signal(&q->q_cv); 215 mutex_exit(&q->q_mutex); 216 kthread_exit(0); 217 } 218 219 static void 220 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) 221 { 222 struct workqueue_exitargs wqe; 223 224 KASSERT(wq->wq_func == workqueue_exit); 225 226 wqe.wqe_q = q; 227 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 228 KASSERT(q->q_worker != NULL); 229 mutex_enter(&q->q_mutex); 230 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); 231 cv_signal(&q->q_cv); 232 while (q->q_worker != NULL) { 233 cv_wait(&q->q_cv, &q->q_mutex); 234 } 235 mutex_exit(&q->q_mutex); 236 mutex_destroy(&q->q_mutex); 237 cv_destroy(&q->q_cv); 238 } 239 240 /* --- */ 241 242 int 243 workqueue_create(struct workqueue **wqp, const char *name, 244 void (*callback_func)(struct work *, void *), void *callback_arg, 245 pri_t prio, int ipl, int flags) 246 { 247 struct workqueue *wq; 248 struct workqueue_queue *q; 249 void *ptr; 250 int error = 0; 251 252 CTASSERT(sizeof(work_impl_t) <= sizeof(struct work)); 253 254 ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP); 255 wq = (void *)roundup2((uintptr_t)ptr, coherency_unit); 256 wq->wq_ptr = ptr; 257 wq->wq_flags = flags; 258 259 workqueue_init(wq, name, callback_func, callback_arg, prio, ipl); 260 261 if (flags & WQ_PERCPU) { 262 struct cpu_info *ci; 263 CPU_INFO_ITERATOR cii; 264 265 /* create the work-queue for each CPU */ 266 for (CPU_INFO_FOREACH(cii, ci)) { 267 q = workqueue_queue_lookup(wq, ci); 268 error = workqueue_initqueue(wq, q, ipl, ci); 269 if (error) { 270 break; 271 } 272 } 273 } else { 274 /* initialize a work-queue */ 275 q = workqueue_queue_lookup(wq, NULL); 276 error = workqueue_initqueue(wq, q, ipl, NULL); 277 } 278 279 if (error != 0) { 280 workqueue_destroy(wq); 281 } else { 282 *wqp = wq; 283 } 284 285 return error; 286 } 287 288 static bool 289 workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target) 290 { 291 work_impl_t *wk; 292 bool found = false; 293 294 mutex_enter(&q->q_mutex); 295 if (q->q_worker == curlwp) 296 goto out; 297 again: 298 SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { 299 if (wk == wk_target) 300 goto found; 301 } 302 SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) { 303 if (wk == wk_target) 304 goto found; 305 } 306 found: 307 if (wk != NULL) { 308 found = true; 309 KASSERT(q->q_waiter == NULL); 310 q->q_waiter = wk; 311 cv_wait(&q->q_cv, &q->q_mutex); 312 goto again; 313 } 314 if (q->q_waiter != NULL) 315 q->q_waiter = NULL; 316 out: 317 mutex_exit(&q->q_mutex); 318 319 return found; 320 } 321 322 /* 323 * Wait for a specified work to finish. The caller must ensure that no new 324 * work will be enqueued before calling workqueue_wait. Note that if the 325 * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue 326 * other than the waiting queue. 327 */ 328 void 329 workqueue_wait(struct workqueue *wq, struct work *wk) 330 { 331 struct workqueue_queue *q; 332 bool found; 333 334 if (ISSET(wq->wq_flags, WQ_PERCPU)) { 335 struct cpu_info *ci; 336 CPU_INFO_ITERATOR cii; 337 for (CPU_INFO_FOREACH(cii, ci)) { 338 q = workqueue_queue_lookup(wq, ci); 339 found = workqueue_q_wait(q, (work_impl_t *)wk); 340 if (found) 341 break; 342 } 343 } else { 344 q = workqueue_queue_lookup(wq, NULL); 345 (void) workqueue_q_wait(q, (work_impl_t *)wk); 346 } 347 } 348 349 void 350 workqueue_destroy(struct workqueue *wq) 351 { 352 struct workqueue_queue *q; 353 struct cpu_info *ci; 354 CPU_INFO_ITERATOR cii; 355 356 wq->wq_func = workqueue_exit; 357 for (CPU_INFO_FOREACH(cii, ci)) { 358 q = workqueue_queue_lookup(wq, ci); 359 if (q->q_worker != NULL) { 360 workqueue_finiqueue(wq, q); 361 } 362 } 363 kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); 364 } 365 366 #ifdef DEBUG 367 static void 368 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk) 369 { 370 work_impl_t *_wk; 371 372 SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) { 373 if (_wk == wk) 374 panic("%s: tried to enqueue a queued work", __func__); 375 } 376 } 377 #endif 378 379 void 380 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) 381 { 382 struct workqueue_queue *q; 383 work_impl_t *wk = (void *)wk0; 384 385 KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); 386 q = workqueue_queue_lookup(wq, ci); 387 388 mutex_enter(&q->q_mutex); 389 KASSERT(q->q_waiter == NULL); 390 #ifdef DEBUG 391 workqueue_check_duplication(q, wk); 392 #endif 393 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); 394 cv_signal(&q->q_cv); 395 mutex_exit(&q->q_mutex); 396 } 397