xref: /spdk/test/unit/lib/thread/iobuf.c/iobuf_ut.c (revision c164db9ffe3718ad4e4f5bab380ccfa62c2fa672)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation. All rights reserved.
3  */
4 
5 #include "spdk_internal/cunit.h"
6 
7 #include "common/lib/ut_multithread.c"
8 #include "unit/lib/json_mock.c"
9 
10 #include "spdk/config.h"
11 #include "spdk/thread.h"
12 
13 #include "thread/iobuf.c"
14 
15 struct ut_iobuf_entry {
16 	struct spdk_iobuf_channel	*ioch;
17 	struct spdk_iobuf_entry		iobuf;
18 	void				*buf;
19 	void				*buf2;
20 	uint32_t			thread_id;
21 	const char			*module;
22 };
23 
24 static void
25 ut_iobuf_finish_cb(void *ctx)
26 {
27 	*(int *)ctx = 1;
28 }
29 
30 static void
31 ut_iobuf_get_buf_cb(struct spdk_iobuf_entry *entry, void *buf)
32 {
33 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
34 
35 	ut_entry->buf = buf;
36 }
37 
38 static int
39 ut_iobuf_foreach_cb(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, void *cb_arg)
40 {
41 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
42 
43 	ut_entry->buf = cb_arg;
44 
45 	return 0;
46 }
47 
48 #define SMALL_BUFSIZE 4096
49 #define LARGE_BUFSIZE 8192
50 
51 static void
52 iobuf(void)
53 {
54 	struct spdk_iobuf_opts opts = {
55 		.small_pool_count = 2,
56 		.large_pool_count = 2,
57 		.small_bufsize = SMALL_BUFSIZE,
58 		.large_bufsize = LARGE_BUFSIZE,
59 	};
60 	struct ut_iobuf_entry *entry;
61 	struct spdk_iobuf_channel mod0_ch[2], mod1_ch[2];
62 	struct ut_iobuf_entry mod0_entries[] = {
63 		{ .thread_id = 0, .module = "ut_module0", },
64 		{ .thread_id = 0, .module = "ut_module0", },
65 		{ .thread_id = 0, .module = "ut_module0", },
66 		{ .thread_id = 0, .module = "ut_module0", },
67 		{ .thread_id = 1, .module = "ut_module0", },
68 		{ .thread_id = 1, .module = "ut_module0", },
69 		{ .thread_id = 1, .module = "ut_module0", },
70 		{ .thread_id = 1, .module = "ut_module0", },
71 	};
72 	struct ut_iobuf_entry mod1_entries[] = {
73 		{ .thread_id = 0, .module = "ut_module1", },
74 		{ .thread_id = 0, .module = "ut_module1", },
75 		{ .thread_id = 0, .module = "ut_module1", },
76 		{ .thread_id = 0, .module = "ut_module1", },
77 		{ .thread_id = 1, .module = "ut_module1", },
78 		{ .thread_id = 1, .module = "ut_module1", },
79 		{ .thread_id = 1, .module = "ut_module1", },
80 		{ .thread_id = 1, .module = "ut_module1", },
81 	};
82 	int rc, finish = 0;
83 	uint32_t i;
84 
85 	allocate_cores(2);
86 	allocate_threads(2);
87 
88 	set_thread(0);
89 
90 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
91 	g_iobuf.opts = opts;
92 	rc = spdk_iobuf_initialize();
93 	CU_ASSERT_EQUAL(rc, 0);
94 
95 	rc = spdk_iobuf_register_module("ut_module0");
96 	CU_ASSERT_EQUAL(rc, 0);
97 
98 	rc = spdk_iobuf_register_module("ut_module1");
99 	CU_ASSERT_EQUAL(rc, 0);
100 
101 	set_thread(0);
102 	rc = spdk_iobuf_channel_init(&mod0_ch[0], "ut_module0", 0, 0);
103 	CU_ASSERT_EQUAL(rc, 0);
104 	set_thread(1);
105 	rc = spdk_iobuf_channel_init(&mod0_ch[1], "ut_module0", 0, 0);
106 	CU_ASSERT_EQUAL(rc, 0);
107 	for (i = 0; i < SPDK_COUNTOF(mod0_entries); ++i) {
108 		mod0_entries[i].ioch = &mod0_ch[mod0_entries[i].thread_id];
109 	}
110 	set_thread(0);
111 	rc = spdk_iobuf_channel_init(&mod1_ch[0], "ut_module1", 0, 0);
112 	CU_ASSERT_EQUAL(rc, 0);
113 	set_thread(1);
114 	rc = spdk_iobuf_channel_init(&mod1_ch[1], "ut_module1", 0, 0);
115 	CU_ASSERT_EQUAL(rc, 0);
116 	for (i = 0; i < SPDK_COUNTOF(mod1_entries); ++i) {
117 		mod1_entries[i].ioch = &mod1_ch[mod1_entries[i].thread_id];
118 	}
119 
120 	/* First check that it's possible to retrieve the whole pools from a single module */
121 	set_thread(0);
122 	entry = &mod0_entries[0];
123 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
124 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
125 	entry = &mod0_entries[1];
126 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
127 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
128 	/* The next two should be put onto the large buf wait queue */
129 	entry = &mod0_entries[2];
130 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
131 	CU_ASSERT_PTR_NULL(entry->buf);
132 	entry = &mod0_entries[3];
133 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
134 	CU_ASSERT_PTR_NULL(entry->buf);
135 	/* Pick the two next buffers from the small pool */
136 	set_thread(1);
137 	entry = &mod0_entries[4];
138 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
139 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
140 	entry = &mod0_entries[5];
141 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
142 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
143 	/* The next two should be put onto the small buf wait queue */
144 	entry = &mod0_entries[6];
145 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
146 	CU_ASSERT_PTR_NULL(entry->buf);
147 	entry = &mod0_entries[7];
148 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
149 	CU_ASSERT_PTR_NULL(entry->buf);
150 
151 	/* Now return one of the large buffers to the pool and verify that the first request's
152 	 * (entry 2) callback was executed and it was removed from the wait queue.
153 	 */
154 	set_thread(0);
155 	entry = &mod0_entries[0];
156 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
157 	entry = &mod0_entries[2];
158 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
159 	entry = &mod0_entries[3];
160 	CU_ASSERT_PTR_NULL(entry->buf);
161 
162 	/* Return the second buffer and check that the other request is satisfied */
163 	entry = &mod0_entries[1];
164 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
165 	entry = &mod0_entries[3];
166 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
167 
168 	/* Return the remaining two buffers */
169 	entry = &mod0_entries[2];
170 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
171 	entry = &mod0_entries[3];
172 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
173 
174 	/* Check that it didn't change the requests waiting for the small buffers */
175 	entry = &mod0_entries[6];
176 	CU_ASSERT_PTR_NULL(entry->buf);
177 	entry = &mod0_entries[7];
178 	CU_ASSERT_PTR_NULL(entry->buf);
179 
180 	/* Do the same test as above, this time using the small pool */
181 	set_thread(1);
182 	entry = &mod0_entries[4];
183 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
184 	entry = &mod0_entries[6];
185 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
186 	entry = &mod0_entries[7];
187 	CU_ASSERT_PTR_NULL(entry->buf);
188 
189 	/* Return the second buffer and check that the other request is satisfied */
190 	entry = &mod0_entries[5];
191 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
192 	entry = &mod0_entries[7];
193 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
194 
195 	/* Return the remaining two buffers */
196 	entry = &mod0_entries[6];
197 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
198 	entry = &mod0_entries[7];
199 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
200 
201 	/* Now check requesting buffers from different modules - first request all of them from one
202 	 * module, starting from the large pool
203 	 */
204 	set_thread(0);
205 	entry = &mod0_entries[0];
206 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
207 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
208 	entry = &mod0_entries[1];
209 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
210 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
211 	/* Request all of them from the small one */
212 	set_thread(1);
213 	entry = &mod0_entries[4];
214 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
215 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
216 	entry = &mod0_entries[5];
217 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
218 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
219 
220 	/* Request one buffer per module from each pool  */
221 	set_thread(0);
222 	entry = &mod1_entries[0];
223 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
224 	CU_ASSERT_PTR_NULL(entry->buf);
225 	entry = &mod0_entries[3];
226 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
227 	CU_ASSERT_PTR_NULL(entry->buf);
228 	/* Change the order from the small pool and request a buffer from mod0 first */
229 	set_thread(1);
230 	entry = &mod0_entries[6];
231 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
232 	CU_ASSERT_PTR_NULL(entry->buf);
233 	entry = &mod1_entries[4];
234 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
235 	CU_ASSERT_PTR_NULL(entry->buf);
236 
237 	/* Now return one buffer to the large pool */
238 	set_thread(0);
239 	entry = &mod0_entries[0];
240 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
241 
242 	/* Make sure the request from mod1 got the buffer, as it was the first to request it */
243 	entry = &mod1_entries[0];
244 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
245 	entry = &mod0_entries[3];
246 	CU_ASSERT_PTR_NULL(entry->buf);
247 
248 	/* Return second buffer to the large pool and check the outstanding mod0 request */
249 	entry = &mod0_entries[1];
250 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
251 	entry = &mod0_entries[3];
252 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
253 
254 	/* Return the remaining two buffers */
255 	entry = &mod1_entries[0];
256 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
257 	entry = &mod0_entries[3];
258 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
259 
260 	/* Check the same for the small pool, but this time the order of the request is reversed
261 	 * (mod0 before mod1)
262 	 */
263 	set_thread(1);
264 	entry = &mod0_entries[4];
265 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
266 	entry = &mod0_entries[6];
267 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
268 	/* mod1 request was second in this case, so it still needs to wait */
269 	entry = &mod1_entries[4];
270 	CU_ASSERT_PTR_NULL(entry->buf);
271 
272 	/* Return the second requested buffer */
273 	entry = &mod0_entries[5];
274 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
275 	entry = &mod1_entries[4];
276 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
277 
278 	/* Return the remaining two buffers */
279 	entry = &mod0_entries[6];
280 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
281 	entry = &mod1_entries[4];
282 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
283 
284 	/* Request buffers to make the pools empty */
285 	set_thread(0);
286 	entry = &mod0_entries[0];
287 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
288 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
289 	entry = &mod1_entries[0];
290 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
291 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
292 	entry = &mod0_entries[1];
293 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
294 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
295 	entry = &mod1_entries[1];
296 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
297 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
298 
299 	/* Queue more requests from both modules */
300 	entry = &mod0_entries[2];
301 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
302 	CU_ASSERT_PTR_NULL(entry->buf);
303 	entry = &mod1_entries[2];
304 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
305 	CU_ASSERT_PTR_NULL(entry->buf);
306 	entry = &mod1_entries[3];
307 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
308 	CU_ASSERT_PTR_NULL(entry->buf);
309 	entry = &mod0_entries[3];
310 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
311 	CU_ASSERT_PTR_NULL(entry->buf);
312 
313 	/* Check that abort correctly remove an entry from the queue */
314 	entry = &mod0_entries[2];
315 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
316 	entry = &mod1_entries[3];
317 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
318 
319 	entry = &mod0_entries[0];
320 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
321 	CU_ASSERT_PTR_NOT_NULL(mod1_entries[2].buf);
322 	entry = &mod0_entries[1];
323 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
324 	CU_ASSERT_PTR_NOT_NULL(mod0_entries[3].buf);
325 
326 	/* Clean up */
327 	entry = &mod1_entries[0];
328 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
329 	entry = &mod1_entries[2];
330 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
331 	entry = &mod1_entries[1];
332 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
333 	entry = &mod0_entries[3];
334 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
335 
336 	/* Request buffers to make the pools empty */
337 	set_thread(0);
338 	entry = &mod0_entries[0];
339 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
340 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
341 	entry = &mod1_entries[0];
342 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
343 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
344 	entry = &mod0_entries[1];
345 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
346 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
347 	entry = &mod1_entries[1];
348 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
349 	CU_ASSERT_PTR_NOT_NULL(entry->buf);
350 
351 	/* Request a buffer from each queue and each module on thread 0 */
352 	set_thread(0);
353 	entry = &mod0_entries[2];
354 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
355 	CU_ASSERT_PTR_NULL(entry->buf);
356 	entry = &mod1_entries[2];
357 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
358 	CU_ASSERT_PTR_NULL(entry->buf);
359 	entry = &mod0_entries[3];
360 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
361 	CU_ASSERT_PTR_NULL(entry->buf);
362 	entry = &mod1_entries[3];
363 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
364 	CU_ASSERT_PTR_NULL(entry->buf);
365 
366 	/* Do the same on thread 1 */
367 	set_thread(1);
368 	entry = &mod0_entries[6];
369 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
370 	CU_ASSERT_PTR_NULL(entry->buf);
371 	entry = &mod1_entries[6];
372 	entry->buf = spdk_iobuf_get(entry->ioch, LARGE_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
373 	CU_ASSERT_PTR_NULL(entry->buf);
374 	entry = &mod0_entries[7];
375 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
376 	CU_ASSERT_PTR_NULL(entry->buf);
377 	entry = &mod1_entries[7];
378 	entry->buf = spdk_iobuf_get(entry->ioch, SMALL_BUFSIZE, &entry->iobuf, ut_iobuf_get_buf_cb);
379 	CU_ASSERT_PTR_NULL(entry->buf);
380 
381 	/* Now do the foreach and check that correct entries are iterated over by assigning their
382 	 * ->buf pointers to different values.
383 	 */
384 	set_thread(0);
385 	rc = spdk_iobuf_for_each_entry(&mod0_ch[0], ut_iobuf_foreach_cb, (void *)0xdeadbeef);
386 	CU_ASSERT_EQUAL(rc, 0);
387 	rc = spdk_iobuf_for_each_entry(&mod1_ch[0], ut_iobuf_foreach_cb, (void *)0xfeedbeef);
388 	CU_ASSERT_EQUAL(rc, 0);
389 	set_thread(1);
390 	rc = spdk_iobuf_for_each_entry(&mod0_ch[1], ut_iobuf_foreach_cb, (void *)0xcafebabe);
391 	CU_ASSERT_EQUAL(rc, 0);
392 	rc = spdk_iobuf_for_each_entry(&mod1_ch[1], ut_iobuf_foreach_cb, (void *)0xbeefcafe);
393 	CU_ASSERT_EQUAL(rc, 0);
394 
395 	/* thread 0 */
396 	CU_ASSERT_PTR_EQUAL(mod0_entries[2].buf, (void *)0xdeadbeef);
397 	CU_ASSERT_PTR_EQUAL(mod0_entries[3].buf, (void *)0xdeadbeef);
398 	CU_ASSERT_PTR_EQUAL(mod1_entries[2].buf, (void *)0xfeedbeef);
399 	CU_ASSERT_PTR_EQUAL(mod1_entries[3].buf, (void *)0xfeedbeef);
400 	/* thread 1 */
401 	CU_ASSERT_PTR_EQUAL(mod0_entries[6].buf, (void *)0xcafebabe);
402 	CU_ASSERT_PTR_EQUAL(mod0_entries[7].buf, (void *)0xcafebabe);
403 	CU_ASSERT_PTR_EQUAL(mod1_entries[6].buf, (void *)0xbeefcafe);
404 	CU_ASSERT_PTR_EQUAL(mod1_entries[7].buf, (void *)0xbeefcafe);
405 
406 	/* Clean everything up */
407 	set_thread(0);
408 	entry = &mod0_entries[2];
409 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
410 	entry = &mod0_entries[3];
411 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
412 	entry = &mod1_entries[2];
413 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
414 	entry = &mod1_entries[3];
415 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
416 
417 	entry = &mod0_entries[0];
418 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
419 	entry = &mod1_entries[0];
420 	spdk_iobuf_put(entry->ioch, entry->buf, LARGE_BUFSIZE);
421 	entry = &mod0_entries[1];
422 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
423 	entry = &mod1_entries[1];
424 	spdk_iobuf_put(entry->ioch, entry->buf, SMALL_BUFSIZE);
425 
426 	set_thread(1);
427 	entry = &mod0_entries[6];
428 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
429 	entry = &mod0_entries[7];
430 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
431 	entry = &mod1_entries[6];
432 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, LARGE_BUFSIZE);
433 	entry = &mod1_entries[7];
434 	spdk_iobuf_entry_abort(entry->ioch, &entry->iobuf, SMALL_BUFSIZE);
435 
436 	set_thread(0);
437 	spdk_iobuf_channel_fini(&mod0_ch[0]);
438 	poll_threads();
439 	spdk_iobuf_channel_fini(&mod1_ch[0]);
440 	poll_threads();
441 	set_thread(1);
442 	spdk_iobuf_channel_fini(&mod0_ch[1]);
443 	poll_threads();
444 	spdk_iobuf_channel_fini(&mod1_ch[1]);
445 	poll_threads();
446 
447 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
448 	poll_threads();
449 
450 	CU_ASSERT_EQUAL(finish, 1);
451 
452 	free_threads();
453 	free_cores();
454 }
455 
456 static void
457 iobuf_cache(void)
458 {
459 	struct spdk_iobuf_opts opts = {
460 		.small_pool_count = 4,
461 		.large_pool_count = 4,
462 		.small_bufsize = SMALL_BUFSIZE,
463 		.large_bufsize = LARGE_BUFSIZE,
464 	};
465 	struct spdk_iobuf_channel iobuf_ch[2];
466 	struct ut_iobuf_entry *entry;
467 	struct ut_iobuf_entry mod0_entries[] = {
468 		{ .thread_id = 0, .module = "ut_module0", },
469 		{ .thread_id = 0, .module = "ut_module0", },
470 		{ .thread_id = 0, .module = "ut_module0", },
471 		{ .thread_id = 0, .module = "ut_module0", },
472 	};
473 	struct ut_iobuf_entry mod1_entries[] = {
474 		{ .thread_id = 0, .module = "ut_module1", },
475 		{ .thread_id = 0, .module = "ut_module1", },
476 	};
477 	int rc, finish = 0;
478 	uint32_t i, j, bufsize;
479 
480 	allocate_cores(1);
481 	allocate_threads(1);
482 
483 	set_thread(0);
484 
485 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
486 	g_iobuf.opts = opts;
487 	rc = spdk_iobuf_initialize();
488 	CU_ASSERT_EQUAL(rc, 0);
489 
490 	rc = spdk_iobuf_register_module("ut_module0");
491 	CU_ASSERT_EQUAL(rc, 0);
492 
493 	rc = spdk_iobuf_register_module("ut_module1");
494 	CU_ASSERT_EQUAL(rc, 0);
495 
496 	/* First check that channel initialization fails when it's not possible to fill in the cache
497 	 * from the pool.
498 	 */
499 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 5, 1);
500 	CU_ASSERT_EQUAL(rc, -ENOMEM);
501 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 1, 5);
502 	CU_ASSERT_EQUAL(rc, -ENOMEM);
503 
504 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 4, 4);
505 	CU_ASSERT_EQUAL(rc, 0);
506 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 4, 4);
507 	CU_ASSERT_EQUAL(rc, -ENOMEM);
508 
509 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
510 	poll_threads();
511 
512 	/* Initialize one channel with cache, acquire buffers, and check that a second one can be
513 	 * created once the buffers acquired from the first one are returned to the pool
514 	 */
515 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
516 	CU_ASSERT_EQUAL(rc, 0);
517 
518 	for (i = 0; i < 3; ++i) {
519 		mod0_entries[i].buf = spdk_iobuf_get(&iobuf_ch[0], LARGE_BUFSIZE, &mod0_entries[i].iobuf,
520 						     ut_iobuf_get_buf_cb);
521 		CU_ASSERT_PTR_NOT_NULL(mod0_entries[i].buf);
522 	}
523 
524 	/* The channels can be temporarily greedy, holding more buffers than their configured cache
525 	 * size. We can only guarantee that we can create a channel if all outstanding buffers
526 	 * have been returned. */
527 	for (i = 0; i < 3; ++i) {
528 		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[i].buf, LARGE_BUFSIZE);
529 	}
530 
531 	/* The last buffer should be released back to the pool, so we should be able to create a new
532 	 * channel
533 	 */
534 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
535 	CU_ASSERT_EQUAL(rc, 0);
536 
537 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
538 	spdk_iobuf_channel_fini(&iobuf_ch[1]);
539 	poll_threads();
540 
541 	/* Check that the pool is only used when the cache is empty and that the cache guarantees a
542 	 * certain set of buffers
543 	 */
544 	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
545 	CU_ASSERT_EQUAL(rc, 0);
546 	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 1, 1);
547 	CU_ASSERT_EQUAL(rc, 0);
548 
549 	uint32_t buffer_sizes[] = { SMALL_BUFSIZE, LARGE_BUFSIZE };
550 	for (i = 0; i < SPDK_COUNTOF(buffer_sizes); ++i) {
551 		bufsize = buffer_sizes[i];
552 
553 		for (j = 0; j < 3; ++j) {
554 			entry = &mod0_entries[j];
555 			entry->buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &entry->iobuf,
556 						    ut_iobuf_get_buf_cb);
557 			CU_ASSERT_PTR_NOT_NULL(entry->buf);
558 		}
559 
560 		mod1_entries[0].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[0].iobuf,
561 						     ut_iobuf_get_buf_cb);
562 		CU_ASSERT_PTR_NOT_NULL(mod1_entries[0].buf);
563 
564 		/* The whole pool is exhausted now */
565 		mod1_entries[1].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[1].iobuf,
566 						     ut_iobuf_get_buf_cb);
567 		CU_ASSERT_PTR_NULL(mod1_entries[1].buf);
568 		mod0_entries[3].buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &mod0_entries[3].iobuf,
569 						     ut_iobuf_get_buf_cb);
570 		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
571 
572 		/* If there are outstanding requests waiting for a buffer, they should have priority
573 		 * over filling in the cache, even if they're from different modules.
574 		 */
575 		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, bufsize);
576 		/* Also make sure the queue is FIFO and doesn't care about which module requested
577 		 * and which module released the buffer.
578 		 */
579 		CU_ASSERT_PTR_NOT_NULL(mod1_entries[1].buf);
580 		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);
581 
582 		/* Return the buffers back */
583 		spdk_iobuf_entry_abort(&iobuf_ch[0], &mod0_entries[3].iobuf, bufsize);
584 		for (j = 0; j < 2; ++j) {
585 			spdk_iobuf_put(&iobuf_ch[0], mod0_entries[j].buf, bufsize);
586 			spdk_iobuf_put(&iobuf_ch[1], mod1_entries[j].buf, bufsize);
587 		}
588 	}
589 
590 	spdk_iobuf_channel_fini(&iobuf_ch[0]);
591 	spdk_iobuf_channel_fini(&iobuf_ch[1]);
592 	poll_threads();
593 
594 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
595 	poll_threads();
596 
597 	CU_ASSERT_EQUAL(finish, 1);
598 
599 	free_threads();
600 	free_cores();
601 }
602 
603 static void
604 ut_iobuf_get_buf2_cb(struct spdk_iobuf_entry *entry, void *buf)
605 {
606 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
607 
608 	CU_ASSERT_PTR_NOT_NULL(ut_entry->buf);
609 	CU_ASSERT_PTR_NULL(ut_entry->buf2);
610 
611 	ut_entry->buf2 = buf;
612 }
613 
614 static void
615 ut_iobuf_get_buf1_cb(struct spdk_iobuf_entry *entry, void *buf)
616 {
617 	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
618 	void *buf2;
619 
620 	CU_ASSERT_PTR_NULL(ut_entry->buf);
621 	CU_ASSERT_PTR_NULL(ut_entry->buf2);
622 	ut_entry->buf = buf;
623 
624 	buf2 = spdk_iobuf_get(ut_entry->ioch, SMALL_BUFSIZE, &ut_entry->iobuf,
625 			      ut_iobuf_get_buf2_cb);
626 	CU_ASSERT_PTR_NULL(buf2);
627 }
628 
629 static void
630 iobuf_priority(void)
631 {
632 	struct spdk_iobuf_opts opts = {
633 		.small_pool_count = 2,
634 		.large_pool_count = 2,
635 		.small_bufsize = SMALL_BUFSIZE,
636 		.large_bufsize = LARGE_BUFSIZE,
637 	};
638 	struct ut_iobuf_entry entries[4] = {};
639 	struct spdk_iobuf_channel iobuf_ch;
640 	int rc, finish = 0;
641 	uint32_t i;
642 
643 	allocate_cores(1);
644 	allocate_threads(1);
645 
646 	set_thread(0);
647 
648 	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
649 	g_iobuf.opts = opts;
650 	rc = spdk_iobuf_initialize();
651 	CU_ASSERT_EQUAL(rc, 0);
652 
653 	rc = spdk_iobuf_register_module("ut_module");
654 	CU_ASSERT_EQUAL(rc, 0);
655 	rc = spdk_iobuf_channel_init(&iobuf_ch, "ut_module", 0, 0);
656 	CU_ASSERT_EQUAL(rc, 0);
657 
658 	for (i = 0; i < SPDK_COUNTOF(entries); ++i) {
659 		entries[i].ioch = &iobuf_ch;
660 	}
661 
662 	/* Check that requests for an iobuf called from within the iobuf_get_cb are prioritized */
663 	entries[0].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, NULL, NULL);
664 	CU_ASSERT_PTR_NOT_NULL(entries[0].buf);
665 	entries[1].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, NULL, NULL);
666 	CU_ASSERT_PTR_NOT_NULL(entries[1].buf);
667 
668 	/* Try to acquire two iobufs twice */
669 	entries[2].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, &entries[2].iobuf,
670 					ut_iobuf_get_buf1_cb);
671 	CU_ASSERT_PTR_NULL(entries[2].buf);
672 	entries[3].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, &entries[3].iobuf,
673 					ut_iobuf_get_buf1_cb);
674 	CU_ASSERT_PTR_NULL(entries[3].buf);
675 
676 	/* Return one of the iobufs - the first entry on the wait queue should get it */
677 	spdk_iobuf_put(&iobuf_ch, entries[0].buf, SMALL_BUFSIZE);
678 	CU_ASSERT_PTR_NOT_NULL(entries[2].buf);
679 	CU_ASSERT_PTR_NULL(entries[3].buf);
680 
681 	/* Return the second one, this time the same entry should get it, because it requested
682 	 * inside its iobuf_get_cb */
683 	spdk_iobuf_put(&iobuf_ch, entries[1].buf, SMALL_BUFSIZE);
684 	CU_ASSERT_PTR_NOT_NULL(entries[2].buf2);
685 	CU_ASSERT_PTR_NULL(entries[3].buf);
686 
687 	/* Release it again, now the last entry should finally get it */
688 	spdk_iobuf_put(&iobuf_ch, entries[2].buf, SMALL_BUFSIZE);
689 	spdk_iobuf_put(&iobuf_ch, entries[2].buf2, SMALL_BUFSIZE);
690 	CU_ASSERT_PTR_NOT_NULL(entries[3].buf);
691 	CU_ASSERT_PTR_NOT_NULL(entries[3].buf2);
692 	spdk_iobuf_put(&iobuf_ch, entries[3].buf, SMALL_BUFSIZE);
693 	spdk_iobuf_put(&iobuf_ch, entries[3].buf2, SMALL_BUFSIZE);
694 
695 	spdk_iobuf_channel_fini(&iobuf_ch);
696 	poll_threads();
697 
698 	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
699 	poll_threads();
700 
701 	CU_ASSERT_EQUAL(finish, 1);
702 
703 	free_threads();
704 	free_cores();
705 }
706 
707 int
708 main(int argc, char **argv)
709 {
710 	CU_pSuite	suite = NULL;
711 	unsigned int	num_failures;
712 
713 	CU_initialize_registry();
714 
715 	suite = CU_add_suite("io_channel", NULL, NULL);
716 	CU_ADD_TEST(suite, iobuf);
717 	CU_ADD_TEST(suite, iobuf_cache);
718 	CU_ADD_TEST(suite, iobuf_priority);
719 
720 	num_failures = spdk_ut_run_tests(argc, argv, NULL);
721 	CU_cleanup_registry();
722 	return num_failures;
723 }
724