xref: /spdk/test/unit/lib/thread/iobuf.c/iobuf_ut.c (revision 95d6c9fac17572b107042103439aafd696d60b0e)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation. All rights reserved.
3  */
4 
5 #include "spdk_internal/cunit.h"
6 
7 #include "common/lib/ut_multithread.c"
8 #include "unit/lib/json_mock.c"
9 
10 #include "spdk/config.h"
11 #include "spdk/thread.h"
12 
13 #include "thread/iobuf.c"
14 
15 struct ut_iobuf_entry {
16 	struct spdk_iobuf_channel	*ioch;
17 	struct spdk_iobuf_entry		iobuf;
18 	void				*buf;
19 	void				*buf2;
20 	uint32_t			thread_id;
21 	const char			*module;
22 };
23 
24 static void
25 ut_iobuf_finish_cb(void *ctx)
26 {
27 	*(int *)ctx = 1;
28 }
29 
30 static void
31 ut_iobuf_get_buf_cb(struct spdk_iobuf_entry *entry, void *buf)
32 {
33 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
34 
35 	ut_entry->buf = buf;
36 }
37 
38 static int
39 ut_iobuf_foreach_cb(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, void *cb_arg)
40 {
41 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
42 
43 	ut_entry->buf = cb_arg;
44 
45 	return 0;
46 }
47 
48 #define SMALL_BUFSIZE 4096
49 #define LARGE_BUFSIZE 8192
50 
51 static void
52 iobuf(void)
53 {
54 	struct spdk_iobuf_opts opts = {
55 		.small_pool_count = 2,
56 		.large_pool_count = 2,
57 		.small_bufsize = SMALL_BUFSIZE,
58 		.large_bufsize = LARGE_BUFSIZE,
59 	};
60 	struct ut_iobuf_entry *entry;
61 	struct spdk_iobuf_channel mod0_ch[2], mod1_ch[2];
62 	struct ut_iobuf_entry mod0_entries[] = {
63 		{ .thread_id = 0, .module = "ut_module0", },
64 		{ .thread_id = 0, .module = "ut_module0", },
65 		{ .thread_id = 0, .module = "ut_module0", },
66 		{ .thread_id = 0, .module = "ut_module0", },
67 		{ .thread_id = 1, .module = "ut_module0", },
68 		{ .thread_id = 1, .module = "ut_module0", },
69 		{ .thread_id = 1, .module = "ut_module0", },
70 		{ .thread_id = 1, .module = "ut_module0", },
71 	};
72 	struct ut_iobuf_entry mod1_entries[] = {
73 		{ .thread_id = 0, .module = "ut_module1", },
74 		{ .thread_id = 0, .module = "ut_module1", },
75 		{ .thread_id = 0, .module = "ut_module1", },
76 		{ .thread_id = 0, .module = "ut_module1", },
77 		{ .thread_id = 1, .module = "ut_module1", },
78 		{ .thread_id = 1, .module = "ut_module1", },
79 		{ .thread_id = 1, .module = "ut_module1", },
80 		{ .thread_id = 1, .module = "ut_module1", },
81 	};
82 	int rc, finish = 0;
83 	uint32_t i;
84 
85 	allocate_cores(2);
86 	allocate_threads(2);
87 
88 	set_thread(0);
89 
90 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
91 	g_iobuf.opts = opts;
92 	rc = spdk_iobuf_initialize();
93 	CU_ASSERT_EQUAL(rc, 0);
94 
95 	rc = spdk_iobuf_register_module("ut_module0");
96 	CU_ASSERT_EQUAL(rc, 0);
97 
98 	rc = spdk_iobuf_register_module("ut_module1");
99 	CU_ASSERT_EQUAL(rc, 0);
100 
101 	set_thread(0);
102 	rc = spdk_iobuf_channel_init(&mod0_ch[0], "ut_module0", 0, 0);
103 	CU_ASSERT_EQUAL(rc, 0);
104 	set_thread(1);
105 	rc = spdk_iobuf_channel_init(&mod0_ch[1], "ut_module0", 0, 0);
106 	CU_ASSERT_EQUAL(rc, 0);
107 	for (i = 0; i < SPDK_COUNTOF(mod0_entries); ++i) {
108 		mod0_entries[i].ioch = &mod0_ch[mod0_entries[i].thread_id];
109 	}
110 	set_thread(0);
111 	rc = spdk_iobuf_channel_init(&mod1_ch[0], "ut_module1", 0, 0);
112 	CU_ASSERT_EQUAL(rc, 0);
113 	set_thread(1);
114 	rc = spdk_iobuf_channel_init(&mod1_ch[1], "ut_module1", 0, 0);
115 	CU_ASSERT_EQUAL(rc, 0);
116 	for (i = 0; i < SPDK_COUNTOF(mod1_entries); ++i) {
117 		mod1_entries[i].ioch = &mod1_ch[mod1_entries[i].thread_id];
118 	}
119 
120 	/* First check that it's possible to retrieve the whole pools from a single module */
121 	set_thread(0);
122 	entry = &mod0_entries[0];
123 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
124 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
125 	entry = &mod0_entries[1];
126 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
127 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
128 	/* The next two should be put onto the large buf wait queue */
129 	entry = &mod0_entries[2];
130 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
131 	CU_ASSERT_PTR_NULL(entry->buf);
132 	entry = &mod0_entries[3];
133 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
134 	CU_ASSERT_PTR_NULL(entry->buf);
135 	/* Pick the two next buffers from the small pool */
136 	set_thread(1);
137 	entry = &mod0_entries[4];
138 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
139 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
140 	entry = &mod0_entries[5];
141 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
142 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
143 	/* The next two should be put onto the small buf wait queue */
144 	entry = &mod0_entries[6];
145 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
146 	CU_ASSERT_PTR_NULL(entry->buf);
147 	entry = &mod0_entries[7];
148 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
149 	CU_ASSERT_PTR_NULL(entry->buf);
150 
151 	/* Now return one of the large buffers to the pool and verify that the first request's
152 	 * (entry 2) callback was executed and it was removed from the wait queue.
153 	 */
154 	set_thread(0);
155 	entry = &mod0_entries[0];
156 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
157 	entry = &mod0_entries[2];
158 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
159 	entry = &mod0_entries[3];
160 	CU_ASSERT_PTR_NULL(entry->buf);
161 
162 	/* Return the second buffer and check that the other request is satisfied */
163 	entry = &mod0_entries[1];
164 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
165 	entry = &mod0_entries[3];
166 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
167 
168 	/* Return the remaining two buffers */
169 	entry = &mod0_entries[2];
170 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
171 	entry = &mod0_entries[3];
172 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
173 
174 	/* Check that it didn't change the requests waiting for the small buffers */
175 	entry = &mod0_entries[6];
176 	CU_ASSERT_PTR_NULL(entry->buf);
177 	entry = &mod0_entries[7];
178 	CU_ASSERT_PTR_NULL(entry->buf);
179 
180 	/* Do the same test as above, this time using the small pool */
181 	set_thread(1);
182 	entry = &mod0_entries[4];
183 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
184 	entry = &mod0_entries[6];
185 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
186 	entry = &mod0_entries[7];
187 	CU_ASSERT_PTR_NULL(entry->buf);
188 
189 	/* Return the second buffer and check that the other request is satisfied */
190 	entry = &mod0_entries[5];
191 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
192 	entry = &mod0_entries[7];
193 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
194 
195 	/* Return the remaining two buffers */
196 	entry = &mod0_entries[6];
197 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
198 	entry = &mod0_entries[7];
199 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
200 
201 	/* Now check requesting buffers from different modules - first request all of them from one
202 	 * module, starting from the large pool
203 	 */
204 	set_thread(0);
205 	entry = &mod0_entries[0];
206 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
207 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
208 	entry = &mod0_entries[1];
209 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
210 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
211 	/* Request all of them from the small one */
212 	set_thread(1);
213 	entry = &mod0_entries[4];
214 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
215 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
216 	entry = &mod0_entries[5];
217 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
218 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
219 
220 	/* Request one buffer per module from each pool  */
221 	set_thread(0);
222 	entry = &mod1_entries[0];
223 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
224 	CU_ASSERT_PTR_NULL(entry->buf);
225 	entry = &mod0_entries[3];
226 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
227 	CU_ASSERT_PTR_NULL(entry->buf);
228 	/* Change the order from the small pool and request a buffer from mod0 first */
229 	set_thread(1);
230 	entry = &mod0_entries[6];
231 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
232 	CU_ASSERT_PTR_NULL(entry->buf);
233 	entry = &mod1_entries[4];
234 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
235 	CU_ASSERT_PTR_NULL(entry->buf);
236 
237 	/* Now return one buffer to the large pool */
238 	set_thread(0);
239 	entry = &mod0_entries[0];
240 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
241 
242 	/* Make sure the request from mod1 got the buffer, as it was the first to request it */
243 	entry = &mod1_entries[0];
244 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
245 	entry = &mod0_entries[3];
246 	CU_ASSERT_PTR_NULL(entry->buf);
247 
248 	/* Return second buffer to the large pool and check the outstanding mod0 request */
249 	entry = &mod0_entries[1];
250 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
251 	entry = &mod0_entries[3];
252 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
253 
254 	/* Return the remaining two buffers */
255 	entry = &mod1_entries[0];
256 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
257 	entry = &mod0_entries[3];
258 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
259 
260 	/* Check the same for the small pool, but this time the order of the request is reversed
261 	 * (mod0 before mod1)
262 	 */
263 	set_thread(1);
264 	entry = &mod0_entries[4];
265 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
266 	entry = &mod0_entries[6];
267 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
268 	/* mod1 request was second in this case, so it still needs to wait */
269 	entry = &mod1_entries[4];
270 	CU_ASSERT_PTR_NULL(entry->buf);
271 
272 	/* Return the second requested buffer */
273 	entry = &mod0_entries[5];
274 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
275 	entry = &mod1_entries[4];
276 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
277 
278 	/* Return the remaining two buffers */
279 	entry = &mod0_entries[6];
280 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
281 	entry = &mod1_entries[4];
282 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
283 
284 	/* Request buffers to make the pools empty */
285 	set_thread(0);
286 	entry = &mod0_entries[0];
287 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
288 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
289 	entry = &mod1_entries[0];
290 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
291 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
292 	entry = &mod0_entries[1];
293 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
294 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
295 	entry = &mod1_entries[1];
296 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
297 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
298 
299 	/* Queue more requests from both modules */
300 	entry = &mod0_entries[2];
301 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
302 	CU_ASSERT_PTR_NULL(entry->buf);
303 	entry = &mod1_entries[2];
304 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
305 	CU_ASSERT_PTR_NULL(entry->buf);
306 	entry = &mod1_entries[3];
307 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
308 	CU_ASSERT_PTR_NULL(entry->buf);
309 	entry = &mod0_entries[3];
310 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
311 	CU_ASSERT_PTR_NULL(entry->buf);
312 
313 	/* Check that abort correctly remove an entry from the queue */
314 	entry = &mod0_entries[2];
315 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
316 	entry = &mod1_entries[3];
317 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
318 
319 	entry = &mod0_entries[0];
320 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
321 	CU_ASSERT_PTR_NOT_NULL(mod1_entries[2].buf);
322 	entry = &mod0_entries[1];
323 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
324 	CU_ASSERT_PTR_NOT_NULL(mod0_entries[3].buf);
325 
326 	/* Clean up */
327 	entry = &mod1_entries[0];
328 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
329 	entry = &mod1_entries[2];
330 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
331 	entry = &mod1_entries[1];
332 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
333 	entry = &mod0_entries[3];
334 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
335 
336 	/* Request buffers to make the pools empty */
337 	set_thread(0);
338 	entry = &mod0_entries[0];
339 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
340 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
341 	entry = &mod1_entries[0];
342 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
343 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
344 	entry = &mod0_entries[1];
345 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
346 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
347 	entry = &mod1_entries[1];
348 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
349 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
350 
351 	/* Request a buffer from each queue and each module on thread 0 */
352 	set_thread(0);
353 	entry = &mod0_entries[2];
354 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
355 	CU_ASSERT_PTR_NULL(entry->buf);
356 	entry = &mod1_entries[2];
357 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
358 	CU_ASSERT_PTR_NULL(entry->buf);
359 	entry = &mod0_entries[3];
360 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
361 	CU_ASSERT_PTR_NULL(entry->buf);
362 	entry = &mod1_entries[3];
363 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
364 	CU_ASSERT_PTR_NULL(entry->buf);
365 
366 	/* Do the same on thread 1 */
367 	set_thread(1);
368 	entry = &mod0_entries[6];
369 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
370 	CU_ASSERT_PTR_NULL(entry->buf);
371 	entry = &mod1_entries[6];
372 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
373 	CU_ASSERT_PTR_NULL(entry->buf);
374 	entry = &mod0_entries[7];
375 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
376 	CU_ASSERT_PTR_NULL(entry->buf);
377 	entry = &mod1_entries[7];
378 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
379 	CU_ASSERT_PTR_NULL(entry->buf);
380 
381 	/* Now do the foreach and check that correct entries are iterated over by assigning their
382 	 * ->buf pointers to different values.
383 	 */
384 	set_thread(0);
385 	rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].large,
386 				       ut_iobuf_foreach_cb, (void *)0xdeadbeef);
387 	CU_ASSERT_EQUAL(rc, 0);
388 	rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].small,
389 				       ut_iobuf_foreach_cb, (void *)0xbeefdead);
390 	CU_ASSERT_EQUAL(rc, 0);
391 	rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].large,
392 				       ut_iobuf_foreach_cb, (void *)0xfeedbeef);
393 	CU_ASSERT_EQUAL(rc, 0);
394 	rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].small,
395 				       ut_iobuf_foreach_cb, (void *)0xbeeffeed);
396 	CU_ASSERT_EQUAL(rc, 0);
397 	set_thread(1);
398 	rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].large,
399 				       ut_iobuf_foreach_cb, (void *)0xcafebabe);
400 	CU_ASSERT_EQUAL(rc, 0);
401 	rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].small,
402 				       ut_iobuf_foreach_cb, (void *)0xbabecafe);
403 	CU_ASSERT_EQUAL(rc, 0);
404 	rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].large,
405 				       ut_iobuf_foreach_cb, (void *)0xbeefcafe);
406 	CU_ASSERT_EQUAL(rc, 0);
407 	rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].small,
408 				       ut_iobuf_foreach_cb, (void *)0xcafebeef);
409 	CU_ASSERT_EQUAL(rc, 0);
410 
411 	/* thread 0 */
412 	CU_ASSERT_PTR_EQUAL(mod0_entries[2].buf, (void *)0xdeadbeef);
413 	CU_ASSERT_PTR_EQUAL(mod0_entries[3].buf, (void *)0xbeefdead);
414 	CU_ASSERT_PTR_EQUAL(mod1_entries[2].buf, (void *)0xfeedbeef);
415 	CU_ASSERT_PTR_EQUAL(mod1_entries[3].buf, (void *)0xbeeffeed);
416 	/* thread 1 */
417 	CU_ASSERT_PTR_EQUAL(mod0_entries[6].buf, (void *)0xcafebabe);
418 	CU_ASSERT_PTR_EQUAL(mod0_entries[7].buf, (void *)0xbabecafe);
419 	CU_ASSERT_PTR_EQUAL(mod1_entries[6].buf, (void *)0xbeefcafe);
420 	CU_ASSERT_PTR_EQUAL(mod1_entries[7].buf, (void *)0xcafebeef);
421 
422 	/* Clean everything up */
423 	set_thread(0);
424 	entry = &mod0_entries[2];
425 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
426 	entry = &mod0_entries[3];
427 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
428 	entry = &mod1_entries[2];
429 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
430 	entry = &mod1_entries[3];
431 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
432 
433 	entry = &mod0_entries[0];
434 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
435 	entry = &mod1_entries[0];
436 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
437 	entry = &mod0_entries[1];
438 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
439 	entry = &mod1_entries[1];
440 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
441 
442 	set_thread(1);
443 	entry = &mod0_entries[6];
444 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
445 	entry = &mod0_entries[7];
446 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
447 	entry = &mod1_entries[6];
448 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
449 	entry = &mod1_entries[7];
450 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
451 
452 	set_thread(0);
453 	spdk_iobuf_channel_fini(&mod0_ch[0]);
454 	poll_threads();
455 	spdk_iobuf_channel_fini(&mod1_ch[0]);
456 	poll_threads();
457 	set_thread(1);
458 	spdk_iobuf_channel_fini(&mod0_ch[1]);
459 	poll_threads();
460 	spdk_iobuf_channel_fini(&mod1_ch[1]);
461 	poll_threads();
462 
463 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
464 	poll_threads();
465 
466 	CU_ASSERT_EQUAL(finish, 1);
467 
468 	free_threads();
469 	free_cores();
470 }
471 
472 static void
473 iobuf_cache(void)
474 {
475 	struct spdk_iobuf_opts opts = {
476 		.small_pool_count = 4,
477 		.large_pool_count = 4,
478 		.small_bufsize = SMALL_BUFSIZE,
479 		.large_bufsize = LARGE_BUFSIZE,
480 	};
481 	struct spdk_iobuf_channel iobuf_ch[2];
482 	struct ut_iobuf_entry *entry;
483 	struct ut_iobuf_entry mod0_entries[] = {
484 		{ .thread_id = 0, .module = "ut_module0", },
485 		{ .thread_id = 0, .module = "ut_module0", },
486 		{ .thread_id = 0, .module = "ut_module0", },
487 		{ .thread_id = 0, .module = "ut_module0", },
488 	};
489 	struct ut_iobuf_entry mod1_entries[] = {
490 		{ .thread_id = 0, .module = "ut_module1", },
491 		{ .thread_id = 0, .module = "ut_module1", },
492 	};
493 	int rc, finish = 0;
494 	uint32_t i, j, bufsize;
495 
496 	allocate_cores(1);
497 	allocate_threads(1);
498 
499 	set_thread(0);
500 
501 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
502 	g_iobuf.opts = opts;
503 	rc = spdk_iobuf_initialize();
504 	CU_ASSERT_EQUAL(rc, 0);
505 
506 	rc = spdk_iobuf_register_module("ut_module0");
507 	CU_ASSERT_EQUAL(rc, 0);
508 
509 	rc = spdk_iobuf_register_module("ut_module1");
510 	CU_ASSERT_EQUAL(rc, 0);
511 
512 	/* First check that channel initialization fails when it's not possible to fill in the cache
513 	 * from the pool.
514 	 */
515 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 5, 1);
516 	CU_ASSERT_EQUAL(rc, -ENOMEM);
517 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 1, 5);
518 	CU_ASSERT_EQUAL(rc, -ENOMEM);
519 
520 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 4, 4);
521 	CU_ASSERT_EQUAL(rc, 0);
522 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 4, 4);
523 	CU_ASSERT_EQUAL(rc, -ENOMEM);
524 
525 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
526 	poll_threads();
527 
528 	/* Initialize one channel with cache, acquire buffers, and check that a second one can be
529 	 * created once the buffers acquired from the first one are returned to the pool
530 	 */
531 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
532 	CU_ASSERT_EQUAL(rc, 0);
533 
534 	for (i = 0; i < 3; ++i) {
535 		mod0_entries[i].buf = spdk_iobuf_get(&iobuf_ch[0], LARGE_BUFSIZE, &mod0_entries[i].iobuf,
536 						     ut_iobuf_get_buf_cb);
537 		CU_ASSERT_PTR_NOT_NULL(mod0_entries[i].buf);
538 	}
539 
540 	/* The channels can be temporarily greedy, holding more buffers than their configured cache
541 	 * size. We can only guarantee that we can create a channel if all outstanding buffers
542 	 * have been returned. */
543 	for (i = 0; i < 3; ++i) {
544 		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[i].buf, LARGE_BUFSIZE);
545 	}
546 
547 	/* The last buffer should be released back to the pool, so we should be able to create a new
548 	 * channel
549 	 */
550 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
551 	CU_ASSERT_EQUAL(rc, 0);
552 
553 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
554 	spdk_iobuf_channel_fini(&iobuf_ch[1]);
555 	poll_threads();
556 
557 	/* Check that the pool is only used when the cache is empty and that the cache guarantees a
558 	 * certain set of buffers
559 	 */
560 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
561 	CU_ASSERT_EQUAL(rc, 0);
562 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 1, 1);
563 	CU_ASSERT_EQUAL(rc, 0);
564 
565 	uint32_t buffer_sizes[] = { SMALL_BUFSIZE, LARGE_BUFSIZE };
566 	for (i = 0; i < SPDK_COUNTOF(buffer_sizes); ++i) {
567 		bufsize = buffer_sizes[i];
568 
569 		for (j = 0; j < 3; ++j) {
570 			entry = &mod0_entries[j];
571 			entry->buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &entry->iobuf,
572 						    ut_iobuf_get_buf_cb);
573 			CU_ASSERT_PTR_NOT_NULL(entry->buf);
574 		}
575 
576 		mod1_entries[0].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[0].iobuf,
577 						     ut_iobuf_get_buf_cb);
578 		CU_ASSERT_PTR_NOT_NULL(mod1_entries[0].buf);
579 
580 		/* The whole pool is exhausted now */
581 		mod1_entries[1].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[1].iobuf,
582 						     ut_iobuf_get_buf_cb);
583 		CU_ASSERT_PTR_NULL(mod1_entries[1].buf);
584 		mod0_entries[3].buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &mod0_entries[3].iobuf,
585 						     ut_iobuf_get_buf_cb);
586 		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
587 
588 		/* If there are outstanding requests waiting for a buffer, they should have priority
589 		 * over filling in the cache, even if they're from different modules.
590 		 */
591 		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, bufsize);
592 		/* Also make sure the queue is FIFO and doesn't care about which module requested
593 		 * and which module released the buffer.
594 		 */
595 		CU_ASSERT_PTR_NOT_NULL(mod1_entries[1].buf);
596 		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
597 
598 		/* Return the buffers back */
599 		spdk_iobuf_entry_abort(&iobuf_ch[0], &mod0_entries[3].iobuf, bufsize);
600 		for (j = 0; j < 2; ++j) {
601 			spdk_iobuf_put(&iobuf_ch[0], mod0_entries[j].buf, bufsize);
602 			spdk_iobuf_put(&iobuf_ch[1], mod1_entries[j].buf, bufsize);
603 		}
604 	}
605 
606 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
607 	spdk_iobuf_channel_fini(&iobuf_ch[1]);
608 	poll_threads();
609 
610 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
611 	poll_threads();
612 
613 	CU_ASSERT_EQUAL(finish, 1);
614 
615 	free_threads();
616 	free_cores();
617 }
618 
619 static void
620 ut_iobuf_get_buf2_cb(struct spdk_iobuf_entry *entry, void *buf)
621 {
622 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
623 
624 	CU_ASSERT_PTR_NOT_NULL(ut_entry->buf);
625 	CU_ASSERT_PTR_NULL(ut_entry->buf2);
626 
627 	ut_entry->buf2 = buf;
628 }
629 
630 static void
631 ut_iobuf_get_buf1_cb(struct spdk_iobuf_entry *entry, void *buf)
632 {
633 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
634 	void *buf2;
635 
636 	CU_ASSERT_PTR_NULL(ut_entry->buf);
637 	CU_ASSERT_PTR_NULL(ut_entry->buf2);
638 	ut_entry->buf = buf;
639 
640 	buf2 = spdk_iobuf_get(ut_entry->ioch, SMALL_BUFSIZE, &ut_entry->iobuf,
641 			      ut_iobuf_get_buf2_cb);
642 	CU_ASSERT_PTR_NULL(buf2);
643 }
644 
645 static void
646 iobuf_priority(void)
647 {
648 	struct spdk_iobuf_opts opts = {
649 		.small_pool_count = 2,
650 		.large_pool_count = 2,
651 		.small_bufsize = SMALL_BUFSIZE,
652 		.large_bufsize = LARGE_BUFSIZE,
653 	};
654 	struct ut_iobuf_entry entries[4] = {};
655 	struct spdk_iobuf_channel iobuf_ch;
656 	int rc, finish = 0;
657 	uint32_t i;
658 
659 	allocate_cores(1);
660 	allocate_threads(1);
661 
662 	set_thread(0);
663 
664 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
665 	g_iobuf.opts = opts;
666 	rc = spdk_iobuf_initialize();
667 	CU_ASSERT_EQUAL(rc, 0);
668 
669 	rc = spdk_iobuf_register_module("ut_module");
670 	CU_ASSERT_EQUAL(rc, 0);
671 	rc = spdk_iobuf_channel_init(&iobuf_ch, "ut_module", 0, 0);
672 	CU_ASSERT_EQUAL(rc, 0);
673 
674 	for (i = 0; i < SPDK_COUNTOF(entries); ++i) {
675 		entries[i].ioch = &iobuf_ch;
676 	}
677 
678 	/* Check that requests for an iobuf called from within the iobuf_get_cb are prioritized */
679 	entries[0].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, NULL, NULL);
680 	CU_ASSERT_PTR_NOT_NULL(entries[0].buf);
681 	entries[1].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, NULL, NULL);
682 	CU_ASSERT_PTR_NOT_NULL(entries[1].buf);
683 
684 	/* Try to acquire two iobufs twice */
685 	entries[2].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, &entries[2].iobuf,
686 					ut_iobuf_get_buf1_cb);
687 	CU_ASSERT_PTR_NULL(entries[2].buf);
688 	entries[3].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, &entries[3].iobuf,
689 					ut_iobuf_get_buf1_cb);
690 	CU_ASSERT_PTR_NULL(entries[3].buf);
691 
692 	/* Return one of the iobufs - the first entry on the wait queue should get it */
693 	spdk_iobuf_put(&iobuf_ch, entries[0].buf, SMALL_BUFSIZE);
694 	CU_ASSERT_PTR_NOT_NULL(entries[2].buf);
695 	CU_ASSERT_PTR_NULL(entries[3].buf);
696 
697 	/* Return the second one, this time the same entry should get it, because it requested
698 	 * inside its iobuf_get_cb */
699 	spdk_iobuf_put(&iobuf_ch, entries[1].buf, SMALL_BUFSIZE);
700 	CU_ASSERT_PTR_NOT_NULL(entries[2].buf2);
701 	CU_ASSERT_PTR_NULL(entries[3].buf);
702 
703 	/* Release it again, now the last entry should finally get it */
704 	spdk_iobuf_put(&iobuf_ch, entries[2].buf, SMALL_BUFSIZE);
705 	spdk_iobuf_put(&iobuf_ch, entries[2].buf2, SMALL_BUFSIZE);
706 	CU_ASSERT_PTR_NOT_NULL(entries[3].buf);
707 	CU_ASSERT_PTR_NOT_NULL(entries[3].buf2);
708 	spdk_iobuf_put(&iobuf_ch, entries[3].buf, SMALL_BUFSIZE);
709 	spdk_iobuf_put(&iobuf_ch, entries[3].buf2, SMALL_BUFSIZE);
710 
711 	spdk_iobuf_channel_fini(&iobuf_ch);
712 	poll_threads();
713 
714 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
715 	poll_threads();
716 
717 	CU_ASSERT_EQUAL(finish, 1);
718 
719 	free_threads();
720 	free_cores();
721 }
722 
723 int
724 main(int argc, char **argv)
725 {
726 	CU_pSuite	suite = NULL;
727 	unsigned int	num_failures;
728 
729 	CU_initialize_registry();
730 
731 	suite = CU_add_suite("io_channel", NULL, NULL);
732 	CU_ADD_TEST(suite, iobuf);
733 	CU_ADD_TEST(suite, iobuf_cache);
734 	CU_ADD_TEST(suite, iobuf_priority);
735 
736 	num_failures = spdk_ut_run_tests(argc, argv, NULL);
737 	CU_cleanup_registry();
738 	return num_failures;
739 }
740