1 /* $OpenBSD: kern_task.c,v 1.25 2019/04/28 04:20:40 dlg Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 #include <sys/proc.h> 26 #include <sys/witness.h> 27 28 #ifdef WITNESS 29 30 static struct lock_type taskq_lock_type = { 31 .lt_name = "taskq" 32 }; 33 34 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \ 35 (LO_CLASS_RWLOCK << LO_CLASSSHIFT) 36 37 #endif /* WITNESS */ 38 39 struct taskq { 40 enum { 41 TQ_S_CREATED, 42 TQ_S_RUNNING, 43 TQ_S_DESTROYED 44 } tq_state; 45 unsigned int tq_running; 46 unsigned int tq_nthreads; 47 unsigned int tq_flags; 48 const char *tq_name; 49 50 struct mutex tq_mtx; 51 struct task_list tq_worklist; 52 #ifdef WITNESS 53 struct lock_object tq_lock_object; 54 #endif 55 }; 56 57 static const char taskq_sys_name[] = "systq"; 58 59 struct taskq taskq_sys = { 60 TQ_S_CREATED, 61 0, 62 1, 63 0, 64 taskq_sys_name, 65 MUTEX_INITIALIZER(IPL_HIGH), 66 TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist), 67 #ifdef WITNESS 68 { 69 .lo_name = taskq_sys_name, 70 .lo_flags = TASKQ_LOCK_FLAGS, 71 }, 72 #endif 73 }; 74 75 static const char taskq_sys_mp_name[] = "systqmp"; 76 77 struct taskq taskq_sys_mp = { 78 TQ_S_CREATED, 79 0, 80 1, 81 TASKQ_MPSAFE, 82 taskq_sys_mp_name, 83 MUTEX_INITIALIZER(IPL_HIGH), 84 TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist), 85 #ifdef WITNESS 86 { 87 .lo_name = taskq_sys_mp_name, 88 .lo_flags = TASKQ_LOCK_FLAGS, 89 }, 90 #endif 91 }; 92 93 struct taskq *const systq = &taskq_sys; 94 struct taskq *const systqmp = &taskq_sys_mp; 95 96 void taskq_init(void); /* called in init_main.c */ 97 void taskq_create_thread(void *); 98 void taskq_barrier_task(void *); 99 int taskq_sleep(const volatile void *, struct mutex *, int, 100 const char *, int); 101 int taskq_next_work(struct taskq *, struct task *); 102 void taskq_thread(void *); 103 104 void 105 taskq_init(void) 106 { 107 WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type); 108 kthread_create_deferred(taskq_create_thread, systq); 109 WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type); 110 kthread_create_deferred(taskq_create_thread, systqmp); 111 } 112 113 struct taskq * 114 taskq_create(const char *name, unsigned int nthreads, int ipl, 115 unsigned int flags) 116 { 117 struct taskq *tq; 118 119 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 120 if (tq == NULL) 121 return (NULL); 122 123 tq->tq_state = TQ_S_CREATED; 124 tq->tq_running = 0; 125 tq->tq_nthreads = nthreads; 126 tq->tq_name = name; 127 tq->tq_flags = flags; 128 129 mtx_init_flags(&tq->tq_mtx, ipl, name, 0); 130 TAILQ_INIT(&tq->tq_worklist); 131 132 #ifdef WITNESS 133 memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object)); 134 tq->tq_lock_object.lo_name = name; 135 tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS; 136 witness_init(&tq->tq_lock_object, &taskq_lock_type); 137 #endif 138 139 /* try to create a thread to guarantee that tasks will be serviced */ 140 kthread_create_deferred(taskq_create_thread, tq); 141 142 return (tq); 143 } 144 145 void 146 taskq_destroy(struct taskq *tq) 147 { 148 mtx_enter(&tq->tq_mtx); 149 switch (tq->tq_state) { 150 case TQ_S_CREATED: 151 /* tq is still referenced by taskq_create_thread */ 152 tq->tq_state = TQ_S_DESTROYED; 153 mtx_leave(&tq->tq_mtx); 154 return; 155 156 case TQ_S_RUNNING: 157 tq->tq_state = TQ_S_DESTROYED; 158 break; 159 160 default: 161 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 162 } 163 164 while (tq->tq_running > 0) { 165 wakeup(tq); 166 msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0); 167 } 168 mtx_leave(&tq->tq_mtx); 169 170 free(tq, M_DEVBUF, sizeof(*tq)); 171 } 172 173 void 174 taskq_create_thread(void *arg) 175 { 176 struct taskq *tq = arg; 177 int rv; 178 179 mtx_enter(&tq->tq_mtx); 180 181 switch (tq->tq_state) { 182 case TQ_S_DESTROYED: 183 mtx_leave(&tq->tq_mtx); 184 free(tq, M_DEVBUF, sizeof(*tq)); 185 return; 186 187 case TQ_S_CREATED: 188 tq->tq_state = TQ_S_RUNNING; 189 break; 190 191 default: 192 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 193 } 194 195 do { 196 tq->tq_running++; 197 mtx_leave(&tq->tq_mtx); 198 199 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 200 201 mtx_enter(&tq->tq_mtx); 202 if (rv != 0) { 203 printf("unable to create thread for \"%s\" taskq\n", 204 tq->tq_name); 205 206 tq->tq_running--; 207 /* could have been destroyed during kthread_create */ 208 if (tq->tq_state == TQ_S_DESTROYED && 209 tq->tq_running == 0) 210 wakeup_one(&tq->tq_running); 211 break; 212 } 213 } while (tq->tq_running < tq->tq_nthreads); 214 215 mtx_leave(&tq->tq_mtx); 216 } 217 218 void 219 taskq_barrier(struct taskq *tq) 220 { 221 struct cond c = COND_INITIALIZER(); 222 struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); 223 224 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 225 226 task_add(tq, &t); 227 cond_wait(&c, "tqbar"); 228 } 229 230 void 231 taskq_del_barrier(struct taskq *tq, struct task *del) 232 { 233 struct cond c = COND_INITIALIZER(); 234 struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); 235 236 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 237 238 if (task_del(tq, del)) 239 return; 240 241 task_add(tq, &t); 242 cond_wait(&c, "tqbar"); 243 } 244 245 void 246 taskq_barrier_task(void *p) 247 { 248 struct cond *c = p; 249 cond_signal(c); 250 } 251 252 void 253 task_set(struct task *t, void (*fn)(void *), void *arg) 254 { 255 t->t_func = fn; 256 t->t_arg = arg; 257 t->t_flags = 0; 258 } 259 260 int 261 task_add(struct taskq *tq, struct task *w) 262 { 263 int rv = 0; 264 265 if (ISSET(w->t_flags, TASK_ONQUEUE)) 266 return (0); 267 268 mtx_enter(&tq->tq_mtx); 269 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 270 rv = 1; 271 SET(w->t_flags, TASK_ONQUEUE); 272 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 273 } 274 mtx_leave(&tq->tq_mtx); 275 276 if (rv) 277 wakeup_one(tq); 278 279 return (rv); 280 } 281 282 int 283 task_del(struct taskq *tq, struct task *w) 284 { 285 int rv = 0; 286 287 if (!ISSET(w->t_flags, TASK_ONQUEUE)) 288 return (0); 289 290 mtx_enter(&tq->tq_mtx); 291 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 292 rv = 1; 293 CLR(w->t_flags, TASK_ONQUEUE); 294 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 295 } 296 mtx_leave(&tq->tq_mtx); 297 298 return (rv); 299 } 300 301 int 302 taskq_next_work(struct taskq *tq, struct task *work) 303 { 304 struct task *next; 305 306 mtx_enter(&tq->tq_mtx); 307 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 308 if (tq->tq_state != TQ_S_RUNNING) { 309 mtx_leave(&tq->tq_mtx); 310 return (0); 311 } 312 313 msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0); 314 } 315 316 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 317 CLR(next->t_flags, TASK_ONQUEUE); 318 319 *work = *next; /* copy to caller to avoid races */ 320 321 next = TAILQ_FIRST(&tq->tq_worklist); 322 mtx_leave(&tq->tq_mtx); 323 324 if (next != NULL && tq->tq_nthreads > 1) 325 wakeup_one(tq); 326 327 return (1); 328 } 329 330 void 331 taskq_thread(void *xtq) 332 { 333 struct taskq *tq = xtq; 334 struct task work; 335 int last; 336 337 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 338 KERNEL_UNLOCK(); 339 340 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 341 342 while (taskq_next_work(tq, &work)) { 343 WITNESS_LOCK(&tq->tq_lock_object, 0); 344 (*work.t_func)(work.t_arg); 345 WITNESS_UNLOCK(&tq->tq_lock_object, 0); 346 sched_pause(yield); 347 } 348 349 mtx_enter(&tq->tq_mtx); 350 last = (--tq->tq_running == 0); 351 mtx_leave(&tq->tq_mtx); 352 353 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 354 KERNEL_LOCK(); 355 356 if (last) 357 wakeup_one(&tq->tq_running); 358 359 kthread_exit(0); 360 } 361