xref: /openbsd-src/sys/kern/kern_task.c (revision c020cf82e0cc147236f01a8dca7052034cf9d30d)
1 /*	$OpenBSD: kern_task.c,v 1.30 2020/06/11 06:06:55 dlg Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 #include <sys/witness.h>
27 
28 #ifdef WITNESS
29 
30 static struct lock_type taskq_lock_type = {
31 	.lt_name = "taskq"
32 };
33 
34 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
35     (LO_CLASS_RWLOCK << LO_CLASSSHIFT)
36 
37 #endif /* WITNESS */
38 
39 struct taskq_thread {
40 	SLIST_ENTRY(taskq_thread)
41 				 tt_entry;
42 	struct proc		*tt_thread;
43 };
44 SLIST_HEAD(taskq_threads, taskq_thread);
45 
46 struct taskq {
47 	enum {
48 		TQ_S_CREATED,
49 		TQ_S_RUNNING,
50 		TQ_S_DESTROYED
51 	}			 tq_state;
52 	unsigned int		 tq_running;
53 	unsigned int		 tq_nthreads;
54 	unsigned int		 tq_flags;
55 	const char		*tq_name;
56 
57 	struct mutex		 tq_mtx;
58 	struct task_list	 tq_worklist;
59 
60 	struct taskq_threads	 tq_threads;
61 	unsigned int		 tq_barriers;
62 	unsigned int		 tq_bgen;
63 	unsigned int		 tq_bthreads;
64 
65 #ifdef WITNESS
66 	struct lock_object	 tq_lock_object;
67 #endif
68 };
69 
70 static const char taskq_sys_name[] = "systq";
71 
72 struct taskq taskq_sys = {
73 	.tq_state	= TQ_S_CREATED,
74 	.tq_running	= 0,
75 	.tq_nthreads	= 1,
76 	.tq_flags	= 0,
77 	.tq_name	= taskq_sys_name,
78 	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
79 			      taskq_sys_name, 0),
80 	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
81 
82 	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads),
83 	.tq_barriers	= 0,
84 	.tq_bgen	= 0,
85 	.tq_bthreads	= 0,
86 
87 #ifdef WITNESS
88 	.tq_lock_object	= {
89 		.lo_name	= taskq_sys_name,
90 		.lo_flags	= TASKQ_LOCK_FLAGS,
91 	},
92 #endif
93 };
94 
95 static const char taskq_sys_mp_name[] = "systqmp";
96 
97 struct taskq taskq_sys_mp = {
98 	.tq_state	= TQ_S_CREATED,
99 	.tq_running	= 0,
100 	.tq_nthreads	= 1,
101 	.tq_flags	= TASKQ_MPSAFE,
102 	.tq_name	= taskq_sys_mp_name,
103 	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
104 			      taskq_sys_mp_name, 0),
105 	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
106 
107 	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads),
108 	.tq_barriers	= 0,
109 	.tq_bgen	= 0,
110 	.tq_bthreads	= 0,
111 
112 #ifdef WITNESS
113 	.tq_lock_object = {
114 		.lo_name	= taskq_sys_mp_name,
115 		.lo_flags	= TASKQ_LOCK_FLAGS,
116 	},
117 #endif
118 };
119 
120 struct taskq *const systq = &taskq_sys;
121 struct taskq *const systqmp = &taskq_sys_mp;
122 
123 void	taskq_init(void); /* called in init_main.c */
124 void	taskq_create_thread(void *);
125 void	taskq_barrier_task(void *);
126 int	taskq_sleep(const volatile void *, struct mutex *, int,
127 	    const char *, int);
128 int	taskq_next_work(struct taskq *, struct task *);
129 void	taskq_thread(void *);
130 
131 void
132 taskq_init(void)
133 {
134 	WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
135 	kthread_create_deferred(taskq_create_thread, systq);
136 	WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
137 	kthread_create_deferred(taskq_create_thread, systqmp);
138 }
139 
140 struct taskq *
141 taskq_create(const char *name, unsigned int nthreads, int ipl,
142     unsigned int flags)
143 {
144 	struct taskq *tq;
145 
146 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
147 	if (tq == NULL)
148 		return (NULL);
149 
150 	tq->tq_state = TQ_S_CREATED;
151 	tq->tq_running = 0;
152 	tq->tq_nthreads = nthreads;
153 	tq->tq_name = name;
154 	tq->tq_flags = flags;
155 
156 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
157 	TAILQ_INIT(&tq->tq_worklist);
158 
159 	SLIST_INIT(&tq->tq_threads);
160 	tq->tq_barriers = 0;
161 	tq->tq_bgen = 0;
162 	tq->tq_bthreads = 0;
163 
164 #ifdef WITNESS
165 	memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
166 	tq->tq_lock_object.lo_name = name;
167 	tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
168 	witness_init(&tq->tq_lock_object, &taskq_lock_type);
169 #endif
170 
171 	/* try to create a thread to guarantee that tasks will be serviced */
172 	kthread_create_deferred(taskq_create_thread, tq);
173 
174 	return (tq);
175 }
176 
177 void
178 taskq_destroy(struct taskq *tq)
179 {
180 	mtx_enter(&tq->tq_mtx);
181 	switch (tq->tq_state) {
182 	case TQ_S_CREATED:
183 		/* tq is still referenced by taskq_create_thread */
184 		tq->tq_state = TQ_S_DESTROYED;
185 		mtx_leave(&tq->tq_mtx);
186 		return;
187 
188 	case TQ_S_RUNNING:
189 		tq->tq_state = TQ_S_DESTROYED;
190 		break;
191 
192 	default:
193 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
194 	}
195 
196 	while (tq->tq_running > 0) {
197 		wakeup(tq);
198 		msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
199 		    INFSLP);
200 	}
201 	mtx_leave(&tq->tq_mtx);
202 
203 	free(tq, M_DEVBUF, sizeof(*tq));
204 }
205 
206 void
207 taskq_create_thread(void *arg)
208 {
209 	struct taskq *tq = arg;
210 	int rv;
211 
212 	mtx_enter(&tq->tq_mtx);
213 
214 	switch (tq->tq_state) {
215 	case TQ_S_DESTROYED:
216 		mtx_leave(&tq->tq_mtx);
217 		free(tq, M_DEVBUF, sizeof(*tq));
218 		return;
219 
220 	case TQ_S_CREATED:
221 		tq->tq_state = TQ_S_RUNNING;
222 		break;
223 
224 	default:
225 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
226 	}
227 
228 	do {
229 		tq->tq_running++;
230 		mtx_leave(&tq->tq_mtx);
231 
232 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
233 
234 		mtx_enter(&tq->tq_mtx);
235 		if (rv != 0) {
236 			printf("unable to create thread for \"%s\" taskq\n",
237 			    tq->tq_name);
238 
239 			tq->tq_running--;
240 			/* could have been destroyed during kthread_create */
241 			if (tq->tq_state == TQ_S_DESTROYED &&
242 			    tq->tq_running == 0)
243 				wakeup_one(&tq->tq_running);
244 			break;
245 		}
246 	} while (tq->tq_running < tq->tq_nthreads);
247 
248 	mtx_leave(&tq->tq_mtx);
249 }
250 
251 void
252 taskq_barrier_task(void *p)
253 {
254 	struct taskq *tq = p;
255 	unsigned int gen;
256 
257 	mtx_enter(&tq->tq_mtx);
258 	tq->tq_bthreads++;
259 	wakeup(&tq->tq_bthreads);
260 
261 	gen = tq->tq_bgen;
262 	do {
263 		msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
264 		    PWAIT, "tqbarend", INFSLP);
265 	} while (gen == tq->tq_bgen);
266 	mtx_leave(&tq->tq_mtx);
267 }
268 
269 static void
270 taskq_do_barrier(struct taskq *tq)
271 {
272 	struct task t = TASK_INITIALIZER(taskq_barrier_task, tq);
273 	struct proc *thread = curproc;
274 	struct taskq_thread *tt;
275 
276 	mtx_enter(&tq->tq_mtx);
277 	tq->tq_barriers++;
278 
279 	/* is the barrier being run from a task inside the taskq? */
280 	SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) {
281 		if (tt->tt_thread == thread) {
282 			tq->tq_bthreads++;
283 			wakeup(&tq->tq_bthreads);
284 			break;
285 		}
286 	}
287 
288 	while (tq->tq_bthreads < tq->tq_nthreads) {
289 		/* shove the task into the queue for a worker to pick up */
290 		SET(t.t_flags, TASK_ONQUEUE);
291 		TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry);
292 		wakeup_one(tq);
293 
294 		msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx,
295 		    PWAIT, "tqbar", INFSLP);
296 
297 		/*
298 		 * another thread running a barrier might have
299 		 * done this work for us.
300 		 */
301 		if (ISSET(t.t_flags, TASK_ONQUEUE))
302 			TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry);
303 	}
304 
305 	if (--tq->tq_barriers == 0) {
306 		/* we're the last one out */
307 		tq->tq_bgen++;
308 		wakeup(&tq->tq_bgen);
309 		tq->tq_bthreads = 0;
310 	} else {
311 		unsigned int gen = tq->tq_bgen;
312 		do {
313 			msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
314 			    PWAIT, "tqbarwait", INFSLP);
315 		} while (gen == tq->tq_bgen);
316 	}
317 	mtx_leave(&tq->tq_mtx);
318 }
319 
320 void
321 taskq_barrier(struct taskq *tq)
322 {
323 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
324 
325 	taskq_do_barrier(tq);
326 }
327 
328 void
329 taskq_del_barrier(struct taskq *tq, struct task *t)
330 {
331 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
332 
333 	if (task_del(tq, t))
334 		return;
335 
336 	taskq_do_barrier(tq);
337 }
338 
339 void
340 task_set(struct task *t, void (*fn)(void *), void *arg)
341 {
342 	t->t_func = fn;
343 	t->t_arg = arg;
344 	t->t_flags = 0;
345 }
346 
347 int
348 task_add(struct taskq *tq, struct task *w)
349 {
350 	int rv = 0;
351 
352 	if (ISSET(w->t_flags, TASK_ONQUEUE))
353 		return (0);
354 
355 	mtx_enter(&tq->tq_mtx);
356 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
357 		rv = 1;
358 		SET(w->t_flags, TASK_ONQUEUE);
359 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
360 	}
361 	mtx_leave(&tq->tq_mtx);
362 
363 	if (rv)
364 		wakeup_one(tq);
365 
366 	return (rv);
367 }
368 
369 int
370 task_del(struct taskq *tq, struct task *w)
371 {
372 	int rv = 0;
373 
374 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
375 		return (0);
376 
377 	mtx_enter(&tq->tq_mtx);
378 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
379 		rv = 1;
380 		CLR(w->t_flags, TASK_ONQUEUE);
381 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
382 	}
383 	mtx_leave(&tq->tq_mtx);
384 
385 	return (rv);
386 }
387 
388 int
389 taskq_next_work(struct taskq *tq, struct task *work)
390 {
391 	struct task *next;
392 
393 	mtx_enter(&tq->tq_mtx);
394 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
395 		if (tq->tq_state != TQ_S_RUNNING) {
396 			mtx_leave(&tq->tq_mtx);
397 			return (0);
398 		}
399 
400 		msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
401 	}
402 
403 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
404 	CLR(next->t_flags, TASK_ONQUEUE);
405 
406 	*work = *next; /* copy to caller to avoid races */
407 
408 	next = TAILQ_FIRST(&tq->tq_worklist);
409 	mtx_leave(&tq->tq_mtx);
410 
411 	if (next != NULL && tq->tq_nthreads > 1)
412 		wakeup_one(tq);
413 
414 	return (1);
415 }
416 
417 void
418 taskq_thread(void *xtq)
419 {
420 	struct taskq_thread self = { .tt_thread = curproc };
421 	struct taskq *tq = xtq;
422 	struct task work;
423 	int last;
424 
425 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
426 		KERNEL_UNLOCK();
427 
428 	mtx_enter(&tq->tq_mtx);
429 	SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry);
430 	mtx_leave(&tq->tq_mtx);
431 
432 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
433 
434 	while (taskq_next_work(tq, &work)) {
435 		WITNESS_LOCK(&tq->tq_lock_object, 0);
436 		(*work.t_func)(work.t_arg);
437 		WITNESS_UNLOCK(&tq->tq_lock_object, 0);
438 		sched_pause(yield);
439 	}
440 
441 	mtx_enter(&tq->tq_mtx);
442 	SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry);
443 	last = (--tq->tq_running == 0);
444 	mtx_leave(&tq->tq_mtx);
445 
446 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
447 		KERNEL_LOCK();
448 
449 	if (last)
450 		wakeup_one(&tq->tq_running);
451 
452 	kthread_exit(0);
453 }
454