1 /* $OpenBSD: kern_task.c,v 1.30 2020/06/11 06:06:55 dlg Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 #include <sys/proc.h> 26 #include <sys/witness.h> 27 28 #ifdef WITNESS 29 30 static struct lock_type taskq_lock_type = { 31 .lt_name = "taskq" 32 }; 33 34 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \ 35 (LO_CLASS_RWLOCK << LO_CLASSSHIFT) 36 37 #endif /* WITNESS */ 38 39 struct taskq_thread { 40 SLIST_ENTRY(taskq_thread) 41 tt_entry; 42 struct proc *tt_thread; 43 }; 44 SLIST_HEAD(taskq_threads, taskq_thread); 45 46 struct taskq { 47 enum { 48 TQ_S_CREATED, 49 TQ_S_RUNNING, 50 TQ_S_DESTROYED 51 } tq_state; 52 unsigned int tq_running; 53 unsigned int tq_nthreads; 54 unsigned int tq_flags; 55 const char *tq_name; 56 57 struct mutex tq_mtx; 58 struct task_list tq_worklist; 59 60 struct taskq_threads tq_threads; 61 unsigned int tq_barriers; 62 unsigned int tq_bgen; 63 unsigned int tq_bthreads; 64 65 #ifdef WITNESS 66 struct lock_object tq_lock_object; 67 #endif 68 }; 69 70 static const char taskq_sys_name[] = "systq"; 71 72 struct taskq taskq_sys = { 73 .tq_state = TQ_S_CREATED, 74 .tq_running = 0, 75 .tq_nthreads = 1, 76 .tq_flags = 0, 77 .tq_name = taskq_sys_name, 78 .tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH, 79 taskq_sys_name, 0), 80 .tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist), 81 82 .tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads), 83 .tq_barriers = 0, 84 .tq_bgen = 0, 85 .tq_bthreads = 0, 86 87 #ifdef WITNESS 88 .tq_lock_object = { 89 .lo_name = taskq_sys_name, 90 .lo_flags = TASKQ_LOCK_FLAGS, 91 }, 92 #endif 93 }; 94 95 static const char taskq_sys_mp_name[] = "systqmp"; 96 97 struct taskq taskq_sys_mp = { 98 .tq_state = TQ_S_CREATED, 99 .tq_running = 0, 100 .tq_nthreads = 1, 101 .tq_flags = TASKQ_MPSAFE, 102 .tq_name = taskq_sys_mp_name, 103 .tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH, 104 taskq_sys_mp_name, 0), 105 .tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist), 106 107 .tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads), 108 .tq_barriers = 0, 109 .tq_bgen = 0, 110 .tq_bthreads = 0, 111 112 #ifdef WITNESS 113 .tq_lock_object = { 114 .lo_name = taskq_sys_mp_name, 115 .lo_flags = TASKQ_LOCK_FLAGS, 116 }, 117 #endif 118 }; 119 120 struct taskq *const systq = &taskq_sys; 121 struct taskq *const systqmp = &taskq_sys_mp; 122 123 void taskq_init(void); /* called in init_main.c */ 124 void taskq_create_thread(void *); 125 void taskq_barrier_task(void *); 126 int taskq_sleep(const volatile void *, struct mutex *, int, 127 const char *, int); 128 int taskq_next_work(struct taskq *, struct task *); 129 void taskq_thread(void *); 130 131 void 132 taskq_init(void) 133 { 134 WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type); 135 kthread_create_deferred(taskq_create_thread, systq); 136 WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type); 137 kthread_create_deferred(taskq_create_thread, systqmp); 138 } 139 140 struct taskq * 141 taskq_create(const char *name, unsigned int nthreads, int ipl, 142 unsigned int flags) 143 { 144 struct taskq *tq; 145 146 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 147 if (tq == NULL) 148 return (NULL); 149 150 tq->tq_state = TQ_S_CREATED; 151 tq->tq_running = 0; 152 tq->tq_nthreads = nthreads; 153 tq->tq_name = name; 154 tq->tq_flags = flags; 155 156 mtx_init_flags(&tq->tq_mtx, ipl, name, 0); 157 TAILQ_INIT(&tq->tq_worklist); 158 159 SLIST_INIT(&tq->tq_threads); 160 tq->tq_barriers = 0; 161 tq->tq_bgen = 0; 162 tq->tq_bthreads = 0; 163 164 #ifdef WITNESS 165 memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object)); 166 tq->tq_lock_object.lo_name = name; 167 tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS; 168 witness_init(&tq->tq_lock_object, &taskq_lock_type); 169 #endif 170 171 /* try to create a thread to guarantee that tasks will be serviced */ 172 kthread_create_deferred(taskq_create_thread, tq); 173 174 return (tq); 175 } 176 177 void 178 taskq_destroy(struct taskq *tq) 179 { 180 mtx_enter(&tq->tq_mtx); 181 switch (tq->tq_state) { 182 case TQ_S_CREATED: 183 /* tq is still referenced by taskq_create_thread */ 184 tq->tq_state = TQ_S_DESTROYED; 185 mtx_leave(&tq->tq_mtx); 186 return; 187 188 case TQ_S_RUNNING: 189 tq->tq_state = TQ_S_DESTROYED; 190 break; 191 192 default: 193 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 194 } 195 196 while (tq->tq_running > 0) { 197 wakeup(tq); 198 msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 199 INFSLP); 200 } 201 mtx_leave(&tq->tq_mtx); 202 203 free(tq, M_DEVBUF, sizeof(*tq)); 204 } 205 206 void 207 taskq_create_thread(void *arg) 208 { 209 struct taskq *tq = arg; 210 int rv; 211 212 mtx_enter(&tq->tq_mtx); 213 214 switch (tq->tq_state) { 215 case TQ_S_DESTROYED: 216 mtx_leave(&tq->tq_mtx); 217 free(tq, M_DEVBUF, sizeof(*tq)); 218 return; 219 220 case TQ_S_CREATED: 221 tq->tq_state = TQ_S_RUNNING; 222 break; 223 224 default: 225 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 226 } 227 228 do { 229 tq->tq_running++; 230 mtx_leave(&tq->tq_mtx); 231 232 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 233 234 mtx_enter(&tq->tq_mtx); 235 if (rv != 0) { 236 printf("unable to create thread for \"%s\" taskq\n", 237 tq->tq_name); 238 239 tq->tq_running--; 240 /* could have been destroyed during kthread_create */ 241 if (tq->tq_state == TQ_S_DESTROYED && 242 tq->tq_running == 0) 243 wakeup_one(&tq->tq_running); 244 break; 245 } 246 } while (tq->tq_running < tq->tq_nthreads); 247 248 mtx_leave(&tq->tq_mtx); 249 } 250 251 void 252 taskq_barrier_task(void *p) 253 { 254 struct taskq *tq = p; 255 unsigned int gen; 256 257 mtx_enter(&tq->tq_mtx); 258 tq->tq_bthreads++; 259 wakeup(&tq->tq_bthreads); 260 261 gen = tq->tq_bgen; 262 do { 263 msleep_nsec(&tq->tq_bgen, &tq->tq_mtx, 264 PWAIT, "tqbarend", INFSLP); 265 } while (gen == tq->tq_bgen); 266 mtx_leave(&tq->tq_mtx); 267 } 268 269 static void 270 taskq_do_barrier(struct taskq *tq) 271 { 272 struct task t = TASK_INITIALIZER(taskq_barrier_task, tq); 273 struct proc *thread = curproc; 274 struct taskq_thread *tt; 275 276 mtx_enter(&tq->tq_mtx); 277 tq->tq_barriers++; 278 279 /* is the barrier being run from a task inside the taskq? */ 280 SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) { 281 if (tt->tt_thread == thread) { 282 tq->tq_bthreads++; 283 wakeup(&tq->tq_bthreads); 284 break; 285 } 286 } 287 288 while (tq->tq_bthreads < tq->tq_nthreads) { 289 /* shove the task into the queue for a worker to pick up */ 290 SET(t.t_flags, TASK_ONQUEUE); 291 TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry); 292 wakeup_one(tq); 293 294 msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx, 295 PWAIT, "tqbar", INFSLP); 296 297 /* 298 * another thread running a barrier might have 299 * done this work for us. 300 */ 301 if (ISSET(t.t_flags, TASK_ONQUEUE)) 302 TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry); 303 } 304 305 if (--tq->tq_barriers == 0) { 306 /* we're the last one out */ 307 tq->tq_bgen++; 308 wakeup(&tq->tq_bgen); 309 tq->tq_bthreads = 0; 310 } else { 311 unsigned int gen = tq->tq_bgen; 312 do { 313 msleep_nsec(&tq->tq_bgen, &tq->tq_mtx, 314 PWAIT, "tqbarwait", INFSLP); 315 } while (gen == tq->tq_bgen); 316 } 317 mtx_leave(&tq->tq_mtx); 318 } 319 320 void 321 taskq_barrier(struct taskq *tq) 322 { 323 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 324 325 taskq_do_barrier(tq); 326 } 327 328 void 329 taskq_del_barrier(struct taskq *tq, struct task *t) 330 { 331 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 332 333 if (task_del(tq, t)) 334 return; 335 336 taskq_do_barrier(tq); 337 } 338 339 void 340 task_set(struct task *t, void (*fn)(void *), void *arg) 341 { 342 t->t_func = fn; 343 t->t_arg = arg; 344 t->t_flags = 0; 345 } 346 347 int 348 task_add(struct taskq *tq, struct task *w) 349 { 350 int rv = 0; 351 352 if (ISSET(w->t_flags, TASK_ONQUEUE)) 353 return (0); 354 355 mtx_enter(&tq->tq_mtx); 356 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 357 rv = 1; 358 SET(w->t_flags, TASK_ONQUEUE); 359 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 360 } 361 mtx_leave(&tq->tq_mtx); 362 363 if (rv) 364 wakeup_one(tq); 365 366 return (rv); 367 } 368 369 int 370 task_del(struct taskq *tq, struct task *w) 371 { 372 int rv = 0; 373 374 if (!ISSET(w->t_flags, TASK_ONQUEUE)) 375 return (0); 376 377 mtx_enter(&tq->tq_mtx); 378 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 379 rv = 1; 380 CLR(w->t_flags, TASK_ONQUEUE); 381 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 382 } 383 mtx_leave(&tq->tq_mtx); 384 385 return (rv); 386 } 387 388 int 389 taskq_next_work(struct taskq *tq, struct task *work) 390 { 391 struct task *next; 392 393 mtx_enter(&tq->tq_mtx); 394 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 395 if (tq->tq_state != TQ_S_RUNNING) { 396 mtx_leave(&tq->tq_mtx); 397 return (0); 398 } 399 400 msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP); 401 } 402 403 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 404 CLR(next->t_flags, TASK_ONQUEUE); 405 406 *work = *next; /* copy to caller to avoid races */ 407 408 next = TAILQ_FIRST(&tq->tq_worklist); 409 mtx_leave(&tq->tq_mtx); 410 411 if (next != NULL && tq->tq_nthreads > 1) 412 wakeup_one(tq); 413 414 return (1); 415 } 416 417 void 418 taskq_thread(void *xtq) 419 { 420 struct taskq_thread self = { .tt_thread = curproc }; 421 struct taskq *tq = xtq; 422 struct task work; 423 int last; 424 425 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 426 KERNEL_UNLOCK(); 427 428 mtx_enter(&tq->tq_mtx); 429 SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry); 430 mtx_leave(&tq->tq_mtx); 431 432 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 433 434 while (taskq_next_work(tq, &work)) { 435 WITNESS_LOCK(&tq->tq_lock_object, 0); 436 (*work.t_func)(work.t_arg); 437 WITNESS_UNLOCK(&tq->tq_lock_object, 0); 438 sched_pause(yield); 439 } 440 441 mtx_enter(&tq->tq_mtx); 442 SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry); 443 last = (--tq->tq_running == 0); 444 mtx_leave(&tq->tq_mtx); 445 446 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 447 KERNEL_LOCK(); 448 449 if (last) 450 wakeup_one(&tq->tq_running); 451 452 kthread_exit(0); 453 } 454