1 /* $NetBSD: taskq.c,v 1.11 2019/08/20 08:12:50 hannken Exp $ */
2
3 /*-
4 * Copyright (c) 2019 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Juergen Hannken-Illjes.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/kcondvar.h>
35 #include <sys/kernel.h>
36 #include <sys/kmem.h>
37 #include <sys/mutex.h>
38 #include <sys/proc.h>
39 #include <sys/threadpool.h>
40
41 #include <sys/taskq.h>
42
43 struct taskq_executor {
44 struct threadpool_job te_job; /* Threadpool job serving the queue. */
45 taskq_t *te_self; /* Backpointer to the queue. */
46 unsigned te_running:1; /* True if the job is running. */
47 };
48
49 struct taskq {
50 int tq_nthreads; /* # of threads serving queue. */
51 pri_t tq_pri; /* Scheduling priority. */
52 uint_t tq_flags; /* Saved flags from taskq_create. */
53 int tq_active; /* # of tasks (queued or running). */
54 int tq_running; /* # of jobs currently running. */
55 int tq_waiting; /* # of jobs currently idle. */
56 unsigned tq_destroyed:1; /* True if queue gets destroyed. */
57 kmutex_t tq_lock; /* Queue and job lock. */
58 kcondvar_t tq_cv; /* Queue condvar. */
59 struct taskq_executor *tq_executor; /* Array of jobs. */
60 struct threadpool *tq_threadpool; /* Pool backing the jobs. */
61 SIMPLEQ_HEAD(, taskq_ent) tq_list; /* Queue of tasks waiting. */
62 };
63
64 taskq_t *system_taskq; /* General purpose task queue. */
65
66 static specificdata_key_t taskq_lwp_key; /* Null or taskq this thread runs. */
67
68 /*
69 * Threadpool job to service tasks from task queue.
70 * Runs until the task queue gets destroyed or the queue is empty for 10 secs.
71 */
72 static void
task_executor(struct threadpool_job * job)73 task_executor(struct threadpool_job *job)
74 {
75 struct taskq_executor *state = (struct taskq_executor *)job;
76 taskq_t *tq = state->te_self;
77 taskq_ent_t *tqe;
78 bool is_dynamic;
79 int error;
80
81 lwp_setspecific(taskq_lwp_key, tq);
82
83 mutex_enter(&tq->tq_lock);
84 while (!tq->tq_destroyed) {
85 if (SIMPLEQ_EMPTY(&tq->tq_list)) {
86 if (ISSET(tq->tq_flags, TASKQ_DYNAMIC))
87 break;
88 tq->tq_waiting++;
89 error = cv_timedwait(&tq->tq_cv, &tq->tq_lock,
90 mstohz(10000));
91 tq->tq_waiting--;
92 if (SIMPLEQ_EMPTY(&tq->tq_list)) {
93 if (error)
94 break;
95 continue;
96 }
97 }
98 tqe = SIMPLEQ_FIRST(&tq->tq_list);
99 KASSERT(tqe != NULL);
100 SIMPLEQ_REMOVE_HEAD(&tq->tq_list, tqent_list);
101 is_dynamic = tqe->tqent_dynamic;
102 tqe->tqent_queued = 0;
103 mutex_exit(&tq->tq_lock);
104
105 (*tqe->tqent_func)(tqe->tqent_arg);
106
107 mutex_enter(&tq->tq_lock);
108 if (is_dynamic)
109 kmem_free(tqe, sizeof(*tqe));
110 tq->tq_active--;
111 }
112 state->te_running = 0;
113 tq->tq_running--;
114 threadpool_job_done(job);
115 mutex_exit(&tq->tq_lock);
116
117 lwp_setspecific(taskq_lwp_key, NULL);
118 }
119
120 void
taskq_init(void)121 taskq_init(void)
122 {
123
124 lwp_specific_key_create(&taskq_lwp_key, NULL);
125 system_taskq = taskq_create("system_taskq", ncpu * 4, PRI_KERNEL,
126 4, 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
127 KASSERT(system_taskq != NULL);
128 }
129
130 void
taskq_fini(void)131 taskq_fini(void)
132 {
133
134 taskq_destroy(system_taskq);
135 lwp_specific_key_delete(taskq_lwp_key);
136 }
137
138 /*
139 * Dispatch a task entry creating executors as neeeded.
140 */
141 static void
taskq_dispatch_common(taskq_t * tq,taskq_ent_t * tqe,uint_t flags)142 taskq_dispatch_common(taskq_t *tq, taskq_ent_t *tqe, uint_t flags)
143 {
144 int i;
145
146 KASSERT(mutex_owned(&tq->tq_lock));
147
148 if (ISSET(flags, TQ_FRONT))
149 SIMPLEQ_INSERT_HEAD(&tq->tq_list, tqe, tqent_list);
150 else
151 SIMPLEQ_INSERT_TAIL(&tq->tq_list, tqe, tqent_list);
152 tqe->tqent_queued = 1;
153 tq->tq_active++;
154 if (tq->tq_waiting) {
155 cv_signal(&tq->tq_cv);
156 mutex_exit(&tq->tq_lock);
157 return;
158 }
159 if (tq->tq_running < tq->tq_nthreads) {
160 for (i = 0; i < tq->tq_nthreads; i++) {
161 if (!tq->tq_executor[i].te_running) {
162 tq->tq_executor[i].te_running = 1;
163 tq->tq_running++;
164 threadpool_schedule_job(tq->tq_threadpool,
165 &tq->tq_executor[i].te_job);
166 break;
167 }
168 }
169 }
170 mutex_exit(&tq->tq_lock);
171 }
172
173 /*
174 * Allocate and dispatch a task entry.
175 */
176 taskqid_t
taskq_dispatch(taskq_t * tq,task_func_t func,void * arg,uint_t flags)177 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
178 {
179 taskq_ent_t *tqe;
180
181 KASSERT(!ISSET(flags, ~(TQ_SLEEP | TQ_NOSLEEP | TQ_NOQUEUE)));
182 KASSERT(ISSET(tq->tq_flags, TASKQ_DYNAMIC) ||
183 !ISSET(flags, TQ_NOQUEUE));
184
185 if (ISSET(flags, (TQ_SLEEP | TQ_NOSLEEP)) == TQ_NOSLEEP)
186 tqe = kmem_alloc(sizeof(*tqe), KM_NOSLEEP);
187 else
188 tqe = kmem_alloc(sizeof(*tqe), KM_SLEEP);
189 if (tqe == NULL)
190 return (taskqid_t) NULL;
191
192 mutex_enter(&tq->tq_lock);
193 if (ISSET(flags, TQ_NOQUEUE) && tq->tq_active == tq->tq_nthreads) {
194 mutex_exit(&tq->tq_lock);
195 kmem_free(tqe, sizeof(*tqe));
196 return (taskqid_t) NULL;
197 }
198 tqe->tqent_dynamic = 1;
199 tqe->tqent_queued = 0;
200 tqe->tqent_func = func;
201 tqe->tqent_arg = arg;
202 taskq_dispatch_common(tq, tqe, flags);
203
204 return (taskqid_t) tqe;
205 }
206
207 /*
208 * Dispatch a preallocated task entry.
209 * Assume caller zeroed it.
210 */
211 void
taskq_dispatch_ent(taskq_t * tq,task_func_t func,void * arg,uint_t flags,taskq_ent_t * tqe)212 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
213 taskq_ent_t *tqe)
214 {
215
216 KASSERT(!ISSET(flags, ~(TQ_FRONT)));
217
218 tqe->tqent_func = func;
219 tqe->tqent_arg = arg;
220 mutex_enter(&tq->tq_lock);
221 taskq_dispatch_common(tq, tqe, flags);
222 }
223
224 /*
225 * Wait until all tasks have completed.
226 */
227 void
taskq_wait(taskq_t * tq)228 taskq_wait(taskq_t *tq)
229 {
230
231 KASSERT(!taskq_member(tq, curlwp));
232
233 mutex_enter(&tq->tq_lock);
234 while (tq->tq_active)
235 kpause("qwait", false, 1, &tq->tq_lock);
236 mutex_exit(&tq->tq_lock);
237 }
238
239 /*
240 * True if the current thread is an executor for this queue.
241 */
242 int
taskq_member(taskq_t * tq,kthread_t * thread)243 taskq_member(taskq_t *tq, kthread_t *thread)
244 {
245
246 KASSERT(thread == curlwp);
247
248 return (lwp_getspecific(taskq_lwp_key) == tq);
249 }
250
251 /*
252 * Create a task queue.
253 * Allocation hints are ignored.
254 */
255 taskq_t *
taskq_create(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags)256 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc,
257 int maxalloc, uint_t flags)
258 {
259 int i;
260 struct threadpool *threadpool;
261 taskq_t *tq;
262
263 KASSERT(!ISSET(flags,
264 ~(TASKQ_DYNAMIC | TASKQ_PREPOPULATE | TASKQ_THREADS_CPU_PCT)));
265
266 if (threadpool_get(&threadpool, pri) != 0)
267 return NULL;
268
269 if (ISSET(flags, TASKQ_THREADS_CPU_PCT))
270 nthreads = MAX((ncpu * nthreads) / 100, 1);
271
272 tq = kmem_zalloc(sizeof(*tq), KM_SLEEP);
273 tq->tq_nthreads = nthreads;
274 tq->tq_pri = pri;
275 tq->tq_flags = flags;
276 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, IPL_NONE);
277 cv_init(&tq->tq_cv, NULL, CV_DEFAULT, NULL);
278 SIMPLEQ_INIT(&tq->tq_list);
279 tq->tq_executor = kmem_alloc(sizeof(*tq->tq_executor) * nthreads,
280 KM_SLEEP);
281 for (i = 0; i < nthreads; i++) {
282 threadpool_job_init(&tq->tq_executor[i].te_job, task_executor,
283 &tq->tq_lock, "%s/%d", name, i);
284 tq->tq_executor[i].te_self = tq;
285 tq->tq_executor[i].te_running = 0;
286 }
287 tq->tq_threadpool = threadpool;
288
289 return tq;
290 }
291
292 taskq_t *
taskq_create_proc(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,struct proc * proc,uint_t flags)293 taskq_create_proc(const char *name, int nthreads, pri_t pri, int minalloc,
294 int maxalloc, struct proc *proc, uint_t flags)
295 {
296
297 return taskq_create(name, nthreads, pri, minalloc, maxalloc, flags);
298 }
299
300 /*
301 * Destroy a task queue.
302 */
303 void
taskq_destroy(taskq_t * tq)304 taskq_destroy(taskq_t *tq)
305 {
306 int i;
307 taskq_ent_t *tqe;
308
309 KASSERT(!taskq_member(tq, curlwp));
310
311 /* Wait for tasks to complete. */
312 taskq_wait(tq);
313
314 /* Mark destroyed and ask running executors to quit. */
315 mutex_enter(&tq->tq_lock);
316 tq->tq_destroyed = 1;
317 cv_broadcast(&tq->tq_cv);
318
319 /* Wait for all executors to quit. */
320 while (tq->tq_running > 0)
321 kpause("tqdestroy", false, 1, &tq->tq_lock);
322 mutex_exit(&tq->tq_lock);
323
324 for (i = 0; i < tq->tq_nthreads; i++)
325 threadpool_job_destroy(&tq->tq_executor[i].te_job);
326 threadpool_put(tq->tq_threadpool, tq->tq_pri);
327 mutex_destroy(&tq->tq_lock);
328 cv_destroy(&tq->tq_cv);
329 kmem_free(tq->tq_executor, sizeof(*tq->tq_executor) * tq->tq_nthreads);
330 kmem_free(tq, sizeof(*tq));
331 }
332