xref: /openbsd-src/sys/kern/kern_task.c (revision 99fd087599a8791921855f21bd7e36130f39aadc)
1 /*	$OpenBSD: kern_task.c,v 1.27 2019/12/19 17:40:11 mpi Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 #include <sys/witness.h>
27 
28 #ifdef WITNESS
29 
30 static struct lock_type taskq_lock_type = {
31 	.lt_name = "taskq"
32 };
33 
34 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
35     (LO_CLASS_RWLOCK << LO_CLASSSHIFT)
36 
37 #endif /* WITNESS */
38 
39 struct taskq {
40 	enum {
41 		TQ_S_CREATED,
42 		TQ_S_RUNNING,
43 		TQ_S_DESTROYED
44 	}			 tq_state;
45 	unsigned int		 tq_running;
46 	unsigned int		 tq_waiting;
47 	unsigned int		 tq_nthreads;
48 	unsigned int		 tq_flags;
49 	const char		*tq_name;
50 
51 	struct mutex		 tq_mtx;
52 	struct task_list	 tq_worklist;
53 #ifdef WITNESS
54 	struct lock_object	 tq_lock_object;
55 #endif
56 };
57 
58 static const char taskq_sys_name[] = "systq";
59 
60 struct taskq taskq_sys = {
61 	TQ_S_CREATED,
62 	0,
63 	0,
64 	1,
65 	0,
66 	taskq_sys_name,
67 	MUTEX_INITIALIZER(IPL_HIGH),
68 	TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
69 #ifdef WITNESS
70 	{
71 		.lo_name = taskq_sys_name,
72 		.lo_flags = TASKQ_LOCK_FLAGS,
73 	},
74 #endif
75 };
76 
77 static const char taskq_sys_mp_name[] = "systqmp";
78 
79 struct taskq taskq_sys_mp = {
80 	TQ_S_CREATED,
81 	0,
82 	0,
83 	1,
84 	TASKQ_MPSAFE,
85 	taskq_sys_mp_name,
86 	MUTEX_INITIALIZER(IPL_HIGH),
87 	TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
88 #ifdef WITNESS
89 	{
90 		.lo_name = taskq_sys_mp_name,
91 		.lo_flags = TASKQ_LOCK_FLAGS,
92 	},
93 #endif
94 };
95 
96 struct taskq *const systq = &taskq_sys;
97 struct taskq *const systqmp = &taskq_sys_mp;
98 
99 void	taskq_init(void); /* called in init_main.c */
100 void	taskq_create_thread(void *);
101 void	taskq_barrier_task(void *);
102 int	taskq_sleep(const volatile void *, struct mutex *, int,
103 	    const char *, int);
104 int	taskq_next_work(struct taskq *, struct task *);
105 void	taskq_thread(void *);
106 
107 void
108 taskq_init(void)
109 {
110 	WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
111 	kthread_create_deferred(taskq_create_thread, systq);
112 	WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
113 	kthread_create_deferred(taskq_create_thread, systqmp);
114 }
115 
116 struct taskq *
117 taskq_create(const char *name, unsigned int nthreads, int ipl,
118     unsigned int flags)
119 {
120 	struct taskq *tq;
121 
122 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
123 	if (tq == NULL)
124 		return (NULL);
125 
126 	tq->tq_state = TQ_S_CREATED;
127 	tq->tq_running = 0;
128 	tq->tq_waiting = 0;
129 	tq->tq_nthreads = nthreads;
130 	tq->tq_name = name;
131 	tq->tq_flags = flags;
132 
133 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
134 	TAILQ_INIT(&tq->tq_worklist);
135 
136 #ifdef WITNESS
137 	memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
138 	tq->tq_lock_object.lo_name = name;
139 	tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
140 	witness_init(&tq->tq_lock_object, &taskq_lock_type);
141 #endif
142 
143 	/* try to create a thread to guarantee that tasks will be serviced */
144 	kthread_create_deferred(taskq_create_thread, tq);
145 
146 	return (tq);
147 }
148 
149 void
150 taskq_destroy(struct taskq *tq)
151 {
152 	mtx_enter(&tq->tq_mtx);
153 	switch (tq->tq_state) {
154 	case TQ_S_CREATED:
155 		/* tq is still referenced by taskq_create_thread */
156 		tq->tq_state = TQ_S_DESTROYED;
157 		mtx_leave(&tq->tq_mtx);
158 		return;
159 
160 	case TQ_S_RUNNING:
161 		tq->tq_state = TQ_S_DESTROYED;
162 		break;
163 
164 	default:
165 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
166 	}
167 
168 	while (tq->tq_running > 0) {
169 		wakeup(tq);
170 		msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
171 		    INFSLP);
172 	}
173 	mtx_leave(&tq->tq_mtx);
174 
175 	free(tq, M_DEVBUF, sizeof(*tq));
176 }
177 
178 void
179 taskq_create_thread(void *arg)
180 {
181 	struct taskq *tq = arg;
182 	int rv;
183 
184 	mtx_enter(&tq->tq_mtx);
185 
186 	switch (tq->tq_state) {
187 	case TQ_S_DESTROYED:
188 		mtx_leave(&tq->tq_mtx);
189 		free(tq, M_DEVBUF, sizeof(*tq));
190 		return;
191 
192 	case TQ_S_CREATED:
193 		tq->tq_state = TQ_S_RUNNING;
194 		break;
195 
196 	default:
197 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
198 	}
199 
200 	do {
201 		tq->tq_running++;
202 		mtx_leave(&tq->tq_mtx);
203 
204 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
205 
206 		mtx_enter(&tq->tq_mtx);
207 		if (rv != 0) {
208 			printf("unable to create thread for \"%s\" taskq\n",
209 			    tq->tq_name);
210 
211 			tq->tq_running--;
212 			/* could have been destroyed during kthread_create */
213 			if (tq->tq_state == TQ_S_DESTROYED &&
214 			    tq->tq_running == 0)
215 				wakeup_one(&tq->tq_running);
216 			break;
217 		}
218 	} while (tq->tq_running < tq->tq_nthreads);
219 
220 	mtx_leave(&tq->tq_mtx);
221 }
222 
223 void
224 taskq_barrier(struct taskq *tq)
225 {
226 	struct cond c = COND_INITIALIZER();
227 	struct task t = TASK_INITIALIZER(taskq_barrier_task, &c);
228 
229 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
230 
231 	SET(t.t_flags, TASK_BARRIER);
232 	task_add(tq, &t);
233 	cond_wait(&c, "tqbar");
234 }
235 
236 void
237 taskq_del_barrier(struct taskq *tq, struct task *del)
238 {
239 	struct cond c = COND_INITIALIZER();
240 	struct task t = TASK_INITIALIZER(taskq_barrier_task, &c);
241 
242 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
243 
244 	if (task_del(tq, del))
245 		return;
246 
247 	SET(t.t_flags, TASK_BARRIER);
248 	task_add(tq, &t);
249 	cond_wait(&c, "tqbar");
250 }
251 
252 void
253 taskq_barrier_task(void *p)
254 {
255 	struct cond *c = p;
256 	cond_signal(c);
257 }
258 
259 void
260 task_set(struct task *t, void (*fn)(void *), void *arg)
261 {
262 	t->t_func = fn;
263 	t->t_arg = arg;
264 	t->t_flags = 0;
265 }
266 
267 int
268 task_add(struct taskq *tq, struct task *w)
269 {
270 	int rv = 0;
271 
272 	if (ISSET(w->t_flags, TASK_ONQUEUE))
273 		return (0);
274 
275 	mtx_enter(&tq->tq_mtx);
276 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
277 		rv = 1;
278 		SET(w->t_flags, TASK_ONQUEUE);
279 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
280 	}
281 	mtx_leave(&tq->tq_mtx);
282 
283 	if (rv)
284 		wakeup_one(tq);
285 
286 	return (rv);
287 }
288 
289 int
290 task_del(struct taskq *tq, struct task *w)
291 {
292 	int rv = 0;
293 
294 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
295 		return (0);
296 
297 	mtx_enter(&tq->tq_mtx);
298 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
299 		rv = 1;
300 		CLR(w->t_flags, TASK_ONQUEUE);
301 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
302 	}
303 	mtx_leave(&tq->tq_mtx);
304 
305 	return (rv);
306 }
307 
308 int
309 taskq_next_work(struct taskq *tq, struct task *work)
310 {
311 	struct task *next;
312 
313 	mtx_enter(&tq->tq_mtx);
314 retry:
315 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
316 		if (tq->tq_state != TQ_S_RUNNING) {
317 			mtx_leave(&tq->tq_mtx);
318 			return (0);
319 		}
320 
321 		tq->tq_waiting++;
322 		msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
323 		tq->tq_waiting--;
324 	}
325 
326 	if (ISSET(next->t_flags, TASK_BARRIER)) {
327 		/*
328 		 * Make sure all other threads are sleeping before we
329 		 * proceed and run the barrier task.
330 		 */
331 		if (++tq->tq_waiting == tq->tq_nthreads) {
332 			tq->tq_waiting--;
333 		} else {
334 			msleep_nsec(tq, &tq->tq_mtx, PWAIT, "tqblk", INFSLP);
335 			tq->tq_waiting--;
336 			goto retry;
337 		}
338 	}
339 
340 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
341 	CLR(next->t_flags, TASK_ONQUEUE);
342 
343 	*work = *next; /* copy to caller to avoid races */
344 
345 	next = TAILQ_FIRST(&tq->tq_worklist);
346 	mtx_leave(&tq->tq_mtx);
347 
348 	if (next != NULL && tq->tq_nthreads > 1)
349 		wakeup_one(tq);
350 
351 	return (1);
352 }
353 
354 void
355 taskq_thread(void *xtq)
356 {
357 	struct taskq *tq = xtq;
358 	struct task work;
359 	int last;
360 
361 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
362 		KERNEL_UNLOCK();
363 
364 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
365 
366 	while (taskq_next_work(tq, &work)) {
367 		WITNESS_LOCK(&tq->tq_lock_object, 0);
368 		(*work.t_func)(work.t_arg);
369 		WITNESS_UNLOCK(&tq->tq_lock_object, 0);
370 		sched_pause(yield);
371 	}
372 
373 	mtx_enter(&tq->tq_mtx);
374 	last = (--tq->tq_running == 0);
375 	mtx_leave(&tq->tq_mtx);
376 
377 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
378 		KERNEL_LOCK();
379 
380 	if (last)
381 		wakeup_one(&tq->tq_running);
382 
383 	kthread_exit(0);
384 }
385