xref: /openbsd-src/sys/kern/kern_task.c (revision 7350f337b9e3eb4461d99580e625c7ef148d107c)
1 /*	$OpenBSD: kern_task.c,v 1.26 2019/06/23 12:56:10 kettenis Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 #include <sys/witness.h>
27 
28 #ifdef WITNESS
29 
30 static struct lock_type taskq_lock_type = {
31 	.lt_name = "taskq"
32 };
33 
34 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
35     (LO_CLASS_RWLOCK << LO_CLASSSHIFT)
36 
37 #endif /* WITNESS */
38 
39 struct taskq {
40 	enum {
41 		TQ_S_CREATED,
42 		TQ_S_RUNNING,
43 		TQ_S_DESTROYED
44 	}			 tq_state;
45 	unsigned int		 tq_running;
46 	unsigned int		 tq_waiting;
47 	unsigned int		 tq_nthreads;
48 	unsigned int		 tq_flags;
49 	const char		*tq_name;
50 
51 	struct mutex		 tq_mtx;
52 	struct task_list	 tq_worklist;
53 #ifdef WITNESS
54 	struct lock_object	 tq_lock_object;
55 #endif
56 };
57 
58 static const char taskq_sys_name[] = "systq";
59 
60 struct taskq taskq_sys = {
61 	TQ_S_CREATED,
62 	0,
63 	0,
64 	1,
65 	0,
66 	taskq_sys_name,
67 	MUTEX_INITIALIZER(IPL_HIGH),
68 	TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
69 #ifdef WITNESS
70 	{
71 		.lo_name = taskq_sys_name,
72 		.lo_flags = TASKQ_LOCK_FLAGS,
73 	},
74 #endif
75 };
76 
77 static const char taskq_sys_mp_name[] = "systqmp";
78 
79 struct taskq taskq_sys_mp = {
80 	TQ_S_CREATED,
81 	0,
82 	0,
83 	1,
84 	TASKQ_MPSAFE,
85 	taskq_sys_mp_name,
86 	MUTEX_INITIALIZER(IPL_HIGH),
87 	TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
88 #ifdef WITNESS
89 	{
90 		.lo_name = taskq_sys_mp_name,
91 		.lo_flags = TASKQ_LOCK_FLAGS,
92 	},
93 #endif
94 };
95 
96 struct taskq *const systq = &taskq_sys;
97 struct taskq *const systqmp = &taskq_sys_mp;
98 
99 void	taskq_init(void); /* called in init_main.c */
100 void	taskq_create_thread(void *);
101 void	taskq_barrier_task(void *);
102 int	taskq_sleep(const volatile void *, struct mutex *, int,
103 	    const char *, int);
104 int	taskq_next_work(struct taskq *, struct task *);
105 void	taskq_thread(void *);
106 
107 void
108 taskq_init(void)
109 {
110 	WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
111 	kthread_create_deferred(taskq_create_thread, systq);
112 	WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
113 	kthread_create_deferred(taskq_create_thread, systqmp);
114 }
115 
116 struct taskq *
117 taskq_create(const char *name, unsigned int nthreads, int ipl,
118     unsigned int flags)
119 {
120 	struct taskq *tq;
121 
122 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
123 	if (tq == NULL)
124 		return (NULL);
125 
126 	tq->tq_state = TQ_S_CREATED;
127 	tq->tq_running = 0;
128 	tq->tq_waiting = 0;
129 	tq->tq_nthreads = nthreads;
130 	tq->tq_name = name;
131 	tq->tq_flags = flags;
132 
133 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
134 	TAILQ_INIT(&tq->tq_worklist);
135 
136 #ifdef WITNESS
137 	memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
138 	tq->tq_lock_object.lo_name = name;
139 	tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
140 	witness_init(&tq->tq_lock_object, &taskq_lock_type);
141 #endif
142 
143 	/* try to create a thread to guarantee that tasks will be serviced */
144 	kthread_create_deferred(taskq_create_thread, tq);
145 
146 	return (tq);
147 }
148 
149 void
150 taskq_destroy(struct taskq *tq)
151 {
152 	mtx_enter(&tq->tq_mtx);
153 	switch (tq->tq_state) {
154 	case TQ_S_CREATED:
155 		/* tq is still referenced by taskq_create_thread */
156 		tq->tq_state = TQ_S_DESTROYED;
157 		mtx_leave(&tq->tq_mtx);
158 		return;
159 
160 	case TQ_S_RUNNING:
161 		tq->tq_state = TQ_S_DESTROYED;
162 		break;
163 
164 	default:
165 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
166 	}
167 
168 	while (tq->tq_running > 0) {
169 		wakeup(tq);
170 		msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
171 	}
172 	mtx_leave(&tq->tq_mtx);
173 
174 	free(tq, M_DEVBUF, sizeof(*tq));
175 }
176 
177 void
178 taskq_create_thread(void *arg)
179 {
180 	struct taskq *tq = arg;
181 	int rv;
182 
183 	mtx_enter(&tq->tq_mtx);
184 
185 	switch (tq->tq_state) {
186 	case TQ_S_DESTROYED:
187 		mtx_leave(&tq->tq_mtx);
188 		free(tq, M_DEVBUF, sizeof(*tq));
189 		return;
190 
191 	case TQ_S_CREATED:
192 		tq->tq_state = TQ_S_RUNNING;
193 		break;
194 
195 	default:
196 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
197 	}
198 
199 	do {
200 		tq->tq_running++;
201 		mtx_leave(&tq->tq_mtx);
202 
203 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
204 
205 		mtx_enter(&tq->tq_mtx);
206 		if (rv != 0) {
207 			printf("unable to create thread for \"%s\" taskq\n",
208 			    tq->tq_name);
209 
210 			tq->tq_running--;
211 			/* could have been destroyed during kthread_create */
212 			if (tq->tq_state == TQ_S_DESTROYED &&
213 			    tq->tq_running == 0)
214 				wakeup_one(&tq->tq_running);
215 			break;
216 		}
217 	} while (tq->tq_running < tq->tq_nthreads);
218 
219 	mtx_leave(&tq->tq_mtx);
220 }
221 
222 void
223 taskq_barrier(struct taskq *tq)
224 {
225 	struct cond c = COND_INITIALIZER();
226 	struct task t = TASK_INITIALIZER(taskq_barrier_task, &c);
227 
228 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
229 
230 	SET(t.t_flags, TASK_BARRIER);
231 	task_add(tq, &t);
232 	cond_wait(&c, "tqbar");
233 }
234 
235 void
236 taskq_del_barrier(struct taskq *tq, struct task *del)
237 {
238 	struct cond c = COND_INITIALIZER();
239 	struct task t = TASK_INITIALIZER(taskq_barrier_task, &c);
240 
241 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
242 
243 	if (task_del(tq, del))
244 		return;
245 
246 	SET(t.t_flags, TASK_BARRIER);
247 	task_add(tq, &t);
248 	cond_wait(&c, "tqbar");
249 }
250 
251 void
252 taskq_barrier_task(void *p)
253 {
254 	struct cond *c = p;
255 	cond_signal(c);
256 }
257 
258 void
259 task_set(struct task *t, void (*fn)(void *), void *arg)
260 {
261 	t->t_func = fn;
262 	t->t_arg = arg;
263 	t->t_flags = 0;
264 }
265 
266 int
267 task_add(struct taskq *tq, struct task *w)
268 {
269 	int rv = 0;
270 
271 	if (ISSET(w->t_flags, TASK_ONQUEUE))
272 		return (0);
273 
274 	mtx_enter(&tq->tq_mtx);
275 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
276 		rv = 1;
277 		SET(w->t_flags, TASK_ONQUEUE);
278 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
279 	}
280 	mtx_leave(&tq->tq_mtx);
281 
282 	if (rv)
283 		wakeup_one(tq);
284 
285 	return (rv);
286 }
287 
288 int
289 task_del(struct taskq *tq, struct task *w)
290 {
291 	int rv = 0;
292 
293 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
294 		return (0);
295 
296 	mtx_enter(&tq->tq_mtx);
297 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
298 		rv = 1;
299 		CLR(w->t_flags, TASK_ONQUEUE);
300 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
301 	}
302 	mtx_leave(&tq->tq_mtx);
303 
304 	return (rv);
305 }
306 
307 int
308 taskq_next_work(struct taskq *tq, struct task *work)
309 {
310 	struct task *next;
311 
312 	mtx_enter(&tq->tq_mtx);
313 retry:
314 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
315 		if (tq->tq_state != TQ_S_RUNNING) {
316 			mtx_leave(&tq->tq_mtx);
317 			return (0);
318 		}
319 
320 		tq->tq_waiting++;
321 		msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
322 		tq->tq_waiting--;
323 	}
324 
325 	if (ISSET(next->t_flags, TASK_BARRIER)) {
326 		/*
327 		 * Make sure all other threads are sleeping before we
328 		 * proceed and run the barrier task.
329 		 */
330 		if (++tq->tq_waiting == tq->tq_nthreads) {
331 			tq->tq_waiting--;
332 		} else {
333 			msleep(tq, &tq->tq_mtx, PWAIT, "tqblk", 0);
334 			tq->tq_waiting--;
335 			goto retry;
336 		}
337 	}
338 
339 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
340 	CLR(next->t_flags, TASK_ONQUEUE);
341 
342 	*work = *next; /* copy to caller to avoid races */
343 
344 	next = TAILQ_FIRST(&tq->tq_worklist);
345 	mtx_leave(&tq->tq_mtx);
346 
347 	if (next != NULL && tq->tq_nthreads > 1)
348 		wakeup_one(tq);
349 
350 	return (1);
351 }
352 
353 void
354 taskq_thread(void *xtq)
355 {
356 	struct taskq *tq = xtq;
357 	struct task work;
358 	int last;
359 
360 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
361 		KERNEL_UNLOCK();
362 
363 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
364 
365 	while (taskq_next_work(tq, &work)) {
366 		WITNESS_LOCK(&tq->tq_lock_object, 0);
367 		(*work.t_func)(work.t_arg);
368 		WITNESS_UNLOCK(&tq->tq_lock_object, 0);
369 		sched_pause(yield);
370 	}
371 
372 	mtx_enter(&tq->tq_mtx);
373 	last = (--tq->tq_running == 0);
374 	mtx_leave(&tq->tq_mtx);
375 
376 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
377 		KERNEL_LOCK();
378 
379 	if (last)
380 		wakeup_one(&tq->tq_running);
381 
382 	kthread_exit(0);
383 }
384