xref: /spdk/lib/thread/iobuf.c (revision 83ba9086796471697a4975a58f60e2392bccd08c)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/util.h"
8 #include "spdk/likely.h"
9 #include "spdk/log.h"
10 #include "spdk/thread.h"
11 
12 #define IOBUF_MIN_SMALL_POOL_SIZE	64
13 #define IOBUF_MIN_LARGE_POOL_SIZE	8
14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE	8192
15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE	1024
16 #define IOBUF_ALIGNMENT			4096
17 #define IOBUF_MIN_SMALL_BUFSIZE		4096
18 #define IOBUF_MIN_LARGE_BUFSIZE		8192
19 #define IOBUF_DEFAULT_SMALL_BUFSIZE	(8 * 1024)
20 /* 132k is a weird choice at first, but this needs to be large enough to accommodate
21  * the default maximum size (128k) plus metadata everywhere. For code paths that
22  * are explicitly configured, the math is instead done properly. This is only
23  * for the default. */
24 #define IOBUF_DEFAULT_LARGE_BUFSIZE	(132 * 1024)
25 #define IOBUF_MAX_CHANNELS		64
26 
27 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
28 		   "Invalid data offset");
29 
30 static bool g_iobuf_is_initialized = false;
31 
32 struct iobuf_channel_node {
33 	spdk_iobuf_entry_stailq_t	small_queue;
34 	spdk_iobuf_entry_stailq_t	large_queue;
35 };
36 
37 struct iobuf_channel {
38 	struct iobuf_channel_node	node[SPDK_CONFIG_MAX_NUMA_NODES];
39 	struct spdk_iobuf_channel	*channels[IOBUF_MAX_CHANNELS];
40 };
41 
42 struct iobuf_module {
43 	char				*name;
44 	TAILQ_ENTRY(iobuf_module)	tailq;
45 };
46 
47 struct iobuf_node {
48 	struct spdk_ring		*small_pool;
49 	struct spdk_ring		*large_pool;
50 	void				*small_pool_base;
51 	void				*large_pool_base;
52 };
53 
54 struct iobuf {
55 	struct spdk_iobuf_opts		opts;
56 	TAILQ_HEAD(, iobuf_module)	modules;
57 	spdk_iobuf_finish_cb		finish_cb;
58 	void				*finish_arg;
59 	struct iobuf_node		node[SPDK_CONFIG_MAX_NUMA_NODES];
60 };
61 
62 #define IOBUF_FOREACH_NUMA_ID(i)						\
63 	for (i = g_iobuf.opts.enable_numa ? spdk_env_get_first_numa_id() : 0;	\
64 	     i < INT32_MAX;							\
65 	     i = g_iobuf.opts.enable_numa ? spdk_env_get_next_numa_id(i) : INT32_MAX)
66 
67 static struct iobuf g_iobuf = {
68 	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
69 	.node = {},
70 	.opts = {
71 		.small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE,
72 		.large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE,
73 		.small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE,
74 		.large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE,
75 	},
76 };
77 
78 struct iobuf_get_stats_ctx {
79 	struct spdk_iobuf_module_stats	*modules;
80 	uint32_t			num_modules;
81 	spdk_iobuf_get_stats_cb		cb_fn;
82 	void				*cb_arg;
83 };
84 
85 static int
86 iobuf_channel_create_cb(void *io_device, void *ctx)
87 {
88 	struct iobuf_channel *ch = ctx;
89 	struct iobuf_channel_node *node;
90 	int32_t i;
91 
92 	IOBUF_FOREACH_NUMA_ID(i) {
93 		node = &ch->node[i];
94 		STAILQ_INIT(&node->small_queue);
95 		STAILQ_INIT(&node->large_queue);
96 	}
97 
98 	return 0;
99 }
100 
101 static void
102 iobuf_channel_destroy_cb(void *io_device, void *ctx)
103 {
104 	struct iobuf_channel *ch = ctx;
105 	struct iobuf_channel_node *node __attribute__((unused));
106 	int32_t i;
107 
108 	IOBUF_FOREACH_NUMA_ID(i) {
109 		node = &ch->node[i];
110 		assert(STAILQ_EMPTY(&node->small_queue));
111 		assert(STAILQ_EMPTY(&node->large_queue));
112 	}
113 }
114 
115 static int
116 iobuf_node_initialize(struct iobuf_node *node, uint32_t numa_id)
117 {
118 	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
119 	struct spdk_iobuf_buffer *buf;
120 	uint64_t i;
121 	int rc;
122 
123 	if (!g_iobuf.opts.enable_numa) {
124 		numa_id = SPDK_ENV_NUMA_ID_ANY;
125 	}
126 
127 	node->small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
128 					    numa_id);
129 	if (!node->small_pool) {
130 		SPDK_ERRLOG("Failed to create small iobuf pool\n");
131 		rc = -ENOMEM;
132 		goto error;
133 	}
134 
135 	node->small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
136 					    NULL, numa_id, SPDK_MALLOC_DMA);
137 	if (node->small_pool_base == NULL) {
138 		SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
139 		rc = -ENOMEM;
140 		goto error;
141 	}
142 
143 	node->large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
144 					    numa_id);
145 	if (!node->large_pool) {
146 		SPDK_ERRLOG("Failed to create large iobuf pool\n");
147 		rc = -ENOMEM;
148 		goto error;
149 	}
150 
151 	node->large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
152 					    NULL, numa_id, SPDK_MALLOC_DMA);
153 	if (node->large_pool_base == NULL) {
154 		SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
155 		rc = -ENOMEM;
156 		goto error;
157 	}
158 
159 	for (i = 0; i < opts->small_pool_count; i++) {
160 		buf = node->small_pool_base + i * opts->small_bufsize;
161 		spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL);
162 	}
163 
164 	for (i = 0; i < opts->large_pool_count; i++) {
165 		buf = node->large_pool_base + i * opts->large_bufsize;
166 		spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL);
167 	}
168 
169 	return 0;
170 
171 error:
172 	spdk_free(node->small_pool_base);
173 	spdk_ring_free(node->small_pool);
174 	spdk_free(node->large_pool_base);
175 	spdk_ring_free(node->large_pool);
176 	memset(node, 0, sizeof(*node));
177 
178 	return rc;
179 }
180 
181 static void
182 iobuf_node_free(struct iobuf_node *node)
183 {
184 	if (node->small_pool == NULL) {
185 		/* This node didn't get allocated, so just return immediately. */
186 		return;
187 	}
188 
189 	if (spdk_ring_count(node->small_pool) != g_iobuf.opts.small_pool_count) {
190 		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
191 			    spdk_ring_count(node->small_pool), g_iobuf.opts.small_pool_count);
192 	}
193 
194 	if (spdk_ring_count(node->large_pool) != g_iobuf.opts.large_pool_count) {
195 		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
196 			    spdk_ring_count(node->large_pool), g_iobuf.opts.large_pool_count);
197 	}
198 
199 	spdk_free(node->small_pool_base);
200 	node->small_pool_base = NULL;
201 	spdk_ring_free(node->small_pool);
202 	node->small_pool = NULL;
203 
204 	spdk_free(node->large_pool_base);
205 	node->large_pool_base = NULL;
206 	spdk_ring_free(node->large_pool);
207 	node->large_pool = NULL;
208 }
209 
210 int
211 spdk_iobuf_initialize(void)
212 {
213 	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
214 	struct iobuf_node *node;
215 	int32_t i;
216 	int rc = 0;
217 
218 	/* Round up to the nearest alignment so that each element remains aligned */
219 	opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT);
220 	opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT);
221 
222 	IOBUF_FOREACH_NUMA_ID(i) {
223 		node = &g_iobuf.node[i];
224 		rc = iobuf_node_initialize(node, i);
225 		if (rc) {
226 			goto err;
227 		}
228 	}
229 
230 	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
231 				sizeof(struct iobuf_channel), "iobuf");
232 	g_iobuf_is_initialized = true;
233 
234 	return 0;
235 
236 err:
237 	IOBUF_FOREACH_NUMA_ID(i) {
238 		node = &g_iobuf.node[i];
239 		iobuf_node_free(node);
240 	}
241 	return rc;
242 }
243 
244 static void
245 iobuf_unregister_cb(void *io_device)
246 {
247 	struct iobuf_module *module;
248 	struct iobuf_node *node;
249 	int32_t i;
250 
251 	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
252 		module = TAILQ_FIRST(&g_iobuf.modules);
253 		TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
254 		free(module->name);
255 		free(module);
256 	}
257 
258 	IOBUF_FOREACH_NUMA_ID(i) {
259 		node = &g_iobuf.node[i];
260 		iobuf_node_free(node);
261 	}
262 
263 	if (g_iobuf.finish_cb != NULL) {
264 		g_iobuf.finish_cb(g_iobuf.finish_arg);
265 	}
266 }
267 
268 void
269 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
270 {
271 	if (!g_iobuf_is_initialized) {
272 		cb_fn(cb_arg);
273 		return;
274 	}
275 
276 	g_iobuf_is_initialized = false;
277 	g_iobuf.finish_cb = cb_fn;
278 	g_iobuf.finish_arg = cb_arg;
279 
280 	spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
281 }
282 
283 int
284 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
285 {
286 	if (!opts) {
287 		SPDK_ERRLOG("opts cannot be NULL\n");
288 		return -1;
289 	}
290 
291 	if (!opts->opts_size) {
292 		SPDK_ERRLOG("opts_size inside opts cannot be zero value\n");
293 		return -1;
294 	}
295 
296 	if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
297 		SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
298 			    IOBUF_MIN_SMALL_POOL_SIZE);
299 		return -EINVAL;
300 	}
301 	if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
302 		SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
303 			    IOBUF_MIN_LARGE_POOL_SIZE);
304 		return -EINVAL;
305 	}
306 
307 	if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
308 		SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n",
309 			    IOBUF_MIN_SMALL_BUFSIZE);
310 		return -EINVAL;
311 	}
312 
313 	if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
314 		SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n",
315 			    IOBUF_MIN_LARGE_BUFSIZE);
316 		return -EINVAL;
317 	}
318 
319 	if (opts->enable_numa &&
320 	    spdk_env_get_last_numa_id() >= SPDK_CONFIG_MAX_NUMA_NODES) {
321 		SPDK_ERRLOG("max NUMA ID %" PRIu32 " cannot be supported with "
322 			    "SPDK_CONFIG_MAX_NUMA_NODES %" PRIu32 "\n",
323 			    spdk_env_get_last_numa_id(), SPDK_CONFIG_MAX_NUMA_NODES);
324 		SPDK_ERRLOG("Re-configure with --max-numa-nodes=%" PRIu32 "\n",
325 			    spdk_env_get_last_numa_id() + 1);
326 		return -EINVAL;
327 	}
328 
329 #define SET_FIELD(field) \
330         if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts->opts_size) { \
331                 g_iobuf.opts.field = opts->field; \
332         } \
333 
334 	SET_FIELD(small_pool_count);
335 	SET_FIELD(large_pool_count);
336 	SET_FIELD(small_bufsize);
337 	SET_FIELD(large_bufsize);
338 	SET_FIELD(enable_numa);
339 
340 	g_iobuf.opts.opts_size = opts->opts_size;
341 
342 #undef SET_FIELD
343 
344 	return 0;
345 }
346 
347 void
348 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts, size_t opts_size)
349 {
350 	if (!opts) {
351 		SPDK_ERRLOG("opts should not be NULL\n");
352 		return;
353 	}
354 
355 	if (!opts_size) {
356 		SPDK_ERRLOG("opts_size should not be zero value\n");
357 		return;
358 	}
359 
360 	opts->opts_size = opts_size;
361 
362 #define SET_FIELD(field) \
363 	if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts_size) { \
364 		opts->field = g_iobuf.opts.field; \
365 	} \
366 
367 	SET_FIELD(small_pool_count);
368 	SET_FIELD(large_pool_count);
369 	SET_FIELD(small_bufsize);
370 	SET_FIELD(large_bufsize);
371 	SET_FIELD(enable_numa);
372 
373 #undef SET_FIELD
374 
375 	/* Do not remove this statement, you should always update this statement when you adding a new field,
376 	 * and do not forget to add the SET_FIELD statement for your added field. */
377 	SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_opts) == 40, "Incorrect size");
378 }
379 
380 static void
381 iobuf_channel_node_init(struct spdk_iobuf_channel *ch, struct iobuf_channel *iobuf_ch,
382 			int32_t numa_id, uint32_t small_cache_size, uint32_t large_cache_size)
383 {
384 	struct iobuf_node *node = &g_iobuf.node[numa_id];
385 	struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id];
386 	struct iobuf_channel_node *ch_node = &iobuf_ch->node[numa_id];
387 
388 	cache->small.queue = &ch_node->small_queue;
389 	cache->large.queue = &ch_node->large_queue;
390 	cache->small.pool = node->small_pool;
391 	cache->large.pool = node->large_pool;
392 	cache->small.bufsize = g_iobuf.opts.small_bufsize;
393 	cache->large.bufsize = g_iobuf.opts.large_bufsize;
394 	cache->small.cache_size = small_cache_size;
395 	cache->large.cache_size = large_cache_size;
396 	cache->small.cache_count = 0;
397 	cache->large.cache_count = 0;
398 
399 	STAILQ_INIT(&cache->small.cache);
400 	STAILQ_INIT(&cache->large.cache);
401 }
402 
403 static int
404 iobuf_channel_node_populate(struct spdk_iobuf_channel *ch, const char *name, int32_t numa_id)
405 {
406 	struct iobuf_node *node = &g_iobuf.node[numa_id];
407 	struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id];
408 	uint32_t small_cache_size = cache->small.cache_size;
409 	uint32_t large_cache_size = cache->large.cache_size;
410 	struct spdk_iobuf_buffer *buf;
411 	uint32_t i;
412 
413 	for (i = 0; i < small_cache_size; ++i) {
414 		if (spdk_ring_dequeue(node->small_pool, (void **)&buf, 1) == 0) {
415 			SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. "
416 				    "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
417 				    name, i, small_cache_size, g_iobuf.opts.small_pool_count);
418 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
419 				    "this value.\n");
420 			return -ENOMEM;
421 		}
422 		STAILQ_INSERT_TAIL(&cache->small.cache, buf, stailq);
423 		cache->small.cache_count++;
424 	}
425 	for (i = 0; i < large_cache_size; ++i) {
426 		if (spdk_ring_dequeue(node->large_pool, (void **)&buf, 1) == 0) {
427 			SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. "
428 				    "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
429 				    name, i, large_cache_size, g_iobuf.opts.large_pool_count);
430 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
431 				    "this value.\n");
432 			return -ENOMEM;
433 		}
434 		STAILQ_INSERT_TAIL(&cache->large.cache, buf, stailq);
435 		cache->large.cache_count++;
436 	}
437 
438 	return 0;
439 }
440 
441 int
442 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
443 			uint32_t small_cache_size, uint32_t large_cache_size)
444 {
445 	struct spdk_io_channel *ioch;
446 	struct iobuf_channel *iobuf_ch;
447 	struct iobuf_module *module;
448 	uint32_t i;
449 	int32_t numa_id;
450 	int rc;
451 
452 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
453 		if (strcmp(name, module->name) == 0) {
454 			break;
455 		}
456 	}
457 
458 	if (module == NULL) {
459 		SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
460 		return -ENODEV;
461 	}
462 
463 	ioch = spdk_get_io_channel(&g_iobuf);
464 	if (ioch == NULL) {
465 		SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
466 		return -ENOMEM;
467 	}
468 
469 	iobuf_ch = spdk_io_channel_get_ctx(ioch);
470 
471 	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
472 		if (iobuf_ch->channels[i] == NULL) {
473 			iobuf_ch->channels[i] = ch;
474 			break;
475 		}
476 	}
477 
478 	if (i == IOBUF_MAX_CHANNELS) {
479 		SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i);
480 		rc = -ENOMEM;
481 		goto error;
482 	}
483 
484 	ch->parent = ioch;
485 	ch->module = module;
486 
487 	IOBUF_FOREACH_NUMA_ID(numa_id) {
488 		iobuf_channel_node_init(ch, iobuf_ch, numa_id,
489 					small_cache_size, large_cache_size);
490 	}
491 
492 	IOBUF_FOREACH_NUMA_ID(numa_id) {
493 		rc = iobuf_channel_node_populate(ch, name, numa_id);
494 		if (rc) {
495 			goto error;
496 		}
497 	}
498 
499 	return 0;
500 error:
501 	spdk_iobuf_channel_fini(ch);
502 
503 	return rc;
504 }
505 
506 static void
507 iobuf_channel_node_fini(struct spdk_iobuf_channel *ch, int32_t numa_id)
508 {
509 	struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id];
510 	struct iobuf_node *node = &g_iobuf.node[numa_id];
511 	struct spdk_iobuf_entry *entry __attribute__((unused));
512 	struct spdk_iobuf_buffer *buf;
513 
514 	/* Make sure none of the wait queue entries are coming from this module */
515 	STAILQ_FOREACH(entry, cache->small.queue, stailq) {
516 		assert(entry->module != ch->module);
517 	}
518 	STAILQ_FOREACH(entry, cache->large.queue, stailq) {
519 		assert(entry->module != ch->module);
520 	}
521 
522 	/* Release cached buffers back to the pool */
523 	while (!STAILQ_EMPTY(&cache->small.cache)) {
524 		buf = STAILQ_FIRST(&cache->small.cache);
525 		STAILQ_REMOVE_HEAD(&cache->small.cache, stailq);
526 		spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL);
527 		cache->small.cache_count--;
528 	}
529 	while (!STAILQ_EMPTY(&cache->large.cache)) {
530 		buf = STAILQ_FIRST(&cache->large.cache);
531 		STAILQ_REMOVE_HEAD(&cache->large.cache, stailq);
532 		spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL);
533 		cache->large.cache_count--;
534 	}
535 
536 	assert(cache->small.cache_count == 0);
537 	assert(cache->large.cache_count == 0);
538 }
539 
540 void
541 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
542 {
543 	struct iobuf_channel *iobuf_ch;
544 	uint32_t i;
545 
546 	IOBUF_FOREACH_NUMA_ID(i) {
547 		iobuf_channel_node_fini(ch, i);
548 	}
549 
550 	iobuf_ch = spdk_io_channel_get_ctx(ch->parent);
551 	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
552 		if (iobuf_ch->channels[i] == ch) {
553 			iobuf_ch->channels[i] = NULL;
554 			break;
555 		}
556 	}
557 
558 	spdk_put_io_channel(ch->parent);
559 	ch->parent = NULL;
560 }
561 
562 int
563 spdk_iobuf_register_module(const char *name)
564 {
565 	struct iobuf_module *module;
566 
567 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
568 		if (strcmp(name, module->name) == 0) {
569 			return -EEXIST;
570 		}
571 	}
572 
573 	module = calloc(1, sizeof(*module));
574 	if (module == NULL) {
575 		return -ENOMEM;
576 	}
577 
578 	module->name = strdup(name);
579 	if (module->name == NULL) {
580 		free(module);
581 		return -ENOMEM;
582 	}
583 
584 	TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
585 
586 	return 0;
587 }
588 
589 int
590 spdk_iobuf_unregister_module(const char *name)
591 {
592 	struct iobuf_module *module;
593 
594 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
595 		if (strcmp(name, module->name) == 0) {
596 			TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
597 			free(module->name);
598 			free(module);
599 			return 0;
600 		}
601 	}
602 
603 	return -ENOENT;
604 }
605 
606 static int
607 iobuf_pool_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool_cache *pool,
608 			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
609 {
610 	struct spdk_iobuf_entry *entry, *tmp;
611 	int rc;
612 
613 	STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
614 		/* We only want to iterate over the entries requested by the module which owns ch */
615 		if (entry->module != ch->module) {
616 			continue;
617 		}
618 
619 		rc = cb_fn(ch, entry, cb_ctx);
620 		if (rc != 0) {
621 			return rc;
622 		}
623 	}
624 
625 	return 0;
626 }
627 
628 int
629 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch,
630 			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
631 {
632 	struct spdk_iobuf_node_cache *cache;
633 	uint32_t i;
634 	int rc;
635 
636 	IOBUF_FOREACH_NUMA_ID(i) {
637 		cache = &ch->cache[i];
638 
639 		rc = iobuf_pool_for_each_entry(ch, &cache->small, cb_fn, cb_ctx);
640 		if (rc != 0) {
641 			return rc;
642 		}
643 		rc = iobuf_pool_for_each_entry(ch, &cache->large, cb_fn, cb_ctx);
644 		if (rc != 0) {
645 			return rc;
646 		}
647 	}
648 
649 	return 0;
650 }
651 
652 static bool
653 iobuf_entry_abort_node(struct spdk_iobuf_channel *ch, int32_t numa_id,
654 		       struct spdk_iobuf_entry *entry, uint64_t len)
655 {
656 	struct spdk_iobuf_node_cache *cache;
657 	struct spdk_iobuf_pool_cache *pool;
658 	struct spdk_iobuf_entry *e;
659 
660 	cache = &ch->cache[numa_id];
661 
662 	if (len <= cache->small.bufsize) {
663 		pool = &cache->small;
664 	} else {
665 		assert(len <= cache->large.bufsize);
666 		pool = &cache->large;
667 	}
668 
669 	STAILQ_FOREACH(e, pool->queue, stailq) {
670 		if (e == entry) {
671 			STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
672 			return true;
673 		}
674 	}
675 
676 	return false;
677 }
678 
679 void
680 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
681 		       uint64_t len)
682 {
683 	uint32_t i;
684 
685 	IOBUF_FOREACH_NUMA_ID(i) {
686 		iobuf_entry_abort_node(ch, i, entry, len);
687 	}
688 }
689 
690 #define IOBUF_BATCH_SIZE 32
691 
692 void *
693 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
694 	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
695 {
696 	struct spdk_iobuf_node_cache *cache;
697 	struct spdk_iobuf_pool_cache *pool;
698 	void *buf;
699 
700 	cache = &ch->cache[0];
701 
702 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
703 	if (len <= cache->small.bufsize) {
704 		pool = &cache->small;
705 	} else {
706 		assert(len <= cache->large.bufsize);
707 		pool = &cache->large;
708 	}
709 
710 	buf = (void *)STAILQ_FIRST(&pool->cache);
711 	if (buf) {
712 		STAILQ_REMOVE_HEAD(&pool->cache, stailq);
713 		assert(pool->cache_count > 0);
714 		pool->cache_count--;
715 		pool->stats.cache++;
716 	} else {
717 		struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
718 		size_t sz, i;
719 
720 		/* If we're going to dequeue, we may as well dequeue a batch. */
721 		sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE,
722 				       spdk_max(pool->cache_size, 1)));
723 		if (sz == 0) {
724 			if (entry) {
725 				STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
726 				entry->module = ch->module;
727 				entry->cb_fn = cb_fn;
728 				pool->stats.retry++;
729 			}
730 
731 			return NULL;
732 		}
733 
734 		pool->stats.main++;
735 		for (i = 0; i < (sz - 1); i++) {
736 			STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq);
737 			pool->cache_count++;
738 		}
739 
740 		/* The last one is the one we'll return */
741 		buf = bufs[i];
742 	}
743 
744 	return (char *)buf;
745 }
746 
747 void
748 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
749 {
750 	struct spdk_iobuf_entry *entry;
751 	struct spdk_iobuf_buffer *iobuf_buf;
752 	struct spdk_iobuf_node_cache *cache;
753 	struct spdk_iobuf_pool_cache *pool;
754 	uint32_t numa_id;
755 	size_t sz;
756 
757 	if (g_iobuf.opts.enable_numa) {
758 		numa_id = spdk_mem_get_numa_id(buf, NULL);
759 	} else {
760 		numa_id = 0;
761 	}
762 
763 	cache = &ch->cache[numa_id];
764 
765 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
766 	if (len <= cache->small.bufsize) {
767 		pool = &cache->small;
768 	} else {
769 		pool = &cache->large;
770 	}
771 
772 	if (STAILQ_EMPTY(pool->queue)) {
773 		if (pool->cache_size == 0) {
774 			spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
775 			return;
776 		}
777 
778 		iobuf_buf = (struct spdk_iobuf_buffer *)buf;
779 
780 		STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
781 		pool->cache_count++;
782 
783 		/* The cache size may exceed the configured amount. We always dequeue from the
784 		 * central pool in batches of known size, so wait until at least a batch
785 		 * has been returned to actually return the buffers to the central pool. */
786 		sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size);
787 		if (pool->cache_count >= pool->cache_size + sz) {
788 			struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
789 			size_t i;
790 
791 			for (i = 0; i < sz; i++) {
792 				bufs[i] = STAILQ_FIRST(&pool->cache);
793 				STAILQ_REMOVE_HEAD(&pool->cache, stailq);
794 				assert(pool->cache_count > 0);
795 				pool->cache_count--;
796 			}
797 
798 			spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL);
799 		}
800 	} else {
801 		entry = STAILQ_FIRST(pool->queue);
802 		STAILQ_REMOVE_HEAD(pool->queue, stailq);
803 		entry->cb_fn(entry, buf);
804 		if (spdk_unlikely(entry == STAILQ_LAST(pool->queue, spdk_iobuf_entry, stailq))) {
805 			STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
806 			STAILQ_INSERT_HEAD(pool->queue, entry, stailq);
807 		}
808 	}
809 }
810 
811 static void
812 iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status)
813 {
814 	struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
815 
816 	ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg);
817 	free(ctx->modules);
818 	free(ctx);
819 }
820 
821 static void
822 iobuf_get_channel_stats(struct spdk_io_channel_iter *iter)
823 {
824 	struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
825 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter);
826 	struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch);
827 	struct spdk_iobuf_channel *channel;
828 	struct iobuf_module *module;
829 	struct spdk_iobuf_module_stats *it;
830 	uint32_t i, j;
831 
832 	for (i = 0; i < ctx->num_modules; ++i) {
833 		for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) {
834 			channel = iobuf_ch->channels[j];
835 			if (channel == NULL) {
836 				continue;
837 			}
838 
839 			it = &ctx->modules[i];
840 			module = (struct iobuf_module *)channel->module;
841 			if (strcmp(it->module, module->name) == 0) {
842 				struct spdk_iobuf_pool_cache *cache;
843 				uint32_t i;
844 
845 				IOBUF_FOREACH_NUMA_ID(i) {
846 					cache = &channel->cache[i].small;
847 					it->small_pool.cache += cache->stats.cache;
848 					it->small_pool.main += cache->stats.main;
849 					it->small_pool.retry += cache->stats.retry;
850 
851 					cache = &channel->cache[i].large;
852 					it->large_pool.cache += cache->stats.cache;
853 					it->large_pool.main += cache->stats.main;
854 					it->large_pool.retry += cache->stats.retry;
855 				}
856 				break;
857 			}
858 		}
859 	}
860 
861 	spdk_for_each_channel_continue(iter, 0);
862 }
863 
864 int
865 spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg)
866 {
867 	struct iobuf_module *module;
868 	struct iobuf_get_stats_ctx *ctx;
869 	uint32_t i;
870 
871 	ctx = calloc(1, sizeof(*ctx));
872 	if (ctx == NULL) {
873 		return -ENOMEM;
874 	}
875 
876 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
877 		++ctx->num_modules;
878 	}
879 
880 	ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats));
881 	if (ctx->modules == NULL) {
882 		free(ctx);
883 		return -ENOMEM;
884 	}
885 
886 	i = 0;
887 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
888 		ctx->modules[i].module = module->name;
889 		++i;
890 	}
891 
892 	ctx->cb_fn = cb_fn;
893 	ctx->cb_arg = cb_arg;
894 
895 	spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx,
896 			      iobuf_get_channel_stats_done);
897 	return 0;
898 }
899