xref: /netbsd-src/external/bsd/jemalloc/dist/test/unit/mpsc_queue.c (revision 4439cfd0acf9c7dc90625e5cd83b2317a9ab8967)
1 #include "test/jemalloc_test.h"
2 
3 #include "jemalloc/internal/mpsc_queue.h"
4 
5 typedef struct elem_s elem_t;
6 typedef ql_head(elem_t) elem_list_t;
7 typedef mpsc_queue(elem_t) elem_mpsc_queue_t;
8 struct elem_s {
9 	int thread;
10 	int idx;
11 	ql_elm(elem_t) link;
12 };
13 
14 /* Include both proto and gen to make sure they match up. */
15 mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
16     elem_list_t);
17 mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
18     elem_list_t, link);
19 
20 static void
21 init_elems_simple(elem_t *elems, int nelems, int thread) {
22 	for (int i = 0; i < nelems; i++) {
23 		elems[i].thread = thread;
24 		elems[i].idx = i;
25 		ql_elm_new(&elems[i], link);
26 	}
27 }
28 
29 static void
30 check_elems_simple(elem_list_t *list, int nelems, int thread) {
31 	elem_t *elem;
32 	int next_idx = 0;
33 	ql_foreach(elem, list, link) {
34 		expect_d_lt(next_idx, nelems, "Too many list items");
35 		expect_d_eq(thread, elem->thread, "");
36 		expect_d_eq(next_idx, elem->idx, "List out of order");
37 		next_idx++;
38 	}
39 }
40 
41 TEST_BEGIN(test_simple) {
42 	enum {NELEMS = 10};
43 	elem_t elems[NELEMS];
44 	elem_list_t list;
45 	elem_mpsc_queue_t queue;
46 
47 	/* Pop empty queue onto empty list -> empty list */
48 	ql_new(&list);
49 	elem_mpsc_queue_new(&queue);
50 	elem_mpsc_queue_pop_batch(&queue, &list);
51 	expect_true(ql_empty(&list), "");
52 
53 	/* Pop empty queue onto nonempty list -> list unchanged */
54 	ql_new(&list);
55 	elem_mpsc_queue_new(&queue);
56 	init_elems_simple(elems, NELEMS, 0);
57 	for (int i = 0; i < NELEMS; i++) {
58 		ql_tail_insert(&list, &elems[i], link);
59 	}
60 	elem_mpsc_queue_pop_batch(&queue, &list);
61 	check_elems_simple(&list, NELEMS, 0);
62 
63 	/* Pop nonempty queue onto empty list -> list takes queue contents */
64 	ql_new(&list);
65 	elem_mpsc_queue_new(&queue);
66 	init_elems_simple(elems, NELEMS, 0);
67 	for (int i = 0; i < NELEMS; i++) {
68 		elem_mpsc_queue_push(&queue, &elems[i]);
69 	}
70 	elem_mpsc_queue_pop_batch(&queue, &list);
71 	check_elems_simple(&list, NELEMS, 0);
72 
73 	/* Pop nonempty queue onto nonempty list -> list gains queue contents */
74 	ql_new(&list);
75 	elem_mpsc_queue_new(&queue);
76 	init_elems_simple(elems, NELEMS, 0);
77 	for (int i = 0; i < NELEMS / 2; i++) {
78 		ql_tail_insert(&list, &elems[i], link);
79 	}
80 	for (int i = NELEMS / 2; i < NELEMS; i++) {
81 		elem_mpsc_queue_push(&queue, &elems[i]);
82 	}
83 	elem_mpsc_queue_pop_batch(&queue, &list);
84 	check_elems_simple(&list, NELEMS, 0);
85 
86 }
87 TEST_END
88 
89 TEST_BEGIN(test_push_single_or_batch) {
90 	enum {
91 		BATCH_MAX = 10,
92 		/*
93 		 * We'll push i items one-at-a-time, then i items as a batch,
94 		 * then i items as a batch again, as i ranges from 1 to
95 		 * BATCH_MAX.  So we need 3 times the sum of the numbers from 1
96 		 * to BATCH_MAX elements total.
97 		 */
98 		NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2
99 	};
100 	elem_t elems[NELEMS];
101 	init_elems_simple(elems, NELEMS, 0);
102 	elem_list_t list;
103 	ql_new(&list);
104 	elem_mpsc_queue_t queue;
105 	elem_mpsc_queue_new(&queue);
106 	int next_idx = 0;
107 	for (int i = 1; i < 10; i++) {
108 		/* Push i items 1 at a time. */
109 		for (int j = 0; j < i; j++) {
110 			elem_mpsc_queue_push(&queue, &elems[next_idx]);
111 			next_idx++;
112 		}
113 		/* Push i items in batch. */
114 		for (int j = 0; j < i; j++) {
115 			ql_tail_insert(&list, &elems[next_idx], link);
116 			next_idx++;
117 		}
118 		elem_mpsc_queue_push_batch(&queue, &list);
119 		expect_true(ql_empty(&list), "Batch push should empty source");
120 		/*
121 		 * Push i items in batch, again.  This tests two batches
122 		 * proceeding one after the other.
123 		 */
124 		for (int j = 0; j < i; j++) {
125 			ql_tail_insert(&list, &elems[next_idx], link);
126 			next_idx++;
127 		}
128 		elem_mpsc_queue_push_batch(&queue, &list);
129 		expect_true(ql_empty(&list), "Batch push should empty source");
130 	}
131 	expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push.");
132 
133 	expect_true(ql_empty(&list), "");
134 	elem_mpsc_queue_pop_batch(&queue, &list);
135 	check_elems_simple(&list, NELEMS, 0);
136 }
137 TEST_END
138 
139 TEST_BEGIN(test_multi_op) {
140 	enum {NELEMS = 20};
141 	elem_t elems[NELEMS];
142 	init_elems_simple(elems, NELEMS, 0);
143 	elem_list_t push_list;
144 	ql_new(&push_list);
145 	elem_list_t result_list;
146 	ql_new(&result_list);
147 	elem_mpsc_queue_t queue;
148 	elem_mpsc_queue_new(&queue);
149 
150 	int next_idx = 0;
151 	/* Push first quarter 1-at-a-time. */
152 	for (int i = 0; i < NELEMS / 4; i++) {
153 		elem_mpsc_queue_push(&queue, &elems[next_idx]);
154 		next_idx++;
155 	}
156 	/* Push second quarter in batch. */
157 	for (int i = NELEMS / 4; i < NELEMS / 2; i++) {
158 		ql_tail_insert(&push_list, &elems[next_idx], link);
159 		next_idx++;
160 	}
161 	elem_mpsc_queue_push_batch(&queue, &push_list);
162 	/* Batch pop all pushed elements. */
163 	elem_mpsc_queue_pop_batch(&queue, &result_list);
164 	/* Push third quarter in batch. */
165 	for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) {
166 		ql_tail_insert(&push_list, &elems[next_idx], link);
167 		next_idx++;
168 	}
169 	elem_mpsc_queue_push_batch(&queue, &push_list);
170 	/* Push last quarter one-at-a-time. */
171 	for (int i = 3 * NELEMS / 4; i < NELEMS; i++) {
172 		elem_mpsc_queue_push(&queue, &elems[next_idx]);
173 		next_idx++;
174 	}
175 	/* Pop them again.  Order of existing list should be preserved. */
176 	elem_mpsc_queue_pop_batch(&queue, &result_list);
177 
178 	check_elems_simple(&result_list, NELEMS, 0);
179 
180 }
181 TEST_END
182 
183 typedef struct pusher_arg_s pusher_arg_t;
184 struct pusher_arg_s {
185 	elem_mpsc_queue_t *queue;
186 	int thread;
187 	elem_t *elems;
188 	int nelems;
189 };
190 
191 typedef struct popper_arg_s popper_arg_t;
192 struct popper_arg_s {
193 	elem_mpsc_queue_t *queue;
194 	int npushers;
195 	int nelems_per_pusher;
196 	int *pusher_counts;
197 };
198 
199 static void *
200 thd_pusher(void *void_arg) {
201 	pusher_arg_t *arg = (pusher_arg_t *)void_arg;
202 	int next_idx = 0;
203 	while (next_idx < arg->nelems) {
204 		/* Push 10 items in batch. */
205 		elem_list_t list;
206 		ql_new(&list);
207 		int limit = next_idx + 10;
208 		while (next_idx < arg->nelems && next_idx < limit) {
209 			ql_tail_insert(&list, &arg->elems[next_idx], link);
210 			next_idx++;
211 		}
212 		elem_mpsc_queue_push_batch(arg->queue, &list);
213 		/* Push 10 items one-at-a-time. */
214 		limit = next_idx + 10;
215 		while (next_idx < arg->nelems && next_idx < limit) {
216 			elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]);
217 			next_idx++;
218 		}
219 
220 	}
221 	return NULL;
222 }
223 
224 static void *
225 thd_popper(void *void_arg) {
226 	popper_arg_t *arg = (popper_arg_t *)void_arg;
227 	int done_pushers = 0;
228 	while (done_pushers < arg->npushers) {
229 		elem_list_t list;
230 		ql_new(&list);
231 		elem_mpsc_queue_pop_batch(arg->queue, &list);
232 		elem_t *elem;
233 		ql_foreach(elem, &list, link) {
234 			int thread = elem->thread;
235 			int idx = elem->idx;
236 			expect_d_eq(arg->pusher_counts[thread], idx,
237 			    "Thread's pushes reordered");
238 			arg->pusher_counts[thread]++;
239 			if (arg->pusher_counts[thread]
240 			    == arg->nelems_per_pusher) {
241 				done_pushers++;
242 			}
243 		}
244 	}
245 	return NULL;
246 }
247 
248 TEST_BEGIN(test_multiple_threads) {
249 	enum {
250 		NPUSHERS = 4,
251 		NELEMS_PER_PUSHER = 1000*1000,
252 	};
253 	thd_t pushers[NPUSHERS];
254 	pusher_arg_t pusher_arg[NPUSHERS];
255 
256 	thd_t popper;
257 	popper_arg_t popper_arg;
258 
259 	elem_mpsc_queue_t queue;
260 	elem_mpsc_queue_new(&queue);
261 
262 	elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t));
263 	elem_t *elem_iter = elems;
264 	for (int i = 0; i < NPUSHERS; i++) {
265 		pusher_arg[i].queue = &queue;
266 		pusher_arg[i].thread = i;
267 		pusher_arg[i].elems = elem_iter;
268 		pusher_arg[i].nelems = NELEMS_PER_PUSHER;
269 
270 		init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i);
271 		elem_iter += NELEMS_PER_PUSHER;
272 	}
273 	popper_arg.queue = &queue;
274 	popper_arg.npushers = NPUSHERS;
275 	popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER;
276 	int pusher_counts[NPUSHERS] = {0};
277 	popper_arg.pusher_counts = pusher_counts;
278 
279 	thd_create(&popper, thd_popper, (void *)&popper_arg);
280 	for (int i = 0; i < NPUSHERS; i++) {
281 		thd_create(&pushers[i], thd_pusher, &pusher_arg[i]);
282 	}
283 
284 	thd_join(popper, NULL);
285 	for (int i = 0; i < NPUSHERS; i++) {
286 		thd_join(pushers[i], NULL);
287 	}
288 
289 	for (int i = 0; i < NPUSHERS; i++) {
290 		expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], "");
291 	}
292 
293 	free(elems);
294 }
295 TEST_END
296 
297 int
298 main(void) {
299 	return test_no_reentrancy(
300 	    test_simple,
301 	    test_push_single_or_batch,
302 	    test_multi_op,
303 	    test_multiple_threads);
304 }
305