1 /* $OpenBSD: kern_task.c,v 1.23 2018/12/16 03:36:02 dlg Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 #include <sys/proc.h> 26 27 struct taskq { 28 enum { 29 TQ_S_CREATED, 30 TQ_S_RUNNING, 31 TQ_S_DESTROYED 32 } tq_state; 33 unsigned int tq_running; 34 unsigned int tq_nthreads; 35 unsigned int tq_flags; 36 const char *tq_name; 37 38 struct mutex tq_mtx; 39 struct task_list tq_worklist; 40 }; 41 42 struct taskq taskq_sys = { 43 TQ_S_CREATED, 44 0, 45 1, 46 0, 47 "systq", 48 MUTEX_INITIALIZER(IPL_HIGH), 49 TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist) 50 }; 51 52 struct taskq taskq_sys_mp = { 53 TQ_S_CREATED, 54 0, 55 1, 56 TASKQ_MPSAFE, 57 "systqmp", 58 MUTEX_INITIALIZER(IPL_HIGH), 59 TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist) 60 }; 61 62 typedef int (*sleepfn)(const volatile void *, struct mutex *, int, 63 const char *, int); 64 65 struct taskq *const systq = &taskq_sys; 66 struct taskq *const systqmp = &taskq_sys_mp; 67 68 void taskq_init(void); /* called in init_main.c */ 69 void taskq_create_thread(void *); 70 void taskq_barrier_task(void *); 71 int taskq_sleep(const volatile void *, struct mutex *, int, 72 const char *, int); 73 int taskq_next_work(struct taskq *, struct task *, sleepfn); 74 void taskq_thread(void *); 75 76 void 77 taskq_init(void) 78 { 79 kthread_create_deferred(taskq_create_thread, systq); 80 kthread_create_deferred(taskq_create_thread, systqmp); 81 } 82 83 struct taskq * 84 taskq_create(const char *name, unsigned int nthreads, int ipl, 85 unsigned int flags) 86 { 87 struct taskq *tq; 88 89 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 90 if (tq == NULL) 91 return (NULL); 92 93 tq->tq_state = TQ_S_CREATED; 94 tq->tq_running = 0; 95 tq->tq_nthreads = nthreads; 96 tq->tq_name = name; 97 tq->tq_flags = flags; 98 99 mtx_init_flags(&tq->tq_mtx, ipl, name, 0); 100 TAILQ_INIT(&tq->tq_worklist); 101 102 /* try to create a thread to guarantee that tasks will be serviced */ 103 kthread_create_deferred(taskq_create_thread, tq); 104 105 return (tq); 106 } 107 108 void 109 taskq_destroy(struct taskq *tq) 110 { 111 mtx_enter(&tq->tq_mtx); 112 switch (tq->tq_state) { 113 case TQ_S_CREATED: 114 /* tq is still referenced by taskq_create_thread */ 115 tq->tq_state = TQ_S_DESTROYED; 116 mtx_leave(&tq->tq_mtx); 117 return; 118 119 case TQ_S_RUNNING: 120 tq->tq_state = TQ_S_DESTROYED; 121 break; 122 123 default: 124 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 125 } 126 127 while (tq->tq_running > 0) { 128 wakeup(tq); 129 msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0); 130 } 131 mtx_leave(&tq->tq_mtx); 132 133 free(tq, M_DEVBUF, sizeof(*tq)); 134 } 135 136 void 137 taskq_create_thread(void *arg) 138 { 139 struct taskq *tq = arg; 140 int rv; 141 142 mtx_enter(&tq->tq_mtx); 143 144 switch (tq->tq_state) { 145 case TQ_S_DESTROYED: 146 mtx_leave(&tq->tq_mtx); 147 free(tq, M_DEVBUF, sizeof(*tq)); 148 return; 149 150 case TQ_S_CREATED: 151 tq->tq_state = TQ_S_RUNNING; 152 break; 153 154 default: 155 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 156 } 157 158 do { 159 tq->tq_running++; 160 mtx_leave(&tq->tq_mtx); 161 162 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 163 164 mtx_enter(&tq->tq_mtx); 165 if (rv != 0) { 166 printf("unable to create thread for \"%s\" taskq\n", 167 tq->tq_name); 168 169 tq->tq_running--; 170 /* could have been destroyed during kthread_create */ 171 if (tq->tq_state == TQ_S_DESTROYED && 172 tq->tq_running == 0) 173 wakeup_one(&tq->tq_running); 174 break; 175 } 176 } while (tq->tq_running < tq->tq_nthreads); 177 178 mtx_leave(&tq->tq_mtx); 179 } 180 181 void 182 taskq_barrier(struct taskq *tq) 183 { 184 struct cond c = COND_INITIALIZER(); 185 struct task t = TASK_INITIALIZER(taskq_barrier_task, &c); 186 187 task_add(tq, &t); 188 189 cond_wait(&c, "tqbar"); 190 } 191 192 void 193 taskq_barrier_task(void *p) 194 { 195 struct cond *c = p; 196 cond_signal(c); 197 } 198 199 void 200 task_set(struct task *t, void (*fn)(void *), void *arg) 201 { 202 t->t_func = fn; 203 t->t_arg = arg; 204 t->t_flags = 0; 205 } 206 207 int 208 task_add(struct taskq *tq, struct task *w) 209 { 210 int rv = 0; 211 212 if (ISSET(w->t_flags, TASK_ONQUEUE)) 213 return (0); 214 215 mtx_enter(&tq->tq_mtx); 216 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 217 rv = 1; 218 SET(w->t_flags, TASK_ONQUEUE); 219 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 220 } 221 mtx_leave(&tq->tq_mtx); 222 223 if (rv) 224 wakeup_one(tq); 225 226 return (rv); 227 } 228 229 int 230 task_del(struct taskq *tq, struct task *w) 231 { 232 int rv = 0; 233 234 if (!ISSET(w->t_flags, TASK_ONQUEUE)) 235 return (0); 236 237 mtx_enter(&tq->tq_mtx); 238 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 239 rv = 1; 240 CLR(w->t_flags, TASK_ONQUEUE); 241 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 242 } 243 mtx_leave(&tq->tq_mtx); 244 245 return (rv); 246 } 247 248 int 249 taskq_sleep(const volatile void *ident, struct mutex *mtx, int priority, 250 const char *wmesg, int tmo) 251 { 252 u_int *flags = &curproc->p_flag; 253 int rv; 254 255 atomic_clearbits_int(flags, P_CANTSLEEP); 256 rv = msleep(ident, mtx, priority, wmesg, tmo); 257 atomic_setbits_int(flags, P_CANTSLEEP); 258 259 return (tmo); 260 } 261 262 int 263 taskq_next_work(struct taskq *tq, struct task *work, sleepfn tqsleep) 264 { 265 struct task *next; 266 267 mtx_enter(&tq->tq_mtx); 268 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 269 if (tq->tq_state != TQ_S_RUNNING) { 270 mtx_leave(&tq->tq_mtx); 271 return (0); 272 } 273 274 tqsleep(tq, &tq->tq_mtx, PWAIT, "bored", 0); 275 } 276 277 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 278 CLR(next->t_flags, TASK_ONQUEUE); 279 280 *work = *next; /* copy to caller to avoid races */ 281 282 next = TAILQ_FIRST(&tq->tq_worklist); 283 mtx_leave(&tq->tq_mtx); 284 285 if (next != NULL && tq->tq_nthreads > 1) 286 wakeup_one(tq); 287 288 return (1); 289 } 290 291 void 292 taskq_thread(void *xtq) 293 { 294 sleepfn tqsleep = msleep; 295 struct taskq *tq = xtq; 296 struct task work; 297 int last; 298 299 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 300 KERNEL_UNLOCK(); 301 302 if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) { 303 tqsleep = taskq_sleep; 304 atomic_setbits_int(&curproc->p_flag, P_CANTSLEEP); 305 } 306 307 while (taskq_next_work(tq, &work, tqsleep)) { 308 (*work.t_func)(work.t_arg); 309 sched_pause(yield); 310 } 311 312 mtx_enter(&tq->tq_mtx); 313 last = (--tq->tq_running == 0); 314 mtx_leave(&tq->tq_mtx); 315 316 if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) 317 atomic_clearbits_int(&curproc->p_flag, P_CANTSLEEP); 318 319 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 320 KERNEL_LOCK(); 321 322 if (last) 323 wakeup_one(&tq->tq_running); 324 325 kthread_exit(0); 326 } 327