1 /* $OpenBSD: kern_task.c,v 1.8 2013/12/11 22:39:49 kettenis Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 26 #define TASK_ONQUEUE 1 27 28 struct taskq { 29 enum { 30 TQ_S_CREATED, 31 TQ_S_RUNNING, 32 TQ_S_DESTROYED 33 } tq_state; 34 unsigned int tq_running; 35 unsigned int tq_nthreads; 36 unsigned int tq_unlocked; 37 const char *tq_name; 38 39 struct mutex tq_mtx; 40 TAILQ_HEAD(, task) tq_worklist; 41 }; 42 43 struct taskq taskq_sys = { 44 TQ_S_CREATED, 45 0, 46 1, 47 0, 48 "systq", 49 MUTEX_INITIALIZER(IPL_HIGH), 50 TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist) 51 }; 52 53 struct taskq *const systq = &taskq_sys; 54 55 void taskq_init(void); /* called in init_main.c */ 56 void taskq_create_thread(void *); 57 int taskq_next_work(struct taskq *, struct task *); 58 void taskq_thread(void *); 59 60 void 61 taskq_init(void) 62 { 63 kthread_create_deferred(taskq_create_thread, systq); 64 } 65 66 struct taskq * 67 taskq_create(const char *name, unsigned int nthreads, int ipl) 68 { 69 struct taskq *tq; 70 71 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 72 if (tq == NULL) 73 return (NULL); 74 75 tq->tq_state = TQ_S_CREATED; 76 tq->tq_running = 0; 77 tq->tq_nthreads = nthreads; 78 tq->tq_name = name; 79 80 if (ipl & IPL_MPSAFE) 81 tq->tq_unlocked = 1; 82 else 83 tq->tq_unlocked = 0; 84 ipl &= ~IPL_MPSAFE; 85 86 mtx_init(&tq->tq_mtx, ipl); 87 TAILQ_INIT(&tq->tq_worklist); 88 89 /* try to create a thread to guarantee that tasks will be serviced */ 90 kthread_create_deferred(taskq_create_thread, tq); 91 92 return (tq); 93 } 94 95 void 96 taskq_destroy(struct taskq *tq) 97 { 98 mtx_enter(&tq->tq_mtx); 99 switch (tq->tq_state) { 100 case TQ_S_CREATED: 101 /* tq is still referenced by taskq_create_thread */ 102 tq->tq_state = TQ_S_DESTROYED; 103 mtx_leave(&tq->tq_mtx); 104 return; 105 106 case TQ_S_RUNNING: 107 tq->tq_state = TQ_S_DESTROYED; 108 break; 109 110 default: 111 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 112 } 113 114 while (tq->tq_running > 0) { 115 wakeup(tq); 116 msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0); 117 } 118 mtx_leave(&tq->tq_mtx); 119 120 free(tq, M_DEVBUF); 121 } 122 123 void 124 taskq_create_thread(void *arg) 125 { 126 struct taskq *tq = arg; 127 int rv; 128 129 mtx_enter(&tq->tq_mtx); 130 131 switch (tq->tq_state) { 132 case TQ_S_DESTROYED: 133 mtx_leave(&tq->tq_mtx); 134 free(tq, M_DEVBUF); 135 return; 136 137 case TQ_S_CREATED: 138 tq->tq_state = TQ_S_RUNNING; 139 break; 140 141 default: 142 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 143 } 144 145 do { 146 tq->tq_running++; 147 mtx_leave(&tq->tq_mtx); 148 149 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 150 151 mtx_enter(&tq->tq_mtx); 152 if (rv != 0) { 153 printf("unable to create thread for \"%s\" taskq\n", 154 tq->tq_name); 155 156 tq->tq_running--; 157 /* could have been destroyed during kthread_create */ 158 if (tq->tq_state == TQ_S_DESTROYED && 159 tq->tq_running == 0) 160 wakeup_one(&tq->tq_running); 161 break; 162 } 163 } while (tq->tq_running < tq->tq_nthreads); 164 165 mtx_leave(&tq->tq_mtx); 166 } 167 168 void 169 task_set(struct task *t, void (*fn)(void *, void *), void *arg1, void *arg2) 170 { 171 t->t_func = fn; 172 t->t_arg1 = arg1; 173 t->t_arg2 = arg2; 174 175 t->t_flags = 0; 176 } 177 178 int 179 task_add(struct taskq *tq, struct task *w) 180 { 181 int rv = 0; 182 183 mtx_enter(&tq->tq_mtx); 184 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 185 rv = 1; 186 SET(w->t_flags, TASK_ONQUEUE); 187 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 188 } 189 mtx_leave(&tq->tq_mtx); 190 191 if (rv) 192 wakeup_one(tq); 193 194 return (rv); 195 } 196 197 int 198 task_del(struct taskq *tq, struct task *w) 199 { 200 int rv = 0; 201 202 mtx_enter(&tq->tq_mtx); 203 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 204 rv = 1; 205 CLR(w->t_flags, TASK_ONQUEUE); 206 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 207 } 208 mtx_leave(&tq->tq_mtx); 209 210 return (rv); 211 } 212 213 int 214 taskq_next_work(struct taskq *tq, struct task *work) 215 { 216 struct task *next; 217 218 mtx_enter(&tq->tq_mtx); 219 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 220 if (tq->tq_state != TQ_S_RUNNING) { 221 mtx_leave(&tq->tq_mtx); 222 return (0); 223 } 224 225 msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0); 226 } 227 228 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 229 CLR(next->t_flags, TASK_ONQUEUE); 230 231 *work = *next; /* copy to caller to avoid races */ 232 233 next = TAILQ_FIRST(&tq->tq_worklist); 234 mtx_leave(&tq->tq_mtx); 235 236 if (next != NULL) 237 wakeup_one(tq); 238 239 return (1); 240 } 241 242 void 243 taskq_thread(void *xtq) 244 { 245 struct taskq *tq = xtq; 246 struct task work; 247 int last; 248 249 if (tq->tq_unlocked) 250 KERNEL_UNLOCK(); 251 252 while (taskq_next_work(tq, &work)) 253 (*work.t_func)(work.t_arg1, work.t_arg2); 254 255 mtx_enter(&tq->tq_mtx); 256 last = (--tq->tq_running == 0); 257 mtx_leave(&tq->tq_mtx); 258 259 if (tq->tq_unlocked) 260 KERNEL_LOCK(); 261 262 if (last) 263 wakeup_one(&tq->tq_running); 264 265 kthread_exit(0); 266 } 267