1 /* $NetBSD: subr_workqueue.c,v 1.48 2024/03/01 04:32:38 mrg Exp $ */ 2 3 /*- 4 * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.48 2024/03/01 04:32:38 mrg Exp $"); 31 32 #include <sys/param.h> 33 34 #include <sys/condvar.h> 35 #include <sys/cpu.h> 36 #include <sys/kmem.h> 37 #include <sys/kthread.h> 38 #include <sys/mutex.h> 39 #include <sys/proc.h> 40 #include <sys/queue.h> 41 #include <sys/sdt.h> 42 #include <sys/systm.h> 43 #include <sys/workqueue.h> 44 45 typedef struct work_impl { 46 SIMPLEQ_ENTRY(work_impl) wk_entry; 47 } work_impl_t; 48 49 SIMPLEQ_HEAD(workqhead, work_impl); 50 51 struct workqueue_queue { 52 kmutex_t q_mutex; 53 kcondvar_t q_cv; 54 struct workqhead q_queue_pending; 55 uint64_t q_gen; 56 lwp_t *q_worker; 57 }; 58 59 struct workqueue { 60 void (*wq_func)(struct work *, void *); 61 void *wq_arg; 62 int wq_flags; 63 64 char wq_name[MAXCOMLEN]; 65 pri_t wq_prio; 66 void *wq_ptr; 67 }; 68 69 #define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit)) 70 #define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit)) 71 72 #define POISON 0xaabbccdd 73 74 SDT_PROBE_DEFINE7(sdt, kernel, workqueue, create, 75 "struct workqueue *"/*wq*/, 76 "const char *"/*name*/, 77 "void (*)(struct work *, void *)"/*func*/, 78 "void *"/*arg*/, 79 "pri_t"/*prio*/, 80 "int"/*ipl*/, 81 "int"/*flags*/); 82 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, destroy, 83 "struct workqueue *"/*wq*/); 84 85 SDT_PROBE_DEFINE3(sdt, kernel, workqueue, enqueue, 86 "struct workqueue *"/*wq*/, 87 "struct work *"/*wk*/, 88 "struct cpu_info *"/*ci*/); 89 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, entry, 90 "struct workqueue *"/*wq*/, 91 "struct work *"/*wk*/, 92 "void (*)(struct work *, void *)"/*func*/, 93 "void *"/*arg*/); 94 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, return, 95 "struct workqueue *"/*wq*/, 96 "struct work *"/*wk*/, 97 "void (*)(struct work *, void *)"/*func*/, 98 "void *"/*arg*/); 99 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__start, 100 "struct workqueue *"/*wq*/, 101 "struct work *"/*wk*/); 102 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__self, 103 "struct workqueue *"/*wq*/, 104 "struct work *"/*wk*/); 105 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__hit, 106 "struct workqueue *"/*wq*/, 107 "struct work *"/*wk*/); 108 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__done, 109 "struct workqueue *"/*wq*/, 110 "struct work *"/*wk*/); 111 112 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__start, 113 "struct workqueue *"/*wq*/); 114 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__done, 115 "struct workqueue *"/*wq*/); 116 117 static size_t 118 workqueue_size(int flags) 119 { 120 121 return WQ_SIZE 122 + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE 123 + coherency_unit; 124 } 125 126 static struct workqueue_queue * 127 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci) 128 { 129 u_int idx = 0; 130 131 if (wq->wq_flags & WQ_PERCPU) { 132 idx = ci ? cpu_index(ci) : cpu_index(curcpu()); 133 } 134 135 return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE)); 136 } 137 138 static void 139 workqueue_runlist(struct workqueue *wq, struct workqhead *list) 140 { 141 work_impl_t *wk; 142 work_impl_t *next; 143 struct lwp *l = curlwp; 144 145 KASSERTMSG(l->l_nopreempt == 0, "lwp %p nopreempt %d", 146 l, l->l_nopreempt); 147 148 for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { 149 next = SIMPLEQ_NEXT(wk, wk_entry); 150 SDT_PROBE4(sdt, kernel, workqueue, entry, 151 wq, wk, wq->wq_func, wq->wq_arg); 152 (*wq->wq_func)((void *)wk, wq->wq_arg); 153 SDT_PROBE4(sdt, kernel, workqueue, return, 154 wq, wk, wq->wq_func, wq->wq_arg); 155 KASSERTMSG(l->l_nopreempt == 0, 156 "lwp %p nopreempt %d func %p", 157 l, l->l_nopreempt, wq->wq_func); 158 } 159 } 160 161 static void 162 workqueue_worker(void *cookie) 163 { 164 struct workqueue *wq = cookie; 165 struct workqueue_queue *q; 166 int s, fpu = wq->wq_flags & WQ_FPU; 167 168 /* find the workqueue of this kthread */ 169 q = workqueue_queue_lookup(wq, curlwp->l_cpu); 170 171 if (fpu) 172 s = kthread_fpu_enter(); 173 mutex_enter(&q->q_mutex); 174 for (;;) { 175 struct workqhead tmp; 176 177 SIMPLEQ_INIT(&tmp); 178 179 while (SIMPLEQ_EMPTY(&q->q_queue_pending)) 180 cv_wait(&q->q_cv, &q->q_mutex); 181 SIMPLEQ_CONCAT(&tmp, &q->q_queue_pending); 182 SIMPLEQ_INIT(&q->q_queue_pending); 183 184 /* 185 * Mark the queue as actively running a batch of work 186 * by setting the generation number odd. 187 */ 188 q->q_gen |= 1; 189 mutex_exit(&q->q_mutex); 190 191 workqueue_runlist(wq, &tmp); 192 193 /* 194 * Notify workqueue_wait that we have completed a batch 195 * of work by incrementing the generation number. 196 */ 197 mutex_enter(&q->q_mutex); 198 KASSERTMSG(q->q_gen & 1, "q=%p gen=%"PRIu64, q, q->q_gen); 199 q->q_gen++; 200 cv_broadcast(&q->q_cv); 201 } 202 mutex_exit(&q->q_mutex); 203 if (fpu) 204 kthread_fpu_exit(s); 205 } 206 207 static void 208 workqueue_init(struct workqueue *wq, const char *name, 209 void (*callback_func)(struct work *, void *), void *callback_arg, 210 pri_t prio, int ipl) 211 { 212 213 KASSERT(sizeof(wq->wq_name) > strlen(name)); 214 strncpy(wq->wq_name, name, sizeof(wq->wq_name)); 215 216 wq->wq_prio = prio; 217 wq->wq_func = callback_func; 218 wq->wq_arg = callback_arg; 219 } 220 221 static int 222 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, 223 int ipl, struct cpu_info *ci) 224 { 225 int error, ktf; 226 227 KASSERT(q->q_worker == NULL); 228 229 mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); 230 cv_init(&q->q_cv, wq->wq_name); 231 SIMPLEQ_INIT(&q->q_queue_pending); 232 q->q_gen = 0; 233 ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); 234 if (wq->wq_prio < PRI_KERNEL) 235 ktf |= KTHREAD_TS; 236 if (ci) { 237 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 238 wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index); 239 } else { 240 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 241 wq, &q->q_worker, "%s", wq->wq_name); 242 } 243 if (error != 0) { 244 mutex_destroy(&q->q_mutex); 245 cv_destroy(&q->q_cv); 246 KASSERT(q->q_worker == NULL); 247 } 248 return error; 249 } 250 251 struct workqueue_exitargs { 252 work_impl_t wqe_wk; 253 struct workqueue_queue *wqe_q; 254 }; 255 256 static void 257 workqueue_exit(struct work *wk, void *arg) 258 { 259 struct workqueue_exitargs *wqe = (void *)wk; 260 struct workqueue_queue *q = wqe->wqe_q; 261 262 /* 263 * only competition at this point is workqueue_finiqueue. 264 */ 265 266 KASSERT(q->q_worker == curlwp); 267 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 268 mutex_enter(&q->q_mutex); 269 q->q_worker = NULL; 270 cv_broadcast(&q->q_cv); 271 mutex_exit(&q->q_mutex); 272 kthread_exit(0); 273 } 274 275 static void 276 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) 277 { 278 struct workqueue_exitargs wqe; 279 280 KASSERT(wq->wq_func == workqueue_exit); 281 282 wqe.wqe_q = q; 283 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 284 KASSERT(q->q_worker != NULL); 285 mutex_enter(&q->q_mutex); 286 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); 287 cv_broadcast(&q->q_cv); 288 while (q->q_worker != NULL) { 289 cv_wait(&q->q_cv, &q->q_mutex); 290 } 291 mutex_exit(&q->q_mutex); 292 mutex_destroy(&q->q_mutex); 293 cv_destroy(&q->q_cv); 294 } 295 296 /* --- */ 297 298 int 299 workqueue_create(struct workqueue **wqp, const char *name, 300 void (*callback_func)(struct work *, void *), void *callback_arg, 301 pri_t prio, int ipl, int flags) 302 { 303 struct workqueue *wq; 304 struct workqueue_queue *q; 305 void *ptr; 306 int error = 0; 307 308 CTASSERT(sizeof(work_impl_t) <= sizeof(struct work)); 309 310 ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP); 311 wq = (void *)roundup2((uintptr_t)ptr, coherency_unit); 312 wq->wq_ptr = ptr; 313 wq->wq_flags = flags; 314 315 workqueue_init(wq, name, callback_func, callback_arg, prio, ipl); 316 317 if (flags & WQ_PERCPU) { 318 struct cpu_info *ci; 319 CPU_INFO_ITERATOR cii; 320 321 /* create the work-queue for each CPU */ 322 for (CPU_INFO_FOREACH(cii, ci)) { 323 q = workqueue_queue_lookup(wq, ci); 324 error = workqueue_initqueue(wq, q, ipl, ci); 325 if (error) { 326 break; 327 } 328 } 329 } else { 330 /* initialize a work-queue */ 331 q = workqueue_queue_lookup(wq, NULL); 332 error = workqueue_initqueue(wq, q, ipl, NULL); 333 } 334 335 if (error != 0) { 336 workqueue_destroy(wq); 337 } else { 338 *wqp = wq; 339 } 340 341 return error; 342 } 343 344 static bool 345 workqueue_q_wait(struct workqueue *wq, struct workqueue_queue *q, 346 work_impl_t *wk_target) 347 { 348 work_impl_t *wk; 349 bool found = false; 350 uint64_t gen; 351 352 mutex_enter(&q->q_mutex); 353 354 /* 355 * Avoid a deadlock scenario. We can't guarantee that 356 * wk_target has completed at this point, but we can't wait for 357 * it either, so do nothing. 358 * 359 * XXX Are there use-cases that require this semantics? 360 */ 361 if (q->q_worker == curlwp) { 362 SDT_PROBE2(sdt, kernel, workqueue, wait__self, wq, wk_target); 363 goto out; 364 } 365 366 /* 367 * Wait until the target is no longer pending. If we find it 368 * on this queue, the caller can stop looking in other queues. 369 * If we don't find it in this queue, however, we can't skip 370 * waiting -- it may be hidden in the running queue which we 371 * have no access to. 372 */ 373 again: 374 SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { 375 if (wk == wk_target) { 376 SDT_PROBE2(sdt, kernel, workqueue, wait__hit, wq, wk); 377 found = true; 378 cv_wait(&q->q_cv, &q->q_mutex); 379 goto again; 380 } 381 } 382 383 /* 384 * The target may be in the batch of work currently running, 385 * but we can't touch that queue. So if there's anything 386 * running, wait until the generation changes. 387 */ 388 gen = q->q_gen; 389 if (gen & 1) { 390 do 391 cv_wait(&q->q_cv, &q->q_mutex); 392 while (gen == q->q_gen); 393 } 394 395 out: 396 mutex_exit(&q->q_mutex); 397 398 return found; 399 } 400 401 /* 402 * Wait for a specified work to finish. The caller must ensure that no new 403 * work will be enqueued before calling workqueue_wait. Note that if the 404 * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue 405 * other than the waiting queue. 406 */ 407 void 408 workqueue_wait(struct workqueue *wq, struct work *wk) 409 { 410 struct workqueue_queue *q; 411 bool found; 412 413 ASSERT_SLEEPABLE(); 414 415 SDT_PROBE2(sdt, kernel, workqueue, wait__start, wq, wk); 416 if (ISSET(wq->wq_flags, WQ_PERCPU)) { 417 struct cpu_info *ci; 418 CPU_INFO_ITERATOR cii; 419 for (CPU_INFO_FOREACH(cii, ci)) { 420 q = workqueue_queue_lookup(wq, ci); 421 found = workqueue_q_wait(wq, q, (work_impl_t *)wk); 422 if (found) 423 break; 424 } 425 } else { 426 q = workqueue_queue_lookup(wq, NULL); 427 (void)workqueue_q_wait(wq, q, (work_impl_t *)wk); 428 } 429 SDT_PROBE2(sdt, kernel, workqueue, wait__done, wq, wk); 430 } 431 432 void 433 workqueue_destroy(struct workqueue *wq) 434 { 435 struct workqueue_queue *q; 436 struct cpu_info *ci; 437 CPU_INFO_ITERATOR cii; 438 439 ASSERT_SLEEPABLE(); 440 441 SDT_PROBE1(sdt, kernel, workqueue, exit__start, wq); 442 wq->wq_func = workqueue_exit; 443 for (CPU_INFO_FOREACH(cii, ci)) { 444 q = workqueue_queue_lookup(wq, ci); 445 if (q->q_worker != NULL) { 446 workqueue_finiqueue(wq, q); 447 } 448 } 449 SDT_PROBE1(sdt, kernel, workqueue, exit__done, wq); 450 kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); 451 } 452 453 #ifdef DEBUG 454 static void 455 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk) 456 { 457 work_impl_t *_wk; 458 459 SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) { 460 if (_wk == wk) 461 panic("%s: tried to enqueue a queued work", __func__); 462 } 463 } 464 #endif 465 466 void 467 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) 468 { 469 struct workqueue_queue *q; 470 work_impl_t *wk = (void *)wk0; 471 472 SDT_PROBE3(sdt, kernel, workqueue, enqueue, wq, wk0, ci); 473 474 KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); 475 q = workqueue_queue_lookup(wq, ci); 476 477 mutex_enter(&q->q_mutex); 478 #ifdef DEBUG 479 workqueue_check_duplication(q, wk); 480 #endif 481 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); 482 cv_broadcast(&q->q_cv); 483 mutex_exit(&q->q_mutex); 484 } 485