1 /* $OpenBSD: kern_task.c,v 1.27 2019/12/19 17:40:11 mpi Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 #include <sys/proc.h> 26 #include <sys/witness.h> 27 28 #ifdef WITNESS 29 30 static struct lock_type taskq_lock_type = { 31 .lt_name = "taskq" 32 }; 33 34 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \ 35 (LO_CLASS_RWLOCK << LO_CLASSSHIFT) 36 37 #endif /* WITNESS */ 38 39 struct taskq { 40 enum { 41 TQ_S_CREATED, 42 TQ_S_RUNNING, 43 TQ_S_DESTROYED 44 } tq_state; 45 unsigned int tq_running; 46 unsigned int tq_waiting; 47 unsigned int tq_nthreads; 48 unsigned int tq_flags; 49 const char *tq_name; 50 51 struct mutex tq_mtx; 52 struct task_list tq_worklist; 53 #ifdef WITNESS 54 struct lock_object tq_lock_object; 55 #endif 56 }; 57 58 static const char taskq_sys_name[] = "systq"; 59 60 struct taskq taskq_sys = { 61 TQ_S_CREATED, 62 0, 63 0, 64 1, 65 0, 66 taskq_sys_name, 67 MUTEX_INITIALIZER(IPL_HIGH), 68 TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist), 69 #ifdef WITNESS 70 { 71 .lo_name = taskq_sys_name, 72 .lo_flags = TASKQ_LOCK_FLAGS, 73 }, 74 #endif 75 }; 76 77 static const char taskq_sys_mp_name[] = "systqmp"; 78 79 struct taskq taskq_sys_mp = { 80 TQ_S_CREATED, 81 0, 82 0, 83 1, 84 TASKQ_MPSAFE, 85 taskq_sys_mp_name, 86 MUTEX_INITIALIZER(IPL_HIGH), 87 TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist), 88 #ifdef WITNESS 89 { 90 .lo_name = taskq_sys_mp_name, 91 .lo_flags = TASKQ_LOCK_FLAGS, 92 }, 93 #endif 94 }; 95 96 struct taskq *const systq = &taskq_sys; 97 struct taskq *const systqmp = &taskq_sys_mp; 98 99 void taskq_init(void); /* called in init_main.c */ 100 void taskq_create_thread(void *); 101 void taskq_barrier_task(void *); 102 int taskq_sleep(const volatile void *, struct mutex *, int, 103 const char *, int); 104 int taskq_next_work(struct taskq *, struct task *); 105 void taskq_thread(void *); 106 107 void 108 taskq_init(void) 109 { 110 WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type); 111 kthread_create_deferred(taskq_create_thread, systq); 112 WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type); 113 kthread_create_deferred(taskq_create_thread, systqmp); 114 } 115 116 struct taskq * 117 taskq_create(const char *name, unsigned int nthreads, int ipl, 118 unsigned int flags) 119 { 120 struct taskq *tq; 121 122 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 123 if (tq == NULL) 124 return (NULL); 125 126 tq->tq_state = TQ_S_CREATED; 127 tq->tq_running = 0; 128 tq->tq_waiting = 0; 129 tq->tq_nthreads = nthreads; 130 tq->tq_name = name; 131 tq->tq_flags = flags; 132 133 mtx_init_flags(&tq->tq_mtx, ipl, name, 0); 134 TAILQ_INIT(&tq->tq_worklist); 135 136 #ifdef WITNESS 137 memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object)); 138 tq->tq_lock_object.lo_name = name; 139 tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS; 140 witness_init(&tq->tq_lock_object, &taskq_lock_type); 141 #endif 142 143 /* try to create a thread to guarantee that tasks will be serviced */ 144 kthread_create_deferred(taskq_create_thread, tq); 145 146 return (tq); 147 } 148 149 void 150 taskq_destroy(struct taskq *tq) 151 { 152 mtx_enter(&tq->tq_mtx); 153 switch (tq->tq_state) { 154 case TQ_S_CREATED: 155 /* tq is still referenced by taskq_create_thread */ 156 tq->tq_state = TQ_S_DESTROYED; 157 mtx_leave(&tq->tq_mtx); 158 return; 159 160 case TQ_S_RUNNING: 161 tq->tq_state = TQ_S_DESTROYED; 162 break; 163 164 default: 165 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 166 } 167 168 while (tq->tq_running > 0) { 169 wakeup(tq); 170 msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 171 INFSLP); 172 } 173 mtx_leave(&tq->tq_mtx); 174 175 free(tq, M_DEVBUF, sizeof(*tq)); 176 } 177 178 void 179 taskq_create_thread(void *arg) 180 { 181 struct taskq *tq = arg; 182 int rv; 183 184 mtx_enter(&tq->tq_mtx); 185 186 switch (tq->tq_state) { 187 case TQ_S_DESTROYED: 188 mtx_leave(&tq->tq_mtx); 189 free(tq, M_DEVBUF, sizeof(*tq)); 190 return; 191 192 case TQ_S_CREATED: 193 tq->tq_state = TQ_S_RUNNING; 194 break; 195 196 default: 197 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 198 } 199 200 do { 201 tq->tq_running++; 202 mtx_leave(&tq->tq_mtx); 203 204 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 205 206 mtx_enter(&tq->tq_mtx); 207 if (rv != 0) { 208 printf("unable to create thread for \"%s\" taskq\n", 209 tq->tq_name); 210 211 tq->tq_running--; 212 /* could have been destroyed during kthread_create */ 213 if (tq->tq_state == TQ_S_DESTROYED && 214 tq->tq_running == 0) 215 wakeup_one(&tq->tq_running); 216 break; 217 } 218 } while (tq->tq_running < tq->tq_nthreads); 219 220 mtx_leave(&tq->tq_mtx); 221 } 222 223 void 224 taskq_barrier(struct taskq *tq) 225 { 226 struct cond c = COND_INITIALIZER(); 227 struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); 228 229 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 230 231 SET(t.t_flags, TASK_BARRIER); 232 task_add(tq, &t); 233 cond_wait(&c, "tqbar"); 234 } 235 236 void 237 taskq_del_barrier(struct taskq *tq, struct task *del) 238 { 239 struct cond c = COND_INITIALIZER(); 240 struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); 241 242 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 243 244 if (task_del(tq, del)) 245 return; 246 247 SET(t.t_flags, TASK_BARRIER); 248 task_add(tq, &t); 249 cond_wait(&c, "tqbar"); 250 } 251 252 void 253 taskq_barrier_task(void *p) 254 { 255 struct cond *c = p; 256 cond_signal(c); 257 } 258 259 void 260 task_set(struct task *t, void (*fn)(void *), void *arg) 261 { 262 t->t_func = fn; 263 t->t_arg = arg; 264 t->t_flags = 0; 265 } 266 267 int 268 task_add(struct taskq *tq, struct task *w) 269 { 270 int rv = 0; 271 272 if (ISSET(w->t_flags, TASK_ONQUEUE)) 273 return (0); 274 275 mtx_enter(&tq->tq_mtx); 276 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 277 rv = 1; 278 SET(w->t_flags, TASK_ONQUEUE); 279 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 280 } 281 mtx_leave(&tq->tq_mtx); 282 283 if (rv) 284 wakeup_one(tq); 285 286 return (rv); 287 } 288 289 int 290 task_del(struct taskq *tq, struct task *w) 291 { 292 int rv = 0; 293 294 if (!ISSET(w->t_flags, TASK_ONQUEUE)) 295 return (0); 296 297 mtx_enter(&tq->tq_mtx); 298 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 299 rv = 1; 300 CLR(w->t_flags, TASK_ONQUEUE); 301 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 302 } 303 mtx_leave(&tq->tq_mtx); 304 305 return (rv); 306 } 307 308 int 309 taskq_next_work(struct taskq *tq, struct task *work) 310 { 311 struct task *next; 312 313 mtx_enter(&tq->tq_mtx); 314 retry: 315 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 316 if (tq->tq_state != TQ_S_RUNNING) { 317 mtx_leave(&tq->tq_mtx); 318 return (0); 319 } 320 321 tq->tq_waiting++; 322 msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP); 323 tq->tq_waiting--; 324 } 325 326 if (ISSET(next->t_flags, TASK_BARRIER)) { 327 /* 328 * Make sure all other threads are sleeping before we 329 * proceed and run the barrier task. 330 */ 331 if (++tq->tq_waiting == tq->tq_nthreads) { 332 tq->tq_waiting--; 333 } else { 334 msleep_nsec(tq, &tq->tq_mtx, PWAIT, "tqblk", INFSLP); 335 tq->tq_waiting--; 336 goto retry; 337 } 338 } 339 340 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 341 CLR(next->t_flags, TASK_ONQUEUE); 342 343 *work = *next; /* copy to caller to avoid races */ 344 345 next = TAILQ_FIRST(&tq->tq_worklist); 346 mtx_leave(&tq->tq_mtx); 347 348 if (next != NULL && tq->tq_nthreads > 1) 349 wakeup_one(tq); 350 351 return (1); 352 } 353 354 void 355 taskq_thread(void *xtq) 356 { 357 struct taskq *tq = xtq; 358 struct task work; 359 int last; 360 361 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 362 KERNEL_UNLOCK(); 363 364 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 365 366 while (taskq_next_work(tq, &work)) { 367 WITNESS_LOCK(&tq->tq_lock_object, 0); 368 (*work.t_func)(work.t_arg); 369 WITNESS_UNLOCK(&tq->tq_lock_object, 0); 370 sched_pause(yield); 371 } 372 373 mtx_enter(&tq->tq_mtx); 374 last = (--tq->tq_running == 0); 375 mtx_leave(&tq->tq_mtx); 376 377 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 378 KERNEL_LOCK(); 379 380 if (last) 381 wakeup_one(&tq->tq_running); 382 383 kthread_exit(0); 384 } 385