xref: /freebsd-src/sys/contrib/openzfs/module/zfs/bqueue.c (revision 15f0b8c309dea1dcb14d3e374686576ff68ac43f)
1eda14cbcSMatt Macy /*
2eda14cbcSMatt Macy  * CDDL HEADER START
3eda14cbcSMatt Macy  *
4eda14cbcSMatt Macy  * This file and its contents are supplied under the terms of the
5eda14cbcSMatt Macy  * Common Development and Distribution License ("CDDL"), version 1.0.
6eda14cbcSMatt Macy  * You may only use this file in accordance with the terms of version
7eda14cbcSMatt Macy  * 1.0 of the CDDL.
8eda14cbcSMatt Macy  *
9eda14cbcSMatt Macy  * A full copy of the text of the CDDL should have accompanied this
10eda14cbcSMatt Macy  * source.  A copy of the CDDL is also available via the Internet at
11eda14cbcSMatt Macy  * http://www.illumos.org/license/CDDL.
12eda14cbcSMatt Macy  *
13eda14cbcSMatt Macy  * CDDL HEADER END
14eda14cbcSMatt Macy  */
15eda14cbcSMatt Macy /*
16eda14cbcSMatt Macy  * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
17eda14cbcSMatt Macy  */
18eda14cbcSMatt Macy 
19eda14cbcSMatt Macy #include	<sys/bqueue.h>
20eda14cbcSMatt Macy #include	<sys/zfs_context.h>
21eda14cbcSMatt Macy 
22eda14cbcSMatt Macy static inline bqueue_node_t *
obj2node(bqueue_t * q,void * data)23eda14cbcSMatt Macy obj2node(bqueue_t *q, void *data)
24eda14cbcSMatt Macy {
25eda14cbcSMatt Macy 	return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26eda14cbcSMatt Macy }
27eda14cbcSMatt Macy 
28eda14cbcSMatt Macy /*
29eda14cbcSMatt Macy  * Initialize a blocking queue  The maximum capacity of the queue is set to
30*15f0b8c3SMartin Matuska  * size.  Types that are stored in a bqueue must contain a bqueue_node_t, and
31*15f0b8c3SMartin Matuska  * node_offset must be its offset from the start of the struct. fill_fraction
32*15f0b8c3SMartin Matuska  * is a performance tuning value; when the queue is full, any threads
33*15f0b8c3SMartin Matuska  * attempting to enqueue records will block.  They will block until they're
34*15f0b8c3SMartin Matuska  * signaled, which will occur when the queue is at least 1/fill_fraction
35eda14cbcSMatt Macy  * empty.  Similar behavior occurs on dequeue; if the queue is empty, threads
36*15f0b8c3SMartin Matuska  * block.  They will be signalled when the queue has 1/fill_fraction full.
37*15f0b8c3SMartin Matuska  * As a result, you must call bqueue_enqueue_flush() when you enqueue your
38*15f0b8c3SMartin Matuska  * final record on a thread, in case the dequeuing threads are currently
39*15f0b8c3SMartin Matuska  * blocked and that enqueue does not cause them to be woken. Alternatively,
40*15f0b8c3SMartin Matuska  * this behavior can be disabled (causing signaling to happen immediately) by
41*15f0b8c3SMartin Matuska  * setting fill_fraction to any value larger than size. Return 0 on success,
42*15f0b8c3SMartin Matuska  * or -1 on failure.
43*15f0b8c3SMartin Matuska  *
44*15f0b8c3SMartin Matuska  * Note: The caller must ensure that for a given bqueue_t, there's only a
45*15f0b8c3SMartin Matuska  * single call to bqueue_enqueue() running at a time (e.g. by calling only
46*15f0b8c3SMartin Matuska  * from a single thread, or with locking around the call). Similarly, the
47*15f0b8c3SMartin Matuska  * caller must ensure that there's only a single call to bqueue_dequeue()
48*15f0b8c3SMartin Matuska  * running at a time. However, the one call to bqueue_enqueue() may be
49*15f0b8c3SMartin Matuska  * invoked concurrently with the one call to bqueue_dequeue().
50eda14cbcSMatt Macy  */
51eda14cbcSMatt Macy int
bqueue_init(bqueue_t * q,uint_t fill_fraction,size_t size,size_t node_offset)52c7046f76SMartin Matuska bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
53eda14cbcSMatt Macy {
54eda14cbcSMatt Macy 	if (fill_fraction == 0) {
55eda14cbcSMatt Macy 		return (-1);
56eda14cbcSMatt Macy 	}
57eda14cbcSMatt Macy 	list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
58eda14cbcSMatt Macy 	    node_offset + offsetof(bqueue_node_t, bqn_node));
59*15f0b8c3SMartin Matuska 	list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t),
60*15f0b8c3SMartin Matuska 	    node_offset + offsetof(bqueue_node_t, bqn_node));
61*15f0b8c3SMartin Matuska 	list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t),
62*15f0b8c3SMartin Matuska 	    node_offset + offsetof(bqueue_node_t, bqn_node));
63eda14cbcSMatt Macy 	cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
64eda14cbcSMatt Macy 	cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
65eda14cbcSMatt Macy 	mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
66eda14cbcSMatt Macy 	q->bq_node_offset = node_offset;
67eda14cbcSMatt Macy 	q->bq_size = 0;
68*15f0b8c3SMartin Matuska 	q->bq_dequeuing_size = 0;
69*15f0b8c3SMartin Matuska 	q->bq_enqueuing_size = 0;
70eda14cbcSMatt Macy 	q->bq_maxsize = size;
71eda14cbcSMatt Macy 	q->bq_fill_fraction = fill_fraction;
72eda14cbcSMatt Macy 	return (0);
73eda14cbcSMatt Macy }
74eda14cbcSMatt Macy 
75eda14cbcSMatt Macy /*
76eda14cbcSMatt Macy  * Destroy a blocking queue.  This function asserts that there are no
77eda14cbcSMatt Macy  * elements in the queue, and no one is blocked on the condition
78eda14cbcSMatt Macy  * variables.
79eda14cbcSMatt Macy  */
80eda14cbcSMatt Macy void
bqueue_destroy(bqueue_t * q)81eda14cbcSMatt Macy bqueue_destroy(bqueue_t *q)
82eda14cbcSMatt Macy {
83eda14cbcSMatt Macy 	mutex_enter(&q->bq_lock);
84eda14cbcSMatt Macy 	ASSERT0(q->bq_size);
85*15f0b8c3SMartin Matuska 	ASSERT0(q->bq_dequeuing_size);
86*15f0b8c3SMartin Matuska 	ASSERT0(q->bq_enqueuing_size);
87eda14cbcSMatt Macy 	cv_destroy(&q->bq_add_cv);
88eda14cbcSMatt Macy 	cv_destroy(&q->bq_pop_cv);
89eda14cbcSMatt Macy 	list_destroy(&q->bq_list);
90*15f0b8c3SMartin Matuska 	list_destroy(&q->bq_dequeuing_list);
91*15f0b8c3SMartin Matuska 	list_destroy(&q->bq_enqueuing_list);
92eda14cbcSMatt Macy 	mutex_exit(&q->bq_lock);
93eda14cbcSMatt Macy 	mutex_destroy(&q->bq_lock);
94eda14cbcSMatt Macy }
95eda14cbcSMatt Macy 
96eda14cbcSMatt Macy static void
bqueue_enqueue_impl(bqueue_t * q,void * data,size_t item_size,boolean_t flush)97c7046f76SMartin Matuska bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
98eda14cbcSMatt Macy {
99eda14cbcSMatt Macy 	ASSERT3U(item_size, >, 0);
100eda14cbcSMatt Macy 	ASSERT3U(item_size, <=, q->bq_maxsize);
101*15f0b8c3SMartin Matuska 
102eda14cbcSMatt Macy 	obj2node(q, data)->bqn_size = item_size;
103*15f0b8c3SMartin Matuska 	q->bq_enqueuing_size += item_size;
104*15f0b8c3SMartin Matuska 	list_insert_tail(&q->bq_enqueuing_list, data);
105*15f0b8c3SMartin Matuska 
106*15f0b8c3SMartin Matuska 	if (flush ||
107*15f0b8c3SMartin Matuska 	    q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) {
108*15f0b8c3SMartin Matuska 		/* Append the enquing list to the shared list. */
109*15f0b8c3SMartin Matuska 		mutex_enter(&q->bq_lock);
110*15f0b8c3SMartin Matuska 		while (q->bq_size > q->bq_maxsize) {
111eda14cbcSMatt Macy 			cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
112eda14cbcSMatt Macy 		}
113*15f0b8c3SMartin Matuska 		q->bq_size += q->bq_enqueuing_size;
114*15f0b8c3SMartin Matuska 		list_move_tail(&q->bq_list, &q->bq_enqueuing_list);
115*15f0b8c3SMartin Matuska 		q->bq_enqueuing_size = 0;
116eda14cbcSMatt Macy 		cv_broadcast(&q->bq_pop_cv);
117eda14cbcSMatt Macy 		mutex_exit(&q->bq_lock);
118eda14cbcSMatt Macy 	}
119*15f0b8c3SMartin Matuska }
120eda14cbcSMatt Macy 
121eda14cbcSMatt Macy /*
122eda14cbcSMatt Macy  * Add data to q, consuming size units of capacity.  If there is insufficient
123eda14cbcSMatt Macy  * capacity to consume size units, block until capacity exists.  Asserts size is
124eda14cbcSMatt Macy  * > 0.
125eda14cbcSMatt Macy  */
126eda14cbcSMatt Macy void
bqueue_enqueue(bqueue_t * q,void * data,size_t item_size)127c7046f76SMartin Matuska bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
128eda14cbcSMatt Macy {
129eda14cbcSMatt Macy 	bqueue_enqueue_impl(q, data, item_size, B_FALSE);
130eda14cbcSMatt Macy }
131eda14cbcSMatt Macy 
132eda14cbcSMatt Macy /*
133eda14cbcSMatt Macy  * Enqueue an entry, and then flush the queue.  This forces the popping threads
134eda14cbcSMatt Macy  * to wake up, even if we're below the fill fraction.  We have this in a single
135eda14cbcSMatt Macy  * function, rather than having a separate call, because it prevents race
136*15f0b8c3SMartin Matuska  * conditions between the enqueuing thread and the dequeuing thread, where the
137*15f0b8c3SMartin Matuska  * enqueueing thread will wake up the dequeuing thread, that thread will
138eda14cbcSMatt Macy  * destroy the condvar before the enqueuing thread is done.
139eda14cbcSMatt Macy  */
140eda14cbcSMatt Macy void
bqueue_enqueue_flush(bqueue_t * q,void * data,size_t item_size)141c7046f76SMartin Matuska bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
142eda14cbcSMatt Macy {
143eda14cbcSMatt Macy 	bqueue_enqueue_impl(q, data, item_size, B_TRUE);
144eda14cbcSMatt Macy }
145eda14cbcSMatt Macy 
146eda14cbcSMatt Macy /*
147eda14cbcSMatt Macy  * Take the first element off of q.  If there are no elements on the queue, wait
148eda14cbcSMatt Macy  * until one is put there.  Return the removed element.
149eda14cbcSMatt Macy  */
150eda14cbcSMatt Macy void *
bqueue_dequeue(bqueue_t * q)151eda14cbcSMatt Macy bqueue_dequeue(bqueue_t *q)
152eda14cbcSMatt Macy {
153*15f0b8c3SMartin Matuska 	void *ret = list_remove_head(&q->bq_dequeuing_list);
154*15f0b8c3SMartin Matuska 	if (ret == NULL) {
155*15f0b8c3SMartin Matuska 		/*
156*15f0b8c3SMartin Matuska 		 * Dequeuing list is empty.  Wait for there to be something on
157*15f0b8c3SMartin Matuska 		 * the shared list, then move the entire shared list to the
158*15f0b8c3SMartin Matuska 		 * dequeuing list.
159*15f0b8c3SMartin Matuska 		 */
160eda14cbcSMatt Macy 		mutex_enter(&q->bq_lock);
161eda14cbcSMatt Macy 		while (q->bq_size == 0) {
162eda14cbcSMatt Macy 			cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
163eda14cbcSMatt Macy 		}
164*15f0b8c3SMartin Matuska 		ASSERT0(q->bq_dequeuing_size);
165*15f0b8c3SMartin Matuska 		ASSERT(list_is_empty(&q->bq_dequeuing_list));
166*15f0b8c3SMartin Matuska 		list_move_tail(&q->bq_dequeuing_list, &q->bq_list);
167*15f0b8c3SMartin Matuska 		q->bq_dequeuing_size = q->bq_size;
168*15f0b8c3SMartin Matuska 		q->bq_size = 0;
169*15f0b8c3SMartin Matuska 		cv_broadcast(&q->bq_add_cv);
170eda14cbcSMatt Macy 		mutex_exit(&q->bq_lock);
171*15f0b8c3SMartin Matuska 		ret = list_remove_head(&q->bq_dequeuing_list);
172eda14cbcSMatt Macy 	}
173*15f0b8c3SMartin Matuska 	q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size;
174*15f0b8c3SMartin Matuska 	return (ret);
175eda14cbcSMatt Macy }
176