xref: /spdk/test/unit/lib/thread/iobuf.c/iobuf_ut.c (revision b02581a89058ebaebe03bd0e16e3b58adfe406c1)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation. All rights reserved.
3  */
4 
5 #include "spdk_internal/cunit.h"
6 
7 #include "common/lib/ut_multithread.c"
8 #include "unit/lib/json_mock.c"
9 
10 #include "spdk/config.h"
11 #include "spdk/thread.h"
12 
13 #include "thread/iobuf.c"
14 
15 struct ut_iobuf_entry {
16 	struct spdk_iobuf_channel	*ioch;
17 	struct spdk_iobuf_entry		iobuf;
18 	void				*buf;
19 	uint32_t			thread_id;
20 	const char			*module;
21 };
22 
23 static void
24 ut_iobuf_finish_cb(void *ctx)
25 {
26 	*(int *)ctx = 1;
27 }
28 
29 static void
30 ut_iobuf_get_buf_cb(struct spdk_iobuf_entry *entry, void *buf)
31 {
32 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
33 
34 	ut_entry->buf = buf;
35 }
36 
37 static int
38 ut_iobuf_foreach_cb(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, void *cb_arg)
39 {
40 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
41 
42 	ut_entry->buf = cb_arg;
43 
44 	return 0;
45 }
46 
47 #define SMALL_BUFSIZE 4096
48 #define LARGE_BUFSIZE 8192
49 
50 static void
51 iobuf(void)
52 {
53 	struct spdk_iobuf_opts opts = {
54 		.small_pool_count = 2,
55 		.large_pool_count = 2,
56 		.small_bufsize = SMALL_BUFSIZE,
57 		.large_bufsize = LARGE_BUFSIZE,
58 	};
59 	struct ut_iobuf_entry *entry;
60 	struct spdk_iobuf_channel mod0_ch[2], mod1_ch[2];
61 	struct ut_iobuf_entry mod0_entries[] = {
62 		{ .thread_id = 0, .module = "ut_module0", },
63 		{ .thread_id = 0, .module = "ut_module0", },
64 		{ .thread_id = 0, .module = "ut_module0", },
65 		{ .thread_id = 0, .module = "ut_module0", },
66 		{ .thread_id = 1, .module = "ut_module0", },
67 		{ .thread_id = 1, .module = "ut_module0", },
68 		{ .thread_id = 1, .module = "ut_module0", },
69 		{ .thread_id = 1, .module = "ut_module0", },
70 	};
71 	struct ut_iobuf_entry mod1_entries[] = {
72 		{ .thread_id = 0, .module = "ut_module1", },
73 		{ .thread_id = 0, .module = "ut_module1", },
74 		{ .thread_id = 0, .module = "ut_module1", },
75 		{ .thread_id = 0, .module = "ut_module1", },
76 		{ .thread_id = 1, .module = "ut_module1", },
77 		{ .thread_id = 1, .module = "ut_module1", },
78 		{ .thread_id = 1, .module = "ut_module1", },
79 		{ .thread_id = 1, .module = "ut_module1", },
80 	};
81 	int rc, finish = 0;
82 	uint32_t i;
83 
84 	allocate_cores(2);
85 	allocate_threads(2);
86 
87 	set_thread(0);
88 
89 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
90 	g_iobuf.opts = opts;
91 	rc = spdk_iobuf_initialize();
92 	CU_ASSERT_EQUAL(rc, 0);
93 
94 	rc = spdk_iobuf_register_module("ut_module0");
95 	CU_ASSERT_EQUAL(rc, 0);
96 
97 	rc = spdk_iobuf_register_module("ut_module1");
98 	CU_ASSERT_EQUAL(rc, 0);
99 
100 	set_thread(0);
101 	rc = spdk_iobuf_channel_init(&mod0_ch[0], "ut_module0", 0, 0);
102 	CU_ASSERT_EQUAL(rc, 0);
103 	set_thread(1);
104 	rc = spdk_iobuf_channel_init(&mod0_ch[1], "ut_module0", 0, 0);
105 	CU_ASSERT_EQUAL(rc, 0);
106 	for (i = 0; i < SPDK_COUNTOF(mod0_entries); ++i) {
107 		mod0_entries[i].ioch = &mod0_ch[mod0_entries[i].thread_id];
108 	}
109 	set_thread(0);
110 	rc = spdk_iobuf_channel_init(&mod1_ch[0], "ut_module1", 0, 0);
111 	CU_ASSERT_EQUAL(rc, 0);
112 	set_thread(1);
113 	rc = spdk_iobuf_channel_init(&mod1_ch[1], "ut_module1", 0, 0);
114 	CU_ASSERT_EQUAL(rc, 0);
115 	for (i = 0; i < SPDK_COUNTOF(mod1_entries); ++i) {
116 		mod1_entries[i].ioch = &mod1_ch[mod1_entries[i].thread_id];
117 	}
118 
119 	/* First check that it's possible to retrieve the whole pools from a single module */
120 	set_thread(0);
121 	entry = &mod0_entries[0];
122 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
123 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
124 	entry = &mod0_entries[1];
125 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
126 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
127 	/* The next two should be put onto the large buf wait queue */
128 	entry = &mod0_entries[2];
129 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
130 	CU_ASSERT_PTR_NULL(entry->buf);
131 	entry = &mod0_entries[3];
132 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
133 	CU_ASSERT_PTR_NULL(entry->buf);
134 	/* Pick the two next buffers from the small pool */
135 	set_thread(1);
136 	entry = &mod0_entries[4];
137 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
138 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
139 	entry = &mod0_entries[5];
140 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
141 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
142 	/* The next two should be put onto the small buf wait queue */
143 	entry = &mod0_entries[6];
144 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
145 	CU_ASSERT_PTR_NULL(entry->buf);
146 	entry = &mod0_entries[7];
147 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
148 	CU_ASSERT_PTR_NULL(entry->buf);
149 
150 	/* Now return one of the large buffers to the pool and verify that the first request's
151 	 * (entry 2) callback was executed and it was removed from the wait queue.
152 	 */
153 	set_thread(0);
154 	entry = &mod0_entries[0];
155 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
156 	entry = &mod0_entries[2];
157 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
158 	entry = &mod0_entries[3];
159 	CU_ASSERT_PTR_NULL(entry->buf);
160 
161 	/* Return the second buffer and check that the other request is satisfied */
162 	entry = &mod0_entries[1];
163 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
164 	entry = &mod0_entries[3];
165 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
166 
167 	/* Return the remaining two buffers */
168 	entry = &mod0_entries[2];
169 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
170 	entry = &mod0_entries[3];
171 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
172 
173 	/* Check that it didn't change the requests waiting for the small buffers */
174 	entry = &mod0_entries[6];
175 	CU_ASSERT_PTR_NULL(entry->buf);
176 	entry = &mod0_entries[7];
177 	CU_ASSERT_PTR_NULL(entry->buf);
178 
179 	/* Do the same test as above, this time using the small pool */
180 	set_thread(1);
181 	entry = &mod0_entries[4];
182 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
183 	entry = &mod0_entries[6];
184 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
185 	entry = &mod0_entries[7];
186 	CU_ASSERT_PTR_NULL(entry->buf);
187 
188 	/* Return the second buffer and check that the other request is satisfied */
189 	entry = &mod0_entries[5];
190 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
191 	entry = &mod0_entries[7];
192 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
193 
194 	/* Return the remaining two buffers */
195 	entry = &mod0_entries[6];
196 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
197 	entry = &mod0_entries[7];
198 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
199 
200 	/* Now check requesting buffers from different modules - first request all of them from one
201 	 * module, starting from the large pool
202 	 */
203 	set_thread(0);
204 	entry = &mod0_entries[0];
205 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
206 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
207 	entry = &mod0_entries[1];
208 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
209 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
210 	/* Request all of them from the small one */
211 	set_thread(1);
212 	entry = &mod0_entries[4];
213 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
214 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
215 	entry = &mod0_entries[5];
216 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
217 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
218 
219 	/* Request one buffer per module from each pool  */
220 	set_thread(0);
221 	entry = &mod1_entries[0];
222 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
223 	CU_ASSERT_PTR_NULL(entry->buf);
224 	entry = &mod0_entries[3];
225 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
226 	CU_ASSERT_PTR_NULL(entry->buf);
227 	/* Change the order from the small pool and request a buffer from mod0 first */
228 	set_thread(1);
229 	entry = &mod0_entries[6];
230 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
231 	CU_ASSERT_PTR_NULL(entry->buf);
232 	entry = &mod1_entries[4];
233 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
234 	CU_ASSERT_PTR_NULL(entry->buf);
235 
236 	/* Now return one buffer to the large pool */
237 	set_thread(0);
238 	entry = &mod0_entries[0];
239 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
240 
241 	/* Make sure the request from mod1 got the buffer, as it was the first to request it */
242 	entry = &mod1_entries[0];
243 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
244 	entry = &mod0_entries[3];
245 	CU_ASSERT_PTR_NULL(entry->buf);
246 
247 	/* Return second buffer to the large pool and check the outstanding mod0 request */
248 	entry = &mod0_entries[1];
249 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
250 	entry = &mod0_entries[3];
251 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
252 
253 	/* Return the remaining two buffers */
254 	entry = &mod1_entries[0];
255 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
256 	entry = &mod0_entries[3];
257 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
258 
259 	/* Check the same for the small pool, but this time the order of the request is reversed
260 	 * (mod0 before mod1)
261 	 */
262 	set_thread(1);
263 	entry = &mod0_entries[4];
264 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
265 	entry = &mod0_entries[6];
266 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
267 	/* mod1 request was second in this case, so it still needs to wait */
268 	entry = &mod1_entries[4];
269 	CU_ASSERT_PTR_NULL(entry->buf);
270 
271 	/* Return the second requested buffer */
272 	entry = &mod0_entries[5];
273 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
274 	entry = &mod1_entries[4];
275 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
276 
277 	/* Return the remaining two buffers */
278 	entry = &mod0_entries[6];
279 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
280 	entry = &mod1_entries[4];
281 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
282 
283 	/* Request buffers to make the pools empty */
284 	set_thread(0);
285 	entry = &mod0_entries[0];
286 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
287 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
288 	entry = &mod1_entries[0];
289 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
290 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
291 	entry = &mod0_entries[1];
292 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
293 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
294 	entry = &mod1_entries[1];
295 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
296 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
297 
298 	/* Queue more requests from both modules */
299 	entry = &mod0_entries[2];
300 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
301 	CU_ASSERT_PTR_NULL(entry->buf);
302 	entry = &mod1_entries[2];
303 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
304 	CU_ASSERT_PTR_NULL(entry->buf);
305 	entry = &mod1_entries[3];
306 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
307 	CU_ASSERT_PTR_NULL(entry->buf);
308 	entry = &mod0_entries[3];
309 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
310 	CU_ASSERT_PTR_NULL(entry->buf);
311 
312 	/* Check that abort correctly remove an entry from the queue */
313 	entry = &mod0_entries[2];
314 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
315 	entry = &mod1_entries[3];
316 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
317 
318 	entry = &mod0_entries[0];
319 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
320 	CU_ASSERT_PTR_NOT_NULL(mod1_entries[2].buf);
321 	entry = &mod0_entries[1];
322 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
323 	CU_ASSERT_PTR_NOT_NULL(mod0_entries[3].buf);
324 
325 	/* Clean up */
326 	entry = &mod1_entries[0];
327 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
328 	entry = &mod1_entries[2];
329 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
330 	entry = &mod1_entries[1];
331 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
332 	entry = &mod0_entries[3];
333 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
334 
335 	/* Request buffers to make the pools empty */
336 	set_thread(0);
337 	entry = &mod0_entries[0];
338 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
339 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
340 	entry = &mod1_entries[0];
341 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
342 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
343 	entry = &mod0_entries[1];
344 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
345 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
346 	entry = &mod1_entries[1];
347 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
348 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
349 
350 	/* Request a buffer from each queue and each module on thread 0 */
351 	set_thread(0);
352 	entry = &mod0_entries[2];
353 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
354 	CU_ASSERT_PTR_NULL(entry->buf);
355 	entry = &mod1_entries[2];
356 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
357 	CU_ASSERT_PTR_NULL(entry->buf);
358 	entry = &mod0_entries[3];
359 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
360 	CU_ASSERT_PTR_NULL(entry->buf);
361 	entry = &mod1_entries[3];
362 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
363 	CU_ASSERT_PTR_NULL(entry->buf);
364 
365 	/* Do the same on thread 1 */
366 	set_thread(1);
367 	entry = &mod0_entries[6];
368 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
369 	CU_ASSERT_PTR_NULL(entry->buf);
370 	entry = &mod1_entries[6];
371 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
372 	CU_ASSERT_PTR_NULL(entry->buf);
373 	entry = &mod0_entries[7];
374 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
375 	CU_ASSERT_PTR_NULL(entry->buf);
376 	entry = &mod1_entries[7];
377 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
378 	CU_ASSERT_PTR_NULL(entry->buf);
379 
380 	/* Now do the foreach and check that correct entries are iterated over by assigning their
381 	 * ->buf pointers to different values.
382 	 */
383 	set_thread(0);
384 	rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].large,
385 				       ut_iobuf_foreach_cb, (void *)0xdeadbeef);
386 	CU_ASSERT_EQUAL(rc, 0);
387 	rc = spdk_iobuf_for_each_entry(&mod0_ch[0], &mod0_ch[0].small,
388 				       ut_iobuf_foreach_cb, (void *)0xbeefdead);
389 	CU_ASSERT_EQUAL(rc, 0);
390 	rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].large,
391 				       ut_iobuf_foreach_cb, (void *)0xfeedbeef);
392 	CU_ASSERT_EQUAL(rc, 0);
393 	rc = spdk_iobuf_for_each_entry(&mod1_ch[0], &mod1_ch[0].small,
394 				       ut_iobuf_foreach_cb, (void *)0xbeeffeed);
395 	CU_ASSERT_EQUAL(rc, 0);
396 	set_thread(1);
397 	rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].large,
398 				       ut_iobuf_foreach_cb, (void *)0xcafebabe);
399 	CU_ASSERT_EQUAL(rc, 0);
400 	rc = spdk_iobuf_for_each_entry(&mod0_ch[1], &mod0_ch[1].small,
401 				       ut_iobuf_foreach_cb, (void *)0xbabecafe);
402 	CU_ASSERT_EQUAL(rc, 0);
403 	rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].large,
404 				       ut_iobuf_foreach_cb, (void *)0xbeefcafe);
405 	CU_ASSERT_EQUAL(rc, 0);
406 	rc = spdk_iobuf_for_each_entry(&mod1_ch[1], &mod1_ch[1].small,
407 				       ut_iobuf_foreach_cb, (void *)0xcafebeef);
408 	CU_ASSERT_EQUAL(rc, 0);
409 
410 	/* thread 0 */
411 	CU_ASSERT_PTR_EQUAL(mod0_entries[2].buf, (void *)0xdeadbeef);
412 	CU_ASSERT_PTR_EQUAL(mod0_entries[3].buf, (void *)0xbeefdead);
413 	CU_ASSERT_PTR_EQUAL(mod1_entries[2].buf, (void *)0xfeedbeef);
414 	CU_ASSERT_PTR_EQUAL(mod1_entries[3].buf, (void *)0xbeeffeed);
415 	/* thread 1 */
416 	CU_ASSERT_PTR_EQUAL(mod0_entries[6].buf, (void *)0xcafebabe);
417 	CU_ASSERT_PTR_EQUAL(mod0_entries[7].buf, (void *)0xbabecafe);
418 	CU_ASSERT_PTR_EQUAL(mod1_entries[6].buf, (void *)0xbeefcafe);
419 	CU_ASSERT_PTR_EQUAL(mod1_entries[7].buf, (void *)0xcafebeef);
420 
421 	/* Clean everything up */
422 	set_thread(0);
423 	entry = &mod0_entries[2];
424 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
425 	entry = &mod0_entries[3];
426 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
427 	entry = &mod1_entries[2];
428 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
429 	entry = &mod1_entries[3];
430 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
431 
432 	entry = &mod0_entries[0];
433 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
434 	entry = &mod1_entries[0];
435 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
436 	entry = &mod0_entries[1];
437 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
438 	entry = &mod1_entries[1];
439 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
440 
441 	set_thread(1);
442 	entry = &mod0_entries[6];
443 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
444 	entry = &mod0_entries[7];
445 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
446 	entry = &mod1_entries[6];
447 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
448 	entry = &mod1_entries[7];
449 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
450 
451 	set_thread(0);
452 	spdk_iobuf_channel_fini(&mod0_ch[0]);
453 	poll_threads();
454 	spdk_iobuf_channel_fini(&mod1_ch[0]);
455 	poll_threads();
456 	set_thread(1);
457 	spdk_iobuf_channel_fini(&mod0_ch[1]);
458 	poll_threads();
459 	spdk_iobuf_channel_fini(&mod1_ch[1]);
460 	poll_threads();
461 
462 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
463 	poll_threads();
464 
465 	CU_ASSERT_EQUAL(finish, 1);
466 
467 	free_threads();
468 	free_cores();
469 }
470 
471 static void
472 iobuf_cache(void)
473 {
474 	struct spdk_iobuf_opts opts = {
475 		.small_pool_count = 4,
476 		.large_pool_count = 4,
477 		.small_bufsize = SMALL_BUFSIZE,
478 		.large_bufsize = LARGE_BUFSIZE,
479 	};
480 	struct spdk_iobuf_channel iobuf_ch[2];
481 	struct ut_iobuf_entry *entry;
482 	struct ut_iobuf_entry mod0_entries[] = {
483 		{ .thread_id = 0, .module = "ut_module0", },
484 		{ .thread_id = 0, .module = "ut_module0", },
485 		{ .thread_id = 0, .module = "ut_module0", },
486 		{ .thread_id = 0, .module = "ut_module0", },
487 	};
488 	struct ut_iobuf_entry mod1_entries[] = {
489 		{ .thread_id = 0, .module = "ut_module1", },
490 		{ .thread_id = 0, .module = "ut_module1", },
491 	};
492 	int rc, finish = 0;
493 	uint32_t i, j, bufsize;
494 
495 	allocate_cores(1);
496 	allocate_threads(1);
497 
498 	set_thread(0);
499 
500 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
501 	g_iobuf.opts = opts;
502 	rc = spdk_iobuf_initialize();
503 	CU_ASSERT_EQUAL(rc, 0);
504 
505 	rc = spdk_iobuf_register_module("ut_module0");
506 	CU_ASSERT_EQUAL(rc, 0);
507 
508 	rc = spdk_iobuf_register_module("ut_module1");
509 	CU_ASSERT_EQUAL(rc, 0);
510 
511 	/* First check that channel initialization fails when it's not possible to fill in the cache
512 	 * from the pool.
513 	 */
514 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 5, 1);
515 	CU_ASSERT_EQUAL(rc, -ENOMEM);
516 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 1, 5);
517 	CU_ASSERT_EQUAL(rc, -ENOMEM);
518 
519 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 4, 4);
520 	CU_ASSERT_EQUAL(rc, 0);
521 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 4, 4);
522 	CU_ASSERT_EQUAL(rc, -ENOMEM);
523 
524 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
525 	poll_threads();
526 
527 	/* Initialize one channel with cache, acquire buffers, and check that a second one can be
528 	 * created once the buffers acquired from the first one are returned to the pool
529 	 */
530 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
531 	CU_ASSERT_EQUAL(rc, 0);
532 
533 	for (i = 0; i < 3; ++i) {
534 		mod0_entries[i].buf = spdk_iobuf_get(&iobuf_ch[0], LARGE_BUFSIZE, &mod0_entries[i].iobuf,
535 						     ut_iobuf_get_buf_cb);
536 		CU_ASSERT_PTR_NOT_NULL(mod0_entries[i].buf);
537 	}
538 
539 	/* The channels can be temporarily greedy, holding more buffers than their configured cache
540 	 * size. We can only guarantee that we can create a channel if all outstanding buffers
541 	 * have been returned. */
542 	for (i = 0; i < 3; ++i) {
543 		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[i].buf, LARGE_BUFSIZE);
544 	}
545 
546 	/* The last buffer should be released back to the pool, so we should be able to create a new
547 	 * channel
548 	 */
549 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
550 	CU_ASSERT_EQUAL(rc, 0);
551 
552 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
553 	spdk_iobuf_channel_fini(&iobuf_ch[1]);
554 	poll_threads();
555 
556 	/* Check that the pool is only used when the cache is empty and that the cache guarantees a
557 	 * certain set of buffers
558 	 */
559 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
560 	CU_ASSERT_EQUAL(rc, 0);
561 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 1, 1);
562 	CU_ASSERT_EQUAL(rc, 0);
563 
564 	uint32_t buffer_sizes[] = { SMALL_BUFSIZE, LARGE_BUFSIZE };
565 	for (i = 0; i < SPDK_COUNTOF(buffer_sizes); ++i) {
566 		bufsize = buffer_sizes[i];
567 
568 		for (j = 0; j < 3; ++j) {
569 			entry = &mod0_entries[j];
570 			entry->buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &entry->iobuf,
571 						    ut_iobuf_get_buf_cb);
572 			CU_ASSERT_PTR_NOT_NULL(entry->buf);
573 		}
574 
575 		mod1_entries[0].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[0].iobuf,
576 						     ut_iobuf_get_buf_cb);
577 		CU_ASSERT_PTR_NOT_NULL(mod1_entries[0].buf);
578 
579 		/* The whole pool is exhausted now */
580 		mod1_entries[1].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[1].iobuf,
581 						     ut_iobuf_get_buf_cb);
582 		CU_ASSERT_PTR_NULL(mod1_entries[1].buf);
583 		mod0_entries[3].buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &mod0_entries[3].iobuf,
584 						     ut_iobuf_get_buf_cb);
585 		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
586 
587 		/* If there are outstanding requests waiting for a buffer, they should have priority
588 		 * over filling in the cache, even if they're from different modules.
589 		 */
590 		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, bufsize);
591 		/* Also make sure the queue is FIFO and doesn't care about which module requested
592 		 * and which module released the buffer.
593 		 */
594 		CU_ASSERT_PTR_NOT_NULL(mod1_entries[1].buf);
595 		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
596 
597 		/* Return the buffers back */
598 		spdk_iobuf_entry_abort(&iobuf_ch[0], &mod0_entries[3].iobuf, bufsize);
599 		for (j = 0; j < 2; ++j) {
600 			spdk_iobuf_put(&iobuf_ch[0], mod0_entries[j].buf, bufsize);
601 			spdk_iobuf_put(&iobuf_ch[1], mod1_entries[j].buf, bufsize);
602 		}
603 	}
604 
605 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
606 	spdk_iobuf_channel_fini(&iobuf_ch[1]);
607 	poll_threads();
608 
609 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
610 	poll_threads();
611 
612 	CU_ASSERT_EQUAL(finish, 1);
613 
614 	free_threads();
615 	free_cores();
616 }
617 
618 int
619 main(int argc, char **argv)
620 {
621 	CU_pSuite	suite = NULL;
622 	unsigned int	num_failures;
623 
624 	CU_initialize_registry();
625 
626 	suite = CU_add_suite("io_channel", NULL, NULL);
627 	CU_ADD_TEST(suite, iobuf);
628 	CU_ADD_TEST(suite, iobuf_cache);
629 
630 	num_failures = spdk_ut_run_tests(argc, argv, NULL);
631 	CU_cleanup_registry();
632 	return num_failures;
633 }
634