xref: /openbsd-src/sys/kern/kern_task.c (revision e5157e49389faebcb42b7237d55fbf096d9c2523)
1 /*	$OpenBSD: kern_task.c,v 1.12 2014/11/01 23:58:28 tedu Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 
26 #define TASK_ONQUEUE	1
27 
28 struct taskq {
29 	enum {
30 		TQ_S_CREATED,
31 		TQ_S_RUNNING,
32 		TQ_S_DESTROYED
33 	}		tq_state;
34 	unsigned int	tq_running;
35 	unsigned int	tq_nthreads;
36 	unsigned int	tq_unlocked;
37 	const char	*tq_name;
38 
39 	struct mutex	tq_mtx;
40 	TAILQ_HEAD(, task) tq_worklist;
41 };
42 
43 struct taskq taskq_sys = {
44 	TQ_S_CREATED,
45 	0,
46 	1,
47 	0,
48 	"systq",
49 	MUTEX_INITIALIZER(IPL_HIGH),
50 	TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
51 };
52 
53 struct taskq taskq_sys_mp = {
54 	TQ_S_CREATED,
55 	0,
56 	1,
57 	1,
58 	"systqmp",
59 	MUTEX_INITIALIZER(IPL_HIGH),
60 	TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist)
61 };
62 
63 struct taskq *const systq = &taskq_sys;
64 struct taskq *const systqmp = &taskq_sys_mp;
65 
66 void	taskq_init(void); /* called in init_main.c */
67 void	taskq_create_thread(void *);
68 int	taskq_next_work(struct taskq *, struct task *);
69 void	taskq_thread(void *);
70 
71 void
72 taskq_init(void)
73 {
74 	kthread_create_deferred(taskq_create_thread, systq);
75 	kthread_create_deferred(taskq_create_thread, systqmp);
76 }
77 
78 struct taskq *
79 taskq_create(const char *name, unsigned int nthreads, int ipl)
80 {
81 	struct taskq *tq;
82 
83 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
84 	if (tq == NULL)
85 		return (NULL);
86 
87 	tq->tq_state = TQ_S_CREATED;
88 	tq->tq_running = 0;
89 	tq->tq_nthreads = nthreads;
90 	tq->tq_name = name;
91 
92 	if (ipl & IPL_MPSAFE)
93 		tq->tq_unlocked = 1;
94 	else
95 		tq->tq_unlocked = 0;
96 	ipl &= ~IPL_MPSAFE;
97 
98 	mtx_init(&tq->tq_mtx, ipl);
99 	TAILQ_INIT(&tq->tq_worklist);
100 
101 	/* try to create a thread to guarantee that tasks will be serviced */
102 	kthread_create_deferred(taskq_create_thread, tq);
103 
104 	return (tq);
105 }
106 
107 void
108 taskq_destroy(struct taskq *tq)
109 {
110 	mtx_enter(&tq->tq_mtx);
111 	switch (tq->tq_state) {
112 	case TQ_S_CREATED:
113 		/* tq is still referenced by taskq_create_thread */
114 		tq->tq_state = TQ_S_DESTROYED;
115 		mtx_leave(&tq->tq_mtx);
116 		return;
117 
118 	case TQ_S_RUNNING:
119 		tq->tq_state = TQ_S_DESTROYED;
120 		break;
121 
122 	default:
123 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
124 	}
125 
126 	while (tq->tq_running > 0) {
127 		wakeup(tq);
128 		msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
129 	}
130 	mtx_leave(&tq->tq_mtx);
131 
132 	free(tq, M_DEVBUF, sizeof(*tq));
133 }
134 
135 void
136 taskq_create_thread(void *arg)
137 {
138 	struct taskq *tq = arg;
139 	int rv;
140 
141 	mtx_enter(&tq->tq_mtx);
142 
143 	switch (tq->tq_state) {
144 	case TQ_S_DESTROYED:
145 		mtx_leave(&tq->tq_mtx);
146 		free(tq, M_DEVBUF, sizeof(*tq));
147 		return;
148 
149 	case TQ_S_CREATED:
150 		tq->tq_state = TQ_S_RUNNING;
151 		break;
152 
153 	default:
154 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
155 	}
156 
157 	do {
158 		tq->tq_running++;
159 		mtx_leave(&tq->tq_mtx);
160 
161 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
162 
163 		mtx_enter(&tq->tq_mtx);
164 		if (rv != 0) {
165 			printf("unable to create thread for \"%s\" taskq\n",
166 			    tq->tq_name);
167 
168 			tq->tq_running--;
169 			/* could have been destroyed during kthread_create */
170 			if (tq->tq_state == TQ_S_DESTROYED &&
171 			    tq->tq_running == 0)
172 				wakeup_one(&tq->tq_running);
173 			break;
174 		}
175 	} while (tq->tq_running < tq->tq_nthreads);
176 
177 	mtx_leave(&tq->tq_mtx);
178 }
179 
180 void
181 task_set(struct task *t, void (*fn)(void *, void *), void *arg1, void *arg2)
182 {
183 	t->t_func = fn;
184 	t->t_arg1 = arg1;
185 	t->t_arg2 = arg2;
186 
187 	t->t_flags = 0;
188 }
189 
190 int
191 task_add(struct taskq *tq, struct task *w)
192 {
193 	int rv = 0;
194 
195 	mtx_enter(&tq->tq_mtx);
196 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
197 		rv = 1;
198 		SET(w->t_flags, TASK_ONQUEUE);
199 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
200 	}
201 	mtx_leave(&tq->tq_mtx);
202 
203 	if (rv)
204 		wakeup_one(tq);
205 
206 	return (rv);
207 }
208 
209 int
210 task_del(struct taskq *tq, struct task *w)
211 {
212 	int rv = 0;
213 
214 	mtx_enter(&tq->tq_mtx);
215 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
216 		rv = 1;
217 		CLR(w->t_flags, TASK_ONQUEUE);
218 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
219 	}
220 	mtx_leave(&tq->tq_mtx);
221 
222 	return (rv);
223 }
224 
225 int
226 taskq_next_work(struct taskq *tq, struct task *work)
227 {
228 	struct task *next;
229 
230 	mtx_enter(&tq->tq_mtx);
231 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
232 		if (tq->tq_state != TQ_S_RUNNING) {
233 			mtx_leave(&tq->tq_mtx);
234 			return (0);
235 		}
236 
237 		msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
238 	}
239 
240 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
241 	CLR(next->t_flags, TASK_ONQUEUE);
242 
243 	*work = *next; /* copy to caller to avoid races */
244 
245 	next = TAILQ_FIRST(&tq->tq_worklist);
246 	mtx_leave(&tq->tq_mtx);
247 
248 	if (next != NULL)
249 		wakeup_one(tq);
250 
251 	return (1);
252 }
253 
254 void
255 taskq_thread(void *xtq)
256 {
257 	struct taskq *tq = xtq;
258 	struct task work;
259 	int last;
260 
261 	if (tq->tq_unlocked)
262 		KERNEL_UNLOCK();
263 
264 	while (taskq_next_work(tq, &work)) {
265 		(*work.t_func)(work.t_arg1, work.t_arg2);
266 		sched_pause();
267 	}
268 
269 	mtx_enter(&tq->tq_mtx);
270 	last = (--tq->tq_running == 0);
271 	mtx_leave(&tq->tq_mtx);
272 
273 	if (tq->tq_unlocked)
274 		KERNEL_LOCK();
275 
276 	if (last)
277 		wakeup_one(&tq->tq_running);
278 
279 	kthread_exit(0);
280 }
281