xref: /openbsd-src/sys/kern/kern_task.c (revision 91f110e064cd7c194e59e019b83bb7496c1c84d4)
1 /*	$OpenBSD: kern_task.c,v 1.8 2013/12/11 22:39:49 kettenis Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 
26 #define TASK_ONQUEUE	1
27 
28 struct taskq {
29 	enum {
30 		TQ_S_CREATED,
31 		TQ_S_RUNNING,
32 		TQ_S_DESTROYED
33 	}		tq_state;
34 	unsigned int	tq_running;
35 	unsigned int	tq_nthreads;
36 	unsigned int	tq_unlocked;
37 	const char	*tq_name;
38 
39 	struct mutex	tq_mtx;
40 	TAILQ_HEAD(, task) tq_worklist;
41 };
42 
43 struct taskq taskq_sys = {
44 	TQ_S_CREATED,
45 	0,
46 	1,
47 	0,
48 	"systq",
49 	MUTEX_INITIALIZER(IPL_HIGH),
50 	TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
51 };
52 
53 struct taskq *const systq = &taskq_sys;
54 
55 void	taskq_init(void); /* called in init_main.c */
56 void	taskq_create_thread(void *);
57 int	taskq_next_work(struct taskq *, struct task *);
58 void	taskq_thread(void *);
59 
60 void
61 taskq_init(void)
62 {
63 	kthread_create_deferred(taskq_create_thread, systq);
64 }
65 
66 struct taskq *
67 taskq_create(const char *name, unsigned int nthreads, int ipl)
68 {
69 	struct taskq *tq;
70 
71 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
72 	if (tq == NULL)
73 		return (NULL);
74 
75 	tq->tq_state = TQ_S_CREATED;
76 	tq->tq_running = 0;
77 	tq->tq_nthreads = nthreads;
78 	tq->tq_name = name;
79 
80 	if (ipl & IPL_MPSAFE)
81 		tq->tq_unlocked = 1;
82 	else
83 		tq->tq_unlocked = 0;
84 	ipl &= ~IPL_MPSAFE;
85 
86 	mtx_init(&tq->tq_mtx, ipl);
87 	TAILQ_INIT(&tq->tq_worklist);
88 
89 	/* try to create a thread to guarantee that tasks will be serviced */
90 	kthread_create_deferred(taskq_create_thread, tq);
91 
92 	return (tq);
93 }
94 
95 void
96 taskq_destroy(struct taskq *tq)
97 {
98 	mtx_enter(&tq->tq_mtx);
99 	switch (tq->tq_state) {
100 	case TQ_S_CREATED:
101 		/* tq is still referenced by taskq_create_thread */
102 		tq->tq_state = TQ_S_DESTROYED;
103 		mtx_leave(&tq->tq_mtx);
104 		return;
105 
106 	case TQ_S_RUNNING:
107 		tq->tq_state = TQ_S_DESTROYED;
108 		break;
109 
110 	default:
111 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
112 	}
113 
114 	while (tq->tq_running > 0) {
115 		wakeup(tq);
116 		msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
117 	}
118 	mtx_leave(&tq->tq_mtx);
119 
120 	free(tq, M_DEVBUF);
121 }
122 
123 void
124 taskq_create_thread(void *arg)
125 {
126 	struct taskq *tq = arg;
127 	int rv;
128 
129 	mtx_enter(&tq->tq_mtx);
130 
131 	switch (tq->tq_state) {
132 	case TQ_S_DESTROYED:
133 		mtx_leave(&tq->tq_mtx);
134 		free(tq, M_DEVBUF);
135 		return;
136 
137 	case TQ_S_CREATED:
138 		tq->tq_state = TQ_S_RUNNING;
139 		break;
140 
141 	default:
142 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
143 	}
144 
145 	do {
146 		tq->tq_running++;
147 		mtx_leave(&tq->tq_mtx);
148 
149 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
150 
151 		mtx_enter(&tq->tq_mtx);
152 		if (rv != 0) {
153 			printf("unable to create thread for \"%s\" taskq\n",
154 			    tq->tq_name);
155 
156 			tq->tq_running--;
157 			/* could have been destroyed during kthread_create */
158 			if (tq->tq_state == TQ_S_DESTROYED &&
159 			    tq->tq_running == 0)
160 				wakeup_one(&tq->tq_running);
161 			break;
162 		}
163 	} while (tq->tq_running < tq->tq_nthreads);
164 
165 	mtx_leave(&tq->tq_mtx);
166 }
167 
168 void
169 task_set(struct task *t, void (*fn)(void *, void *), void *arg1, void *arg2)
170 {
171 	t->t_func = fn;
172 	t->t_arg1 = arg1;
173 	t->t_arg2 = arg2;
174 
175 	t->t_flags = 0;
176 }
177 
178 int
179 task_add(struct taskq *tq, struct task *w)
180 {
181 	int rv = 0;
182 
183 	mtx_enter(&tq->tq_mtx);
184 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
185 		rv = 1;
186 		SET(w->t_flags, TASK_ONQUEUE);
187 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
188 	}
189 	mtx_leave(&tq->tq_mtx);
190 
191 	if (rv)
192 		wakeup_one(tq);
193 
194 	return (rv);
195 }
196 
197 int
198 task_del(struct taskq *tq, struct task *w)
199 {
200 	int rv = 0;
201 
202 	mtx_enter(&tq->tq_mtx);
203 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
204 		rv = 1;
205 		CLR(w->t_flags, TASK_ONQUEUE);
206 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
207 	}
208 	mtx_leave(&tq->tq_mtx);
209 
210 	return (rv);
211 }
212 
213 int
214 taskq_next_work(struct taskq *tq, struct task *work)
215 {
216 	struct task *next;
217 
218 	mtx_enter(&tq->tq_mtx);
219 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
220 		if (tq->tq_state != TQ_S_RUNNING) {
221 			mtx_leave(&tq->tq_mtx);
222 			return (0);
223 		}
224 
225 		msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
226 	}
227 
228 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
229 	CLR(next->t_flags, TASK_ONQUEUE);
230 
231 	*work = *next; /* copy to caller to avoid races */
232 
233 	next = TAILQ_FIRST(&tq->tq_worklist);
234 	mtx_leave(&tq->tq_mtx);
235 
236 	if (next != NULL)
237 		wakeup_one(tq);
238 
239 	return (1);
240 }
241 
242 void
243 taskq_thread(void *xtq)
244 {
245 	struct taskq *tq = xtq;
246 	struct task work;
247 	int last;
248 
249 	if (tq->tq_unlocked)
250 		KERNEL_UNLOCK();
251 
252 	while (taskq_next_work(tq, &work))
253 		(*work.t_func)(work.t_arg1, work.t_arg2);
254 
255 	mtx_enter(&tq->tq_mtx);
256 	last = (--tq->tq_running == 0);
257 	mtx_leave(&tq->tq_mtx);
258 
259 	if (tq->tq_unlocked)
260 		KERNEL_LOCK();
261 
262 	if (last)
263 		wakeup_one(&tq->tq_running);
264 
265 	kthread_exit(0);
266 }
267