xref: /openbsd-src/sys/kern/kern_task.c (revision fe1f7dfa9438a02ac3cf846b995d81cf559db7f5)
1 /*	$OpenBSD: kern_task.c,v 1.25 2019/04/28 04:20:40 dlg Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 #include <sys/witness.h>
27 
28 #ifdef WITNESS
29 
30 static struct lock_type taskq_lock_type = {
31 	.lt_name = "taskq"
32 };
33 
34 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
35     (LO_CLASS_RWLOCK << LO_CLASSSHIFT)
36 
37 #endif /* WITNESS */
38 
39 struct taskq {
40 	enum {
41 		TQ_S_CREATED,
42 		TQ_S_RUNNING,
43 		TQ_S_DESTROYED
44 	}			 tq_state;
45 	unsigned int		 tq_running;
46 	unsigned int		 tq_nthreads;
47 	unsigned int		 tq_flags;
48 	const char		*tq_name;
49 
50 	struct mutex		 tq_mtx;
51 	struct task_list	 tq_worklist;
52 #ifdef WITNESS
53 	struct lock_object	 tq_lock_object;
54 #endif
55 };
56 
57 static const char taskq_sys_name[] = "systq";
58 
59 struct taskq taskq_sys = {
60 	TQ_S_CREATED,
61 	0,
62 	1,
63 	0,
64 	taskq_sys_name,
65 	MUTEX_INITIALIZER(IPL_HIGH),
66 	TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
67 #ifdef WITNESS
68 	{
69 		.lo_name = taskq_sys_name,
70 		.lo_flags = TASKQ_LOCK_FLAGS,
71 	},
72 #endif
73 };
74 
75 static const char taskq_sys_mp_name[] = "systqmp";
76 
77 struct taskq taskq_sys_mp = {
78 	TQ_S_CREATED,
79 	0,
80 	1,
81 	TASKQ_MPSAFE,
82 	taskq_sys_mp_name,
83 	MUTEX_INITIALIZER(IPL_HIGH),
84 	TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
85 #ifdef WITNESS
86 	{
87 		.lo_name = taskq_sys_mp_name,
88 		.lo_flags = TASKQ_LOCK_FLAGS,
89 	},
90 #endif
91 };
92 
93 struct taskq *const systq = &taskq_sys;
94 struct taskq *const systqmp = &taskq_sys_mp;
95 
96 void	taskq_init(void); /* called in init_main.c */
97 void	taskq_create_thread(void *);
98 void	taskq_barrier_task(void *);
99 int	taskq_sleep(const volatile void *, struct mutex *, int,
100 	    const char *, int);
101 int	taskq_next_work(struct taskq *, struct task *);
102 void	taskq_thread(void *);
103 
104 void
105 taskq_init(void)
106 {
107 	WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
108 	kthread_create_deferred(taskq_create_thread, systq);
109 	WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
110 	kthread_create_deferred(taskq_create_thread, systqmp);
111 }
112 
113 struct taskq *
114 taskq_create(const char *name, unsigned int nthreads, int ipl,
115     unsigned int flags)
116 {
117 	struct taskq *tq;
118 
119 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
120 	if (tq == NULL)
121 		return (NULL);
122 
123 	tq->tq_state = TQ_S_CREATED;
124 	tq->tq_running = 0;
125 	tq->tq_nthreads = nthreads;
126 	tq->tq_name = name;
127 	tq->tq_flags = flags;
128 
129 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
130 	TAILQ_INIT(&tq->tq_worklist);
131 
132 #ifdef WITNESS
133 	memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
134 	tq->tq_lock_object.lo_name = name;
135 	tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
136 	witness_init(&tq->tq_lock_object, &taskq_lock_type);
137 #endif
138 
139 	/* try to create a thread to guarantee that tasks will be serviced */
140 	kthread_create_deferred(taskq_create_thread, tq);
141 
142 	return (tq);
143 }
144 
145 void
146 taskq_destroy(struct taskq *tq)
147 {
148 	mtx_enter(&tq->tq_mtx);
149 	switch (tq->tq_state) {
150 	case TQ_S_CREATED:
151 		/* tq is still referenced by taskq_create_thread */
152 		tq->tq_state = TQ_S_DESTROYED;
153 		mtx_leave(&tq->tq_mtx);
154 		return;
155 
156 	case TQ_S_RUNNING:
157 		tq->tq_state = TQ_S_DESTROYED;
158 		break;
159 
160 	default:
161 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
162 	}
163 
164 	while (tq->tq_running > 0) {
165 		wakeup(tq);
166 		msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
167 	}
168 	mtx_leave(&tq->tq_mtx);
169 
170 	free(tq, M_DEVBUF, sizeof(*tq));
171 }
172 
173 void
174 taskq_create_thread(void *arg)
175 {
176 	struct taskq *tq = arg;
177 	int rv;
178 
179 	mtx_enter(&tq->tq_mtx);
180 
181 	switch (tq->tq_state) {
182 	case TQ_S_DESTROYED:
183 		mtx_leave(&tq->tq_mtx);
184 		free(tq, M_DEVBUF, sizeof(*tq));
185 		return;
186 
187 	case TQ_S_CREATED:
188 		tq->tq_state = TQ_S_RUNNING;
189 		break;
190 
191 	default:
192 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
193 	}
194 
195 	do {
196 		tq->tq_running++;
197 		mtx_leave(&tq->tq_mtx);
198 
199 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
200 
201 		mtx_enter(&tq->tq_mtx);
202 		if (rv != 0) {
203 			printf("unable to create thread for \"%s\" taskq\n",
204 			    tq->tq_name);
205 
206 			tq->tq_running--;
207 			/* could have been destroyed during kthread_create */
208 			if (tq->tq_state == TQ_S_DESTROYED &&
209 			    tq->tq_running == 0)
210 				wakeup_one(&tq->tq_running);
211 			break;
212 		}
213 	} while (tq->tq_running < tq->tq_nthreads);
214 
215 	mtx_leave(&tq->tq_mtx);
216 }
217 
218 void
219 taskq_barrier(struct taskq *tq)
220 {
221 	struct cond c = COND_INITIALIZER();
222 	struct task t = TASK_INITIALIZER(taskq_barrier_task, &c);
223 
224 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
225 
226 	task_add(tq, &t);
227 	cond_wait(&c, "tqbar");
228 }
229 
230 void
231 taskq_del_barrier(struct taskq *tq, struct task *del)
232 {
233 	struct cond c = COND_INITIALIZER();
234 	struct task t = TASK_INITIALIZER(taskq_barrier_task, &c);
235 
236 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
237 
238 	if (task_del(tq, del))
239 		return;
240 
241 	task_add(tq, &t);
242 	cond_wait(&c, "tqbar");
243 }
244 
245 void
246 taskq_barrier_task(void *p)
247 {
248 	struct cond *c = p;
249 	cond_signal(c);
250 }
251 
252 void
253 task_set(struct task *t, void (*fn)(void *), void *arg)
254 {
255 	t->t_func = fn;
256 	t->t_arg = arg;
257 	t->t_flags = 0;
258 }
259 
260 int
261 task_add(struct taskq *tq, struct task *w)
262 {
263 	int rv = 0;
264 
265 	if (ISSET(w->t_flags, TASK_ONQUEUE))
266 		return (0);
267 
268 	mtx_enter(&tq->tq_mtx);
269 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
270 		rv = 1;
271 		SET(w->t_flags, TASK_ONQUEUE);
272 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
273 	}
274 	mtx_leave(&tq->tq_mtx);
275 
276 	if (rv)
277 		wakeup_one(tq);
278 
279 	return (rv);
280 }
281 
282 int
283 task_del(struct taskq *tq, struct task *w)
284 {
285 	int rv = 0;
286 
287 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
288 		return (0);
289 
290 	mtx_enter(&tq->tq_mtx);
291 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
292 		rv = 1;
293 		CLR(w->t_flags, TASK_ONQUEUE);
294 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
295 	}
296 	mtx_leave(&tq->tq_mtx);
297 
298 	return (rv);
299 }
300 
301 int
302 taskq_next_work(struct taskq *tq, struct task *work)
303 {
304 	struct task *next;
305 
306 	mtx_enter(&tq->tq_mtx);
307 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
308 		if (tq->tq_state != TQ_S_RUNNING) {
309 			mtx_leave(&tq->tq_mtx);
310 			return (0);
311 		}
312 
313 		msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
314 	}
315 
316 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
317 	CLR(next->t_flags, TASK_ONQUEUE);
318 
319 	*work = *next; /* copy to caller to avoid races */
320 
321 	next = TAILQ_FIRST(&tq->tq_worklist);
322 	mtx_leave(&tq->tq_mtx);
323 
324 	if (next != NULL && tq->tq_nthreads > 1)
325 		wakeup_one(tq);
326 
327 	return (1);
328 }
329 
330 void
331 taskq_thread(void *xtq)
332 {
333 	struct taskq *tq = xtq;
334 	struct task work;
335 	int last;
336 
337 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
338 		KERNEL_UNLOCK();
339 
340 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
341 
342 	while (taskq_next_work(tq, &work)) {
343 		WITNESS_LOCK(&tq->tq_lock_object, 0);
344 		(*work.t_func)(work.t_arg);
345 		WITNESS_UNLOCK(&tq->tq_lock_object, 0);
346 		sched_pause(yield);
347 	}
348 
349 	mtx_enter(&tq->tq_mtx);
350 	last = (--tq->tq_running == 0);
351 	mtx_leave(&tq->tq_mtx);
352 
353 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
354 		KERNEL_LOCK();
355 
356 	if (last)
357 		wakeup_one(&tq->tq_running);
358 
359 	kthread_exit(0);
360 }
361