xref: /freebsd-src/sys/contrib/openzfs/module/os/linux/spl/spl-taskq.c (revision 4e859e67dde2cb4a583d340b27793a255f62f53e)
1 /*
2  *  Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
3  *  Copyright (C) 2007 The Regents of the University of California.
4  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
5  *  Written by Brian Behlendorf <behlendorf1@llnl.gov>.
6  *  UCRL-CODE-235197
7  *
8  *  This file is part of the SPL, Solaris Porting Layer.
9  *
10  *  The SPL is free software; you can redistribute it and/or modify it
11  *  under the terms of the GNU General Public License as published by the
12  *  Free Software Foundation; either version 2 of the License, or (at your
13  *  option) any later version.
14  *
15  *  The SPL is distributed in the hope that it will be useful, but WITHOUT
16  *  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17  *  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
18  *  for more details.
19  *
20  *  You should have received a copy of the GNU General Public License along
21  *  with the SPL.  If not, see <http://www.gnu.org/licenses/>.
22  *
23  *  Solaris Porting Layer (SPL) Task Queue Implementation.
24  */
25 /*
26  * Copyright (c) 2024, Klara Inc.
27  * Copyright (c) 2024, Syneto
28  */
29 
30 #include <sys/timer.h>
31 #include <sys/taskq.h>
32 #include <sys/kmem.h>
33 #include <sys/tsd.h>
34 #include <sys/trace_spl.h>
35 #include <sys/time.h>
36 #include <sys/atomic.h>
37 #include <sys/kstat.h>
38 #ifdef HAVE_CPU_HOTPLUG
39 #include <linux/cpuhotplug.h>
40 #endif
41 
42 typedef struct taskq_kstats {
43 	/* static values, for completeness */
44 	kstat_named_t tqks_threads_max;
45 	kstat_named_t tqks_entry_pool_min;
46 	kstat_named_t tqks_entry_pool_max;
47 
48 	/* gauges (inc/dec counters, current value) */
49 	kstat_named_t tqks_threads_active;
50 	kstat_named_t tqks_threads_idle;
51 	kstat_named_t tqks_threads_total;
52 	kstat_named_t tqks_tasks_pending;
53 	kstat_named_t tqks_tasks_priority;
54 	kstat_named_t tqks_tasks_total;
55 	kstat_named_t tqks_tasks_delayed;
56 	kstat_named_t tqks_entries_free;
57 
58 	/* counters (inc only, since taskq creation) */
59 	kstat_named_t tqks_threads_created;
60 	kstat_named_t tqks_threads_destroyed;
61 	kstat_named_t tqks_tasks_dispatched;
62 	kstat_named_t tqks_tasks_dispatched_delayed;
63 	kstat_named_t tqks_tasks_executed_normal;
64 	kstat_named_t tqks_tasks_executed_priority;
65 	kstat_named_t tqks_tasks_executed;
66 	kstat_named_t tqks_tasks_delayed_requeued;
67 	kstat_named_t tqks_tasks_cancelled;
68 	kstat_named_t tqks_thread_wakeups;
69 	kstat_named_t tqks_thread_wakeups_nowork;
70 	kstat_named_t tqks_thread_sleeps;
71 } taskq_kstats_t;
72 
73 static taskq_kstats_t taskq_kstats_template = {
74 	{ "threads_max",		KSTAT_DATA_UINT64 },
75 	{ "entry_pool_min",		KSTAT_DATA_UINT64 },
76 	{ "entry_pool_max",		KSTAT_DATA_UINT64 },
77 	{ "threads_active",		KSTAT_DATA_UINT64 },
78 	{ "threads_idle",		KSTAT_DATA_UINT64 },
79 	{ "threads_total",		KSTAT_DATA_UINT64 },
80 	{ "tasks_pending",		KSTAT_DATA_UINT64 },
81 	{ "tasks_priority",		KSTAT_DATA_UINT64 },
82 	{ "tasks_total",		KSTAT_DATA_UINT64 },
83 	{ "tasks_delayed",		KSTAT_DATA_UINT64 },
84 	{ "entries_free",		KSTAT_DATA_UINT64 },
85 
86 	{ "threads_created",		KSTAT_DATA_UINT64 },
87 	{ "threads_destroyed",		KSTAT_DATA_UINT64 },
88 	{ "tasks_dispatched",		KSTAT_DATA_UINT64 },
89 	{ "tasks_dispatched_delayed",	KSTAT_DATA_UINT64 },
90 	{ "tasks_executed_normal",	KSTAT_DATA_UINT64 },
91 	{ "tasks_executed_priority",	KSTAT_DATA_UINT64 },
92 	{ "tasks_executed",		KSTAT_DATA_UINT64 },
93 	{ "tasks_delayed_requeued",	KSTAT_DATA_UINT64 },
94 	{ "tasks_cancelled",		KSTAT_DATA_UINT64 },
95 	{ "thread_wakeups",		KSTAT_DATA_UINT64 },
96 	{ "thread_wakeups_nowork",	KSTAT_DATA_UINT64 },
97 	{ "thread_sleeps",		KSTAT_DATA_UINT64 },
98 };
99 
100 #define	TQSTAT_INC(tq, stat)	wmsum_add(&tq->tq_sums.tqs_##stat, 1)
101 #define	TQSTAT_DEC(tq, stat)	wmsum_add(&tq->tq_sums.tqs_##stat, -1)
102 
103 #define	_TQSTAT_MOD_LIST(mod, tq, t) do { \
104 	switch (t->tqent_flags & TQENT_LIST_MASK) {			\
105 	case TQENT_LIST_NONE: ASSERT(list_empty(&t->tqent_list)); break;\
106 	case TQENT_LIST_PENDING: mod(tq, tasks_pending); break;		\
107 	case TQENT_LIST_PRIORITY: mod(tq, tasks_priority); break;	\
108 	case TQENT_LIST_DELAY: mod(tq, tasks_delayed); break;		\
109 	}								\
110 } while (0)
111 #define	TQSTAT_INC_LIST(tq, t)	_TQSTAT_MOD_LIST(TQSTAT_INC, tq, t)
112 #define	TQSTAT_DEC_LIST(tq, t)	_TQSTAT_MOD_LIST(TQSTAT_DEC, tq, t)
113 
114 #define	TQENT_SET_LIST(t, l)	\
115 	t->tqent_flags = (t->tqent_flags & ~TQENT_LIST_MASK) | l;
116 
117 static int spl_taskq_thread_bind = 0;
118 module_param(spl_taskq_thread_bind, int, 0644);
119 MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
120 
121 static uint_t spl_taskq_thread_timeout_ms = 5000;
122 /* BEGIN CSTYLED */
123 module_param(spl_taskq_thread_timeout_ms, uint, 0644);
124 /* END CSTYLED */
125 MODULE_PARM_DESC(spl_taskq_thread_timeout_ms,
126 	"Minimum idle threads exit interval for dynamic taskqs");
127 
128 static int spl_taskq_thread_dynamic = 1;
129 module_param(spl_taskq_thread_dynamic, int, 0444);
130 MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");
131 
132 static int spl_taskq_thread_priority = 1;
133 module_param(spl_taskq_thread_priority, int, 0644);
134 MODULE_PARM_DESC(spl_taskq_thread_priority,
135 	"Allow non-default priority for taskq threads");
136 
137 static uint_t spl_taskq_thread_sequential = 4;
138 /* BEGIN CSTYLED */
139 module_param(spl_taskq_thread_sequential, uint, 0644);
140 /* END CSTYLED */
141 MODULE_PARM_DESC(spl_taskq_thread_sequential,
142 	"Create new taskq threads after N sequential tasks");
143 
144 /*
145  * Global system-wide dynamic task queue available for all consumers. This
146  * taskq is not intended for long-running tasks; instead, a dedicated taskq
147  * should be created.
148  */
149 taskq_t *system_taskq;
150 EXPORT_SYMBOL(system_taskq);
151 /* Global dynamic task queue for long delay */
152 taskq_t *system_delay_taskq;
153 EXPORT_SYMBOL(system_delay_taskq);
154 
155 /* Private dedicated taskq for creating new taskq threads on demand. */
156 static taskq_t *dynamic_taskq;
157 static taskq_thread_t *taskq_thread_create(taskq_t *);
158 
159 #ifdef HAVE_CPU_HOTPLUG
160 /* Multi-callback id for cpu hotplugging. */
161 static int spl_taskq_cpuhp_state;
162 #endif
163 
164 /* List of all taskqs */
165 LIST_HEAD(tq_list);
166 struct rw_semaphore tq_list_sem;
167 static uint_t taskq_tsd;
168 
169 static int
170 task_km_flags(uint_t flags)
171 {
172 	if (flags & TQ_NOSLEEP)
173 		return (KM_NOSLEEP);
174 
175 	if (flags & TQ_PUSHPAGE)
176 		return (KM_PUSHPAGE);
177 
178 	return (KM_SLEEP);
179 }
180 
181 /*
182  * taskq_find_by_name - Find the largest instance number of a named taskq.
183  */
184 static int
185 taskq_find_by_name(const char *name)
186 {
187 	struct list_head *tql = NULL;
188 	taskq_t *tq;
189 
190 	list_for_each_prev(tql, &tq_list) {
191 		tq = list_entry(tql, taskq_t, tq_taskqs);
192 		if (strcmp(name, tq->tq_name) == 0)
193 			return (tq->tq_instance);
194 	}
195 	return (-1);
196 }
197 
198 /*
199  * NOTE: Must be called with tq->tq_lock held, returns a list_t which
200  * is not attached to the free, work, or pending taskq lists.
201  */
202 static taskq_ent_t *
203 task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)
204 {
205 	taskq_ent_t *t;
206 	int count = 0;
207 
208 	ASSERT(tq);
209 retry:
210 	/* Acquire taskq_ent_t's from free list if available */
211 	if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {
212 		t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
213 
214 		ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
215 		ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
216 		ASSERT(!timer_pending(&t->tqent_timer));
217 
218 		list_del_init(&t->tqent_list);
219 		TQSTAT_DEC(tq, entries_free);
220 		return (t);
221 	}
222 
223 	/* Free list is empty and memory allocations are prohibited */
224 	if (flags & TQ_NOALLOC)
225 		return (NULL);
226 
227 	/* Hit maximum taskq_ent_t pool size */
228 	if (tq->tq_nalloc >= tq->tq_maxalloc) {
229 		if (flags & TQ_NOSLEEP)
230 			return (NULL);
231 
232 		/*
233 		 * Sleep periodically polling the free list for an available
234 		 * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
235 		 * but we cannot block forever waiting for an taskq_ent_t to
236 		 * show up in the free list, otherwise a deadlock can happen.
237 		 *
238 		 * Therefore, we need to allocate a new task even if the number
239 		 * of allocated tasks is above tq->tq_maxalloc, but we still
240 		 * end up delaying the task allocation by one second, thereby
241 		 * throttling the task dispatch rate.
242 		 */
243 		spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
244 		schedule_timeout_interruptible(HZ / 100);
245 		spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,
246 		    tq->tq_lock_class);
247 		if (count < 100) {
248 			count++;
249 			goto retry;
250 		}
251 	}
252 
253 	spin_unlock_irqrestore(&tq->tq_lock, *irqflags);
254 	t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));
255 	spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);
256 
257 	if (t) {
258 		taskq_init_ent(t);
259 		tq->tq_nalloc++;
260 	}
261 
262 	return (t);
263 }
264 
265 /*
266  * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t
267  * to already be removed from the free, work, or pending taskq lists.
268  */
269 static void
270 task_free(taskq_t *tq, taskq_ent_t *t)
271 {
272 	ASSERT(tq);
273 	ASSERT(t);
274 	ASSERT(list_empty(&t->tqent_list));
275 	ASSERT(!timer_pending(&t->tqent_timer));
276 
277 	kmem_free(t, sizeof (taskq_ent_t));
278 	tq->tq_nalloc--;
279 }
280 
281 /*
282  * NOTE: Must be called with tq->tq_lock held, either destroys the
283  * taskq_ent_t if too many exist or moves it to the free list for later use.
284  */
285 static void
286 task_done(taskq_t *tq, taskq_ent_t *t)
287 {
288 	ASSERT(tq);
289 	ASSERT(t);
290 	ASSERT(list_empty(&t->tqent_list));
291 
292 	/* Wake tasks blocked in taskq_wait_id() */
293 	wake_up_all(&t->tqent_waitq);
294 
295 	if (tq->tq_nalloc <= tq->tq_minalloc) {
296 		t->tqent_id = TASKQID_INVALID;
297 		t->tqent_func = NULL;
298 		t->tqent_arg = NULL;
299 		t->tqent_flags = 0;
300 
301 		list_add_tail(&t->tqent_list, &tq->tq_free_list);
302 		TQSTAT_INC(tq, entries_free);
303 	} else {
304 		task_free(tq, t);
305 	}
306 }
307 
308 /*
309  * When a delayed task timer expires remove it from the delay list and
310  * add it to the priority list in order for immediate processing.
311  */
312 static void
313 task_expire_impl(taskq_ent_t *t)
314 {
315 	taskq_ent_t *w;
316 	taskq_t *tq = t->tqent_taskq;
317 	struct list_head *l = NULL;
318 	unsigned long flags;
319 
320 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
321 
322 	if (t->tqent_flags & TQENT_FLAG_CANCEL) {
323 		ASSERT(list_empty(&t->tqent_list));
324 		spin_unlock_irqrestore(&tq->tq_lock, flags);
325 		return;
326 	}
327 
328 	t->tqent_birth = jiffies;
329 	DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
330 
331 	/*
332 	 * The priority list must be maintained in strict task id order
333 	 * from lowest to highest for lowest_id to be easily calculable.
334 	 */
335 	list_del(&t->tqent_list);
336 	list_for_each_prev(l, &tq->tq_prio_list) {
337 		w = list_entry(l, taskq_ent_t, tqent_list);
338 		if (w->tqent_id < t->tqent_id) {
339 			list_add(&t->tqent_list, l);
340 			break;
341 		}
342 	}
343 	if (l == &tq->tq_prio_list)
344 		list_add(&t->tqent_list, &tq->tq_prio_list);
345 
346 	spin_unlock_irqrestore(&tq->tq_lock, flags);
347 
348 	wake_up(&tq->tq_work_waitq);
349 
350 	TQSTAT_INC(tq, tasks_delayed_requeued);
351 }
352 
353 static void
354 task_expire(spl_timer_list_t tl)
355 {
356 	struct timer_list *tmr = (struct timer_list *)tl;
357 	taskq_ent_t *t = from_timer(t, tmr, tqent_timer);
358 	task_expire_impl(t);
359 }
360 
361 /*
362  * Returns the lowest incomplete taskqid_t.  The taskqid_t may
363  * be queued on the pending list, on the priority list, on the
364  * delay list, or on the work list currently being handled, but
365  * it is not 100% complete yet.
366  */
367 static taskqid_t
368 taskq_lowest_id(taskq_t *tq)
369 {
370 	taskqid_t lowest_id = tq->tq_next_id;
371 	taskq_ent_t *t;
372 	taskq_thread_t *tqt;
373 
374 	if (!list_empty(&tq->tq_pend_list)) {
375 		t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
376 		lowest_id = MIN(lowest_id, t->tqent_id);
377 	}
378 
379 	if (!list_empty(&tq->tq_prio_list)) {
380 		t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
381 		lowest_id = MIN(lowest_id, t->tqent_id);
382 	}
383 
384 	if (!list_empty(&tq->tq_delay_list)) {
385 		t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
386 		lowest_id = MIN(lowest_id, t->tqent_id);
387 	}
388 
389 	if (!list_empty(&tq->tq_active_list)) {
390 		tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
391 		    tqt_active_list);
392 		ASSERT(tqt->tqt_id != TASKQID_INVALID);
393 		lowest_id = MIN(lowest_id, tqt->tqt_id);
394 	}
395 
396 	return (lowest_id);
397 }
398 
399 /*
400  * Insert a task into a list keeping the list sorted by increasing taskqid.
401  */
402 static void
403 taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
404 {
405 	taskq_thread_t *w;
406 	struct list_head *l = NULL;
407 
408 	ASSERT(tq);
409 	ASSERT(tqt);
410 
411 	list_for_each_prev(l, &tq->tq_active_list) {
412 		w = list_entry(l, taskq_thread_t, tqt_active_list);
413 		if (w->tqt_id < tqt->tqt_id) {
414 			list_add(&tqt->tqt_active_list, l);
415 			break;
416 		}
417 	}
418 	if (l == &tq->tq_active_list)
419 		list_add(&tqt->tqt_active_list, &tq->tq_active_list);
420 }
421 
422 /*
423  * Find and return a task from the given list if it exists.  The list
424  * must be in lowest to highest task id order.
425  */
426 static taskq_ent_t *
427 taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
428 {
429 	struct list_head *l = NULL;
430 	taskq_ent_t *t;
431 
432 	list_for_each(l, lh) {
433 		t = list_entry(l, taskq_ent_t, tqent_list);
434 
435 		if (t->tqent_id == id)
436 			return (t);
437 
438 		if (t->tqent_id > id)
439 			break;
440 	}
441 
442 	return (NULL);
443 }
444 
445 /*
446  * Find an already dispatched task given the task id regardless of what
447  * state it is in.  If a task is still pending it will be returned.
448  * If a task is executing, then -EBUSY will be returned instead.
449  * If the task has already been run then NULL is returned.
450  */
451 static taskq_ent_t *
452 taskq_find(taskq_t *tq, taskqid_t id)
453 {
454 	taskq_thread_t *tqt;
455 	struct list_head *l = NULL;
456 	taskq_ent_t *t;
457 
458 	t = taskq_find_list(tq, &tq->tq_delay_list, id);
459 	if (t)
460 		return (t);
461 
462 	t = taskq_find_list(tq, &tq->tq_prio_list, id);
463 	if (t)
464 		return (t);
465 
466 	t = taskq_find_list(tq, &tq->tq_pend_list, id);
467 	if (t)
468 		return (t);
469 
470 	list_for_each(l, &tq->tq_active_list) {
471 		tqt = list_entry(l, taskq_thread_t, tqt_active_list);
472 		if (tqt->tqt_id == id) {
473 			/*
474 			 * Instead of returning tqt_task, we just return a non
475 			 * NULL value to prevent misuse, since tqt_task only
476 			 * has two valid fields.
477 			 */
478 			return (ERR_PTR(-EBUSY));
479 		}
480 	}
481 
482 	return (NULL);
483 }
484 
485 /*
486  * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and
487  * taskq_wait() functions below.
488  *
489  * Taskq waiting is accomplished by tracking the lowest outstanding task
490  * id and the next available task id.  As tasks are dispatched they are
491  * added to the tail of the pending, priority, or delay lists.  As worker
492  * threads become available the tasks are removed from the heads of these
493  * lists and linked to the worker threads.  This ensures the lists are
494  * kept sorted by lowest to highest task id.
495  *
496  * Therefore the lowest outstanding task id can be quickly determined by
497  * checking the head item from all of these lists.  This value is stored
498  * with the taskq as the lowest id.  It only needs to be recalculated when
499  * either the task with the current lowest id completes or is canceled.
500  *
501  * By blocking until the lowest task id exceeds the passed task id the
502  * taskq_wait_outstanding() function can be easily implemented.  Similarly,
503  * by blocking until the lowest task id matches the next task id taskq_wait()
504  * can be implemented.
505  *
506  * Callers should be aware that when there are multiple worked threads it
507  * is possible for larger task ids to complete before smaller ones.  Also
508  * when the taskq contains delay tasks with small task ids callers may
509  * block for a considerable length of time waiting for them to expire and
510  * execute.
511  */
512 static int
513 taskq_wait_id_check(taskq_t *tq, taskqid_t id)
514 {
515 	int rc;
516 	unsigned long flags;
517 
518 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
519 	rc = (taskq_find(tq, id) == NULL);
520 	spin_unlock_irqrestore(&tq->tq_lock, flags);
521 
522 	return (rc);
523 }
524 
525 /*
526  * The taskq_wait_id() function blocks until the passed task id completes.
527  * This does not guarantee that all lower task ids have completed.
528  */
529 void
530 taskq_wait_id(taskq_t *tq, taskqid_t id)
531 {
532 	wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));
533 }
534 EXPORT_SYMBOL(taskq_wait_id);
535 
536 static int
537 taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)
538 {
539 	int rc;
540 	unsigned long flags;
541 
542 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
543 	rc = (id < tq->tq_lowest_id);
544 	spin_unlock_irqrestore(&tq->tq_lock, flags);
545 
546 	return (rc);
547 }
548 
549 /*
550  * The taskq_wait_outstanding() function will block until all tasks with a
551  * lower taskqid than the passed 'id' have been completed.  Note that all
552  * task id's are assigned monotonically at dispatch time.  Zero may be
553  * passed for the id to indicate all tasks dispatch up to this point,
554  * but not after, should be waited for.
555  */
556 void
557 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
558 {
559 	id = id ? id : tq->tq_next_id - 1;
560 	wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id));
561 }
562 EXPORT_SYMBOL(taskq_wait_outstanding);
563 
564 static int
565 taskq_wait_check(taskq_t *tq)
566 {
567 	int rc;
568 	unsigned long flags;
569 
570 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
571 	rc = (tq->tq_lowest_id == tq->tq_next_id);
572 	spin_unlock_irqrestore(&tq->tq_lock, flags);
573 
574 	return (rc);
575 }
576 
577 /*
578  * The taskq_wait() function will block until the taskq is empty.
579  * This means that if a taskq re-dispatches work to itself taskq_wait()
580  * callers will block indefinitely.
581  */
582 void
583 taskq_wait(taskq_t *tq)
584 {
585 	wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));
586 }
587 EXPORT_SYMBOL(taskq_wait);
588 
589 int
590 taskq_member(taskq_t *tq, kthread_t *t)
591 {
592 	return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));
593 }
594 EXPORT_SYMBOL(taskq_member);
595 
596 taskq_t *
597 taskq_of_curthread(void)
598 {
599 	return (tsd_get(taskq_tsd));
600 }
601 EXPORT_SYMBOL(taskq_of_curthread);
602 
603 /*
604  * Cancel an already dispatched task given the task id.  Still pending tasks
605  * will be immediately canceled, and if the task is active the function will
606  * block until it completes.  Preallocated tasks which are canceled must be
607  * freed by the caller.
608  */
609 int
610 taskq_cancel_id(taskq_t *tq, taskqid_t id)
611 {
612 	taskq_ent_t *t;
613 	int rc = ENOENT;
614 	unsigned long flags;
615 
616 	ASSERT(tq);
617 
618 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
619 	t = taskq_find(tq, id);
620 	if (t && t != ERR_PTR(-EBUSY)) {
621 		list_del_init(&t->tqent_list);
622 		TQSTAT_DEC_LIST(tq, t);
623 		TQSTAT_DEC(tq, tasks_total);
624 
625 		t->tqent_flags |= TQENT_FLAG_CANCEL;
626 		TQSTAT_INC(tq, tasks_cancelled);
627 
628 		/*
629 		 * When canceling the lowest outstanding task id we
630 		 * must recalculate the new lowest outstanding id.
631 		 */
632 		if (tq->tq_lowest_id == t->tqent_id) {
633 			tq->tq_lowest_id = taskq_lowest_id(tq);
634 			ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
635 		}
636 
637 		/*
638 		 * The task_expire() function takes the tq->tq_lock so drop
639 		 * drop the lock before synchronously cancelling the timer.
640 		 */
641 		if (timer_pending(&t->tqent_timer)) {
642 			spin_unlock_irqrestore(&tq->tq_lock, flags);
643 			del_timer_sync(&t->tqent_timer);
644 			spin_lock_irqsave_nested(&tq->tq_lock, flags,
645 			    tq->tq_lock_class);
646 		}
647 
648 		if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
649 			task_done(tq, t);
650 
651 		rc = 0;
652 	}
653 	spin_unlock_irqrestore(&tq->tq_lock, flags);
654 
655 	if (t == ERR_PTR(-EBUSY)) {
656 		taskq_wait_id(tq, id);
657 		rc = EBUSY;
658 	}
659 
660 	return (rc);
661 }
662 EXPORT_SYMBOL(taskq_cancel_id);
663 
664 static int taskq_thread_spawn(taskq_t *tq);
665 
666 taskqid_t
667 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
668 {
669 	taskq_ent_t *t;
670 	taskqid_t rc = TASKQID_INVALID;
671 	unsigned long irqflags;
672 
673 	ASSERT(tq);
674 	ASSERT(func);
675 
676 	spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
677 
678 	/* Taskq being destroyed and all tasks drained */
679 	if (!(tq->tq_flags & TASKQ_ACTIVE))
680 		goto out;
681 
682 	/* Do not queue the task unless there is idle thread for it */
683 	ASSERT(tq->tq_nactive <= tq->tq_nthreads);
684 	if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
685 		/* Dynamic taskq may be able to spawn another thread */
686 		if (taskq_thread_spawn(tq) == 0)
687 			goto out;
688 	}
689 
690 	if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
691 		goto out;
692 
693 	spin_lock(&t->tqent_lock);
694 
695 	/* Queue to the front of the list to enforce TQ_NOQUEUE semantics */
696 	if (flags & TQ_NOQUEUE) {
697 		TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
698 		list_add(&t->tqent_list, &tq->tq_prio_list);
699 	/* Queue to the priority list instead of the pending list */
700 	} else if (flags & TQ_FRONT) {
701 		TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
702 		list_add_tail(&t->tqent_list, &tq->tq_prio_list);
703 	} else {
704 		TQENT_SET_LIST(t, TQENT_LIST_PENDING);
705 		list_add_tail(&t->tqent_list, &tq->tq_pend_list);
706 	}
707 	TQSTAT_INC_LIST(tq, t);
708 	TQSTAT_INC(tq, tasks_total);
709 
710 	t->tqent_id = rc = tq->tq_next_id;
711 	tq->tq_next_id++;
712 	t->tqent_func = func;
713 	t->tqent_arg = arg;
714 	t->tqent_taskq = tq;
715 	t->tqent_timer.function = NULL;
716 	t->tqent_timer.expires = 0;
717 
718 	t->tqent_birth = jiffies;
719 	DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
720 
721 	ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
722 
723 	spin_unlock(&t->tqent_lock);
724 
725 	wake_up(&tq->tq_work_waitq);
726 
727 	TQSTAT_INC(tq, tasks_dispatched);
728 
729 	/* Spawn additional taskq threads if required. */
730 	if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads)
731 		(void) taskq_thread_spawn(tq);
732 out:
733 	spin_unlock_irqrestore(&tq->tq_lock, irqflags);
734 	return (rc);
735 }
736 EXPORT_SYMBOL(taskq_dispatch);
737 
738 taskqid_t
739 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
740     uint_t flags, clock_t expire_time)
741 {
742 	taskqid_t rc = TASKQID_INVALID;
743 	taskq_ent_t *t;
744 	unsigned long irqflags;
745 
746 	ASSERT(tq);
747 	ASSERT(func);
748 
749 	spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);
750 
751 	/* Taskq being destroyed and all tasks drained */
752 	if (!(tq->tq_flags & TASKQ_ACTIVE))
753 		goto out;
754 
755 	if ((t = task_alloc(tq, flags, &irqflags)) == NULL)
756 		goto out;
757 
758 	spin_lock(&t->tqent_lock);
759 
760 	/* Queue to the delay list for subsequent execution */
761 	list_add_tail(&t->tqent_list, &tq->tq_delay_list);
762 	TQENT_SET_LIST(t, TQENT_LIST_DELAY);
763 	TQSTAT_INC_LIST(tq, t);
764 	TQSTAT_INC(tq, tasks_total);
765 
766 	t->tqent_id = rc = tq->tq_next_id;
767 	tq->tq_next_id++;
768 	t->tqent_func = func;
769 	t->tqent_arg = arg;
770 	t->tqent_taskq = tq;
771 	t->tqent_timer.function = task_expire;
772 	t->tqent_timer.expires = (unsigned long)expire_time;
773 	add_timer(&t->tqent_timer);
774 
775 	ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
776 
777 	spin_unlock(&t->tqent_lock);
778 
779 	TQSTAT_INC(tq, tasks_dispatched_delayed);
780 
781 	/* Spawn additional taskq threads if required. */
782 	if (tq->tq_nactive == tq->tq_nthreads)
783 		(void) taskq_thread_spawn(tq);
784 out:
785 	spin_unlock_irqrestore(&tq->tq_lock, irqflags);
786 	return (rc);
787 }
788 EXPORT_SYMBOL(taskq_dispatch_delay);
789 
790 void
791 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
792     taskq_ent_t *t)
793 {
794 	unsigned long irqflags;
795 	ASSERT(tq);
796 	ASSERT(func);
797 
798 	spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
799 	    tq->tq_lock_class);
800 
801 	/* Taskq being destroyed and all tasks drained */
802 	if (!(tq->tq_flags & TASKQ_ACTIVE)) {
803 		t->tqent_id = TASKQID_INVALID;
804 		goto out;
805 	}
806 
807 	if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {
808 		/* Dynamic taskq may be able to spawn another thread */
809 		if (taskq_thread_spawn(tq) == 0)
810 			goto out;
811 		flags |= TQ_FRONT;
812 	}
813 
814 	spin_lock(&t->tqent_lock);
815 
816 	/*
817 	 * Make sure the entry is not on some other taskq; it is important to
818 	 * ASSERT() under lock
819 	 */
820 	ASSERT(taskq_empty_ent(t));
821 
822 	/*
823 	 * Mark it as a prealloc'd task.  This is important
824 	 * to ensure that we don't free it later.
825 	 */
826 	t->tqent_flags |= TQENT_FLAG_PREALLOC;
827 
828 	/* Queue to the priority list instead of the pending list */
829 	if (flags & TQ_FRONT) {
830 		TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);
831 		list_add_tail(&t->tqent_list, &tq->tq_prio_list);
832 	} else {
833 		TQENT_SET_LIST(t, TQENT_LIST_PENDING);
834 		list_add_tail(&t->tqent_list, &tq->tq_pend_list);
835 	}
836 	TQSTAT_INC_LIST(tq, t);
837 	TQSTAT_INC(tq, tasks_total);
838 
839 	t->tqent_id = tq->tq_next_id;
840 	tq->tq_next_id++;
841 	t->tqent_func = func;
842 	t->tqent_arg = arg;
843 	t->tqent_taskq = tq;
844 
845 	t->tqent_birth = jiffies;
846 	DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);
847 
848 	spin_unlock(&t->tqent_lock);
849 
850 	wake_up(&tq->tq_work_waitq);
851 
852 	TQSTAT_INC(tq, tasks_dispatched);
853 
854 	/* Spawn additional taskq threads if required. */
855 	if (tq->tq_nactive == tq->tq_nthreads)
856 		(void) taskq_thread_spawn(tq);
857 out:
858 	spin_unlock_irqrestore(&tq->tq_lock, irqflags);
859 }
860 EXPORT_SYMBOL(taskq_dispatch_ent);
861 
862 int
863 taskq_empty_ent(taskq_ent_t *t)
864 {
865 	return (list_empty(&t->tqent_list));
866 }
867 EXPORT_SYMBOL(taskq_empty_ent);
868 
869 void
870 taskq_init_ent(taskq_ent_t *t)
871 {
872 	spin_lock_init(&t->tqent_lock);
873 	init_waitqueue_head(&t->tqent_waitq);
874 	timer_setup(&t->tqent_timer, NULL, 0);
875 	INIT_LIST_HEAD(&t->tqent_list);
876 	t->tqent_id = 0;
877 	t->tqent_func = NULL;
878 	t->tqent_arg = NULL;
879 	t->tqent_flags = 0;
880 	t->tqent_taskq = NULL;
881 }
882 EXPORT_SYMBOL(taskq_init_ent);
883 
884 /*
885  * Return the next pending task, preference is given to tasks on the
886  * priority list which were dispatched with TQ_FRONT.
887  */
888 static taskq_ent_t *
889 taskq_next_ent(taskq_t *tq)
890 {
891 	struct list_head *list;
892 
893 	if (!list_empty(&tq->tq_prio_list))
894 		list = &tq->tq_prio_list;
895 	else if (!list_empty(&tq->tq_pend_list))
896 		list = &tq->tq_pend_list;
897 	else
898 		return (NULL);
899 
900 	return (list_entry(list->next, taskq_ent_t, tqent_list));
901 }
902 
903 /*
904  * Spawns a new thread for the specified taskq.
905  */
906 static void
907 taskq_thread_spawn_task(void *arg)
908 {
909 	taskq_t *tq = (taskq_t *)arg;
910 	unsigned long flags;
911 
912 	if (taskq_thread_create(tq) == NULL) {
913 		/* restore spawning count if failed */
914 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
915 		    tq->tq_lock_class);
916 		tq->tq_nspawn--;
917 		spin_unlock_irqrestore(&tq->tq_lock, flags);
918 	}
919 }
920 
921 /*
922  * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current
923  * number of threads is insufficient to handle the pending tasks.  These
924  * new threads must be created by the dedicated dynamic_taskq to avoid
925  * deadlocks between thread creation and memory reclaim.  The system_taskq
926  * which is also a dynamic taskq cannot be safely used for this.
927  */
928 static int
929 taskq_thread_spawn(taskq_t *tq)
930 {
931 	int spawning = 0;
932 
933 	if (!(tq->tq_flags & TASKQ_DYNAMIC))
934 		return (0);
935 
936 	tq->lastspawnstop = jiffies;
937 	if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&
938 	    (tq->tq_flags & TASKQ_ACTIVE)) {
939 		spawning = (++tq->tq_nspawn);
940 		taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,
941 		    tq, TQ_NOSLEEP);
942 	}
943 
944 	return (spawning);
945 }
946 
947 /*
948  * Threads in a dynamic taskq may exit once there is no more work to do.
949  * To prevent threads from being created and destroyed too often limit
950  * the exit rate to one per spl_taskq_thread_timeout_ms.
951  *
952  * The first thread is the thread list is treated as the primary thread.
953  * There is nothing special about the primary thread but in order to avoid
954  * all the taskq pids from changing we opt to make it long running.
955  */
956 static int
957 taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)
958 {
959 	ASSERT(!taskq_next_ent(tq));
960 	if (!(tq->tq_flags & TASKQ_DYNAMIC) || !spl_taskq_thread_dynamic)
961 		return (0);
962 	if (!(tq->tq_flags & TASKQ_ACTIVE))
963 		return (1);
964 	if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,
965 	    tqt_thread_list) == tqt)
966 		return (0);
967 	ASSERT3U(tq->tq_nthreads, >, 1);
968 	if (tq->tq_nspawn != 0)
969 		return (0);
970 	if (time_before(jiffies, tq->lastspawnstop +
971 	    msecs_to_jiffies(spl_taskq_thread_timeout_ms)))
972 		return (0);
973 	tq->lastspawnstop = jiffies;
974 	return (1);
975 }
976 
977 static int
978 taskq_thread(void *args)
979 {
980 	DECLARE_WAITQUEUE(wait, current);
981 	sigset_t blocked;
982 	taskq_thread_t *tqt = args;
983 	taskq_t *tq;
984 	taskq_ent_t *t;
985 	int seq_tasks = 0;
986 	unsigned long flags;
987 	taskq_ent_t dup_task = {};
988 
989 	ASSERT(tqt);
990 	ASSERT(tqt->tqt_tq);
991 	tq = tqt->tqt_tq;
992 	current->flags |= PF_NOFREEZE;
993 
994 	(void) spl_fstrans_mark();
995 
996 	sigfillset(&blocked);
997 	sigprocmask(SIG_BLOCK, &blocked, NULL);
998 	flush_signals(current);
999 
1000 	tsd_set(taskq_tsd, tq);
1001 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1002 	/*
1003 	 * If we are dynamically spawned, decrease spawning count. Note that
1004 	 * we could be created during taskq_create, in which case we shouldn't
1005 	 * do the decrement. But it's fine because taskq_create will reset
1006 	 * tq_nspawn later.
1007 	 */
1008 	if (tq->tq_flags & TASKQ_DYNAMIC)
1009 		tq->tq_nspawn--;
1010 
1011 	/* Immediately exit if more threads than allowed were created. */
1012 	if (tq->tq_nthreads >= tq->tq_maxthreads)
1013 		goto error;
1014 
1015 	tq->tq_nthreads++;
1016 	list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);
1017 	wake_up(&tq->tq_wait_waitq);
1018 	set_current_state(TASK_INTERRUPTIBLE);
1019 
1020 	TQSTAT_INC(tq, threads_total);
1021 
1022 	while (!kthread_should_stop()) {
1023 
1024 		if (list_empty(&tq->tq_pend_list) &&
1025 		    list_empty(&tq->tq_prio_list)) {
1026 
1027 			if (taskq_thread_should_stop(tq, tqt))
1028 				break;
1029 
1030 			add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
1031 			spin_unlock_irqrestore(&tq->tq_lock, flags);
1032 
1033 			TQSTAT_INC(tq, thread_sleeps);
1034 			TQSTAT_INC(tq, threads_idle);
1035 
1036 			schedule();
1037 			seq_tasks = 0;
1038 
1039 			TQSTAT_DEC(tq, threads_idle);
1040 			TQSTAT_INC(tq, thread_wakeups);
1041 
1042 			spin_lock_irqsave_nested(&tq->tq_lock, flags,
1043 			    tq->tq_lock_class);
1044 			remove_wait_queue(&tq->tq_work_waitq, &wait);
1045 		} else {
1046 			__set_current_state(TASK_RUNNING);
1047 		}
1048 
1049 		if ((t = taskq_next_ent(tq)) != NULL) {
1050 			list_del_init(&t->tqent_list);
1051 			TQSTAT_DEC_LIST(tq, t);
1052 			TQSTAT_DEC(tq, tasks_total);
1053 
1054 			/*
1055 			 * A TQENT_FLAG_PREALLOC task may be reused or freed
1056 			 * during the task function call. Store tqent_id and
1057 			 * tqent_flags here.
1058 			 *
1059 			 * Also use an on stack taskq_ent_t for tqt_task
1060 			 * assignment in this case; we want to make sure
1061 			 * to duplicate all fields, so the values are
1062 			 * correct when it's accessed via DTRACE_PROBE*.
1063 			 */
1064 			tqt->tqt_id = t->tqent_id;
1065 			tqt->tqt_flags = t->tqent_flags;
1066 
1067 			if (t->tqent_flags & TQENT_FLAG_PREALLOC) {
1068 				dup_task = *t;
1069 				t = &dup_task;
1070 			}
1071 			tqt->tqt_task = t;
1072 
1073 			taskq_insert_in_order(tq, tqt);
1074 			tq->tq_nactive++;
1075 			spin_unlock_irqrestore(&tq->tq_lock, flags);
1076 
1077 			TQSTAT_INC(tq, threads_active);
1078 			DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t);
1079 
1080 			/* Perform the requested task */
1081 			t->tqent_func(t->tqent_arg);
1082 
1083 			DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t);
1084 
1085 			TQSTAT_DEC(tq, threads_active);
1086 			if ((t->tqent_flags & TQENT_LIST_MASK) ==
1087 			    TQENT_LIST_PENDING)
1088 				TQSTAT_INC(tq, tasks_executed_normal);
1089 			else
1090 				TQSTAT_INC(tq, tasks_executed_priority);
1091 			TQSTAT_INC(tq, tasks_executed);
1092 
1093 			spin_lock_irqsave_nested(&tq->tq_lock, flags,
1094 			    tq->tq_lock_class);
1095 
1096 			tq->tq_nactive--;
1097 			list_del_init(&tqt->tqt_active_list);
1098 			tqt->tqt_task = NULL;
1099 
1100 			/* For prealloc'd tasks, we don't free anything. */
1101 			if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
1102 				task_done(tq, t);
1103 
1104 			/*
1105 			 * When the current lowest outstanding taskqid is
1106 			 * done calculate the new lowest outstanding id
1107 			 */
1108 			if (tq->tq_lowest_id == tqt->tqt_id) {
1109 				tq->tq_lowest_id = taskq_lowest_id(tq);
1110 				ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
1111 			}
1112 
1113 			/* Spawn additional taskq threads if required. */
1114 			if ((++seq_tasks) > spl_taskq_thread_sequential &&
1115 			    taskq_thread_spawn(tq))
1116 				seq_tasks = 0;
1117 
1118 			tqt->tqt_id = TASKQID_INVALID;
1119 			tqt->tqt_flags = 0;
1120 			wake_up_all(&tq->tq_wait_waitq);
1121 		} else
1122 			TQSTAT_INC(tq, thread_wakeups_nowork);
1123 
1124 		set_current_state(TASK_INTERRUPTIBLE);
1125 
1126 	}
1127 
1128 	__set_current_state(TASK_RUNNING);
1129 	tq->tq_nthreads--;
1130 	list_del_init(&tqt->tqt_thread_list);
1131 
1132 	TQSTAT_DEC(tq, threads_total);
1133 	TQSTAT_INC(tq, threads_destroyed);
1134 
1135 error:
1136 	kmem_free(tqt, sizeof (taskq_thread_t));
1137 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1138 
1139 	tsd_set(taskq_tsd, NULL);
1140 	thread_exit();
1141 
1142 	return (0);
1143 }
1144 
1145 static taskq_thread_t *
1146 taskq_thread_create(taskq_t *tq)
1147 {
1148 	static int last_used_cpu = 0;
1149 	taskq_thread_t *tqt;
1150 
1151 	tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);
1152 	INIT_LIST_HEAD(&tqt->tqt_thread_list);
1153 	INIT_LIST_HEAD(&tqt->tqt_active_list);
1154 	tqt->tqt_tq = tq;
1155 	tqt->tqt_id = TASKQID_INVALID;
1156 
1157 	tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
1158 	    "%s", tq->tq_name);
1159 	if (tqt->tqt_thread == NULL) {
1160 		kmem_free(tqt, sizeof (taskq_thread_t));
1161 		return (NULL);
1162 	}
1163 
1164 	if (spl_taskq_thread_bind) {
1165 		last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
1166 		kthread_bind(tqt->tqt_thread, last_used_cpu);
1167 	}
1168 
1169 	if (spl_taskq_thread_priority)
1170 		set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));
1171 
1172 	wake_up_process(tqt->tqt_thread);
1173 
1174 	TQSTAT_INC(tq, threads_created);
1175 
1176 	return (tqt);
1177 }
1178 
1179 static void
1180 taskq_stats_init(taskq_t *tq)
1181 {
1182 	taskq_sums_t *tqs = &tq->tq_sums;
1183 	wmsum_init(&tqs->tqs_threads_active, 0);
1184 	wmsum_init(&tqs->tqs_threads_idle, 0);
1185 	wmsum_init(&tqs->tqs_threads_total, 0);
1186 	wmsum_init(&tqs->tqs_tasks_pending, 0);
1187 	wmsum_init(&tqs->tqs_tasks_priority, 0);
1188 	wmsum_init(&tqs->tqs_tasks_total, 0);
1189 	wmsum_init(&tqs->tqs_tasks_delayed, 0);
1190 	wmsum_init(&tqs->tqs_entries_free, 0);
1191 	wmsum_init(&tqs->tqs_threads_created, 0);
1192 	wmsum_init(&tqs->tqs_threads_destroyed, 0);
1193 	wmsum_init(&tqs->tqs_tasks_dispatched, 0);
1194 	wmsum_init(&tqs->tqs_tasks_dispatched_delayed, 0);
1195 	wmsum_init(&tqs->tqs_tasks_executed_normal, 0);
1196 	wmsum_init(&tqs->tqs_tasks_executed_priority, 0);
1197 	wmsum_init(&tqs->tqs_tasks_executed, 0);
1198 	wmsum_init(&tqs->tqs_tasks_delayed_requeued, 0);
1199 	wmsum_init(&tqs->tqs_tasks_cancelled, 0);
1200 	wmsum_init(&tqs->tqs_thread_wakeups, 0);
1201 	wmsum_init(&tqs->tqs_thread_wakeups_nowork, 0);
1202 	wmsum_init(&tqs->tqs_thread_sleeps, 0);
1203 }
1204 
1205 static void
1206 taskq_stats_fini(taskq_t *tq)
1207 {
1208 	taskq_sums_t *tqs = &tq->tq_sums;
1209 	wmsum_fini(&tqs->tqs_threads_active);
1210 	wmsum_fini(&tqs->tqs_threads_idle);
1211 	wmsum_fini(&tqs->tqs_threads_total);
1212 	wmsum_fini(&tqs->tqs_tasks_pending);
1213 	wmsum_fini(&tqs->tqs_tasks_priority);
1214 	wmsum_fini(&tqs->tqs_tasks_total);
1215 	wmsum_fini(&tqs->tqs_tasks_delayed);
1216 	wmsum_fini(&tqs->tqs_entries_free);
1217 	wmsum_fini(&tqs->tqs_threads_created);
1218 	wmsum_fini(&tqs->tqs_threads_destroyed);
1219 	wmsum_fini(&tqs->tqs_tasks_dispatched);
1220 	wmsum_fini(&tqs->tqs_tasks_dispatched_delayed);
1221 	wmsum_fini(&tqs->tqs_tasks_executed_normal);
1222 	wmsum_fini(&tqs->tqs_tasks_executed_priority);
1223 	wmsum_fini(&tqs->tqs_tasks_executed);
1224 	wmsum_fini(&tqs->tqs_tasks_delayed_requeued);
1225 	wmsum_fini(&tqs->tqs_tasks_cancelled);
1226 	wmsum_fini(&tqs->tqs_thread_wakeups);
1227 	wmsum_fini(&tqs->tqs_thread_wakeups_nowork);
1228 	wmsum_fini(&tqs->tqs_thread_sleeps);
1229 }
1230 
1231 static int
1232 taskq_kstats_update(kstat_t *ksp, int rw)
1233 {
1234 	if (rw == KSTAT_WRITE)
1235 		return (EACCES);
1236 
1237 	taskq_t *tq = ksp->ks_private;
1238 	taskq_kstats_t *tqks = ksp->ks_data;
1239 
1240 	tqks->tqks_threads_max.value.ui64 = tq->tq_maxthreads;
1241 	tqks->tqks_entry_pool_min.value.ui64 = tq->tq_minalloc;
1242 	tqks->tqks_entry_pool_max.value.ui64 = tq->tq_maxalloc;
1243 
1244 	taskq_sums_t *tqs = &tq->tq_sums;
1245 
1246 	tqks->tqks_threads_active.value.ui64 =
1247 	    wmsum_value(&tqs->tqs_threads_active);
1248 	tqks->tqks_threads_idle.value.ui64 =
1249 	    wmsum_value(&tqs->tqs_threads_idle);
1250 	tqks->tqks_threads_total.value.ui64 =
1251 	    wmsum_value(&tqs->tqs_threads_total);
1252 	tqks->tqks_tasks_pending.value.ui64 =
1253 	    wmsum_value(&tqs->tqs_tasks_pending);
1254 	tqks->tqks_tasks_priority.value.ui64 =
1255 	    wmsum_value(&tqs->tqs_tasks_priority);
1256 	tqks->tqks_tasks_total.value.ui64 =
1257 	    wmsum_value(&tqs->tqs_tasks_total);
1258 	tqks->tqks_tasks_delayed.value.ui64 =
1259 	    wmsum_value(&tqs->tqs_tasks_delayed);
1260 	tqks->tqks_entries_free.value.ui64 =
1261 	    wmsum_value(&tqs->tqs_entries_free);
1262 	tqks->tqks_threads_created.value.ui64 =
1263 	    wmsum_value(&tqs->tqs_threads_created);
1264 	tqks->tqks_threads_destroyed.value.ui64 =
1265 	    wmsum_value(&tqs->tqs_threads_destroyed);
1266 	tqks->tqks_tasks_dispatched.value.ui64 =
1267 	    wmsum_value(&tqs->tqs_tasks_dispatched);
1268 	tqks->tqks_tasks_dispatched_delayed.value.ui64 =
1269 	    wmsum_value(&tqs->tqs_tasks_dispatched_delayed);
1270 	tqks->tqks_tasks_executed_normal.value.ui64 =
1271 	    wmsum_value(&tqs->tqs_tasks_executed_normal);
1272 	tqks->tqks_tasks_executed_priority.value.ui64 =
1273 	    wmsum_value(&tqs->tqs_tasks_executed_priority);
1274 	tqks->tqks_tasks_executed.value.ui64 =
1275 	    wmsum_value(&tqs->tqs_tasks_executed);
1276 	tqks->tqks_tasks_delayed_requeued.value.ui64 =
1277 	    wmsum_value(&tqs->tqs_tasks_delayed_requeued);
1278 	tqks->tqks_tasks_cancelled.value.ui64 =
1279 	    wmsum_value(&tqs->tqs_tasks_cancelled);
1280 	tqks->tqks_thread_wakeups.value.ui64 =
1281 	    wmsum_value(&tqs->tqs_thread_wakeups);
1282 	tqks->tqks_thread_wakeups_nowork.value.ui64 =
1283 	    wmsum_value(&tqs->tqs_thread_wakeups_nowork);
1284 	tqks->tqks_thread_sleeps.value.ui64 =
1285 	    wmsum_value(&tqs->tqs_thread_sleeps);
1286 
1287 	return (0);
1288 }
1289 
1290 static void
1291 taskq_kstats_init(taskq_t *tq)
1292 {
1293 	char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1294 	snprintf(name, sizeof (name), "%s.%d", tq->tq_name, tq->tq_instance);
1295 
1296 	kstat_t *ksp = kstat_create("taskq", 0, name, "misc",
1297 	    KSTAT_TYPE_NAMED, sizeof (taskq_kstats_t) / sizeof (kstat_named_t),
1298 	    KSTAT_FLAG_VIRTUAL);
1299 
1300 	if (ksp == NULL)
1301 		return;
1302 
1303 	ksp->ks_private = tq;
1304 	ksp->ks_update = taskq_kstats_update;
1305 	ksp->ks_data = kmem_alloc(sizeof (taskq_kstats_t), KM_SLEEP);
1306 	memcpy(ksp->ks_data, &taskq_kstats_template, sizeof (taskq_kstats_t));
1307 	kstat_install(ksp);
1308 
1309 	tq->tq_ksp = ksp;
1310 }
1311 
1312 static void
1313 taskq_kstats_fini(taskq_t *tq)
1314 {
1315 	if (tq->tq_ksp == NULL)
1316 		return;
1317 
1318 	kmem_free(tq->tq_ksp->ks_data, sizeof (taskq_kstats_t));
1319 	kstat_delete(tq->tq_ksp);
1320 
1321 	tq->tq_ksp = NULL;
1322 }
1323 
1324 taskq_t *
1325 taskq_create(const char *name, int threads_arg, pri_t pri,
1326     int minalloc, int maxalloc, uint_t flags)
1327 {
1328 	taskq_t *tq;
1329 	taskq_thread_t *tqt;
1330 	int count = 0, rc = 0, i;
1331 	unsigned long irqflags;
1332 	int nthreads = threads_arg;
1333 
1334 	ASSERT(name != NULL);
1335 	ASSERT(minalloc >= 0);
1336 	ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */
1337 
1338 	/* Scale the number of threads using nthreads as a percentage */
1339 	if (flags & TASKQ_THREADS_CPU_PCT) {
1340 		ASSERT(nthreads <= 100);
1341 		ASSERT(nthreads >= 0);
1342 		nthreads = MIN(threads_arg, 100);
1343 		nthreads = MAX(nthreads, 0);
1344 		nthreads = MAX((num_online_cpus() * nthreads) /100, 1);
1345 	}
1346 
1347 	tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
1348 	if (tq == NULL)
1349 		return (NULL);
1350 
1351 	tq->tq_hp_support = B_FALSE;
1352 #ifdef HAVE_CPU_HOTPLUG
1353 	if (flags & TASKQ_THREADS_CPU_PCT) {
1354 		tq->tq_hp_support = B_TRUE;
1355 		if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,
1356 		    &tq->tq_hp_cb_node) != 0) {
1357 			kmem_free(tq, sizeof (*tq));
1358 			return (NULL);
1359 		}
1360 	}
1361 #endif
1362 
1363 	spin_lock_init(&tq->tq_lock);
1364 	INIT_LIST_HEAD(&tq->tq_thread_list);
1365 	INIT_LIST_HEAD(&tq->tq_active_list);
1366 	tq->tq_name = kmem_strdup(name);
1367 	tq->tq_nactive = 0;
1368 	tq->tq_nthreads = 0;
1369 	tq->tq_nspawn = 0;
1370 	tq->tq_maxthreads = nthreads;
1371 	tq->tq_cpu_pct = threads_arg;
1372 	tq->tq_pri = pri;
1373 	tq->tq_minalloc = minalloc;
1374 	tq->tq_maxalloc = maxalloc;
1375 	tq->tq_nalloc = 0;
1376 	tq->tq_flags = (flags | TASKQ_ACTIVE);
1377 	tq->tq_next_id = TASKQID_INITIAL;
1378 	tq->tq_lowest_id = TASKQID_INITIAL;
1379 	tq->lastspawnstop = jiffies;
1380 	INIT_LIST_HEAD(&tq->tq_free_list);
1381 	INIT_LIST_HEAD(&tq->tq_pend_list);
1382 	INIT_LIST_HEAD(&tq->tq_prio_list);
1383 	INIT_LIST_HEAD(&tq->tq_delay_list);
1384 	init_waitqueue_head(&tq->tq_work_waitq);
1385 	init_waitqueue_head(&tq->tq_wait_waitq);
1386 	tq->tq_lock_class = TQ_LOCK_GENERAL;
1387 	INIT_LIST_HEAD(&tq->tq_taskqs);
1388 	taskq_stats_init(tq);
1389 
1390 	if (flags & TASKQ_PREPOPULATE) {
1391 		spin_lock_irqsave_nested(&tq->tq_lock, irqflags,
1392 		    tq->tq_lock_class);
1393 
1394 		for (i = 0; i < minalloc; i++)
1395 			task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,
1396 			    &irqflags));
1397 
1398 		spin_unlock_irqrestore(&tq->tq_lock, irqflags);
1399 	}
1400 
1401 	if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
1402 		nthreads = 1;
1403 
1404 	for (i = 0; i < nthreads; i++) {
1405 		tqt = taskq_thread_create(tq);
1406 		if (tqt == NULL)
1407 			rc = 1;
1408 		else
1409 			count++;
1410 	}
1411 
1412 	/* Wait for all threads to be started before potential destroy */
1413 	wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);
1414 	/*
1415 	 * taskq_thread might have touched nspawn, but we don't want them to
1416 	 * because they're not dynamically spawned. So we reset it to 0
1417 	 */
1418 	tq->tq_nspawn = 0;
1419 
1420 	if (rc) {
1421 		taskq_destroy(tq);
1422 		return (NULL);
1423 	}
1424 
1425 	down_write(&tq_list_sem);
1426 	tq->tq_instance = taskq_find_by_name(name) + 1;
1427 	list_add_tail(&tq->tq_taskqs, &tq_list);
1428 	up_write(&tq_list_sem);
1429 
1430 	/* Install kstats late, because the name includes tq_instance */
1431 	taskq_kstats_init(tq);
1432 
1433 	return (tq);
1434 }
1435 EXPORT_SYMBOL(taskq_create);
1436 
1437 void
1438 taskq_destroy(taskq_t *tq)
1439 {
1440 	struct task_struct *thread;
1441 	taskq_thread_t *tqt;
1442 	taskq_ent_t *t;
1443 	unsigned long flags;
1444 
1445 	ASSERT(tq);
1446 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1447 	tq->tq_flags &= ~TASKQ_ACTIVE;
1448 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1449 
1450 #ifdef HAVE_CPU_HOTPLUG
1451 	if (tq->tq_hp_support) {
1452 		VERIFY0(cpuhp_state_remove_instance_nocalls(
1453 		    spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));
1454 	}
1455 #endif
1456 	/*
1457 	 * When TASKQ_ACTIVE is clear new tasks may not be added nor may
1458 	 * new worker threads be spawned for dynamic taskq.
1459 	 */
1460 	if (dynamic_taskq != NULL)
1461 		taskq_wait_outstanding(dynamic_taskq, 0);
1462 
1463 	taskq_wait(tq);
1464 
1465 	taskq_kstats_fini(tq);
1466 
1467 	/* remove taskq from global list used by the kstats */
1468 	down_write(&tq_list_sem);
1469 	list_del(&tq->tq_taskqs);
1470 	up_write(&tq_list_sem);
1471 
1472 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1473 	/* wait for spawning threads to insert themselves to the list */
1474 	while (tq->tq_nspawn) {
1475 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1476 		schedule_timeout_interruptible(1);
1477 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
1478 		    tq->tq_lock_class);
1479 	}
1480 
1481 	/*
1482 	 * Signal each thread to exit and block until it does.  Each thread
1483 	 * is responsible for removing itself from the list and freeing its
1484 	 * taskq_thread_t.  This allows for idle threads to opt to remove
1485 	 * themselves from the taskq.  They can be recreated as needed.
1486 	 */
1487 	while (!list_empty(&tq->tq_thread_list)) {
1488 		tqt = list_entry(tq->tq_thread_list.next,
1489 		    taskq_thread_t, tqt_thread_list);
1490 		thread = tqt->tqt_thread;
1491 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1492 
1493 		kthread_stop(thread);
1494 
1495 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
1496 		    tq->tq_lock_class);
1497 	}
1498 
1499 	while (!list_empty(&tq->tq_free_list)) {
1500 		t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
1501 
1502 		ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
1503 
1504 		list_del_init(&t->tqent_list);
1505 		task_free(tq, t);
1506 	}
1507 
1508 	ASSERT0(tq->tq_nthreads);
1509 	ASSERT0(tq->tq_nalloc);
1510 	ASSERT0(tq->tq_nspawn);
1511 	ASSERT(list_empty(&tq->tq_thread_list));
1512 	ASSERT(list_empty(&tq->tq_active_list));
1513 	ASSERT(list_empty(&tq->tq_free_list));
1514 	ASSERT(list_empty(&tq->tq_pend_list));
1515 	ASSERT(list_empty(&tq->tq_prio_list));
1516 	ASSERT(list_empty(&tq->tq_delay_list));
1517 
1518 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1519 
1520 	taskq_stats_fini(tq);
1521 	kmem_strfree(tq->tq_name);
1522 	kmem_free(tq, sizeof (taskq_t));
1523 }
1524 EXPORT_SYMBOL(taskq_destroy);
1525 
1526 /*
1527  * Create a taskq with a specified number of pool threads. Allocate
1528  * and return an array of nthreads kthread_t pointers, one for each
1529  * thread in the pool. The array is not ordered and must be freed
1530  * by the caller.
1531  */
1532 taskq_t *
1533 taskq_create_synced(const char *name, int nthreads, pri_t pri,
1534     int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
1535 {
1536 	taskq_t *tq;
1537 	taskq_thread_t *tqt;
1538 	int i = 0;
1539 	kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
1540 	    KM_SLEEP);
1541 
1542 	flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
1543 
1544 	/* taskq_create spawns all the threads before returning */
1545 	tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
1546 	    flags | TASKQ_PREPOPULATE);
1547 	VERIFY(tq != NULL);
1548 	VERIFY(tq->tq_nthreads == nthreads);
1549 
1550 	list_for_each_entry(tqt, &tq->tq_thread_list, tqt_thread_list) {
1551 		kthreads[i] = tqt->tqt_thread;
1552 		i++;
1553 	}
1554 
1555 	ASSERT3S(i, ==, nthreads);
1556 	*ktpp = kthreads;
1557 
1558 	return (tq);
1559 }
1560 EXPORT_SYMBOL(taskq_create_synced);
1561 
1562 static kstat_t *taskq_summary_ksp = NULL;
1563 
1564 static int
1565 spl_taskq_kstat_headers(char *buf, size_t size)
1566 {
1567 	size_t n = snprintf(buf, size,
1568 	    "%-20s | %-17s | %-23s\n"
1569 	    "%-20s | %-17s | %-23s\n"
1570 	    "%-20s | %-17s | %-23s\n",
1571 	    "", "threads", "tasks on queue",
1572 	    "taskq name", "tot [act idl] max", " pend [ norm  high] dly",
1573 	    "--------------------", "-----------------",
1574 	    "-----------------------");
1575 	return (n >= size ? ENOMEM : 0);
1576 }
1577 
1578 static int
1579 spl_taskq_kstat_data(char *buf, size_t size, void *data)
1580 {
1581 	struct list_head *tql = NULL;
1582 	taskq_t *tq;
1583 	char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */
1584 	char threads[25];
1585 	char tasks[30];
1586 	size_t n;
1587 	int err = 0;
1588 
1589 	down_read(&tq_list_sem);
1590 	list_for_each_prev(tql, &tq_list) {
1591 		tq = list_entry(tql, taskq_t, tq_taskqs);
1592 
1593 		mutex_enter(tq->tq_ksp->ks_lock);
1594 		taskq_kstats_update(tq->tq_ksp, KSTAT_READ);
1595 		taskq_kstats_t *tqks = tq->tq_ksp->ks_data;
1596 
1597 		snprintf(name, sizeof (name), "%s.%d", tq->tq_name,
1598 		    tq->tq_instance);
1599 		snprintf(threads, sizeof (threads), "%3llu [%3llu %3llu] %3llu",
1600 		    tqks->tqks_threads_total.value.ui64,
1601 		    tqks->tqks_threads_active.value.ui64,
1602 		    tqks->tqks_threads_idle.value.ui64,
1603 		    tqks->tqks_threads_max.value.ui64);
1604 		snprintf(tasks, sizeof (tasks), "%5llu [%5llu %5llu] %3llu",
1605 		    tqks->tqks_tasks_total.value.ui64,
1606 		    tqks->tqks_tasks_pending.value.ui64,
1607 		    tqks->tqks_tasks_priority.value.ui64,
1608 		    tqks->tqks_tasks_delayed.value.ui64);
1609 
1610 		mutex_exit(tq->tq_ksp->ks_lock);
1611 
1612 		n = snprintf(buf, size, "%-20s | %-17s | %-23s\n",
1613 		    name, threads, tasks);
1614 		if (n >= size) {
1615 			err = ENOMEM;
1616 			break;
1617 		}
1618 
1619 		buf = &buf[n];
1620 		size -= n;
1621 	}
1622 
1623 	up_read(&tq_list_sem);
1624 
1625 	return (err);
1626 }
1627 
1628 static void
1629 spl_taskq_kstat_init(void)
1630 {
1631 	kstat_t *ksp = kstat_create("taskq", 0, "summary", "misc",
1632 	    KSTAT_TYPE_RAW, 0, KSTAT_FLAG_VIRTUAL);
1633 
1634 	if (ksp == NULL)
1635 		return;
1636 
1637 	ksp->ks_data = (void *)(uintptr_t)1;
1638 	ksp->ks_ndata = 1;
1639 	kstat_set_raw_ops(ksp, spl_taskq_kstat_headers,
1640 	    spl_taskq_kstat_data, NULL);
1641 	kstat_install(ksp);
1642 
1643 	taskq_summary_ksp = ksp;
1644 }
1645 
1646 static void
1647 spl_taskq_kstat_fini(void)
1648 {
1649 	if (taskq_summary_ksp == NULL)
1650 		return;
1651 
1652 	kstat_delete(taskq_summary_ksp);
1653 	taskq_summary_ksp = NULL;
1654 }
1655 
1656 static unsigned int spl_taskq_kick = 0;
1657 
1658 /*
1659  * 2.6.36 API Change
1660  * module_param_cb is introduced to take kernel_param_ops and
1661  * module_param_call is marked as obsolete. Also set and get operations
1662  * were changed to take a 'const struct kernel_param *'.
1663  */
1664 static int
1665 #ifdef module_param_cb
1666 param_set_taskq_kick(const char *val, const struct kernel_param *kp)
1667 #else
1668 param_set_taskq_kick(const char *val, struct kernel_param *kp)
1669 #endif
1670 {
1671 	int ret;
1672 	taskq_t *tq = NULL;
1673 	taskq_ent_t *t;
1674 	unsigned long flags;
1675 
1676 	ret = param_set_uint(val, kp);
1677 	if (ret < 0 || !spl_taskq_kick)
1678 		return (ret);
1679 	/* reset value */
1680 	spl_taskq_kick = 0;
1681 
1682 	down_read(&tq_list_sem);
1683 	list_for_each_entry(tq, &tq_list, tq_taskqs) {
1684 		spin_lock_irqsave_nested(&tq->tq_lock, flags,
1685 		    tq->tq_lock_class);
1686 		/* Check if the first pending is older than 5 seconds */
1687 		t = taskq_next_ent(tq);
1688 		if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) {
1689 			(void) taskq_thread_spawn(tq);
1690 			printk(KERN_INFO "spl: Kicked taskq %s/%d\n",
1691 			    tq->tq_name, tq->tq_instance);
1692 		}
1693 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1694 	}
1695 	up_read(&tq_list_sem);
1696 	return (ret);
1697 }
1698 
1699 #ifdef module_param_cb
1700 static const struct kernel_param_ops param_ops_taskq_kick = {
1701 	.set = param_set_taskq_kick,
1702 	.get = param_get_uint,
1703 };
1704 module_param_cb(spl_taskq_kick, &param_ops_taskq_kick, &spl_taskq_kick, 0644);
1705 #else
1706 module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,
1707 	&spl_taskq_kick, 0644);
1708 #endif
1709 MODULE_PARM_DESC(spl_taskq_kick,
1710 	"Write nonzero to kick stuck taskqs to spawn more threads");
1711 
1712 #ifdef HAVE_CPU_HOTPLUG
1713 /*
1714  * This callback will be called exactly once for each core that comes online,
1715  * for each dynamic taskq. We attempt to expand taskqs that have
1716  * TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every
1717  * time, to correctly determine whether or not to add a thread.
1718  */
1719 static int
1720 spl_taskq_expand(unsigned int cpu, struct hlist_node *node)
1721 {
1722 	taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1723 	unsigned long flags;
1724 	int err = 0;
1725 
1726 	ASSERT(tq);
1727 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1728 
1729 	if (!(tq->tq_flags & TASKQ_ACTIVE)) {
1730 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1731 		return (err);
1732 	}
1733 
1734 	ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1735 	int nthreads = MIN(tq->tq_cpu_pct, 100);
1736 	nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);
1737 	tq->tq_maxthreads = nthreads;
1738 
1739 	if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1740 	    tq->tq_maxthreads > tq->tq_nthreads) {
1741 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1742 		taskq_thread_t *tqt = taskq_thread_create(tq);
1743 		if (tqt == NULL)
1744 			err = -1;
1745 		return (err);
1746 	}
1747 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1748 	return (err);
1749 }
1750 
1751 /*
1752  * While we don't support offlining CPUs, it is possible that CPUs will fail
1753  * to online successfully. We do need to be able to handle this case
1754  * gracefully.
1755  */
1756 static int
1757 spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)
1758 {
1759 	taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);
1760 	unsigned long flags;
1761 
1762 	ASSERT(tq);
1763 	spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);
1764 
1765 	if (!(tq->tq_flags & TASKQ_ACTIVE))
1766 		goto out;
1767 
1768 	ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
1769 	int nthreads = MIN(tq->tq_cpu_pct, 100);
1770 	nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);
1771 	tq->tq_maxthreads = nthreads;
1772 
1773 	if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&
1774 	    tq->tq_maxthreads < tq->tq_nthreads) {
1775 		ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1);
1776 		taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,
1777 		    taskq_thread_t, tqt_thread_list);
1778 		struct task_struct *thread = tqt->tqt_thread;
1779 		spin_unlock_irqrestore(&tq->tq_lock, flags);
1780 
1781 		kthread_stop(thread);
1782 
1783 		return (0);
1784 	}
1785 
1786 out:
1787 	spin_unlock_irqrestore(&tq->tq_lock, flags);
1788 	return (0);
1789 }
1790 #endif
1791 
1792 int
1793 spl_taskq_init(void)
1794 {
1795 	init_rwsem(&tq_list_sem);
1796 	tsd_create(&taskq_tsd, NULL);
1797 
1798 #ifdef HAVE_CPU_HOTPLUG
1799 	spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,
1800 	    "fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);
1801 #endif
1802 
1803 	system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),
1804 	    maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1805 	if (system_taskq == NULL)
1806 		return (-ENOMEM);
1807 
1808 	system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),
1809 	    maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);
1810 	if (system_delay_taskq == NULL) {
1811 #ifdef HAVE_CPU_HOTPLUG
1812 		cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1813 #endif
1814 		taskq_destroy(system_taskq);
1815 		return (-ENOMEM);
1816 	}
1817 
1818 	dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
1819 	    maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
1820 	if (dynamic_taskq == NULL) {
1821 #ifdef HAVE_CPU_HOTPLUG
1822 		cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1823 #endif
1824 		taskq_destroy(system_taskq);
1825 		taskq_destroy(system_delay_taskq);
1826 		return (-ENOMEM);
1827 	}
1828 
1829 	/*
1830 	 * This is used to annotate tq_lock, so
1831 	 *   taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch
1832 	 * does not trigger a lockdep warning re: possible recursive locking
1833 	 */
1834 	dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;
1835 
1836 	spl_taskq_kstat_init();
1837 
1838 	return (0);
1839 }
1840 
1841 void
1842 spl_taskq_fini(void)
1843 {
1844 	spl_taskq_kstat_fini();
1845 
1846 	taskq_destroy(dynamic_taskq);
1847 	dynamic_taskq = NULL;
1848 
1849 	taskq_destroy(system_delay_taskq);
1850 	system_delay_taskq = NULL;
1851 
1852 	taskq_destroy(system_taskq);
1853 	system_taskq = NULL;
1854 
1855 	tsd_destroy(&taskq_tsd);
1856 
1857 #ifdef HAVE_CPU_HOTPLUG
1858 	cpuhp_remove_multi_state(spl_taskq_cpuhp_state);
1859 	spl_taskq_cpuhp_state = 0;
1860 #endif
1861 }
1862