1 /* $OpenBSD: kern_task.c,v 1.36 2025/01/13 03:21:10 mvs Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 #include <sys/proc.h> 26 #include <sys/witness.h> 27 28 #include "kcov.h" 29 #if NKCOV > 0 30 #include <sys/kcov.h> 31 #endif 32 33 #ifdef WITNESS 34 35 static struct lock_type taskq_lock_type = { 36 .lt_name = "taskq" 37 }; 38 39 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \ 40 (LO_CLASS_RWLOCK << LO_CLASSSHIFT) 41 42 #endif /* WITNESS */ 43 44 struct taskq_thread { 45 SLIST_ENTRY(taskq_thread) 46 tt_entry; 47 struct proc *tt_thread; 48 }; 49 SLIST_HEAD(taskq_threads, taskq_thread); 50 51 struct taskq { 52 enum { 53 TQ_S_CREATED, 54 TQ_S_RUNNING, 55 TQ_S_DESTROYED 56 } tq_state; 57 unsigned int tq_running; 58 unsigned int tq_nthreads; 59 unsigned int tq_flags; 60 const char *tq_name; 61 62 struct mutex tq_mtx; 63 struct task_list tq_worklist; 64 65 struct taskq_threads tq_threads; 66 unsigned int tq_barriers; 67 unsigned int tq_bgen; 68 unsigned int tq_bthreads; 69 70 #ifdef WITNESS 71 struct lock_object tq_lock_object; 72 #endif 73 }; 74 75 static const char taskq_sys_name[] = "systq"; 76 77 struct taskq taskq_sys = { 78 .tq_state = TQ_S_CREATED, 79 .tq_running = 0, 80 .tq_nthreads = 1, 81 .tq_flags = 0, 82 .tq_name = taskq_sys_name, 83 .tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH, 84 taskq_sys_name, 0), 85 .tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist), 86 87 .tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads), 88 .tq_barriers = 0, 89 .tq_bgen = 0, 90 .tq_bthreads = 0, 91 92 #ifdef WITNESS 93 .tq_lock_object = { 94 .lo_name = taskq_sys_name, 95 .lo_flags = TASKQ_LOCK_FLAGS, 96 }, 97 #endif 98 }; 99 100 static const char taskq_sys_mp_name[] = "systqmp"; 101 102 struct taskq taskq_sys_mp = { 103 .tq_state = TQ_S_CREATED, 104 .tq_running = 0, 105 .tq_nthreads = 1, 106 .tq_flags = TASKQ_MPSAFE, 107 .tq_name = taskq_sys_mp_name, 108 .tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH, 109 taskq_sys_mp_name, 0), 110 .tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist), 111 112 .tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads), 113 .tq_barriers = 0, 114 .tq_bgen = 0, 115 .tq_bthreads = 0, 116 117 #ifdef WITNESS 118 .tq_lock_object = { 119 .lo_name = taskq_sys_mp_name, 120 .lo_flags = TASKQ_LOCK_FLAGS, 121 }, 122 #endif 123 }; 124 125 struct taskq *const systq = &taskq_sys; 126 struct taskq *const systqmp = &taskq_sys_mp; 127 128 void taskq_init(void); /* called in init_main.c */ 129 void taskq_create_thread(void *); 130 void taskq_barrier_task(void *); 131 int taskq_next_work(struct taskq *, struct task *); 132 void taskq_thread(void *); 133 134 void 135 taskq_init(void) 136 { 137 WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type); 138 kthread_create_deferred(taskq_create_thread, systq); 139 WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type); 140 kthread_create_deferred(taskq_create_thread, systqmp); 141 } 142 143 struct taskq * 144 taskq_create(const char *name, unsigned int nthreads, int ipl, 145 unsigned int flags) 146 { 147 struct taskq *tq; 148 149 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 150 if (tq == NULL) 151 return (NULL); 152 153 tq->tq_state = TQ_S_CREATED; 154 tq->tq_running = 0; 155 tq->tq_nthreads = nthreads; 156 tq->tq_name = name; 157 tq->tq_flags = flags; 158 159 mtx_init_flags(&tq->tq_mtx, ipl, name, 0); 160 TAILQ_INIT(&tq->tq_worklist); 161 162 SLIST_INIT(&tq->tq_threads); 163 tq->tq_barriers = 0; 164 tq->tq_bgen = 0; 165 tq->tq_bthreads = 0; 166 167 #ifdef WITNESS 168 memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object)); 169 tq->tq_lock_object.lo_name = name; 170 tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS; 171 witness_init(&tq->tq_lock_object, &taskq_lock_type); 172 #endif 173 174 /* try to create a thread to guarantee that tasks will be serviced */ 175 kthread_create_deferred(taskq_create_thread, tq); 176 177 return (tq); 178 } 179 180 void 181 taskq_destroy(struct taskq *tq) 182 { 183 mtx_enter(&tq->tq_mtx); 184 switch (tq->tq_state) { 185 case TQ_S_CREATED: 186 /* tq is still referenced by taskq_create_thread */ 187 tq->tq_state = TQ_S_DESTROYED; 188 mtx_leave(&tq->tq_mtx); 189 return; 190 191 case TQ_S_RUNNING: 192 tq->tq_state = TQ_S_DESTROYED; 193 break; 194 195 default: 196 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 197 } 198 199 while (tq->tq_running > 0) { 200 wakeup(tq); 201 msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 202 INFSLP); 203 } 204 mtx_leave(&tq->tq_mtx); 205 206 free(tq, M_DEVBUF, sizeof(*tq)); 207 } 208 209 void 210 taskq_create_thread(void *arg) 211 { 212 struct taskq *tq = arg; 213 int rv; 214 215 mtx_enter(&tq->tq_mtx); 216 217 switch (tq->tq_state) { 218 case TQ_S_DESTROYED: 219 mtx_leave(&tq->tq_mtx); 220 free(tq, M_DEVBUF, sizeof(*tq)); 221 return; 222 223 case TQ_S_CREATED: 224 tq->tq_state = TQ_S_RUNNING; 225 break; 226 227 default: 228 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 229 } 230 231 do { 232 tq->tq_running++; 233 mtx_leave(&tq->tq_mtx); 234 235 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 236 237 mtx_enter(&tq->tq_mtx); 238 if (rv != 0) { 239 printf("unable to create thread for \"%s\" taskq\n", 240 tq->tq_name); 241 242 tq->tq_running--; 243 /* could have been destroyed during kthread_create */ 244 if (tq->tq_state == TQ_S_DESTROYED && 245 tq->tq_running == 0) 246 wakeup_one(&tq->tq_running); 247 break; 248 } 249 } while (tq->tq_running < tq->tq_nthreads); 250 251 mtx_leave(&tq->tq_mtx); 252 } 253 254 void 255 taskq_barrier_task(void *p) 256 { 257 struct taskq *tq = p; 258 unsigned int gen; 259 260 mtx_enter(&tq->tq_mtx); 261 tq->tq_bthreads++; 262 wakeup(&tq->tq_bthreads); 263 264 gen = tq->tq_bgen; 265 do { 266 msleep_nsec(&tq->tq_bgen, &tq->tq_mtx, 267 PWAIT, "tqbarend", INFSLP); 268 } while (gen == tq->tq_bgen); 269 mtx_leave(&tq->tq_mtx); 270 } 271 272 static void 273 taskq_do_barrier(struct taskq *tq) 274 { 275 struct task t = TASK_INITIALIZER(taskq_barrier_task, tq); 276 struct proc *thread = curproc; 277 struct taskq_thread *tt; 278 279 mtx_enter(&tq->tq_mtx); 280 tq->tq_barriers++; 281 282 /* is the barrier being run from a task inside the taskq? */ 283 SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) { 284 if (tt->tt_thread == thread) { 285 tq->tq_bthreads++; 286 wakeup(&tq->tq_bthreads); 287 break; 288 } 289 } 290 291 while (tq->tq_bthreads < tq->tq_nthreads) { 292 /* shove the task into the queue for a worker to pick up */ 293 SET(t.t_flags, TASK_ONQUEUE); 294 TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry); 295 wakeup_one(tq); 296 297 msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx, 298 PWAIT, "tqbar", INFSLP); 299 300 /* 301 * another thread running a barrier might have 302 * done this work for us. 303 */ 304 if (ISSET(t.t_flags, TASK_ONQUEUE)) 305 TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry); 306 } 307 308 if (--tq->tq_barriers == 0) { 309 /* we're the last one out */ 310 tq->tq_bgen++; 311 wakeup(&tq->tq_bgen); 312 tq->tq_bthreads = 0; 313 } else { 314 unsigned int gen = tq->tq_bgen; 315 do { 316 msleep_nsec(&tq->tq_bgen, &tq->tq_mtx, 317 PWAIT, "tqbarwait", INFSLP); 318 } while (gen == tq->tq_bgen); 319 } 320 mtx_leave(&tq->tq_mtx); 321 } 322 323 void 324 taskq_barrier(struct taskq *tq) 325 { 326 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 327 328 taskq_do_barrier(tq); 329 } 330 331 void 332 taskq_del_barrier(struct taskq *tq, struct task *t) 333 { 334 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 335 336 task_del(tq, t); 337 taskq_do_barrier(tq); 338 } 339 340 void 341 task_set(struct task *t, void (*fn)(void *), void *arg) 342 { 343 t->t_func = fn; 344 t->t_arg = arg; 345 t->t_flags = 0; 346 } 347 348 int 349 task_add(struct taskq *tq, struct task *w) 350 { 351 int rv = 0; 352 353 if (ISSET(w->t_flags, TASK_ONQUEUE)) 354 return (0); 355 356 mtx_enter(&tq->tq_mtx); 357 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 358 rv = 1; 359 SET(w->t_flags, TASK_ONQUEUE); 360 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 361 #if NKCOV > 0 362 if (!kcov_cold) 363 w->t_process = curproc->p_p; 364 #endif 365 } 366 mtx_leave(&tq->tq_mtx); 367 368 if (rv) 369 wakeup_one(tq); 370 371 return (rv); 372 } 373 374 int 375 task_del(struct taskq *tq, struct task *w) 376 { 377 int rv = 0; 378 379 if (!ISSET(w->t_flags, TASK_ONQUEUE)) 380 return (0); 381 382 mtx_enter(&tq->tq_mtx); 383 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 384 rv = 1; 385 CLR(w->t_flags, TASK_ONQUEUE); 386 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 387 } 388 mtx_leave(&tq->tq_mtx); 389 390 return (rv); 391 } 392 393 int 394 taskq_next_work(struct taskq *tq, struct task *work) 395 { 396 struct task *next; 397 398 mtx_enter(&tq->tq_mtx); 399 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 400 if (tq->tq_state != TQ_S_RUNNING) { 401 mtx_leave(&tq->tq_mtx); 402 return (0); 403 } 404 405 msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP); 406 } 407 408 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 409 CLR(next->t_flags, TASK_ONQUEUE); 410 411 *work = *next; /* copy to caller to avoid races */ 412 413 next = TAILQ_FIRST(&tq->tq_worklist); 414 mtx_leave(&tq->tq_mtx); 415 416 if (next != NULL && tq->tq_nthreads > 1) 417 wakeup_one(tq); 418 419 return (1); 420 } 421 422 void 423 taskq_thread(void *xtq) 424 { 425 struct taskq_thread self = { .tt_thread = curproc }; 426 struct taskq *tq = xtq; 427 struct task work; 428 int last; 429 430 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 431 KERNEL_UNLOCK(); 432 433 mtx_enter(&tq->tq_mtx); 434 SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry); 435 mtx_leave(&tq->tq_mtx); 436 437 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 438 439 while (taskq_next_work(tq, &work)) { 440 WITNESS_LOCK(&tq->tq_lock_object, 0); 441 #if NKCOV > 0 442 kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process); 443 #endif 444 (*work.t_func)(work.t_arg); 445 #if NKCOV > 0 446 kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process); 447 #endif 448 WITNESS_UNLOCK(&tq->tq_lock_object, 0); 449 sched_pause(yield); 450 } 451 452 mtx_enter(&tq->tq_mtx); 453 SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry); 454 last = (--tq->tq_running == 0); 455 mtx_leave(&tq->tq_mtx); 456 457 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 458 KERNEL_LOCK(); 459 460 if (last) 461 wakeup_one(&tq->tq_running); 462 463 kthread_exit(0); 464 } 465