1 /* $NetBSD: subr_workqueue.c,v 1.47 2023/08/09 08:24:18 riastradh Exp $ */ 2 3 /*- 4 * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.47 2023/08/09 08:24:18 riastradh Exp $"); 31 32 #include <sys/param.h> 33 34 #include <sys/condvar.h> 35 #include <sys/cpu.h> 36 #include <sys/kmem.h> 37 #include <sys/kthread.h> 38 #include <sys/mutex.h> 39 #include <sys/proc.h> 40 #include <sys/queue.h> 41 #include <sys/sdt.h> 42 #include <sys/systm.h> 43 #include <sys/workqueue.h> 44 45 typedef struct work_impl { 46 SIMPLEQ_ENTRY(work_impl) wk_entry; 47 } work_impl_t; 48 49 SIMPLEQ_HEAD(workqhead, work_impl); 50 51 struct workqueue_queue { 52 kmutex_t q_mutex; 53 kcondvar_t q_cv; 54 struct workqhead q_queue_pending; 55 uint64_t q_gen; 56 lwp_t *q_worker; 57 }; 58 59 struct workqueue { 60 void (*wq_func)(struct work *, void *); 61 void *wq_arg; 62 int wq_flags; 63 64 char wq_name[MAXCOMLEN]; 65 pri_t wq_prio; 66 void *wq_ptr; 67 }; 68 69 #define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit)) 70 #define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit)) 71 72 #define POISON 0xaabbccdd 73 74 SDT_PROBE_DEFINE7(sdt, kernel, workqueue, create, 75 "struct workqueue *"/*wq*/, 76 "const char *"/*name*/, 77 "void (*)(struct work *, void *)"/*func*/, 78 "void *"/*arg*/, 79 "pri_t"/*prio*/, 80 "int"/*ipl*/, 81 "int"/*flags*/); 82 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, destroy, 83 "struct workqueue *"/*wq*/); 84 85 SDT_PROBE_DEFINE3(sdt, kernel, workqueue, enqueue, 86 "struct workqueue *"/*wq*/, 87 "struct work *"/*wk*/, 88 "struct cpu_info *"/*ci*/); 89 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, entry, 90 "struct workqueue *"/*wq*/, 91 "struct work *"/*wk*/, 92 "void (*)(struct work *, void *)"/*func*/, 93 "void *"/*arg*/); 94 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, return, 95 "struct workqueue *"/*wq*/, 96 "struct work *"/*wk*/, 97 "void (*)(struct work *, void *)"/*func*/, 98 "void *"/*arg*/); 99 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__start, 100 "struct workqueue *"/*wq*/, 101 "struct work *"/*wk*/); 102 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__self, 103 "struct workqueue *"/*wq*/, 104 "struct work *"/*wk*/); 105 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__hit, 106 "struct workqueue *"/*wq*/, 107 "struct work *"/*wk*/); 108 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__done, 109 "struct workqueue *"/*wq*/, 110 "struct work *"/*wk*/); 111 112 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__start, 113 "struct workqueue *"/*wq*/); 114 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__done, 115 "struct workqueue *"/*wq*/); 116 117 static size_t 118 workqueue_size(int flags) 119 { 120 121 return WQ_SIZE 122 + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE 123 + coherency_unit; 124 } 125 126 static struct workqueue_queue * 127 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci) 128 { 129 u_int idx = 0; 130 131 if (wq->wq_flags & WQ_PERCPU) { 132 idx = ci ? cpu_index(ci) : cpu_index(curcpu()); 133 } 134 135 return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE)); 136 } 137 138 static void 139 workqueue_runlist(struct workqueue *wq, struct workqhead *list) 140 { 141 work_impl_t *wk; 142 work_impl_t *next; 143 144 for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { 145 next = SIMPLEQ_NEXT(wk, wk_entry); 146 SDT_PROBE4(sdt, kernel, workqueue, entry, 147 wq, wk, wq->wq_func, wq->wq_arg); 148 (*wq->wq_func)((void *)wk, wq->wq_arg); 149 SDT_PROBE4(sdt, kernel, workqueue, return, 150 wq, wk, wq->wq_func, wq->wq_arg); 151 } 152 } 153 154 static void 155 workqueue_worker(void *cookie) 156 { 157 struct workqueue *wq = cookie; 158 struct workqueue_queue *q; 159 int s, fpu = wq->wq_flags & WQ_FPU; 160 161 /* find the workqueue of this kthread */ 162 q = workqueue_queue_lookup(wq, curlwp->l_cpu); 163 164 if (fpu) 165 s = kthread_fpu_enter(); 166 mutex_enter(&q->q_mutex); 167 for (;;) { 168 struct workqhead tmp; 169 170 SIMPLEQ_INIT(&tmp); 171 172 while (SIMPLEQ_EMPTY(&q->q_queue_pending)) 173 cv_wait(&q->q_cv, &q->q_mutex); 174 SIMPLEQ_CONCAT(&tmp, &q->q_queue_pending); 175 SIMPLEQ_INIT(&q->q_queue_pending); 176 177 /* 178 * Mark the queue as actively running a batch of work 179 * by setting the generation number odd. 180 */ 181 q->q_gen |= 1; 182 mutex_exit(&q->q_mutex); 183 184 workqueue_runlist(wq, &tmp); 185 186 /* 187 * Notify workqueue_wait that we have completed a batch 188 * of work by incrementing the generation number. 189 */ 190 mutex_enter(&q->q_mutex); 191 KASSERTMSG(q->q_gen & 1, "q=%p gen=%"PRIu64, q, q->q_gen); 192 q->q_gen++; 193 cv_broadcast(&q->q_cv); 194 } 195 mutex_exit(&q->q_mutex); 196 if (fpu) 197 kthread_fpu_exit(s); 198 } 199 200 static void 201 workqueue_init(struct workqueue *wq, const char *name, 202 void (*callback_func)(struct work *, void *), void *callback_arg, 203 pri_t prio, int ipl) 204 { 205 206 KASSERT(sizeof(wq->wq_name) > strlen(name)); 207 strncpy(wq->wq_name, name, sizeof(wq->wq_name)); 208 209 wq->wq_prio = prio; 210 wq->wq_func = callback_func; 211 wq->wq_arg = callback_arg; 212 } 213 214 static int 215 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, 216 int ipl, struct cpu_info *ci) 217 { 218 int error, ktf; 219 220 KASSERT(q->q_worker == NULL); 221 222 mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); 223 cv_init(&q->q_cv, wq->wq_name); 224 SIMPLEQ_INIT(&q->q_queue_pending); 225 q->q_gen = 0; 226 ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); 227 if (wq->wq_prio < PRI_KERNEL) 228 ktf |= KTHREAD_TS; 229 if (ci) { 230 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 231 wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index); 232 } else { 233 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 234 wq, &q->q_worker, "%s", wq->wq_name); 235 } 236 if (error != 0) { 237 mutex_destroy(&q->q_mutex); 238 cv_destroy(&q->q_cv); 239 KASSERT(q->q_worker == NULL); 240 } 241 return error; 242 } 243 244 struct workqueue_exitargs { 245 work_impl_t wqe_wk; 246 struct workqueue_queue *wqe_q; 247 }; 248 249 static void 250 workqueue_exit(struct work *wk, void *arg) 251 { 252 struct workqueue_exitargs *wqe = (void *)wk; 253 struct workqueue_queue *q = wqe->wqe_q; 254 255 /* 256 * only competition at this point is workqueue_finiqueue. 257 */ 258 259 KASSERT(q->q_worker == curlwp); 260 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 261 mutex_enter(&q->q_mutex); 262 q->q_worker = NULL; 263 cv_broadcast(&q->q_cv); 264 mutex_exit(&q->q_mutex); 265 kthread_exit(0); 266 } 267 268 static void 269 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) 270 { 271 struct workqueue_exitargs wqe; 272 273 KASSERT(wq->wq_func == workqueue_exit); 274 275 wqe.wqe_q = q; 276 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 277 KASSERT(q->q_worker != NULL); 278 mutex_enter(&q->q_mutex); 279 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); 280 cv_broadcast(&q->q_cv); 281 while (q->q_worker != NULL) { 282 cv_wait(&q->q_cv, &q->q_mutex); 283 } 284 mutex_exit(&q->q_mutex); 285 mutex_destroy(&q->q_mutex); 286 cv_destroy(&q->q_cv); 287 } 288 289 /* --- */ 290 291 int 292 workqueue_create(struct workqueue **wqp, const char *name, 293 void (*callback_func)(struct work *, void *), void *callback_arg, 294 pri_t prio, int ipl, int flags) 295 { 296 struct workqueue *wq; 297 struct workqueue_queue *q; 298 void *ptr; 299 int error = 0; 300 301 CTASSERT(sizeof(work_impl_t) <= sizeof(struct work)); 302 303 ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP); 304 wq = (void *)roundup2((uintptr_t)ptr, coherency_unit); 305 wq->wq_ptr = ptr; 306 wq->wq_flags = flags; 307 308 workqueue_init(wq, name, callback_func, callback_arg, prio, ipl); 309 310 if (flags & WQ_PERCPU) { 311 struct cpu_info *ci; 312 CPU_INFO_ITERATOR cii; 313 314 /* create the work-queue for each CPU */ 315 for (CPU_INFO_FOREACH(cii, ci)) { 316 q = workqueue_queue_lookup(wq, ci); 317 error = workqueue_initqueue(wq, q, ipl, ci); 318 if (error) { 319 break; 320 } 321 } 322 } else { 323 /* initialize a work-queue */ 324 q = workqueue_queue_lookup(wq, NULL); 325 error = workqueue_initqueue(wq, q, ipl, NULL); 326 } 327 328 if (error != 0) { 329 workqueue_destroy(wq); 330 } else { 331 *wqp = wq; 332 } 333 334 return error; 335 } 336 337 static bool 338 workqueue_q_wait(struct workqueue *wq, struct workqueue_queue *q, 339 work_impl_t *wk_target) 340 { 341 work_impl_t *wk; 342 bool found = false; 343 uint64_t gen; 344 345 mutex_enter(&q->q_mutex); 346 347 /* 348 * Avoid a deadlock scenario. We can't guarantee that 349 * wk_target has completed at this point, but we can't wait for 350 * it either, so do nothing. 351 * 352 * XXX Are there use-cases that require this semantics? 353 */ 354 if (q->q_worker == curlwp) { 355 SDT_PROBE2(sdt, kernel, workqueue, wait__self, wq, wk_target); 356 goto out; 357 } 358 359 /* 360 * Wait until the target is no longer pending. If we find it 361 * on this queue, the caller can stop looking in other queues. 362 * If we don't find it in this queue, however, we can't skip 363 * waiting -- it may be hidden in the running queue which we 364 * have no access to. 365 */ 366 again: 367 SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { 368 if (wk == wk_target) { 369 SDT_PROBE2(sdt, kernel, workqueue, wait__hit, wq, wk); 370 found = true; 371 cv_wait(&q->q_cv, &q->q_mutex); 372 goto again; 373 } 374 } 375 376 /* 377 * The target may be in the batch of work currently running, 378 * but we can't touch that queue. So if there's anything 379 * running, wait until the generation changes. 380 */ 381 gen = q->q_gen; 382 if (gen & 1) { 383 do 384 cv_wait(&q->q_cv, &q->q_mutex); 385 while (gen == q->q_gen); 386 } 387 388 out: 389 mutex_exit(&q->q_mutex); 390 391 return found; 392 } 393 394 /* 395 * Wait for a specified work to finish. The caller must ensure that no new 396 * work will be enqueued before calling workqueue_wait. Note that if the 397 * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue 398 * other than the waiting queue. 399 */ 400 void 401 workqueue_wait(struct workqueue *wq, struct work *wk) 402 { 403 struct workqueue_queue *q; 404 bool found; 405 406 ASSERT_SLEEPABLE(); 407 408 SDT_PROBE2(sdt, kernel, workqueue, wait__start, wq, wk); 409 if (ISSET(wq->wq_flags, WQ_PERCPU)) { 410 struct cpu_info *ci; 411 CPU_INFO_ITERATOR cii; 412 for (CPU_INFO_FOREACH(cii, ci)) { 413 q = workqueue_queue_lookup(wq, ci); 414 found = workqueue_q_wait(wq, q, (work_impl_t *)wk); 415 if (found) 416 break; 417 } 418 } else { 419 q = workqueue_queue_lookup(wq, NULL); 420 (void)workqueue_q_wait(wq, q, (work_impl_t *)wk); 421 } 422 SDT_PROBE2(sdt, kernel, workqueue, wait__done, wq, wk); 423 } 424 425 void 426 workqueue_destroy(struct workqueue *wq) 427 { 428 struct workqueue_queue *q; 429 struct cpu_info *ci; 430 CPU_INFO_ITERATOR cii; 431 432 ASSERT_SLEEPABLE(); 433 434 SDT_PROBE1(sdt, kernel, workqueue, exit__start, wq); 435 wq->wq_func = workqueue_exit; 436 for (CPU_INFO_FOREACH(cii, ci)) { 437 q = workqueue_queue_lookup(wq, ci); 438 if (q->q_worker != NULL) { 439 workqueue_finiqueue(wq, q); 440 } 441 } 442 SDT_PROBE1(sdt, kernel, workqueue, exit__done, wq); 443 kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); 444 } 445 446 #ifdef DEBUG 447 static void 448 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk) 449 { 450 work_impl_t *_wk; 451 452 SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) { 453 if (_wk == wk) 454 panic("%s: tried to enqueue a queued work", __func__); 455 } 456 } 457 #endif 458 459 void 460 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) 461 { 462 struct workqueue_queue *q; 463 work_impl_t *wk = (void *)wk0; 464 465 SDT_PROBE3(sdt, kernel, workqueue, enqueue, wq, wk0, ci); 466 467 KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); 468 q = workqueue_queue_lookup(wq, ci); 469 470 mutex_enter(&q->q_mutex); 471 #ifdef DEBUG 472 workqueue_check_duplication(q, wk); 473 #endif 474 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); 475 cv_broadcast(&q->q_cv); 476 mutex_exit(&q->q_mutex); 477 } 478