1 /* $NetBSD: subr_workqueue.c,v 1.37 2018/06/13 05:26:12 ozaki-r Exp $ */ 2 3 /*- 4 * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.37 2018/06/13 05:26:12 ozaki-r Exp $"); 31 32 #include <sys/param.h> 33 #include <sys/cpu.h> 34 #include <sys/systm.h> 35 #include <sys/kthread.h> 36 #include <sys/kmem.h> 37 #include <sys/proc.h> 38 #include <sys/workqueue.h> 39 #include <sys/mutex.h> 40 #include <sys/condvar.h> 41 #include <sys/queue.h> 42 43 typedef struct work_impl { 44 SIMPLEQ_ENTRY(work_impl) wk_entry; 45 } work_impl_t; 46 47 SIMPLEQ_HEAD(workqhead, work_impl); 48 49 struct workqueue_queue { 50 kmutex_t q_mutex; 51 kcondvar_t q_cv; 52 struct workqhead q_queue_pending; 53 struct workqhead q_queue_running; 54 lwp_t *q_worker; 55 work_impl_t *q_waiter; 56 }; 57 58 struct workqueue { 59 void (*wq_func)(struct work *, void *); 60 void *wq_arg; 61 int wq_flags; 62 63 char wq_name[MAXCOMLEN]; 64 pri_t wq_prio; 65 void *wq_ptr; 66 }; 67 68 #define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit)) 69 #define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit)) 70 71 #define POISON 0xaabbccdd 72 73 static size_t 74 workqueue_size(int flags) 75 { 76 77 return WQ_SIZE 78 + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE 79 + coherency_unit; 80 } 81 82 static struct workqueue_queue * 83 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci) 84 { 85 u_int idx = 0; 86 87 if (wq->wq_flags & WQ_PERCPU) { 88 idx = ci ? cpu_index(ci) : cpu_index(curcpu()); 89 } 90 91 return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE)); 92 } 93 94 static void 95 workqueue_runlist(struct workqueue *wq, struct workqhead *list) 96 { 97 work_impl_t *wk; 98 work_impl_t *next; 99 100 /* 101 * note that "list" is not a complete SIMPLEQ. 102 */ 103 104 for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { 105 next = SIMPLEQ_NEXT(wk, wk_entry); 106 (*wq->wq_func)((void *)wk, wq->wq_arg); 107 } 108 } 109 110 static void 111 workqueue_worker(void *cookie) 112 { 113 struct workqueue *wq = cookie; 114 struct workqueue_queue *q; 115 116 /* find the workqueue of this kthread */ 117 q = workqueue_queue_lookup(wq, curlwp->l_cpu); 118 119 for (;;) { 120 /* 121 * we violate abstraction of SIMPLEQ. 122 */ 123 124 mutex_enter(&q->q_mutex); 125 while (SIMPLEQ_EMPTY(&q->q_queue_pending)) 126 cv_wait(&q->q_cv, &q->q_mutex); 127 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); 128 q->q_queue_running.sqh_first = 129 q->q_queue_pending.sqh_first; /* XXX */ 130 SIMPLEQ_INIT(&q->q_queue_pending); 131 mutex_exit(&q->q_mutex); 132 133 workqueue_runlist(wq, &q->q_queue_running); 134 135 mutex_enter(&q->q_mutex); 136 KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); 137 SIMPLEQ_INIT(&q->q_queue_running); 138 if (__predict_false(q->q_waiter != NULL)) { 139 /* Wake up workqueue_wait */ 140 cv_signal(&q->q_cv); 141 } 142 mutex_exit(&q->q_mutex); 143 } 144 } 145 146 static void 147 workqueue_init(struct workqueue *wq, const char *name, 148 void (*callback_func)(struct work *, void *), void *callback_arg, 149 pri_t prio, int ipl) 150 { 151 152 KASSERT(sizeof(wq->wq_name) > strlen(name)); 153 strncpy(wq->wq_name, name, sizeof(wq->wq_name)); 154 155 wq->wq_prio = prio; 156 wq->wq_func = callback_func; 157 wq->wq_arg = callback_arg; 158 } 159 160 static int 161 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, 162 int ipl, struct cpu_info *ci) 163 { 164 int error, ktf; 165 166 KASSERT(q->q_worker == NULL); 167 168 mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); 169 cv_init(&q->q_cv, wq->wq_name); 170 SIMPLEQ_INIT(&q->q_queue_pending); 171 SIMPLEQ_INIT(&q->q_queue_running); 172 ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); 173 if (wq->wq_prio < PRI_KERNEL) 174 ktf |= KTHREAD_TS; 175 if (ci) { 176 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 177 wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index); 178 } else { 179 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 180 wq, &q->q_worker, "%s", wq->wq_name); 181 } 182 if (error != 0) { 183 mutex_destroy(&q->q_mutex); 184 cv_destroy(&q->q_cv); 185 KASSERT(q->q_worker == NULL); 186 } 187 return error; 188 } 189 190 struct workqueue_exitargs { 191 work_impl_t wqe_wk; 192 struct workqueue_queue *wqe_q; 193 }; 194 195 static void 196 workqueue_exit(struct work *wk, void *arg) 197 { 198 struct workqueue_exitargs *wqe = (void *)wk; 199 struct workqueue_queue *q = wqe->wqe_q; 200 201 /* 202 * only competition at this point is workqueue_finiqueue. 203 */ 204 205 KASSERT(q->q_worker == curlwp); 206 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 207 mutex_enter(&q->q_mutex); 208 q->q_worker = NULL; 209 cv_signal(&q->q_cv); 210 mutex_exit(&q->q_mutex); 211 kthread_exit(0); 212 } 213 214 static void 215 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) 216 { 217 struct workqueue_exitargs wqe; 218 219 KASSERT(wq->wq_func == workqueue_exit); 220 221 wqe.wqe_q = q; 222 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 223 KASSERT(q->q_worker != NULL); 224 mutex_enter(&q->q_mutex); 225 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); 226 cv_signal(&q->q_cv); 227 while (q->q_worker != NULL) { 228 cv_wait(&q->q_cv, &q->q_mutex); 229 } 230 mutex_exit(&q->q_mutex); 231 mutex_destroy(&q->q_mutex); 232 cv_destroy(&q->q_cv); 233 } 234 235 /* --- */ 236 237 int 238 workqueue_create(struct workqueue **wqp, const char *name, 239 void (*callback_func)(struct work *, void *), void *callback_arg, 240 pri_t prio, int ipl, int flags) 241 { 242 struct workqueue *wq; 243 struct workqueue_queue *q; 244 void *ptr; 245 int error = 0; 246 247 CTASSERT(sizeof(work_impl_t) <= sizeof(struct work)); 248 249 ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP); 250 wq = (void *)roundup2((uintptr_t)ptr, coherency_unit); 251 wq->wq_ptr = ptr; 252 wq->wq_flags = flags; 253 254 workqueue_init(wq, name, callback_func, callback_arg, prio, ipl); 255 256 if (flags & WQ_PERCPU) { 257 struct cpu_info *ci; 258 CPU_INFO_ITERATOR cii; 259 260 /* create the work-queue for each CPU */ 261 for (CPU_INFO_FOREACH(cii, ci)) { 262 q = workqueue_queue_lookup(wq, ci); 263 error = workqueue_initqueue(wq, q, ipl, ci); 264 if (error) { 265 break; 266 } 267 } 268 } else { 269 /* initialize a work-queue */ 270 q = workqueue_queue_lookup(wq, NULL); 271 error = workqueue_initqueue(wq, q, ipl, NULL); 272 } 273 274 if (error != 0) { 275 workqueue_destroy(wq); 276 } else { 277 *wqp = wq; 278 } 279 280 return error; 281 } 282 283 static bool 284 workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target) 285 { 286 work_impl_t *wk; 287 bool found = false; 288 289 mutex_enter(&q->q_mutex); 290 if (q->q_worker == curlwp) 291 goto out; 292 again: 293 SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { 294 if (wk == wk_target) 295 goto found; 296 } 297 SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) { 298 if (wk == wk_target) 299 goto found; 300 } 301 found: 302 if (wk != NULL) { 303 found = true; 304 KASSERT(q->q_waiter == NULL); 305 q->q_waiter = wk; 306 cv_wait(&q->q_cv, &q->q_mutex); 307 goto again; 308 } 309 if (q->q_waiter != NULL) 310 q->q_waiter = NULL; 311 out: 312 mutex_exit(&q->q_mutex); 313 314 return found; 315 } 316 317 /* 318 * Wait for a specified work to finish. The caller must ensure that no new 319 * work will be enqueued before calling workqueue_wait. Note that if the 320 * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue 321 * other than the waiting queue. 322 */ 323 void 324 workqueue_wait(struct workqueue *wq, struct work *wk) 325 { 326 struct workqueue_queue *q; 327 bool found; 328 329 if (ISSET(wq->wq_flags, WQ_PERCPU)) { 330 struct cpu_info *ci; 331 CPU_INFO_ITERATOR cii; 332 for (CPU_INFO_FOREACH(cii, ci)) { 333 q = workqueue_queue_lookup(wq, ci); 334 found = workqueue_q_wait(q, (work_impl_t *)wk); 335 if (found) 336 break; 337 } 338 } else { 339 q = workqueue_queue_lookup(wq, NULL); 340 (void) workqueue_q_wait(q, (work_impl_t *)wk); 341 } 342 } 343 344 void 345 workqueue_destroy(struct workqueue *wq) 346 { 347 struct workqueue_queue *q; 348 struct cpu_info *ci; 349 CPU_INFO_ITERATOR cii; 350 351 wq->wq_func = workqueue_exit; 352 for (CPU_INFO_FOREACH(cii, ci)) { 353 q = workqueue_queue_lookup(wq, ci); 354 if (q->q_worker != NULL) { 355 workqueue_finiqueue(wq, q); 356 } 357 } 358 kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); 359 } 360 361 #ifdef DEBUG 362 static void 363 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk) 364 { 365 work_impl_t *_wk; 366 367 SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) { 368 if (_wk == wk) 369 panic("%s: tried to enqueue a queued work", __func__); 370 } 371 } 372 #endif 373 374 void 375 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) 376 { 377 struct workqueue_queue *q; 378 work_impl_t *wk = (void *)wk0; 379 380 KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); 381 q = workqueue_queue_lookup(wq, ci); 382 383 mutex_enter(&q->q_mutex); 384 KASSERT(q->q_waiter == NULL); 385 #ifdef DEBUG 386 workqueue_check_duplication(q, wk); 387 #endif 388 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); 389 cv_signal(&q->q_cv); 390 mutex_exit(&q->q_mutex); 391 } 392