1 /* $OpenBSD: kern_task.c,v 1.12 2014/11/01 23:58:28 tedu Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 26 #define TASK_ONQUEUE 1 27 28 struct taskq { 29 enum { 30 TQ_S_CREATED, 31 TQ_S_RUNNING, 32 TQ_S_DESTROYED 33 } tq_state; 34 unsigned int tq_running; 35 unsigned int tq_nthreads; 36 unsigned int tq_unlocked; 37 const char *tq_name; 38 39 struct mutex tq_mtx; 40 TAILQ_HEAD(, task) tq_worklist; 41 }; 42 43 struct taskq taskq_sys = { 44 TQ_S_CREATED, 45 0, 46 1, 47 0, 48 "systq", 49 MUTEX_INITIALIZER(IPL_HIGH), 50 TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist) 51 }; 52 53 struct taskq taskq_sys_mp = { 54 TQ_S_CREATED, 55 0, 56 1, 57 1, 58 "systqmp", 59 MUTEX_INITIALIZER(IPL_HIGH), 60 TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist) 61 }; 62 63 struct taskq *const systq = &taskq_sys; 64 struct taskq *const systqmp = &taskq_sys_mp; 65 66 void taskq_init(void); /* called in init_main.c */ 67 void taskq_create_thread(void *); 68 int taskq_next_work(struct taskq *, struct task *); 69 void taskq_thread(void *); 70 71 void 72 taskq_init(void) 73 { 74 kthread_create_deferred(taskq_create_thread, systq); 75 kthread_create_deferred(taskq_create_thread, systqmp); 76 } 77 78 struct taskq * 79 taskq_create(const char *name, unsigned int nthreads, int ipl) 80 { 81 struct taskq *tq; 82 83 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 84 if (tq == NULL) 85 return (NULL); 86 87 tq->tq_state = TQ_S_CREATED; 88 tq->tq_running = 0; 89 tq->tq_nthreads = nthreads; 90 tq->tq_name = name; 91 92 if (ipl & IPL_MPSAFE) 93 tq->tq_unlocked = 1; 94 else 95 tq->tq_unlocked = 0; 96 ipl &= ~IPL_MPSAFE; 97 98 mtx_init(&tq->tq_mtx, ipl); 99 TAILQ_INIT(&tq->tq_worklist); 100 101 /* try to create a thread to guarantee that tasks will be serviced */ 102 kthread_create_deferred(taskq_create_thread, tq); 103 104 return (tq); 105 } 106 107 void 108 taskq_destroy(struct taskq *tq) 109 { 110 mtx_enter(&tq->tq_mtx); 111 switch (tq->tq_state) { 112 case TQ_S_CREATED: 113 /* tq is still referenced by taskq_create_thread */ 114 tq->tq_state = TQ_S_DESTROYED; 115 mtx_leave(&tq->tq_mtx); 116 return; 117 118 case TQ_S_RUNNING: 119 tq->tq_state = TQ_S_DESTROYED; 120 break; 121 122 default: 123 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 124 } 125 126 while (tq->tq_running > 0) { 127 wakeup(tq); 128 msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0); 129 } 130 mtx_leave(&tq->tq_mtx); 131 132 free(tq, M_DEVBUF, sizeof(*tq)); 133 } 134 135 void 136 taskq_create_thread(void *arg) 137 { 138 struct taskq *tq = arg; 139 int rv; 140 141 mtx_enter(&tq->tq_mtx); 142 143 switch (tq->tq_state) { 144 case TQ_S_DESTROYED: 145 mtx_leave(&tq->tq_mtx); 146 free(tq, M_DEVBUF, sizeof(*tq)); 147 return; 148 149 case TQ_S_CREATED: 150 tq->tq_state = TQ_S_RUNNING; 151 break; 152 153 default: 154 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 155 } 156 157 do { 158 tq->tq_running++; 159 mtx_leave(&tq->tq_mtx); 160 161 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 162 163 mtx_enter(&tq->tq_mtx); 164 if (rv != 0) { 165 printf("unable to create thread for \"%s\" taskq\n", 166 tq->tq_name); 167 168 tq->tq_running--; 169 /* could have been destroyed during kthread_create */ 170 if (tq->tq_state == TQ_S_DESTROYED && 171 tq->tq_running == 0) 172 wakeup_one(&tq->tq_running); 173 break; 174 } 175 } while (tq->tq_running < tq->tq_nthreads); 176 177 mtx_leave(&tq->tq_mtx); 178 } 179 180 void 181 task_set(struct task *t, void (*fn)(void *, void *), void *arg1, void *arg2) 182 { 183 t->t_func = fn; 184 t->t_arg1 = arg1; 185 t->t_arg2 = arg2; 186 187 t->t_flags = 0; 188 } 189 190 int 191 task_add(struct taskq *tq, struct task *w) 192 { 193 int rv = 0; 194 195 mtx_enter(&tq->tq_mtx); 196 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 197 rv = 1; 198 SET(w->t_flags, TASK_ONQUEUE); 199 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 200 } 201 mtx_leave(&tq->tq_mtx); 202 203 if (rv) 204 wakeup_one(tq); 205 206 return (rv); 207 } 208 209 int 210 task_del(struct taskq *tq, struct task *w) 211 { 212 int rv = 0; 213 214 mtx_enter(&tq->tq_mtx); 215 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 216 rv = 1; 217 CLR(w->t_flags, TASK_ONQUEUE); 218 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 219 } 220 mtx_leave(&tq->tq_mtx); 221 222 return (rv); 223 } 224 225 int 226 taskq_next_work(struct taskq *tq, struct task *work) 227 { 228 struct task *next; 229 230 mtx_enter(&tq->tq_mtx); 231 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 232 if (tq->tq_state != TQ_S_RUNNING) { 233 mtx_leave(&tq->tq_mtx); 234 return (0); 235 } 236 237 msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0); 238 } 239 240 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 241 CLR(next->t_flags, TASK_ONQUEUE); 242 243 *work = *next; /* copy to caller to avoid races */ 244 245 next = TAILQ_FIRST(&tq->tq_worklist); 246 mtx_leave(&tq->tq_mtx); 247 248 if (next != NULL) 249 wakeup_one(tq); 250 251 return (1); 252 } 253 254 void 255 taskq_thread(void *xtq) 256 { 257 struct taskq *tq = xtq; 258 struct task work; 259 int last; 260 261 if (tq->tq_unlocked) 262 KERNEL_UNLOCK(); 263 264 while (taskq_next_work(tq, &work)) { 265 (*work.t_func)(work.t_arg1, work.t_arg2); 266 sched_pause(); 267 } 268 269 mtx_enter(&tq->tq_mtx); 270 last = (--tq->tq_running == 0); 271 mtx_leave(&tq->tq_mtx); 272 273 if (tq->tq_unlocked) 274 KERNEL_LOCK(); 275 276 if (last) 277 wakeup_one(&tq->tq_running); 278 279 kthread_exit(0); 280 } 281