xref: /openbsd-src/sys/kern/kern_task.c (revision ea6088e7d368d53c49ebfdf4520275cec2f78f5b)
1 /*	$OpenBSD: kern_task.c,v 1.21 2017/11/13 23:52:49 dlg Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 
27 #define TASK_ONQUEUE	1
28 
29 struct taskq {
30 	enum {
31 		TQ_S_CREATED,
32 		TQ_S_RUNNING,
33 		TQ_S_DESTROYED
34 	}			 tq_state;
35 	unsigned int		 tq_running;
36 	unsigned int		 tq_nthreads;
37 	unsigned int		 tq_flags;
38 	const char		*tq_name;
39 
40 	struct mutex		 tq_mtx;
41 	struct task_list	 tq_worklist;
42 };
43 
44 struct taskq taskq_sys = {
45 	TQ_S_CREATED,
46 	0,
47 	1,
48 	0,
49 	"systq",
50 	MUTEX_INITIALIZER(IPL_HIGH),
51 	TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
52 };
53 
54 struct taskq taskq_sys_mp = {
55 	TQ_S_CREATED,
56 	0,
57 	1,
58 	TASKQ_MPSAFE,
59 	"systqmp",
60 	MUTEX_INITIALIZER(IPL_HIGH),
61 	TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist)
62 };
63 
64 typedef int (*sleepfn)(const volatile void *, struct mutex *, int,
65     const char *, int);
66 
67 struct taskq *const systq = &taskq_sys;
68 struct taskq *const systqmp = &taskq_sys_mp;
69 
70 void	taskq_init(void); /* called in init_main.c */
71 void	taskq_create_thread(void *);
72 void	taskq_barrier_task(void *);
73 int	taskq_sleep(const volatile void *, struct mutex *, int,
74 	    const char *, int);
75 int	taskq_next_work(struct taskq *, struct task *, sleepfn);
76 void	taskq_thread(void *);
77 
78 void
79 taskq_init(void)
80 {
81 	kthread_create_deferred(taskq_create_thread, systq);
82 	kthread_create_deferred(taskq_create_thread, systqmp);
83 }
84 
85 struct taskq *
86 taskq_create(const char *name, unsigned int nthreads, int ipl,
87     unsigned int flags)
88 {
89 	struct taskq *tq;
90 
91 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
92 	if (tq == NULL)
93 		return (NULL);
94 
95 	tq->tq_state = TQ_S_CREATED;
96 	tq->tq_running = 0;
97 	tq->tq_nthreads = nthreads;
98 	tq->tq_name = name;
99 	tq->tq_flags = flags;
100 
101 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
102 	TAILQ_INIT(&tq->tq_worklist);
103 
104 	/* try to create a thread to guarantee that tasks will be serviced */
105 	kthread_create_deferred(taskq_create_thread, tq);
106 
107 	return (tq);
108 }
109 
110 void
111 taskq_destroy(struct taskq *tq)
112 {
113 	mtx_enter(&tq->tq_mtx);
114 	switch (tq->tq_state) {
115 	case TQ_S_CREATED:
116 		/* tq is still referenced by taskq_create_thread */
117 		tq->tq_state = TQ_S_DESTROYED;
118 		mtx_leave(&tq->tq_mtx);
119 		return;
120 
121 	case TQ_S_RUNNING:
122 		tq->tq_state = TQ_S_DESTROYED;
123 		break;
124 
125 	default:
126 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
127 	}
128 
129 	while (tq->tq_running > 0) {
130 		wakeup(tq);
131 		msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
132 	}
133 	mtx_leave(&tq->tq_mtx);
134 
135 	free(tq, M_DEVBUF, sizeof(*tq));
136 }
137 
138 void
139 taskq_create_thread(void *arg)
140 {
141 	struct taskq *tq = arg;
142 	int rv;
143 
144 	mtx_enter(&tq->tq_mtx);
145 
146 	switch (tq->tq_state) {
147 	case TQ_S_DESTROYED:
148 		mtx_leave(&tq->tq_mtx);
149 		free(tq, M_DEVBUF, sizeof(*tq));
150 		return;
151 
152 	case TQ_S_CREATED:
153 		tq->tq_state = TQ_S_RUNNING;
154 		break;
155 
156 	default:
157 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
158 	}
159 
160 	do {
161 		tq->tq_running++;
162 		mtx_leave(&tq->tq_mtx);
163 
164 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
165 
166 		mtx_enter(&tq->tq_mtx);
167 		if (rv != 0) {
168 			printf("unable to create thread for \"%s\" taskq\n",
169 			    tq->tq_name);
170 
171 			tq->tq_running--;
172 			/* could have been destroyed during kthread_create */
173 			if (tq->tq_state == TQ_S_DESTROYED &&
174 			    tq->tq_running == 0)
175 				wakeup_one(&tq->tq_running);
176 			break;
177 		}
178 	} while (tq->tq_running < tq->tq_nthreads);
179 
180 	mtx_leave(&tq->tq_mtx);
181 }
182 
183 void
184 taskq_barrier(struct taskq *tq)
185 {
186 	struct sleep_state sls;
187 	unsigned int notdone = 1;
188 	struct task t = TASK_INITIALIZER(taskq_barrier_task, &notdone);
189 
190 	task_add(tq, &t);
191 
192 	while (notdone) {
193 		sleep_setup(&sls, &notdone, PWAIT, "tqbar");
194 		sleep_finish(&sls, notdone);
195 	}
196 }
197 
198 void
199 taskq_barrier_task(void *p)
200 {
201 	unsigned int *notdone = p;
202 
203 	*notdone = 0;
204 	wakeup_one(notdone);
205 }
206 
207 void
208 task_set(struct task *t, void (*fn)(void *), void *arg)
209 {
210 	t->t_func = fn;
211 	t->t_arg = arg;
212 	t->t_flags = 0;
213 }
214 
215 int
216 task_add(struct taskq *tq, struct task *w)
217 {
218 	int rv = 0;
219 
220 	if (ISSET(w->t_flags, TASK_ONQUEUE))
221 		return (0);
222 
223 	mtx_enter(&tq->tq_mtx);
224 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
225 		rv = 1;
226 		SET(w->t_flags, TASK_ONQUEUE);
227 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
228 	}
229 	mtx_leave(&tq->tq_mtx);
230 
231 	if (rv)
232 		wakeup_one(tq);
233 
234 	return (rv);
235 }
236 
237 int
238 task_del(struct taskq *tq, struct task *w)
239 {
240 	int rv = 0;
241 
242 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
243 		return (0);
244 
245 	mtx_enter(&tq->tq_mtx);
246 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
247 		rv = 1;
248 		CLR(w->t_flags, TASK_ONQUEUE);
249 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
250 	}
251 	mtx_leave(&tq->tq_mtx);
252 
253 	return (rv);
254 }
255 
256 int
257 taskq_sleep(const volatile void *ident, struct mutex *mtx, int priority,
258     const char *wmesg, int tmo)
259 {
260 	u_int *flags = &curproc->p_flag;
261 	int rv;
262 
263 	atomic_clearbits_int(flags, P_CANTSLEEP);
264 	rv = msleep(ident, mtx, priority, wmesg, tmo);
265 	atomic_setbits_int(flags, P_CANTSLEEP);
266 
267 	return (tmo);
268 }
269 
270 int
271 taskq_next_work(struct taskq *tq, struct task *work, sleepfn tqsleep)
272 {
273 	struct task *next;
274 
275 	mtx_enter(&tq->tq_mtx);
276 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
277 		if (tq->tq_state != TQ_S_RUNNING) {
278 			mtx_leave(&tq->tq_mtx);
279 			return (0);
280 		}
281 
282 		tqsleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
283 	}
284 
285 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
286 	CLR(next->t_flags, TASK_ONQUEUE);
287 
288 	*work = *next; /* copy to caller to avoid races */
289 
290 	next = TAILQ_FIRST(&tq->tq_worklist);
291 	mtx_leave(&tq->tq_mtx);
292 
293 	if (next != NULL && tq->tq_nthreads > 1)
294 		wakeup_one(tq);
295 
296 	return (1);
297 }
298 
299 void
300 taskq_thread(void *xtq)
301 {
302 	sleepfn tqsleep = msleep;
303 	struct taskq *tq = xtq;
304 	struct task work;
305 	int last;
306 
307 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
308 		KERNEL_UNLOCK();
309 
310 	if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) {
311 		tqsleep = taskq_sleep;
312 		atomic_setbits_int(&curproc->p_flag, P_CANTSLEEP);
313 	}
314 
315 	while (taskq_next_work(tq, &work, tqsleep)) {
316 		(*work.t_func)(work.t_arg);
317 		sched_pause(yield);
318 	}
319 
320 	mtx_enter(&tq->tq_mtx);
321 	last = (--tq->tq_running == 0);
322 	mtx_leave(&tq->tq_mtx);
323 
324 	if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP))
325 		atomic_clearbits_int(&curproc->p_flag, P_CANTSLEEP);
326 
327 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
328 		KERNEL_LOCK();
329 
330 	if (last)
331 		wakeup_one(&tq->tq_running);
332 
333 	kthread_exit(0);
334 }
335