xref: /openbsd-src/sys/kern/kern_task.c (revision ae3cb403620ab940fbaabb3055fac045a63d56b7)
1 /*	$OpenBSD: kern_task.c,v 1.22 2017/12/14 00:45:16 dlg Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 
27 #define TASK_ONQUEUE	1
28 
29 struct taskq {
30 	enum {
31 		TQ_S_CREATED,
32 		TQ_S_RUNNING,
33 		TQ_S_DESTROYED
34 	}			 tq_state;
35 	unsigned int		 tq_running;
36 	unsigned int		 tq_nthreads;
37 	unsigned int		 tq_flags;
38 	const char		*tq_name;
39 
40 	struct mutex		 tq_mtx;
41 	struct task_list	 tq_worklist;
42 };
43 
44 struct taskq taskq_sys = {
45 	TQ_S_CREATED,
46 	0,
47 	1,
48 	0,
49 	"systq",
50 	MUTEX_INITIALIZER(IPL_HIGH),
51 	TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
52 };
53 
54 struct taskq taskq_sys_mp = {
55 	TQ_S_CREATED,
56 	0,
57 	1,
58 	TASKQ_MPSAFE,
59 	"systqmp",
60 	MUTEX_INITIALIZER(IPL_HIGH),
61 	TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist)
62 };
63 
64 typedef int (*sleepfn)(const volatile void *, struct mutex *, int,
65     const char *, int);
66 
67 struct taskq *const systq = &taskq_sys;
68 struct taskq *const systqmp = &taskq_sys_mp;
69 
70 void	taskq_init(void); /* called in init_main.c */
71 void	taskq_create_thread(void *);
72 void	taskq_barrier_task(void *);
73 int	taskq_sleep(const volatile void *, struct mutex *, int,
74 	    const char *, int);
75 int	taskq_next_work(struct taskq *, struct task *, sleepfn);
76 void	taskq_thread(void *);
77 
78 void
79 taskq_init(void)
80 {
81 	kthread_create_deferred(taskq_create_thread, systq);
82 	kthread_create_deferred(taskq_create_thread, systqmp);
83 }
84 
85 struct taskq *
86 taskq_create(const char *name, unsigned int nthreads, int ipl,
87     unsigned int flags)
88 {
89 	struct taskq *tq;
90 
91 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
92 	if (tq == NULL)
93 		return (NULL);
94 
95 	tq->tq_state = TQ_S_CREATED;
96 	tq->tq_running = 0;
97 	tq->tq_nthreads = nthreads;
98 	tq->tq_name = name;
99 	tq->tq_flags = flags;
100 
101 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
102 	TAILQ_INIT(&tq->tq_worklist);
103 
104 	/* try to create a thread to guarantee that tasks will be serviced */
105 	kthread_create_deferred(taskq_create_thread, tq);
106 
107 	return (tq);
108 }
109 
110 void
111 taskq_destroy(struct taskq *tq)
112 {
113 	mtx_enter(&tq->tq_mtx);
114 	switch (tq->tq_state) {
115 	case TQ_S_CREATED:
116 		/* tq is still referenced by taskq_create_thread */
117 		tq->tq_state = TQ_S_DESTROYED;
118 		mtx_leave(&tq->tq_mtx);
119 		return;
120 
121 	case TQ_S_RUNNING:
122 		tq->tq_state = TQ_S_DESTROYED;
123 		break;
124 
125 	default:
126 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
127 	}
128 
129 	while (tq->tq_running > 0) {
130 		wakeup(tq);
131 		msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
132 	}
133 	mtx_leave(&tq->tq_mtx);
134 
135 	free(tq, M_DEVBUF, sizeof(*tq));
136 }
137 
138 void
139 taskq_create_thread(void *arg)
140 {
141 	struct taskq *tq = arg;
142 	int rv;
143 
144 	mtx_enter(&tq->tq_mtx);
145 
146 	switch (tq->tq_state) {
147 	case TQ_S_DESTROYED:
148 		mtx_leave(&tq->tq_mtx);
149 		free(tq, M_DEVBUF, sizeof(*tq));
150 		return;
151 
152 	case TQ_S_CREATED:
153 		tq->tq_state = TQ_S_RUNNING;
154 		break;
155 
156 	default:
157 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
158 	}
159 
160 	do {
161 		tq->tq_running++;
162 		mtx_leave(&tq->tq_mtx);
163 
164 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
165 
166 		mtx_enter(&tq->tq_mtx);
167 		if (rv != 0) {
168 			printf("unable to create thread for \"%s\" taskq\n",
169 			    tq->tq_name);
170 
171 			tq->tq_running--;
172 			/* could have been destroyed during kthread_create */
173 			if (tq->tq_state == TQ_S_DESTROYED &&
174 			    tq->tq_running == 0)
175 				wakeup_one(&tq->tq_running);
176 			break;
177 		}
178 	} while (tq->tq_running < tq->tq_nthreads);
179 
180 	mtx_leave(&tq->tq_mtx);
181 }
182 
183 void
184 taskq_barrier(struct taskq *tq)
185 {
186 	struct cond c = COND_INITIALIZER();
187 	struct task t = TASK_INITIALIZER(taskq_barrier_task, &c);
188 
189 	task_add(tq, &t);
190 
191 	cond_wait(&c, "tqbar");
192 }
193 
194 void
195 taskq_barrier_task(void *p)
196 {
197 	struct cond *c = p;
198 	cond_signal(c);
199 }
200 
201 void
202 task_set(struct task *t, void (*fn)(void *), void *arg)
203 {
204 	t->t_func = fn;
205 	t->t_arg = arg;
206 	t->t_flags = 0;
207 }
208 
209 int
210 task_add(struct taskq *tq, struct task *w)
211 {
212 	int rv = 0;
213 
214 	if (ISSET(w->t_flags, TASK_ONQUEUE))
215 		return (0);
216 
217 	mtx_enter(&tq->tq_mtx);
218 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
219 		rv = 1;
220 		SET(w->t_flags, TASK_ONQUEUE);
221 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
222 	}
223 	mtx_leave(&tq->tq_mtx);
224 
225 	if (rv)
226 		wakeup_one(tq);
227 
228 	return (rv);
229 }
230 
231 int
232 task_del(struct taskq *tq, struct task *w)
233 {
234 	int rv = 0;
235 
236 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
237 		return (0);
238 
239 	mtx_enter(&tq->tq_mtx);
240 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
241 		rv = 1;
242 		CLR(w->t_flags, TASK_ONQUEUE);
243 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
244 	}
245 	mtx_leave(&tq->tq_mtx);
246 
247 	return (rv);
248 }
249 
250 int
251 taskq_sleep(const volatile void *ident, struct mutex *mtx, int priority,
252     const char *wmesg, int tmo)
253 {
254 	u_int *flags = &curproc->p_flag;
255 	int rv;
256 
257 	atomic_clearbits_int(flags, P_CANTSLEEP);
258 	rv = msleep(ident, mtx, priority, wmesg, tmo);
259 	atomic_setbits_int(flags, P_CANTSLEEP);
260 
261 	return (tmo);
262 }
263 
264 int
265 taskq_next_work(struct taskq *tq, struct task *work, sleepfn tqsleep)
266 {
267 	struct task *next;
268 
269 	mtx_enter(&tq->tq_mtx);
270 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
271 		if (tq->tq_state != TQ_S_RUNNING) {
272 			mtx_leave(&tq->tq_mtx);
273 			return (0);
274 		}
275 
276 		tqsleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
277 	}
278 
279 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
280 	CLR(next->t_flags, TASK_ONQUEUE);
281 
282 	*work = *next; /* copy to caller to avoid races */
283 
284 	next = TAILQ_FIRST(&tq->tq_worklist);
285 	mtx_leave(&tq->tq_mtx);
286 
287 	if (next != NULL && tq->tq_nthreads > 1)
288 		wakeup_one(tq);
289 
290 	return (1);
291 }
292 
293 void
294 taskq_thread(void *xtq)
295 {
296 	sleepfn tqsleep = msleep;
297 	struct taskq *tq = xtq;
298 	struct task work;
299 	int last;
300 
301 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
302 		KERNEL_UNLOCK();
303 
304 	if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) {
305 		tqsleep = taskq_sleep;
306 		atomic_setbits_int(&curproc->p_flag, P_CANTSLEEP);
307 	}
308 
309 	while (taskq_next_work(tq, &work, tqsleep)) {
310 		(*work.t_func)(work.t_arg);
311 		sched_pause(yield);
312 	}
313 
314 	mtx_enter(&tq->tq_mtx);
315 	last = (--tq->tq_running == 0);
316 	mtx_leave(&tq->tq_mtx);
317 
318 	if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP))
319 		atomic_clearbits_int(&curproc->p_flag, P_CANTSLEEP);
320 
321 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
322 		KERNEL_LOCK();
323 
324 	if (last)
325 		wakeup_one(&tq->tq_running);
326 
327 	kthread_exit(0);
328 }
329