1 /* $OpenBSD: kern_task.c,v 1.26 2019/06/23 12:56:10 kettenis Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 #include <sys/proc.h> 26 #include <sys/witness.h> 27 28 #ifdef WITNESS 29 30 static struct lock_type taskq_lock_type = { 31 .lt_name = "taskq" 32 }; 33 34 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \ 35 (LO_CLASS_RWLOCK << LO_CLASSSHIFT) 36 37 #endif /* WITNESS */ 38 39 struct taskq { 40 enum { 41 TQ_S_CREATED, 42 TQ_S_RUNNING, 43 TQ_S_DESTROYED 44 } tq_state; 45 unsigned int tq_running; 46 unsigned int tq_waiting; 47 unsigned int tq_nthreads; 48 unsigned int tq_flags; 49 const char *tq_name; 50 51 struct mutex tq_mtx; 52 struct task_list tq_worklist; 53 #ifdef WITNESS 54 struct lock_object tq_lock_object; 55 #endif 56 }; 57 58 static const char taskq_sys_name[] = "systq"; 59 60 struct taskq taskq_sys = { 61 TQ_S_CREATED, 62 0, 63 0, 64 1, 65 0, 66 taskq_sys_name, 67 MUTEX_INITIALIZER(IPL_HIGH), 68 TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist), 69 #ifdef WITNESS 70 { 71 .lo_name = taskq_sys_name, 72 .lo_flags = TASKQ_LOCK_FLAGS, 73 }, 74 #endif 75 }; 76 77 static const char taskq_sys_mp_name[] = "systqmp"; 78 79 struct taskq taskq_sys_mp = { 80 TQ_S_CREATED, 81 0, 82 0, 83 1, 84 TASKQ_MPSAFE, 85 taskq_sys_mp_name, 86 MUTEX_INITIALIZER(IPL_HIGH), 87 TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist), 88 #ifdef WITNESS 89 { 90 .lo_name = taskq_sys_mp_name, 91 .lo_flags = TASKQ_LOCK_FLAGS, 92 }, 93 #endif 94 }; 95 96 struct taskq *const systq = &taskq_sys; 97 struct taskq *const systqmp = &taskq_sys_mp; 98 99 void taskq_init(void); /* called in init_main.c */ 100 void taskq_create_thread(void *); 101 void taskq_barrier_task(void *); 102 int taskq_sleep(const volatile void *, struct mutex *, int, 103 const char *, int); 104 int taskq_next_work(struct taskq *, struct task *); 105 void taskq_thread(void *); 106 107 void 108 taskq_init(void) 109 { 110 WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type); 111 kthread_create_deferred(taskq_create_thread, systq); 112 WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type); 113 kthread_create_deferred(taskq_create_thread, systqmp); 114 } 115 116 struct taskq * 117 taskq_create(const char *name, unsigned int nthreads, int ipl, 118 unsigned int flags) 119 { 120 struct taskq *tq; 121 122 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 123 if (tq == NULL) 124 return (NULL); 125 126 tq->tq_state = TQ_S_CREATED; 127 tq->tq_running = 0; 128 tq->tq_waiting = 0; 129 tq->tq_nthreads = nthreads; 130 tq->tq_name = name; 131 tq->tq_flags = flags; 132 133 mtx_init_flags(&tq->tq_mtx, ipl, name, 0); 134 TAILQ_INIT(&tq->tq_worklist); 135 136 #ifdef WITNESS 137 memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object)); 138 tq->tq_lock_object.lo_name = name; 139 tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS; 140 witness_init(&tq->tq_lock_object, &taskq_lock_type); 141 #endif 142 143 /* try to create a thread to guarantee that tasks will be serviced */ 144 kthread_create_deferred(taskq_create_thread, tq); 145 146 return (tq); 147 } 148 149 void 150 taskq_destroy(struct taskq *tq) 151 { 152 mtx_enter(&tq->tq_mtx); 153 switch (tq->tq_state) { 154 case TQ_S_CREATED: 155 /* tq is still referenced by taskq_create_thread */ 156 tq->tq_state = TQ_S_DESTROYED; 157 mtx_leave(&tq->tq_mtx); 158 return; 159 160 case TQ_S_RUNNING: 161 tq->tq_state = TQ_S_DESTROYED; 162 break; 163 164 default: 165 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 166 } 167 168 while (tq->tq_running > 0) { 169 wakeup(tq); 170 msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0); 171 } 172 mtx_leave(&tq->tq_mtx); 173 174 free(tq, M_DEVBUF, sizeof(*tq)); 175 } 176 177 void 178 taskq_create_thread(void *arg) 179 { 180 struct taskq *tq = arg; 181 int rv; 182 183 mtx_enter(&tq->tq_mtx); 184 185 switch (tq->tq_state) { 186 case TQ_S_DESTROYED: 187 mtx_leave(&tq->tq_mtx); 188 free(tq, M_DEVBUF, sizeof(*tq)); 189 return; 190 191 case TQ_S_CREATED: 192 tq->tq_state = TQ_S_RUNNING; 193 break; 194 195 default: 196 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 197 } 198 199 do { 200 tq->tq_running++; 201 mtx_leave(&tq->tq_mtx); 202 203 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 204 205 mtx_enter(&tq->tq_mtx); 206 if (rv != 0) { 207 printf("unable to create thread for \"%s\" taskq\n", 208 tq->tq_name); 209 210 tq->tq_running--; 211 /* could have been destroyed during kthread_create */ 212 if (tq->tq_state == TQ_S_DESTROYED && 213 tq->tq_running == 0) 214 wakeup_one(&tq->tq_running); 215 break; 216 } 217 } while (tq->tq_running < tq->tq_nthreads); 218 219 mtx_leave(&tq->tq_mtx); 220 } 221 222 void 223 taskq_barrier(struct taskq *tq) 224 { 225 struct cond c = COND_INITIALIZER(); 226 struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); 227 228 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 229 230 SET(t.t_flags, TASK_BARRIER); 231 task_add(tq, &t); 232 cond_wait(&c, "tqbar"); 233 } 234 235 void 236 taskq_del_barrier(struct taskq *tq, struct task *del) 237 { 238 struct cond c = COND_INITIALIZER(); 239 struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); 240 241 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 242 243 if (task_del(tq, del)) 244 return; 245 246 SET(t.t_flags, TASK_BARRIER); 247 task_add(tq, &t); 248 cond_wait(&c, "tqbar"); 249 } 250 251 void 252 taskq_barrier_task(void *p) 253 { 254 struct cond *c = p; 255 cond_signal(c); 256 } 257 258 void 259 task_set(struct task *t, void (*fn)(void *), void *arg) 260 { 261 t->t_func = fn; 262 t->t_arg = arg; 263 t->t_flags = 0; 264 } 265 266 int 267 task_add(struct taskq *tq, struct task *w) 268 { 269 int rv = 0; 270 271 if (ISSET(w->t_flags, TASK_ONQUEUE)) 272 return (0); 273 274 mtx_enter(&tq->tq_mtx); 275 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 276 rv = 1; 277 SET(w->t_flags, TASK_ONQUEUE); 278 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 279 } 280 mtx_leave(&tq->tq_mtx); 281 282 if (rv) 283 wakeup_one(tq); 284 285 return (rv); 286 } 287 288 int 289 task_del(struct taskq *tq, struct task *w) 290 { 291 int rv = 0; 292 293 if (!ISSET(w->t_flags, TASK_ONQUEUE)) 294 return (0); 295 296 mtx_enter(&tq->tq_mtx); 297 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 298 rv = 1; 299 CLR(w->t_flags, TASK_ONQUEUE); 300 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 301 } 302 mtx_leave(&tq->tq_mtx); 303 304 return (rv); 305 } 306 307 int 308 taskq_next_work(struct taskq *tq, struct task *work) 309 { 310 struct task *next; 311 312 mtx_enter(&tq->tq_mtx); 313 retry: 314 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 315 if (tq->tq_state != TQ_S_RUNNING) { 316 mtx_leave(&tq->tq_mtx); 317 return (0); 318 } 319 320 tq->tq_waiting++; 321 msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0); 322 tq->tq_waiting--; 323 } 324 325 if (ISSET(next->t_flags, TASK_BARRIER)) { 326 /* 327 * Make sure all other threads are sleeping before we 328 * proceed and run the barrier task. 329 */ 330 if (++tq->tq_waiting == tq->tq_nthreads) { 331 tq->tq_waiting--; 332 } else { 333 msleep(tq, &tq->tq_mtx, PWAIT, "tqblk", 0); 334 tq->tq_waiting--; 335 goto retry; 336 } 337 } 338 339 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 340 CLR(next->t_flags, TASK_ONQUEUE); 341 342 *work = *next; /* copy to caller to avoid races */ 343 344 next = TAILQ_FIRST(&tq->tq_worklist); 345 mtx_leave(&tq->tq_mtx); 346 347 if (next != NULL && tq->tq_nthreads > 1) 348 wakeup_one(tq); 349 350 return (1); 351 } 352 353 void 354 taskq_thread(void *xtq) 355 { 356 struct taskq *tq = xtq; 357 struct task work; 358 int last; 359 360 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 361 KERNEL_UNLOCK(); 362 363 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 364 365 while (taskq_next_work(tq, &work)) { 366 WITNESS_LOCK(&tq->tq_lock_object, 0); 367 (*work.t_func)(work.t_arg); 368 WITNESS_UNLOCK(&tq->tq_lock_object, 0); 369 sched_pause(yield); 370 } 371 372 mtx_enter(&tq->tq_mtx); 373 last = (--tq->tq_running == 0); 374 mtx_leave(&tq->tq_mtx); 375 376 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 377 KERNEL_LOCK(); 378 379 if (last) 380 wakeup_one(&tq->tq_running); 381 382 kthread_exit(0); 383 } 384