xref: /openbsd-src/sys/kern/kern_task.c (revision c90a81c56dcebd6a1b73fe4aff9b03385b8e63b3)
1 /*	$OpenBSD: kern_task.c,v 1.23 2018/12/16 03:36:02 dlg Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 
27 struct taskq {
28 	enum {
29 		TQ_S_CREATED,
30 		TQ_S_RUNNING,
31 		TQ_S_DESTROYED
32 	}			 tq_state;
33 	unsigned int		 tq_running;
34 	unsigned int		 tq_nthreads;
35 	unsigned int		 tq_flags;
36 	const char		*tq_name;
37 
38 	struct mutex		 tq_mtx;
39 	struct task_list	 tq_worklist;
40 };
41 
42 struct taskq taskq_sys = {
43 	TQ_S_CREATED,
44 	0,
45 	1,
46 	0,
47 	"systq",
48 	MUTEX_INITIALIZER(IPL_HIGH),
49 	TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
50 };
51 
52 struct taskq taskq_sys_mp = {
53 	TQ_S_CREATED,
54 	0,
55 	1,
56 	TASKQ_MPSAFE,
57 	"systqmp",
58 	MUTEX_INITIALIZER(IPL_HIGH),
59 	TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist)
60 };
61 
62 typedef int (*sleepfn)(const volatile void *, struct mutex *, int,
63     const char *, int);
64 
65 struct taskq *const systq = &taskq_sys;
66 struct taskq *const systqmp = &taskq_sys_mp;
67 
68 void	taskq_init(void); /* called in init_main.c */
69 void	taskq_create_thread(void *);
70 void	taskq_barrier_task(void *);
71 int	taskq_sleep(const volatile void *, struct mutex *, int,
72 	    const char *, int);
73 int	taskq_next_work(struct taskq *, struct task *, sleepfn);
74 void	taskq_thread(void *);
75 
76 void
77 taskq_init(void)
78 {
79 	kthread_create_deferred(taskq_create_thread, systq);
80 	kthread_create_deferred(taskq_create_thread, systqmp);
81 }
82 
83 struct taskq *
84 taskq_create(const char *name, unsigned int nthreads, int ipl,
85     unsigned int flags)
86 {
87 	struct taskq *tq;
88 
89 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
90 	if (tq == NULL)
91 		return (NULL);
92 
93 	tq->tq_state = TQ_S_CREATED;
94 	tq->tq_running = 0;
95 	tq->tq_nthreads = nthreads;
96 	tq->tq_name = name;
97 	tq->tq_flags = flags;
98 
99 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
100 	TAILQ_INIT(&tq->tq_worklist);
101 
102 	/* try to create a thread to guarantee that tasks will be serviced */
103 	kthread_create_deferred(taskq_create_thread, tq);
104 
105 	return (tq);
106 }
107 
108 void
109 taskq_destroy(struct taskq *tq)
110 {
111 	mtx_enter(&tq->tq_mtx);
112 	switch (tq->tq_state) {
113 	case TQ_S_CREATED:
114 		/* tq is still referenced by taskq_create_thread */
115 		tq->tq_state = TQ_S_DESTROYED;
116 		mtx_leave(&tq->tq_mtx);
117 		return;
118 
119 	case TQ_S_RUNNING:
120 		tq->tq_state = TQ_S_DESTROYED;
121 		break;
122 
123 	default:
124 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
125 	}
126 
127 	while (tq->tq_running > 0) {
128 		wakeup(tq);
129 		msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
130 	}
131 	mtx_leave(&tq->tq_mtx);
132 
133 	free(tq, M_DEVBUF, sizeof(*tq));
134 }
135 
136 void
137 taskq_create_thread(void *arg)
138 {
139 	struct taskq *tq = arg;
140 	int rv;
141 
142 	mtx_enter(&tq->tq_mtx);
143 
144 	switch (tq->tq_state) {
145 	case TQ_S_DESTROYED:
146 		mtx_leave(&tq->tq_mtx);
147 		free(tq, M_DEVBUF, sizeof(*tq));
148 		return;
149 
150 	case TQ_S_CREATED:
151 		tq->tq_state = TQ_S_RUNNING;
152 		break;
153 
154 	default:
155 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
156 	}
157 
158 	do {
159 		tq->tq_running++;
160 		mtx_leave(&tq->tq_mtx);
161 
162 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
163 
164 		mtx_enter(&tq->tq_mtx);
165 		if (rv != 0) {
166 			printf("unable to create thread for \"%s\" taskq\n",
167 			    tq->tq_name);
168 
169 			tq->tq_running--;
170 			/* could have been destroyed during kthread_create */
171 			if (tq->tq_state == TQ_S_DESTROYED &&
172 			    tq->tq_running == 0)
173 				wakeup_one(&tq->tq_running);
174 			break;
175 		}
176 	} while (tq->tq_running < tq->tq_nthreads);
177 
178 	mtx_leave(&tq->tq_mtx);
179 }
180 
181 void
182 taskq_barrier(struct taskq *tq)
183 {
184 	struct cond c = COND_INITIALIZER();
185 	struct task t = TASK_INITIALIZER(taskq_barrier_task, &c);
186 
187 	task_add(tq, &t);
188 
189 	cond_wait(&c, "tqbar");
190 }
191 
192 void
193 taskq_barrier_task(void *p)
194 {
195 	struct cond *c = p;
196 	cond_signal(c);
197 }
198 
199 void
200 task_set(struct task *t, void (*fn)(void *), void *arg)
201 {
202 	t->t_func = fn;
203 	t->t_arg = arg;
204 	t->t_flags = 0;
205 }
206 
207 int
208 task_add(struct taskq *tq, struct task *w)
209 {
210 	int rv = 0;
211 
212 	if (ISSET(w->t_flags, TASK_ONQUEUE))
213 		return (0);
214 
215 	mtx_enter(&tq->tq_mtx);
216 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
217 		rv = 1;
218 		SET(w->t_flags, TASK_ONQUEUE);
219 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
220 	}
221 	mtx_leave(&tq->tq_mtx);
222 
223 	if (rv)
224 		wakeup_one(tq);
225 
226 	return (rv);
227 }
228 
229 int
230 task_del(struct taskq *tq, struct task *w)
231 {
232 	int rv = 0;
233 
234 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
235 		return (0);
236 
237 	mtx_enter(&tq->tq_mtx);
238 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
239 		rv = 1;
240 		CLR(w->t_flags, TASK_ONQUEUE);
241 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
242 	}
243 	mtx_leave(&tq->tq_mtx);
244 
245 	return (rv);
246 }
247 
248 int
249 taskq_sleep(const volatile void *ident, struct mutex *mtx, int priority,
250     const char *wmesg, int tmo)
251 {
252 	u_int *flags = &curproc->p_flag;
253 	int rv;
254 
255 	atomic_clearbits_int(flags, P_CANTSLEEP);
256 	rv = msleep(ident, mtx, priority, wmesg, tmo);
257 	atomic_setbits_int(flags, P_CANTSLEEP);
258 
259 	return (tmo);
260 }
261 
262 int
263 taskq_next_work(struct taskq *tq, struct task *work, sleepfn tqsleep)
264 {
265 	struct task *next;
266 
267 	mtx_enter(&tq->tq_mtx);
268 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
269 		if (tq->tq_state != TQ_S_RUNNING) {
270 			mtx_leave(&tq->tq_mtx);
271 			return (0);
272 		}
273 
274 		tqsleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
275 	}
276 
277 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
278 	CLR(next->t_flags, TASK_ONQUEUE);
279 
280 	*work = *next; /* copy to caller to avoid races */
281 
282 	next = TAILQ_FIRST(&tq->tq_worklist);
283 	mtx_leave(&tq->tq_mtx);
284 
285 	if (next != NULL && tq->tq_nthreads > 1)
286 		wakeup_one(tq);
287 
288 	return (1);
289 }
290 
291 void
292 taskq_thread(void *xtq)
293 {
294 	sleepfn tqsleep = msleep;
295 	struct taskq *tq = xtq;
296 	struct task work;
297 	int last;
298 
299 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
300 		KERNEL_UNLOCK();
301 
302 	if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) {
303 		tqsleep = taskq_sleep;
304 		atomic_setbits_int(&curproc->p_flag, P_CANTSLEEP);
305 	}
306 
307 	while (taskq_next_work(tq, &work, tqsleep)) {
308 		(*work.t_func)(work.t_arg);
309 		sched_pause(yield);
310 	}
311 
312 	mtx_enter(&tq->tq_mtx);
313 	last = (--tq->tq_running == 0);
314 	mtx_leave(&tq->tq_mtx);
315 
316 	if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP))
317 		atomic_clearbits_int(&curproc->p_flag, P_CANTSLEEP);
318 
319 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
320 		KERNEL_LOCK();
321 
322 	if (last)
323 		wakeup_one(&tq->tq_running);
324 
325 	kthread_exit(0);
326 }
327