1 /* $NetBSD: subr_workqueue.c,v 1.39 2020/09/08 17:02:18 riastradh Exp $ */ 2 3 /*- 4 * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.39 2020/09/08 17:02:18 riastradh Exp $"); 31 32 #include <sys/param.h> 33 #include <sys/cpu.h> 34 #include <sys/systm.h> 35 #include <sys/kthread.h> 36 #include <sys/kmem.h> 37 #include <sys/proc.h> 38 #include <sys/workqueue.h> 39 #include <sys/mutex.h> 40 #include <sys/condvar.h> 41 #include <sys/queue.h> 42 43 typedef struct work_impl { 44 SIMPLEQ_ENTRY(work_impl) wk_entry; 45 } work_impl_t; 46 47 SIMPLEQ_HEAD(workqhead, work_impl); 48 49 struct workqueue_queue { 50 kmutex_t q_mutex; 51 kcondvar_t q_cv; 52 struct workqhead q_queue_pending; 53 struct workqhead q_queue_running; 54 lwp_t *q_worker; 55 }; 56 57 struct workqueue { 58 void (*wq_func)(struct work *, void *); 59 void *wq_arg; 60 int wq_flags; 61 62 char wq_name[MAXCOMLEN]; 63 pri_t wq_prio; 64 void *wq_ptr; 65 }; 66 67 #define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit)) 68 #define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit)) 69 70 #define POISON 0xaabbccdd 71 72 static size_t 73 workqueue_size(int flags) 74 { 75 76 return WQ_SIZE 77 + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE 78 + coherency_unit; 79 } 80 81 static struct workqueue_queue * 82 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci) 83 { 84 u_int idx = 0; 85 86 if (wq->wq_flags & WQ_PERCPU) { 87 idx = ci ? cpu_index(ci) : cpu_index(curcpu()); 88 } 89 90 return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE)); 91 } 92 93 static void 94 workqueue_runlist(struct workqueue *wq, struct workqhead *list) 95 { 96 work_impl_t *wk; 97 work_impl_t *next; 98 99 /* 100 * note that "list" is not a complete SIMPLEQ. 101 */ 102 103 for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { 104 next = SIMPLEQ_NEXT(wk, wk_entry); 105 (*wq->wq_func)((void *)wk, wq->wq_arg); 106 } 107 } 108 109 static void 110 workqueue_worker(void *cookie) 111 { 112 struct workqueue *wq = cookie; 113 struct workqueue_queue *q; 114 int s; 115 116 /* find the workqueue of this kthread */ 117 q = workqueue_queue_lookup(wq, curlwp->l_cpu); 118 119 if (wq->wq_flags & WQ_FPU) 120 s = kthread_fpu_enter(); 121 for (;;) { 122 /* 123 * we violate abstraction of SIMPLEQ. 124 */ 125 126 mutex_enter(&q->q_mutex); 127 while (SIMPLEQ_EMPTY(&q->q_queue_pending)) 128 cv_wait(&q->q_cv, &q->q_mutex); 129 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); 130 q->q_queue_running.sqh_first = 131 q->q_queue_pending.sqh_first; /* XXX */ 132 SIMPLEQ_INIT(&q->q_queue_pending); 133 mutex_exit(&q->q_mutex); 134 135 workqueue_runlist(wq, &q->q_queue_running); 136 137 mutex_enter(&q->q_mutex); 138 KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); 139 SIMPLEQ_INIT(&q->q_queue_running); 140 /* Wake up workqueue_wait */ 141 cv_broadcast(&q->q_cv); 142 mutex_exit(&q->q_mutex); 143 } 144 if (wq->wq_flags & WQ_FPU) 145 kthread_fpu_exit(s); 146 } 147 148 static void 149 workqueue_init(struct workqueue *wq, const char *name, 150 void (*callback_func)(struct work *, void *), void *callback_arg, 151 pri_t prio, int ipl) 152 { 153 154 KASSERT(sizeof(wq->wq_name) > strlen(name)); 155 strncpy(wq->wq_name, name, sizeof(wq->wq_name)); 156 157 wq->wq_prio = prio; 158 wq->wq_func = callback_func; 159 wq->wq_arg = callback_arg; 160 } 161 162 static int 163 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, 164 int ipl, struct cpu_info *ci) 165 { 166 int error, ktf; 167 168 KASSERT(q->q_worker == NULL); 169 170 mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); 171 cv_init(&q->q_cv, wq->wq_name); 172 SIMPLEQ_INIT(&q->q_queue_pending); 173 SIMPLEQ_INIT(&q->q_queue_running); 174 ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); 175 if (wq->wq_prio < PRI_KERNEL) 176 ktf |= KTHREAD_TS; 177 if (ci) { 178 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 179 wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index); 180 } else { 181 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 182 wq, &q->q_worker, "%s", wq->wq_name); 183 } 184 if (error != 0) { 185 mutex_destroy(&q->q_mutex); 186 cv_destroy(&q->q_cv); 187 KASSERT(q->q_worker == NULL); 188 } 189 return error; 190 } 191 192 struct workqueue_exitargs { 193 work_impl_t wqe_wk; 194 struct workqueue_queue *wqe_q; 195 }; 196 197 static void 198 workqueue_exit(struct work *wk, void *arg) 199 { 200 struct workqueue_exitargs *wqe = (void *)wk; 201 struct workqueue_queue *q = wqe->wqe_q; 202 203 /* 204 * only competition at this point is workqueue_finiqueue. 205 */ 206 207 KASSERT(q->q_worker == curlwp); 208 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 209 mutex_enter(&q->q_mutex); 210 q->q_worker = NULL; 211 cv_broadcast(&q->q_cv); 212 mutex_exit(&q->q_mutex); 213 kthread_exit(0); 214 } 215 216 static void 217 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) 218 { 219 struct workqueue_exitargs wqe; 220 221 KASSERT(wq->wq_func == workqueue_exit); 222 223 wqe.wqe_q = q; 224 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 225 KASSERT(q->q_worker != NULL); 226 mutex_enter(&q->q_mutex); 227 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); 228 cv_broadcast(&q->q_cv); 229 while (q->q_worker != NULL) { 230 cv_wait(&q->q_cv, &q->q_mutex); 231 } 232 mutex_exit(&q->q_mutex); 233 mutex_destroy(&q->q_mutex); 234 cv_destroy(&q->q_cv); 235 } 236 237 /* --- */ 238 239 int 240 workqueue_create(struct workqueue **wqp, const char *name, 241 void (*callback_func)(struct work *, void *), void *callback_arg, 242 pri_t prio, int ipl, int flags) 243 { 244 struct workqueue *wq; 245 struct workqueue_queue *q; 246 void *ptr; 247 int error = 0; 248 249 CTASSERT(sizeof(work_impl_t) <= sizeof(struct work)); 250 251 ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP); 252 wq = (void *)roundup2((uintptr_t)ptr, coherency_unit); 253 wq->wq_ptr = ptr; 254 wq->wq_flags = flags; 255 256 workqueue_init(wq, name, callback_func, callback_arg, prio, ipl); 257 258 if (flags & WQ_PERCPU) { 259 struct cpu_info *ci; 260 CPU_INFO_ITERATOR cii; 261 262 /* create the work-queue for each CPU */ 263 for (CPU_INFO_FOREACH(cii, ci)) { 264 q = workqueue_queue_lookup(wq, ci); 265 error = workqueue_initqueue(wq, q, ipl, ci); 266 if (error) { 267 break; 268 } 269 } 270 } else { 271 /* initialize a work-queue */ 272 q = workqueue_queue_lookup(wq, NULL); 273 error = workqueue_initqueue(wq, q, ipl, NULL); 274 } 275 276 if (error != 0) { 277 workqueue_destroy(wq); 278 } else { 279 *wqp = wq; 280 } 281 282 return error; 283 } 284 285 static bool 286 workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target) 287 { 288 work_impl_t *wk; 289 bool found = false; 290 291 mutex_enter(&q->q_mutex); 292 if (q->q_worker == curlwp) 293 goto out; 294 again: 295 SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { 296 if (wk == wk_target) 297 goto found; 298 } 299 SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) { 300 if (wk == wk_target) 301 goto found; 302 } 303 found: 304 if (wk != NULL) { 305 found = true; 306 cv_wait(&q->q_cv, &q->q_mutex); 307 goto again; 308 } 309 out: 310 mutex_exit(&q->q_mutex); 311 312 return found; 313 } 314 315 /* 316 * Wait for a specified work to finish. The caller must ensure that no new 317 * work will be enqueued before calling workqueue_wait. Note that if the 318 * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue 319 * other than the waiting queue. 320 */ 321 void 322 workqueue_wait(struct workqueue *wq, struct work *wk) 323 { 324 struct workqueue_queue *q; 325 bool found; 326 327 if (ISSET(wq->wq_flags, WQ_PERCPU)) { 328 struct cpu_info *ci; 329 CPU_INFO_ITERATOR cii; 330 for (CPU_INFO_FOREACH(cii, ci)) { 331 q = workqueue_queue_lookup(wq, ci); 332 found = workqueue_q_wait(q, (work_impl_t *)wk); 333 if (found) 334 break; 335 } 336 } else { 337 q = workqueue_queue_lookup(wq, NULL); 338 (void) workqueue_q_wait(q, (work_impl_t *)wk); 339 } 340 } 341 342 void 343 workqueue_destroy(struct workqueue *wq) 344 { 345 struct workqueue_queue *q; 346 struct cpu_info *ci; 347 CPU_INFO_ITERATOR cii; 348 349 wq->wq_func = workqueue_exit; 350 for (CPU_INFO_FOREACH(cii, ci)) { 351 q = workqueue_queue_lookup(wq, ci); 352 if (q->q_worker != NULL) { 353 workqueue_finiqueue(wq, q); 354 } 355 } 356 kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); 357 } 358 359 #ifdef DEBUG 360 static void 361 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk) 362 { 363 work_impl_t *_wk; 364 365 SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) { 366 if (_wk == wk) 367 panic("%s: tried to enqueue a queued work", __func__); 368 } 369 } 370 #endif 371 372 void 373 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) 374 { 375 struct workqueue_queue *q; 376 work_impl_t *wk = (void *)wk0; 377 378 KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); 379 q = workqueue_queue_lookup(wq, ci); 380 381 mutex_enter(&q->q_mutex); 382 #ifdef DEBUG 383 workqueue_check_duplication(q, wk); 384 #endif 385 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); 386 cv_broadcast(&q->q_cv); 387 mutex_exit(&q->q_mutex); 388 } 389