1 /* $OpenBSD: kern_task.c,v 1.22 2017/12/14 00:45:16 dlg Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 #include <sys/proc.h> 26 27 #define TASK_ONQUEUE 1 28 29 struct taskq { 30 enum { 31 TQ_S_CREATED, 32 TQ_S_RUNNING, 33 TQ_S_DESTROYED 34 } tq_state; 35 unsigned int tq_running; 36 unsigned int tq_nthreads; 37 unsigned int tq_flags; 38 const char *tq_name; 39 40 struct mutex tq_mtx; 41 struct task_list tq_worklist; 42 }; 43 44 struct taskq taskq_sys = { 45 TQ_S_CREATED, 46 0, 47 1, 48 0, 49 "systq", 50 MUTEX_INITIALIZER(IPL_HIGH), 51 TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist) 52 }; 53 54 struct taskq taskq_sys_mp = { 55 TQ_S_CREATED, 56 0, 57 1, 58 TASKQ_MPSAFE, 59 "systqmp", 60 MUTEX_INITIALIZER(IPL_HIGH), 61 TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist) 62 }; 63 64 typedef int (*sleepfn)(const volatile void *, struct mutex *, int, 65 const char *, int); 66 67 struct taskq *const systq = &taskq_sys; 68 struct taskq *const systqmp = &taskq_sys_mp; 69 70 void taskq_init(void); /* called in init_main.c */ 71 void taskq_create_thread(void *); 72 void taskq_barrier_task(void *); 73 int taskq_sleep(const volatile void *, struct mutex *, int, 74 const char *, int); 75 int taskq_next_work(struct taskq *, struct task *, sleepfn); 76 void taskq_thread(void *); 77 78 void 79 taskq_init(void) 80 { 81 kthread_create_deferred(taskq_create_thread, systq); 82 kthread_create_deferred(taskq_create_thread, systqmp); 83 } 84 85 struct taskq * 86 taskq_create(const char *name, unsigned int nthreads, int ipl, 87 unsigned int flags) 88 { 89 struct taskq *tq; 90 91 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 92 if (tq == NULL) 93 return (NULL); 94 95 tq->tq_state = TQ_S_CREATED; 96 tq->tq_running = 0; 97 tq->tq_nthreads = nthreads; 98 tq->tq_name = name; 99 tq->tq_flags = flags; 100 101 mtx_init_flags(&tq->tq_mtx, ipl, name, 0); 102 TAILQ_INIT(&tq->tq_worklist); 103 104 /* try to create a thread to guarantee that tasks will be serviced */ 105 kthread_create_deferred(taskq_create_thread, tq); 106 107 return (tq); 108 } 109 110 void 111 taskq_destroy(struct taskq *tq) 112 { 113 mtx_enter(&tq->tq_mtx); 114 switch (tq->tq_state) { 115 case TQ_S_CREATED: 116 /* tq is still referenced by taskq_create_thread */ 117 tq->tq_state = TQ_S_DESTROYED; 118 mtx_leave(&tq->tq_mtx); 119 return; 120 121 case TQ_S_RUNNING: 122 tq->tq_state = TQ_S_DESTROYED; 123 break; 124 125 default: 126 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 127 } 128 129 while (tq->tq_running > 0) { 130 wakeup(tq); 131 msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0); 132 } 133 mtx_leave(&tq->tq_mtx); 134 135 free(tq, M_DEVBUF, sizeof(*tq)); 136 } 137 138 void 139 taskq_create_thread(void *arg) 140 { 141 struct taskq *tq = arg; 142 int rv; 143 144 mtx_enter(&tq->tq_mtx); 145 146 switch (tq->tq_state) { 147 case TQ_S_DESTROYED: 148 mtx_leave(&tq->tq_mtx); 149 free(tq, M_DEVBUF, sizeof(*tq)); 150 return; 151 152 case TQ_S_CREATED: 153 tq->tq_state = TQ_S_RUNNING; 154 break; 155 156 default: 157 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 158 } 159 160 do { 161 tq->tq_running++; 162 mtx_leave(&tq->tq_mtx); 163 164 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 165 166 mtx_enter(&tq->tq_mtx); 167 if (rv != 0) { 168 printf("unable to create thread for \"%s\" taskq\n", 169 tq->tq_name); 170 171 tq->tq_running--; 172 /* could have been destroyed during kthread_create */ 173 if (tq->tq_state == TQ_S_DESTROYED && 174 tq->tq_running == 0) 175 wakeup_one(&tq->tq_running); 176 break; 177 } 178 } while (tq->tq_running < tq->tq_nthreads); 179 180 mtx_leave(&tq->tq_mtx); 181 } 182 183 void 184 taskq_barrier(struct taskq *tq) 185 { 186 struct cond c = COND_INITIALIZER(); 187 struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); 188 189 task_add(tq, &t); 190 191 cond_wait(&c, "tqbar"); 192 } 193 194 void 195 taskq_barrier_task(void *p) 196 { 197 struct cond *c = p; 198 cond_signal(c); 199 } 200 201 void 202 task_set(struct task *t, void (*fn)(void *), void *arg) 203 { 204 t->t_func = fn; 205 t->t_arg = arg; 206 t->t_flags = 0; 207 } 208 209 int 210 task_add(struct taskq *tq, struct task *w) 211 { 212 int rv = 0; 213 214 if (ISSET(w->t_flags, TASK_ONQUEUE)) 215 return (0); 216 217 mtx_enter(&tq->tq_mtx); 218 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 219 rv = 1; 220 SET(w->t_flags, TASK_ONQUEUE); 221 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 222 } 223 mtx_leave(&tq->tq_mtx); 224 225 if (rv) 226 wakeup_one(tq); 227 228 return (rv); 229 } 230 231 int 232 task_del(struct taskq *tq, struct task *w) 233 { 234 int rv = 0; 235 236 if (!ISSET(w->t_flags, TASK_ONQUEUE)) 237 return (0); 238 239 mtx_enter(&tq->tq_mtx); 240 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 241 rv = 1; 242 CLR(w->t_flags, TASK_ONQUEUE); 243 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 244 } 245 mtx_leave(&tq->tq_mtx); 246 247 return (rv); 248 } 249 250 int 251 taskq_sleep(const volatile void *ident, struct mutex *mtx, int priority, 252 const char *wmesg, int tmo) 253 { 254 u_int *flags = &curproc->p_flag; 255 int rv; 256 257 atomic_clearbits_int(flags, P_CANTSLEEP); 258 rv = msleep(ident, mtx, priority, wmesg, tmo); 259 atomic_setbits_int(flags, P_CANTSLEEP); 260 261 return (tmo); 262 } 263 264 int 265 taskq_next_work(struct taskq *tq, struct task *work, sleepfn tqsleep) 266 { 267 struct task *next; 268 269 mtx_enter(&tq->tq_mtx); 270 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 271 if (tq->tq_state != TQ_S_RUNNING) { 272 mtx_leave(&tq->tq_mtx); 273 return (0); 274 } 275 276 tqsleep(tq, &tq->tq_mtx, PWAIT, "bored", 0); 277 } 278 279 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 280 CLR(next->t_flags, TASK_ONQUEUE); 281 282 *work = *next; /* copy to caller to avoid races */ 283 284 next = TAILQ_FIRST(&tq->tq_worklist); 285 mtx_leave(&tq->tq_mtx); 286 287 if (next != NULL && tq->tq_nthreads > 1) 288 wakeup_one(tq); 289 290 return (1); 291 } 292 293 void 294 taskq_thread(void *xtq) 295 { 296 sleepfn tqsleep = msleep; 297 struct taskq *tq = xtq; 298 struct task work; 299 int last; 300 301 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 302 KERNEL_UNLOCK(); 303 304 if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) { 305 tqsleep = taskq_sleep; 306 atomic_setbits_int(&curproc->p_flag, P_CANTSLEEP); 307 } 308 309 while (taskq_next_work(tq, &work, tqsleep)) { 310 (*work.t_func)(work.t_arg); 311 sched_pause(yield); 312 } 313 314 mtx_enter(&tq->tq_mtx); 315 last = (--tq->tq_running == 0); 316 mtx_leave(&tq->tq_mtx); 317 318 if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) 319 atomic_clearbits_int(&curproc->p_flag, P_CANTSLEEP); 320 321 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 322 KERNEL_LOCK(); 323 324 if (last) 325 wakeup_one(&tq->tq_running); 326 327 kthread_exit(0); 328 } 329