xref: /netbsd-src/external/cddl/osnet/sys/kern/taskq.c (revision c72b1aea36d3376b7344a1bb2e12a66c524d9c53)
1 /*	$NetBSD: taskq.c,v 1.11 2019/08/20 08:12:50 hannken Exp $	*/
2 
3 /*-
4  * Copyright (c) 2019 The NetBSD Foundation, Inc.
5  * All rights reserved.
6  *
7  * This code is derived from software contributed to The NetBSD Foundation
8  * by Juergen Hannken-Illjes.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/kcondvar.h>
35 #include <sys/kernel.h>
36 #include <sys/kmem.h>
37 #include <sys/mutex.h>
38 #include <sys/proc.h>
39 #include <sys/threadpool.h>
40 
41 #include <sys/taskq.h>
42 
43 struct taskq_executor {
44 	struct threadpool_job te_job;	/* Threadpool job serving the queue. */
45 	taskq_t *te_self;		/* Backpointer to the queue. */
46 	unsigned te_running:1;		/* True if the job is running. */
47 };
48 
49 struct taskq {
50 	int tq_nthreads;		/* # of threads serving queue. */
51 	pri_t tq_pri;			/* Scheduling priority. */
52 	uint_t tq_flags;		/* Saved flags from taskq_create. */
53 	int tq_active;			/* # of tasks (queued or running). */
54 	int tq_running;			/* # of jobs currently running. */
55 	int tq_waiting;			/* # of jobs currently idle. */
56 	unsigned tq_destroyed:1;	/* True if queue gets destroyed. */
57 	kmutex_t tq_lock;		/* Queue and job lock. */
58 	kcondvar_t tq_cv;		/* Queue condvar. */
59 	struct taskq_executor *tq_executor; /* Array of jobs. */
60 	struct threadpool *tq_threadpool; /* Pool backing the jobs. */
61 	SIMPLEQ_HEAD(, taskq_ent) tq_list; /* Queue of tasks waiting. */
62 };
63 
64 taskq_t *system_taskq;			/* General purpose task queue. */
65 
66 static specificdata_key_t taskq_lwp_key; /* Null or taskq this thread runs. */
67 
68 /*
69  * Threadpool job to service tasks from task queue.
70  * Runs until the task queue gets destroyed or the queue is empty for 10 secs.
71  */
72 static void
task_executor(struct threadpool_job * job)73 task_executor(struct threadpool_job *job)
74 {
75 	struct taskq_executor *state = (struct taskq_executor *)job;
76 	taskq_t *tq = state->te_self;
77 	taskq_ent_t *tqe;
78 	bool is_dynamic;
79 	int error;
80 
81 	lwp_setspecific(taskq_lwp_key, tq);
82 
83 	mutex_enter(&tq->tq_lock);
84 	while (!tq->tq_destroyed) {
85 		if (SIMPLEQ_EMPTY(&tq->tq_list)) {
86 			if (ISSET(tq->tq_flags, TASKQ_DYNAMIC))
87 				break;
88 			tq->tq_waiting++;
89 			error = cv_timedwait(&tq->tq_cv, &tq->tq_lock,
90 			    mstohz(10000));
91 			tq->tq_waiting--;
92 			if (SIMPLEQ_EMPTY(&tq->tq_list)) {
93 				if (error)
94 					break;
95 				continue;
96 			}
97 		}
98 		tqe = SIMPLEQ_FIRST(&tq->tq_list);
99 		KASSERT(tqe != NULL);
100 		SIMPLEQ_REMOVE_HEAD(&tq->tq_list, tqent_list);
101 		is_dynamic = tqe->tqent_dynamic;
102 		tqe->tqent_queued = 0;
103 		mutex_exit(&tq->tq_lock);
104 
105 		(*tqe->tqent_func)(tqe->tqent_arg);
106 
107 		mutex_enter(&tq->tq_lock);
108 		if (is_dynamic)
109 			kmem_free(tqe, sizeof(*tqe));
110 		tq->tq_active--;
111 	}
112 	state->te_running = 0;
113 	tq->tq_running--;
114 	threadpool_job_done(job);
115 	mutex_exit(&tq->tq_lock);
116 
117 	lwp_setspecific(taskq_lwp_key, NULL);
118 }
119 
120 void
taskq_init(void)121 taskq_init(void)
122 {
123 
124 	lwp_specific_key_create(&taskq_lwp_key, NULL);
125 	system_taskq = taskq_create("system_taskq", ncpu * 4, PRI_KERNEL,
126 	    4, 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
127 	KASSERT(system_taskq != NULL);
128 }
129 
130 void
taskq_fini(void)131 taskq_fini(void)
132 {
133 
134 	taskq_destroy(system_taskq);
135 	lwp_specific_key_delete(taskq_lwp_key);
136 }
137 
138 /*
139  * Dispatch a task entry creating executors as neeeded.
140  */
141 static void
taskq_dispatch_common(taskq_t * tq,taskq_ent_t * tqe,uint_t flags)142 taskq_dispatch_common(taskq_t *tq, taskq_ent_t *tqe, uint_t flags)
143 {
144 	int i;
145 
146 	KASSERT(mutex_owned(&tq->tq_lock));
147 
148 	if (ISSET(flags, TQ_FRONT))
149 		SIMPLEQ_INSERT_HEAD(&tq->tq_list, tqe, tqent_list);
150 	else
151 		SIMPLEQ_INSERT_TAIL(&tq->tq_list, tqe, tqent_list);
152 	tqe->tqent_queued = 1;
153 	tq->tq_active++;
154 	if (tq->tq_waiting) {
155 		cv_signal(&tq->tq_cv);
156 		mutex_exit(&tq->tq_lock);
157 		return;
158 	}
159 	if (tq->tq_running < tq->tq_nthreads) {
160 		for (i = 0; i < tq->tq_nthreads; i++) {
161 			if (!tq->tq_executor[i].te_running) {
162 				tq->tq_executor[i].te_running = 1;
163 				tq->tq_running++;
164 				threadpool_schedule_job(tq->tq_threadpool,
165 				    &tq->tq_executor[i].te_job);
166 				break;
167 			}
168 		}
169 	}
170 	mutex_exit(&tq->tq_lock);
171 }
172 
173 /*
174  * Allocate and dispatch a task entry.
175  */
176 taskqid_t
taskq_dispatch(taskq_t * tq,task_func_t func,void * arg,uint_t flags)177 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
178 {
179 	taskq_ent_t *tqe;
180 
181 	KASSERT(!ISSET(flags, ~(TQ_SLEEP | TQ_NOSLEEP | TQ_NOQUEUE)));
182 	KASSERT(ISSET(tq->tq_flags, TASKQ_DYNAMIC) ||
183 	    !ISSET(flags, TQ_NOQUEUE));
184 
185 	if (ISSET(flags, (TQ_SLEEP | TQ_NOSLEEP)) == TQ_NOSLEEP)
186 		tqe = kmem_alloc(sizeof(*tqe), KM_NOSLEEP);
187 	else
188 		tqe = kmem_alloc(sizeof(*tqe), KM_SLEEP);
189 	if (tqe == NULL)
190 		return (taskqid_t) NULL;
191 
192 	mutex_enter(&tq->tq_lock);
193 	if (ISSET(flags, TQ_NOQUEUE) && tq->tq_active == tq->tq_nthreads) {
194 		mutex_exit(&tq->tq_lock);
195 		kmem_free(tqe, sizeof(*tqe));
196 		return (taskqid_t) NULL;
197 	}
198 	tqe->tqent_dynamic = 1;
199 	tqe->tqent_queued = 0;
200 	tqe->tqent_func = func;
201 	tqe->tqent_arg = arg;
202 	taskq_dispatch_common(tq, tqe, flags);
203 
204 	return (taskqid_t) tqe;
205 }
206 
207 /*
208  * Dispatch a preallocated task entry.
209  * Assume caller zeroed it.
210  */
211 void
taskq_dispatch_ent(taskq_t * tq,task_func_t func,void * arg,uint_t flags,taskq_ent_t * tqe)212 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
213     taskq_ent_t *tqe)
214 {
215 
216 	KASSERT(!ISSET(flags, ~(TQ_FRONT)));
217 
218 	tqe->tqent_func = func;
219 	tqe->tqent_arg = arg;
220 	mutex_enter(&tq->tq_lock);
221 	taskq_dispatch_common(tq, tqe, flags);
222 }
223 
224 /*
225  * Wait until all tasks have completed.
226  */
227 void
taskq_wait(taskq_t * tq)228 taskq_wait(taskq_t *tq)
229 {
230 
231 	KASSERT(!taskq_member(tq, curlwp));
232 
233 	mutex_enter(&tq->tq_lock);
234 	while (tq->tq_active)
235 		kpause("qwait", false, 1, &tq->tq_lock);
236 	mutex_exit(&tq->tq_lock);
237 }
238 
239 /*
240  * True if the current thread is an executor for this queue.
241  */
242 int
taskq_member(taskq_t * tq,kthread_t * thread)243 taskq_member(taskq_t *tq, kthread_t *thread)
244 {
245 
246 	KASSERT(thread == curlwp);
247 
248 	return (lwp_getspecific(taskq_lwp_key) == tq);
249 }
250 
251 /*
252  * Create a task queue.
253  * Allocation hints are ignored.
254  */
255 taskq_t *
taskq_create(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags)256 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc,
257     int maxalloc, uint_t flags)
258 {
259 	int i;
260 	struct threadpool *threadpool;
261 	taskq_t *tq;
262 
263 	KASSERT(!ISSET(flags,
264 	    ~(TASKQ_DYNAMIC | TASKQ_PREPOPULATE | TASKQ_THREADS_CPU_PCT)));
265 
266 	if (threadpool_get(&threadpool, pri) != 0)
267 		return NULL;
268 
269 	if (ISSET(flags, TASKQ_THREADS_CPU_PCT))
270 		nthreads = MAX((ncpu * nthreads) / 100, 1);
271 
272 	tq = kmem_zalloc(sizeof(*tq), KM_SLEEP);
273 	tq->tq_nthreads = nthreads;
274 	tq->tq_pri = pri;
275 	tq->tq_flags = flags;
276 	mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, IPL_NONE);
277 	cv_init(&tq->tq_cv, NULL, CV_DEFAULT, NULL);
278 	SIMPLEQ_INIT(&tq->tq_list);
279 	tq->tq_executor = kmem_alloc(sizeof(*tq->tq_executor) * nthreads,
280 	    KM_SLEEP);
281 	for (i = 0; i < nthreads; i++) {
282 		threadpool_job_init(&tq->tq_executor[i].te_job, task_executor,
283 		    &tq->tq_lock, "%s/%d", name, i);
284 		tq->tq_executor[i].te_self = tq;
285 		tq->tq_executor[i].te_running = 0;
286 	}
287 	tq->tq_threadpool = threadpool;
288 
289 	return tq;
290 }
291 
292 taskq_t *
taskq_create_proc(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,struct proc * proc,uint_t flags)293 taskq_create_proc(const char *name, int nthreads, pri_t pri, int minalloc,
294     int maxalloc, struct proc *proc, uint_t flags)
295 {
296 
297 	return taskq_create(name, nthreads, pri, minalloc, maxalloc, flags);
298 }
299 
300 /*
301  * Destroy a task queue.
302  */
303 void
taskq_destroy(taskq_t * tq)304 taskq_destroy(taskq_t *tq)
305 {
306 	int i;
307 	taskq_ent_t *tqe;
308 
309 	KASSERT(!taskq_member(tq, curlwp));
310 
311 	/* Wait for tasks to complete. */
312 	taskq_wait(tq);
313 
314 	/* Mark destroyed and ask running executors to quit. */
315 	mutex_enter(&tq->tq_lock);
316 	tq->tq_destroyed = 1;
317 	cv_broadcast(&tq->tq_cv);
318 
319 	/* Wait for all executors to quit. */
320 	while (tq->tq_running > 0)
321 		kpause("tqdestroy", false, 1, &tq->tq_lock);
322 	mutex_exit(&tq->tq_lock);
323 
324 	for (i = 0; i < tq->tq_nthreads; i++)
325 		threadpool_job_destroy(&tq->tq_executor[i].te_job);
326 	threadpool_put(tq->tq_threadpool, tq->tq_pri);
327 	mutex_destroy(&tq->tq_lock);
328 	cv_destroy(&tq->tq_cv);
329 	kmem_free(tq->tq_executor, sizeof(*tq->tq_executor) * tq->tq_nthreads);
330 	kmem_free(tq, sizeof(*tq));
331 }
332