1 /* $NetBSD: subr_workqueue.c,v 1.35 2018/01/30 11:03:06 ozaki-r Exp $ */ 2 3 /*- 4 * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.35 2018/01/30 11:03:06 ozaki-r Exp $"); 31 32 #include <sys/param.h> 33 #include <sys/cpu.h> 34 #include <sys/systm.h> 35 #include <sys/kthread.h> 36 #include <sys/kmem.h> 37 #include <sys/proc.h> 38 #include <sys/workqueue.h> 39 #include <sys/mutex.h> 40 #include <sys/condvar.h> 41 #include <sys/queue.h> 42 43 typedef struct work_impl { 44 SIMPLEQ_ENTRY(work_impl) wk_entry; 45 } work_impl_t; 46 47 SIMPLEQ_HEAD(workqhead, work_impl); 48 49 struct workqueue_queue { 50 kmutex_t q_mutex; 51 kcondvar_t q_cv; 52 struct workqhead q_queue_pending; 53 struct workqhead q_queue_running; 54 lwp_t *q_worker; 55 work_impl_t *q_waiter; 56 }; 57 58 struct workqueue { 59 void (*wq_func)(struct work *, void *); 60 void *wq_arg; 61 int wq_flags; 62 63 char wq_name[MAXCOMLEN]; 64 pri_t wq_prio; 65 void *wq_ptr; 66 }; 67 68 #define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit)) 69 #define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit)) 70 71 #define POISON 0xaabbccdd 72 73 static size_t 74 workqueue_size(int flags) 75 { 76 77 return WQ_SIZE 78 + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE 79 + coherency_unit; 80 } 81 82 static struct workqueue_queue * 83 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci) 84 { 85 u_int idx = 0; 86 87 if (wq->wq_flags & WQ_PERCPU) { 88 idx = ci ? cpu_index(ci) : cpu_index(curcpu()); 89 } 90 91 return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE)); 92 } 93 94 static void 95 workqueue_runlist(struct workqueue *wq, struct workqhead *list) 96 { 97 work_impl_t *wk; 98 work_impl_t *next; 99 100 /* 101 * note that "list" is not a complete SIMPLEQ. 102 */ 103 104 for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { 105 next = SIMPLEQ_NEXT(wk, wk_entry); 106 (*wq->wq_func)((void *)wk, wq->wq_arg); 107 } 108 } 109 110 static void 111 workqueue_worker(void *cookie) 112 { 113 struct workqueue *wq = cookie; 114 struct workqueue_queue *q; 115 116 /* find the workqueue of this kthread */ 117 q = workqueue_queue_lookup(wq, curlwp->l_cpu); 118 119 for (;;) { 120 /* 121 * we violate abstraction of SIMPLEQ. 122 */ 123 124 mutex_enter(&q->q_mutex); 125 while (SIMPLEQ_EMPTY(&q->q_queue_pending)) 126 cv_wait(&q->q_cv, &q->q_mutex); 127 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); 128 q->q_queue_running.sqh_first = 129 q->q_queue_pending.sqh_first; /* XXX */ 130 SIMPLEQ_INIT(&q->q_queue_pending); 131 mutex_exit(&q->q_mutex); 132 133 workqueue_runlist(wq, &q->q_queue_running); 134 135 mutex_enter(&q->q_mutex); 136 KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); 137 SIMPLEQ_INIT(&q->q_queue_running); 138 if (__predict_false(q->q_waiter != NULL)) { 139 /* Wake up workqueue_wait */ 140 cv_signal(&q->q_cv); 141 } 142 mutex_exit(&q->q_mutex); 143 } 144 } 145 146 static void 147 workqueue_init(struct workqueue *wq, const char *name, 148 void (*callback_func)(struct work *, void *), void *callback_arg, 149 pri_t prio, int ipl) 150 { 151 152 strncpy(wq->wq_name, name, sizeof(wq->wq_name)); 153 154 wq->wq_prio = prio; 155 wq->wq_func = callback_func; 156 wq->wq_arg = callback_arg; 157 } 158 159 static int 160 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, 161 int ipl, struct cpu_info *ci) 162 { 163 int error, ktf; 164 165 KASSERT(q->q_worker == NULL); 166 167 mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); 168 cv_init(&q->q_cv, wq->wq_name); 169 SIMPLEQ_INIT(&q->q_queue_pending); 170 SIMPLEQ_INIT(&q->q_queue_running); 171 ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); 172 if (wq->wq_prio < PRI_KERNEL) 173 ktf |= KTHREAD_TS; 174 if (ci) { 175 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 176 wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index); 177 } else { 178 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker, 179 wq, &q->q_worker, "%s", wq->wq_name); 180 } 181 if (error != 0) { 182 mutex_destroy(&q->q_mutex); 183 cv_destroy(&q->q_cv); 184 KASSERT(q->q_worker == NULL); 185 } 186 return error; 187 } 188 189 struct workqueue_exitargs { 190 work_impl_t wqe_wk; 191 struct workqueue_queue *wqe_q; 192 }; 193 194 static void 195 workqueue_exit(struct work *wk, void *arg) 196 { 197 struct workqueue_exitargs *wqe = (void *)wk; 198 struct workqueue_queue *q = wqe->wqe_q; 199 200 /* 201 * only competition at this point is workqueue_finiqueue. 202 */ 203 204 KASSERT(q->q_worker == curlwp); 205 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 206 mutex_enter(&q->q_mutex); 207 q->q_worker = NULL; 208 cv_signal(&q->q_cv); 209 mutex_exit(&q->q_mutex); 210 kthread_exit(0); 211 } 212 213 static void 214 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) 215 { 216 struct workqueue_exitargs wqe; 217 218 KASSERT(wq->wq_func == workqueue_exit); 219 220 wqe.wqe_q = q; 221 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); 222 KASSERT(q->q_worker != NULL); 223 mutex_enter(&q->q_mutex); 224 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); 225 cv_signal(&q->q_cv); 226 while (q->q_worker != NULL) { 227 cv_wait(&q->q_cv, &q->q_mutex); 228 } 229 mutex_exit(&q->q_mutex); 230 mutex_destroy(&q->q_mutex); 231 cv_destroy(&q->q_cv); 232 } 233 234 /* --- */ 235 236 int 237 workqueue_create(struct workqueue **wqp, const char *name, 238 void (*callback_func)(struct work *, void *), void *callback_arg, 239 pri_t prio, int ipl, int flags) 240 { 241 struct workqueue *wq; 242 struct workqueue_queue *q; 243 void *ptr; 244 int error = 0; 245 246 CTASSERT(sizeof(work_impl_t) <= sizeof(struct work)); 247 248 ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP); 249 wq = (void *)roundup2((uintptr_t)ptr, coherency_unit); 250 wq->wq_ptr = ptr; 251 wq->wq_flags = flags; 252 253 workqueue_init(wq, name, callback_func, callback_arg, prio, ipl); 254 255 if (flags & WQ_PERCPU) { 256 struct cpu_info *ci; 257 CPU_INFO_ITERATOR cii; 258 259 /* create the work-queue for each CPU */ 260 for (CPU_INFO_FOREACH(cii, ci)) { 261 q = workqueue_queue_lookup(wq, ci); 262 error = workqueue_initqueue(wq, q, ipl, ci); 263 if (error) { 264 break; 265 } 266 } 267 } else { 268 /* initialize a work-queue */ 269 q = workqueue_queue_lookup(wq, NULL); 270 error = workqueue_initqueue(wq, q, ipl, NULL); 271 } 272 273 if (error != 0) { 274 workqueue_destroy(wq); 275 } else { 276 *wqp = wq; 277 } 278 279 return error; 280 } 281 282 static bool 283 workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target) 284 { 285 work_impl_t *wk; 286 bool found = false; 287 288 mutex_enter(&q->q_mutex); 289 again: 290 SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { 291 if (wk == wk_target) 292 goto found; 293 } 294 SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) { 295 if (wk == wk_target) 296 goto found; 297 } 298 found: 299 if (wk != NULL) { 300 found = true; 301 KASSERT(q->q_waiter == NULL); 302 q->q_waiter = wk; 303 cv_wait(&q->q_cv, &q->q_mutex); 304 goto again; 305 } 306 if (q->q_waiter != NULL) 307 q->q_waiter = NULL; 308 mutex_exit(&q->q_mutex); 309 310 return found; 311 } 312 313 /* 314 * Wait for a specified work to finish. The caller must ensure that no new 315 * work will be enqueued before calling workqueue_wait. Note that if the 316 * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue 317 * other than the waiting queue. 318 */ 319 void 320 workqueue_wait(struct workqueue *wq, struct work *wk) 321 { 322 struct workqueue_queue *q; 323 bool found; 324 325 if (ISSET(wq->wq_flags, WQ_PERCPU)) { 326 struct cpu_info *ci; 327 CPU_INFO_ITERATOR cii; 328 for (CPU_INFO_FOREACH(cii, ci)) { 329 q = workqueue_queue_lookup(wq, ci); 330 found = workqueue_q_wait(q, (work_impl_t *)wk); 331 if (found) 332 break; 333 } 334 } else { 335 q = workqueue_queue_lookup(wq, NULL); 336 (void) workqueue_q_wait(q, (work_impl_t *)wk); 337 } 338 } 339 340 void 341 workqueue_destroy(struct workqueue *wq) 342 { 343 struct workqueue_queue *q; 344 struct cpu_info *ci; 345 CPU_INFO_ITERATOR cii; 346 347 wq->wq_func = workqueue_exit; 348 for (CPU_INFO_FOREACH(cii, ci)) { 349 q = workqueue_queue_lookup(wq, ci); 350 if (q->q_worker != NULL) { 351 workqueue_finiqueue(wq, q); 352 } 353 } 354 kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); 355 } 356 357 #ifdef DEBUG 358 static void 359 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk) 360 { 361 work_impl_t *_wk; 362 363 SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) { 364 if (_wk == wk) 365 panic("%s: tried to enqueue a queued work", __func__); 366 } 367 } 368 #endif 369 370 void 371 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) 372 { 373 struct workqueue_queue *q; 374 work_impl_t *wk = (void *)wk0; 375 376 KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); 377 q = workqueue_queue_lookup(wq, ci); 378 379 mutex_enter(&q->q_mutex); 380 KASSERT(q->q_waiter == NULL); 381 #ifdef DEBUG 382 workqueue_check_duplication(q, wk); 383 #endif 384 SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); 385 cv_signal(&q->q_cv); 386 mutex_exit(&q->q_mutex); 387 } 388