1 /* $NetBSD: subr_workqueue.c,v 1.41 2022/10/29 11:41:00 riastradh Exp $ */ 2 3 /*- 4 * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.41 2022/10/29 11:41:00 riastradh Exp $"); 31 32 #include <sys/param.h> 33 #include <sys/cpu.h> 34 #include <sys/systm.h> 35 #include <sys/kthread.h> 36 #include <sys/kmem.h> 37 #include <sys/proc.h> 38 #include <sys/workqueue.h> 39 #include <sys/mutex.h> 40 #include <sys/condvar.h> 41 #include <sys/sdt.h> 42 #include <sys/queue.h> 43 44 typedef struct work_impl { 45 SIMPLEQ_ENTRY(work_impl) wk_entry; 46 } work_impl_t; 47 48 SIMPLEQ_HEAD(workqhead, work_impl); 49 50 struct workqueue_queue { 51 kmutex_t q_mutex; 52 kcondvar_t q_cv; 53 struct workqhead q_queue_pending; 54 struct workqhead q_queue_running; 55 lwp_t *q_worker; 56 }; 57 58 struct workqueue { 59 void (*wq_func)(struct work *, void *); 60 void *wq_arg; 61 int wq_flags; 62 63 char wq_name[MAXCOMLEN]; 64 pri_t wq_prio; 65 void *wq_ptr; 66 }; 67 68 #define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit)) 69 #define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit)) 70 71 #define POISON 0xaabbccdd 72 73 SDT_PROBE_DEFINE7(sdt, kernel, workqueue, create, 74 "struct workqueue *"/*wq*/, 75 "const char *"/*name*/, 76 "void (*)(struct work *, void *)"/*func*/, 77 "void *"/*arg*/, 78 "pri_t"/*prio*/, 79 "int"/*ipl*/, 80 "int"/*flags*/); 81 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, destroy, 82 "struct workqueue *"/*wq*/); 83 84 SDT_PROBE_DEFINE3(sdt, kernel, workqueue, enqueue, 85 "struct workqueue *"/*wq*/, 86 "struct work *"/*wk*/, 87 "struct cpu_info *"/*ci*/); 88 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, entry, 89 "struct workqueue *"/*wq*/, 90 "struct work *"/*wk*/, 91 "void (*)(struct work *, void *)"/*func*/, 92 "void *"/*arg*/); 93 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, return, 94 "struct workqueue *"/*wq*/, 95 "struct work *"/*wk*/, 96 "void (*)(struct work *, void *)"/*func*/, 97 "void *"/*arg*/); 98 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__start, 99 "struct workqueue *"/*wq*/, 100 "struct work *"/*wk*/); 101 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__done, 102 "struct workqueue *"/*wq*/, 103 "struct work *"/*wk*/); 104 105 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__start, 106 "struct workqueue *"/*wq*/); 107 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__done, 108 "struct workqueue *"/*wq*/); 109 110 static size_t 111 workqueue_size(int flags) 112 { 113 114 return WQ_SIZE 115 + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE 116 + coherency_unit; 117 } 118 119 static struct workqueue_queue * 120 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci) 121 { 122 u_int idx = 0; 123 124 if (wq->wq_flags & WQ_PERCPU) { 125 idx = ci ? cpu_index(ci) : cpu_index(curcpu()); 126 } 127 128 return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE)); 129 } 130 131 static void 132 workqueue_runlist(struct workqueue *wq, struct workqhead *list) 133 { 134 work_impl_t *wk; 135 work_impl_t *next; 136 137 /* 138 * note that "list" is not a complete SIMPLEQ. 139 */ 140 141 for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { 142 next = SIMPLEQ_NEXT(wk, wk_entry); 143 SDT_PROBE4(sdt, kernel, workqueue, entry, 144 wq, wk, wq->wq_func, wq->wq_arg); 145 (*wq->wq_func)((void *)wk, wq->wq_arg); 146 SDT_PROBE4(sdt, kernel, workqueue, return, 147 wq, wk, wq->wq_func, wq->wq_arg); 148 } 149 } 150 151 static void 152 workqueue_worker(void *cookie) 153 { 154 struct workqueue *wq = cookie; 155 struct workqueue_queue *q; 156 int s; 157 158 /* find the workqueue of this kthread */ 159 q = workqueue_queue_lookup(wq, curlwp->l_cpu); 160 161 if (wq->wq_flags & WQ_FPU) 162 s = kthread_fpu_enter(); 163 for (;;) { 164 /* 165 * we violate abstraction of SIMPLEQ. 166 */ 167 168 mutex_enter(&q->q_mutex); 169 while (SIMPLEQ_EMPTY(&q->q_queue_pending)) 170 cv_wait(&q->q_cv, &q->q_mutex); 171 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); 172 q->q_queue_running.sqh_first = 173 q->q_queue_pending.sqh_first; /* XXX */ 174 SIMPLEQ_INIT(&q->q_queue_pending); 175 mutex_exit(&q->q_mutex); 176 177 workqueue_runlist(wq, &q->q_queue_running); 178 179 mutex_enter(&q->q_mutex); 180 KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); 181 SIMPLEQ_INIT(&q->q_queue_running); 182 /* Wake up workqueue_wait */ 183 cv_broadcast(&q->q_cv); 184 mutex_exit(&q->q_mutex); 185 } 186 if (wq->wq_flags & WQ_FPU) 187 kthread_fpu_exit(s); 188 } 189 190 static void 191 workqueue_init(struct workqueue *wq, const char *name, 192 void (*callback_func)(struct work *, void *), void *callback_arg, 193 pri_t prio, int ipl) 194 { 195 196 KASSERT(sizeof(wq->wq_name) > strlen(name)); 197 strncpy(wq->wq_name, name, sizeof(wq->wq_name)); 198 199 wq->wq_prio = prio; 200 wq->wq_func = callback_func; 201 wq->wq_arg = callback_arg; 202 } 203 204 static int 205 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, 206 int ipl, struct cpu_info *ci) 207 { 208 int error, ktf; 209 210 KASSERT(q->q_worker == NULL); 211 212 mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); 213 cv_init(&q->q_cv, wq->wq_name); 214 SIMPLEQ_INIT(&q->q_queue_pending); 215 SIMPLEQ_INIT(&q->q_queue_running); 216 ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); 217 if (wq->wq_prio < PRI_KERNEL) 218 ktf |= KTHREAD_TS; 219 if (ci) { 220 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 221 wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index); 222 } else { 223 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 224 wq, &q->q_worker, "%s", wq->wq_name); 225 } 226 if (error != 0) { 227 mutex_destroy(&q->q_mutex); 228 cv_destroy(&q->q_cv); 229 KASSERT(q->q_worker == NULL); 230 } 231 return error; 232 } 233 234 struct workqueue_exitargs { 235 work_impl_t wqe_wk; 236 struct workqueue_queue *wqe_q; 237 }; 238 239 static void 240 workqueue_exit(struct work *wk, void *arg) 241 { 242 struct workqueue_exitargs *wqe = (void *)wk; 243 struct workqueue_queue *q = wqe->wqe_q; 244 245 /* 246 * only competition at this point is workqueue_finiqueue. 247 */ 248 249 KASSERT(q->q_worker == curlwp); 250 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 251 mutex_enter(&q->q_mutex); 252 q->q_worker = NULL; 253 cv_broadcast(&q->q_cv); 254 mutex_exit(&q->q_mutex); 255 kthread_exit(0); 256 } 257 258 static void 259 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) 260 { 261 struct workqueue_exitargs wqe; 262 263 KASSERT(wq->wq_func == workqueue_exit); 264 265 wqe.wqe_q = q; 266 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 267 KASSERT(q->q_worker != NULL); 268 mutex_enter(&q->q_mutex); 269 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); 270 cv_broadcast(&q->q_cv); 271 while (q->q_worker != NULL) { 272 cv_wait(&q->q_cv, &q->q_mutex); 273 } 274 mutex_exit(&q->q_mutex); 275 mutex_destroy(&q->q_mutex); 276 cv_destroy(&q->q_cv); 277 } 278 279 /* --- */ 280 281 int 282 workqueue_create(struct workqueue **wqp, const char *name, 283 void (*callback_func)(struct work *, void *), void *callback_arg, 284 pri_t prio, int ipl, int flags) 285 { 286 struct workqueue *wq; 287 struct workqueue_queue *q; 288 void *ptr; 289 int error = 0; 290 291 CTASSERT(sizeof(work_impl_t) <= sizeof(struct work)); 292 293 ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP); 294 wq = (void *)roundup2((uintptr_t)ptr, coherency_unit); 295 wq->wq_ptr = ptr; 296 wq->wq_flags = flags; 297 298 workqueue_init(wq, name, callback_func, callback_arg, prio, ipl); 299 300 if (flags & WQ_PERCPU) { 301 struct cpu_info *ci; 302 CPU_INFO_ITERATOR cii; 303 304 /* create the work-queue for each CPU */ 305 for (CPU_INFO_FOREACH(cii, ci)) { 306 q = workqueue_queue_lookup(wq, ci); 307 error = workqueue_initqueue(wq, q, ipl, ci); 308 if (error) { 309 break; 310 } 311 } 312 } else { 313 /* initialize a work-queue */ 314 q = workqueue_queue_lookup(wq, NULL); 315 error = workqueue_initqueue(wq, q, ipl, NULL); 316 } 317 318 if (error != 0) { 319 workqueue_destroy(wq); 320 } else { 321 *wqp = wq; 322 } 323 324 return error; 325 } 326 327 static bool 328 workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target) 329 { 330 work_impl_t *wk; 331 bool found = false; 332 333 mutex_enter(&q->q_mutex); 334 if (q->q_worker == curlwp) 335 goto out; 336 again: 337 SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { 338 if (wk == wk_target) 339 goto found; 340 } 341 SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) { 342 if (wk == wk_target) 343 goto found; 344 } 345 found: 346 if (wk != NULL) { 347 found = true; 348 cv_wait(&q->q_cv, &q->q_mutex); 349 goto again; 350 } 351 out: 352 mutex_exit(&q->q_mutex); 353 354 return found; 355 } 356 357 /* 358 * Wait for a specified work to finish. The caller must ensure that no new 359 * work will be enqueued before calling workqueue_wait. Note that if the 360 * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue 361 * other than the waiting queue. 362 */ 363 void 364 workqueue_wait(struct workqueue *wq, struct work *wk) 365 { 366 struct workqueue_queue *q; 367 bool found; 368 369 ASSERT_SLEEPABLE(); 370 371 SDT_PROBE2(sdt, kernel, workqueue, wait__start, wq, wk); 372 if (ISSET(wq->wq_flags, WQ_PERCPU)) { 373 struct cpu_info *ci; 374 CPU_INFO_ITERATOR cii; 375 for (CPU_INFO_FOREACH(cii, ci)) { 376 q = workqueue_queue_lookup(wq, ci); 377 found = workqueue_q_wait(q, (work_impl_t *)wk); 378 if (found) 379 break; 380 } 381 } else { 382 q = workqueue_queue_lookup(wq, NULL); 383 (void) workqueue_q_wait(q, (work_impl_t *)wk); 384 } 385 SDT_PROBE2(sdt, kernel, workqueue, wait__done, wq, wk); 386 } 387 388 void 389 workqueue_destroy(struct workqueue *wq) 390 { 391 struct workqueue_queue *q; 392 struct cpu_info *ci; 393 CPU_INFO_ITERATOR cii; 394 395 ASSERT_SLEEPABLE(); 396 397 SDT_PROBE1(sdt, kernel, workqueue, exit__start, wq); 398 wq->wq_func = workqueue_exit; 399 for (CPU_INFO_FOREACH(cii, ci)) { 400 q = workqueue_queue_lookup(wq, ci); 401 if (q->q_worker != NULL) { 402 workqueue_finiqueue(wq, q); 403 } 404 } 405 SDT_PROBE1(sdt, kernel, workqueue, exit__done, wq); 406 kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); 407 } 408 409 #ifdef DEBUG 410 static void 411 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk) 412 { 413 work_impl_t *_wk; 414 415 SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) { 416 if (_wk == wk) 417 panic("%s: tried to enqueue a queued work", __func__); 418 } 419 } 420 #endif 421 422 void 423 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) 424 { 425 struct workqueue_queue *q; 426 work_impl_t *wk = (void *)wk0; 427 428 SDT_PROBE3(sdt, kernel, workqueue, enqueue, wq, wk0, ci); 429 430 KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); 431 q = workqueue_queue_lookup(wq, ci); 432 433 mutex_enter(&q->q_mutex); 434 #ifdef DEBUG 435 workqueue_check_duplication(q, wk); 436 #endif 437 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); 438 cv_broadcast(&q->q_cv); 439 mutex_exit(&q->q_mutex); 440 } 441