xref: /spdk/lib/thread/iobuf.c (revision 57fd99b91e71a4baa5543e19ff83958dc99d4dac)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/util.h"
8 #include "spdk/likely.h"
9 #include "spdk/log.h"
10 #include "spdk/thread.h"
11 
12 #define IOBUF_MIN_SMALL_POOL_SIZE	64
13 #define IOBUF_MIN_LARGE_POOL_SIZE	8
14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE	8192
15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE	1024
16 #define IOBUF_ALIGNMENT			4096
17 #define IOBUF_MIN_SMALL_BUFSIZE		4096
18 #define IOBUF_MIN_LARGE_BUFSIZE		8192
19 #define IOBUF_DEFAULT_SMALL_BUFSIZE	(8 * 1024)
20 /* 132k is a weird choice at first, but this needs to be large enough to accommodate
21  * the default maximum size (128k) plus metadata everywhere. For code paths that
22  * are explicitly configured, the math is instead done properly. This is only
23  * for the default. */
24 #define IOBUF_DEFAULT_LARGE_BUFSIZE	(132 * 1024)
25 #define IOBUF_MAX_CHANNELS		64
26 
27 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
28 		   "Invalid data offset");
29 
30 static bool g_iobuf_is_initialized = false;
31 
32 struct iobuf_channel {
33 	spdk_iobuf_entry_stailq_t	small_queue;
34 	spdk_iobuf_entry_stailq_t	large_queue;
35 	struct spdk_iobuf_channel	*channels[IOBUF_MAX_CHANNELS];
36 };
37 
38 struct iobuf_module {
39 	char				*name;
40 	TAILQ_ENTRY(iobuf_module)	tailq;
41 };
42 
43 struct iobuf {
44 	struct spdk_ring		*small_pool;
45 	struct spdk_ring		*large_pool;
46 	void				*small_pool_base;
47 	void				*large_pool_base;
48 	struct spdk_iobuf_opts		opts;
49 	TAILQ_HEAD(, iobuf_module)	modules;
50 	spdk_iobuf_finish_cb		finish_cb;
51 	void				*finish_arg;
52 };
53 
54 static struct iobuf g_iobuf = {
55 	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
56 	.small_pool = NULL,
57 	.large_pool = NULL,
58 	.small_pool_base = NULL,
59 	.large_pool_base = NULL,
60 	.opts = {
61 		.small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE,
62 		.large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE,
63 		.small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE,
64 		.large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE,
65 	},
66 };
67 
68 struct iobuf_get_stats_ctx {
69 	struct spdk_iobuf_module_stats	*modules;
70 	uint32_t			num_modules;
71 	spdk_iobuf_get_stats_cb		cb_fn;
72 	void				*cb_arg;
73 };
74 
75 static int
76 iobuf_channel_create_cb(void *io_device, void *ctx)
77 {
78 	struct iobuf_channel *ch = ctx;
79 
80 	STAILQ_INIT(&ch->small_queue);
81 	STAILQ_INIT(&ch->large_queue);
82 
83 	return 0;
84 }
85 
86 static void
87 iobuf_channel_destroy_cb(void *io_device, void *ctx)
88 {
89 	struct iobuf_channel *ch __attribute__((unused)) = ctx;
90 
91 	assert(STAILQ_EMPTY(&ch->small_queue));
92 	assert(STAILQ_EMPTY(&ch->large_queue));
93 }
94 
95 int
96 spdk_iobuf_initialize(void)
97 {
98 	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
99 	int rc = 0;
100 	uint64_t i;
101 	struct spdk_iobuf_buffer *buf;
102 
103 	g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
104 					      SPDK_ENV_SOCKET_ID_ANY);
105 	if (!g_iobuf.small_pool) {
106 		SPDK_ERRLOG("Failed to create small iobuf pool\n");
107 		rc = -ENOMEM;
108 		goto error;
109 	}
110 
111 	/* Round up to the nearest alignment so that each element remains aligned */
112 	opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT);
113 	g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
114 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
115 	if (g_iobuf.small_pool_base == NULL) {
116 		SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
117 		rc = -ENOMEM;
118 		goto error;
119 	}
120 
121 	g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
122 					      SPDK_ENV_SOCKET_ID_ANY);
123 	if (!g_iobuf.large_pool) {
124 		SPDK_ERRLOG("Failed to create large iobuf pool\n");
125 		rc = -ENOMEM;
126 		goto error;
127 	}
128 
129 	/* Round up to the nearest alignment so that each element remains aligned */
130 	opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT);
131 	g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
132 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
133 	if (g_iobuf.large_pool_base == NULL) {
134 		SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
135 		rc = -ENOMEM;
136 		goto error;
137 	}
138 
139 	for (i = 0; i < opts->small_pool_count; i++) {
140 		buf = g_iobuf.small_pool_base + i * opts->small_bufsize;
141 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
142 	}
143 
144 	for (i = 0; i < opts->large_pool_count; i++) {
145 		buf = g_iobuf.large_pool_base + i * opts->large_bufsize;
146 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
147 	}
148 
149 	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
150 				sizeof(struct iobuf_channel), "iobuf");
151 	g_iobuf_is_initialized = true;
152 
153 	return 0;
154 error:
155 	spdk_free(g_iobuf.small_pool_base);
156 	spdk_ring_free(g_iobuf.small_pool);
157 	spdk_free(g_iobuf.large_pool_base);
158 	spdk_ring_free(g_iobuf.large_pool);
159 
160 	return rc;
161 }
162 
163 static void
164 iobuf_unregister_cb(void *io_device)
165 {
166 	struct iobuf_module *module;
167 
168 	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
169 		module = TAILQ_FIRST(&g_iobuf.modules);
170 		TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
171 		free(module->name);
172 		free(module);
173 	}
174 
175 	if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
176 		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
177 			    spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
178 	}
179 
180 	if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
181 		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
182 			    spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
183 	}
184 
185 	spdk_free(g_iobuf.small_pool_base);
186 	g_iobuf.small_pool_base = NULL;
187 	spdk_ring_free(g_iobuf.small_pool);
188 	g_iobuf.small_pool = NULL;
189 
190 	spdk_free(g_iobuf.large_pool_base);
191 	g_iobuf.large_pool_base = NULL;
192 	spdk_ring_free(g_iobuf.large_pool);
193 	g_iobuf.large_pool = NULL;
194 
195 	if (g_iobuf.finish_cb != NULL) {
196 		g_iobuf.finish_cb(g_iobuf.finish_arg);
197 	}
198 }
199 
200 void
201 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
202 {
203 	if (!g_iobuf_is_initialized) {
204 		cb_fn(cb_arg);
205 		return;
206 	}
207 
208 	g_iobuf_is_initialized = false;
209 	g_iobuf.finish_cb = cb_fn;
210 	g_iobuf.finish_arg = cb_arg;
211 
212 	spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
213 }
214 
215 int
216 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
217 {
218 	if (!opts) {
219 		SPDK_ERRLOG("opts cannot be NULL\n");
220 		return -1;
221 	}
222 
223 	if (!opts->opts_size) {
224 		SPDK_ERRLOG("opts_size inside opts cannot be zero value\n");
225 		return -1;
226 	}
227 
228 	if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
229 		SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
230 			    IOBUF_MIN_SMALL_POOL_SIZE);
231 		return -EINVAL;
232 	}
233 	if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
234 		SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
235 			    IOBUF_MIN_LARGE_POOL_SIZE);
236 		return -EINVAL;
237 	}
238 
239 	if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
240 		SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n",
241 			    IOBUF_MIN_SMALL_BUFSIZE);
242 		return -EINVAL;
243 	}
244 
245 	if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
246 		SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n",
247 			    IOBUF_MIN_LARGE_BUFSIZE);
248 		return -EINVAL;
249 	}
250 
251 #define SET_FIELD(field) \
252         if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts->opts_size) { \
253                 g_iobuf.opts.field = opts->field; \
254         } \
255 
256 	SET_FIELD(small_pool_count);
257 	SET_FIELD(large_pool_count);
258 	SET_FIELD(small_bufsize);
259 	SET_FIELD(large_bufsize);
260 
261 	g_iobuf.opts.opts_size = opts->opts_size;
262 
263 #undef SET_FIELD
264 
265 	return 0;
266 }
267 
268 void
269 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts, size_t opts_size)
270 {
271 	if (!opts) {
272 		SPDK_ERRLOG("opts should not be NULL\n");
273 		return;
274 	}
275 
276 	if (!opts_size) {
277 		SPDK_ERRLOG("opts_size should not be zero value\n");
278 		return;
279 	}
280 
281 	opts->opts_size = opts_size;
282 
283 #define SET_FIELD(field) \
284 	if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts_size) { \
285 		opts->field = g_iobuf.opts.field; \
286 	} \
287 
288 	SET_FIELD(small_pool_count);
289 	SET_FIELD(large_pool_count);
290 	SET_FIELD(small_bufsize);
291 	SET_FIELD(large_bufsize);
292 
293 #undef SET_FIELD
294 
295 	/* Do not remove this statement, you should always update this statement when you adding a new field,
296 	 * and do not forget to add the SET_FIELD statement for your added field. */
297 	SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_opts) == 32, "Incorrect size");
298 }
299 
300 
301 int
302 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
303 			uint32_t small_cache_size, uint32_t large_cache_size)
304 {
305 	struct spdk_io_channel *ioch;
306 	struct iobuf_channel *iobuf_ch;
307 	struct iobuf_module *module;
308 	struct spdk_iobuf_buffer *buf;
309 	uint32_t i;
310 
311 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
312 		if (strcmp(name, module->name) == 0) {
313 			break;
314 		}
315 	}
316 
317 	if (module == NULL) {
318 		SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
319 		return -ENODEV;
320 	}
321 
322 	ioch = spdk_get_io_channel(&g_iobuf);
323 	if (ioch == NULL) {
324 		SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
325 		return -ENOMEM;
326 	}
327 
328 	iobuf_ch = spdk_io_channel_get_ctx(ioch);
329 
330 	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
331 		if (iobuf_ch->channels[i] == NULL) {
332 			iobuf_ch->channels[i] = ch;
333 			break;
334 		}
335 	}
336 
337 	if (i == IOBUF_MAX_CHANNELS) {
338 		SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i);
339 		goto error;
340 	}
341 
342 	ch->small.queue = &iobuf_ch->small_queue;
343 	ch->large.queue = &iobuf_ch->large_queue;
344 	ch->small.pool = g_iobuf.small_pool;
345 	ch->large.pool = g_iobuf.large_pool;
346 	ch->small.bufsize = g_iobuf.opts.small_bufsize;
347 	ch->large.bufsize = g_iobuf.opts.large_bufsize;
348 	ch->parent = ioch;
349 	ch->module = module;
350 	ch->small.cache_size = small_cache_size;
351 	ch->large.cache_size = large_cache_size;
352 	ch->small.cache_count = 0;
353 	ch->large.cache_count = 0;
354 
355 	STAILQ_INIT(&ch->small.cache);
356 	STAILQ_INIT(&ch->large.cache);
357 
358 	for (i = 0; i < small_cache_size; ++i) {
359 		if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) {
360 			SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. "
361 				    "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
362 				    name, i, small_cache_size, g_iobuf.opts.small_pool_count);
363 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
364 				    "this value.\n");
365 			goto error;
366 		}
367 		STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
368 		ch->small.cache_count++;
369 	}
370 	for (i = 0; i < large_cache_size; ++i) {
371 		if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) {
372 			SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. "
373 				    "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
374 				    name, i, large_cache_size, g_iobuf.opts.large_pool_count);
375 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
376 				    "this value.\n");
377 			goto error;
378 		}
379 		STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
380 		ch->large.cache_count++;
381 	}
382 
383 	return 0;
384 error:
385 	spdk_iobuf_channel_fini(ch);
386 
387 	return -ENOMEM;
388 }
389 
390 void
391 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
392 {
393 	struct spdk_iobuf_entry *entry __attribute__((unused));
394 	struct spdk_iobuf_buffer *buf;
395 	struct iobuf_channel *iobuf_ch;
396 	uint32_t i;
397 
398 	/* Make sure none of the wait queue entries are coming from this module */
399 	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
400 		assert(entry->module != ch->module);
401 	}
402 	STAILQ_FOREACH(entry, ch->large.queue, stailq) {
403 		assert(entry->module != ch->module);
404 	}
405 
406 	/* Release cached buffers back to the pool */
407 	while (!STAILQ_EMPTY(&ch->small.cache)) {
408 		buf = STAILQ_FIRST(&ch->small.cache);
409 		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
410 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
411 		ch->small.cache_count--;
412 	}
413 	while (!STAILQ_EMPTY(&ch->large.cache)) {
414 		buf = STAILQ_FIRST(&ch->large.cache);
415 		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
416 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
417 		ch->large.cache_count--;
418 	}
419 
420 	assert(ch->small.cache_count == 0);
421 	assert(ch->large.cache_count == 0);
422 
423 	iobuf_ch = spdk_io_channel_get_ctx(ch->parent);
424 	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
425 		if (iobuf_ch->channels[i] == ch) {
426 			iobuf_ch->channels[i] = NULL;
427 			break;
428 		}
429 	}
430 
431 	spdk_put_io_channel(ch->parent);
432 	ch->parent = NULL;
433 }
434 
435 int
436 spdk_iobuf_register_module(const char *name)
437 {
438 	struct iobuf_module *module;
439 
440 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
441 		if (strcmp(name, module->name) == 0) {
442 			return -EEXIST;
443 		}
444 	}
445 
446 	module = calloc(1, sizeof(*module));
447 	if (module == NULL) {
448 		return -ENOMEM;
449 	}
450 
451 	module->name = strdup(name);
452 	if (module->name == NULL) {
453 		free(module);
454 		return -ENOMEM;
455 	}
456 
457 	TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
458 
459 	return 0;
460 }
461 
462 int
463 spdk_iobuf_unregister_module(const char *name)
464 {
465 	struct iobuf_module *module;
466 
467 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
468 		if (strcmp(name, module->name) == 0) {
469 			TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
470 			free(module->name);
471 			free(module);
472 			return 0;
473 		}
474 	}
475 
476 	return -ENOENT;
477 }
478 
479 int
480 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
481 			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
482 {
483 	struct spdk_iobuf_entry *entry, *tmp;
484 	int rc;
485 
486 	STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
487 		/* We only want to iterate over the entries requested by the module which owns ch */
488 		if (entry->module != ch->module) {
489 			continue;
490 		}
491 
492 		rc = cb_fn(ch, entry, cb_ctx);
493 		if (rc != 0) {
494 			return rc;
495 		}
496 	}
497 
498 	return 0;
499 }
500 
501 void
502 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
503 		       uint64_t len)
504 {
505 	struct spdk_iobuf_pool *pool;
506 
507 	if (len <= ch->small.bufsize) {
508 		pool = &ch->small;
509 	} else {
510 		assert(len <= ch->large.bufsize);
511 		pool = &ch->large;
512 	}
513 
514 	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
515 }
516 
517 #define IOBUF_BATCH_SIZE 32
518 
519 void *
520 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
521 	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
522 {
523 	struct spdk_iobuf_pool *pool;
524 	void *buf;
525 
526 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
527 	if (len <= ch->small.bufsize) {
528 		pool = &ch->small;
529 	} else {
530 		assert(len <= ch->large.bufsize);
531 		pool = &ch->large;
532 	}
533 
534 	buf = (void *)STAILQ_FIRST(&pool->cache);
535 	if (buf) {
536 		STAILQ_REMOVE_HEAD(&pool->cache, stailq);
537 		assert(pool->cache_count > 0);
538 		pool->cache_count--;
539 		pool->stats.cache++;
540 	} else {
541 		struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
542 		size_t sz, i;
543 
544 		/* If we're going to dequeue, we may as well dequeue a batch. */
545 		sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE,
546 				       spdk_max(pool->cache_size, 1)));
547 		if (sz == 0) {
548 			if (entry) {
549 				STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
550 				entry->module = ch->module;
551 				entry->cb_fn = cb_fn;
552 				pool->stats.retry++;
553 			}
554 
555 			return NULL;
556 		}
557 
558 		pool->stats.main++;
559 		for (i = 0; i < (sz - 1); i++) {
560 			STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq);
561 			pool->cache_count++;
562 		}
563 
564 		/* The last one is the one we'll return */
565 		buf = bufs[i];
566 	}
567 
568 	return (char *)buf;
569 }
570 
571 void
572 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
573 {
574 	struct spdk_iobuf_entry *entry;
575 	struct spdk_iobuf_buffer *iobuf_buf;
576 	struct spdk_iobuf_pool *pool;
577 	size_t sz;
578 
579 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
580 	if (len <= ch->small.bufsize) {
581 		pool = &ch->small;
582 	} else {
583 		pool = &ch->large;
584 	}
585 
586 	if (STAILQ_EMPTY(pool->queue)) {
587 		if (pool->cache_size == 0) {
588 			spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
589 			return;
590 		}
591 
592 		iobuf_buf = (struct spdk_iobuf_buffer *)buf;
593 
594 		STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
595 		pool->cache_count++;
596 
597 		/* The cache size may exceed the configured amount. We always dequeue from the
598 		 * central pool in batches of known size, so wait until at least a batch
599 		 * has been returned to actually return the buffers to the central pool. */
600 		sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size);
601 		if (pool->cache_count >= pool->cache_size + sz) {
602 			struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
603 			size_t i;
604 
605 			for (i = 0; i < sz; i++) {
606 				bufs[i] = STAILQ_FIRST(&pool->cache);
607 				STAILQ_REMOVE_HEAD(&pool->cache, stailq);
608 				assert(pool->cache_count > 0);
609 				pool->cache_count--;
610 			}
611 
612 			spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL);
613 		}
614 	} else {
615 		entry = STAILQ_FIRST(pool->queue);
616 		STAILQ_REMOVE_HEAD(pool->queue, stailq);
617 		entry->cb_fn(entry, buf);
618 		if (spdk_unlikely(entry == STAILQ_LAST(pool->queue, spdk_iobuf_entry, stailq))) {
619 			STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
620 			STAILQ_INSERT_HEAD(pool->queue, entry, stailq);
621 		}
622 	}
623 }
624 
625 static void
626 iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status)
627 {
628 	struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
629 
630 	ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg);
631 	free(ctx->modules);
632 	free(ctx);
633 }
634 
635 static void
636 iobuf_get_channel_stats(struct spdk_io_channel_iter *iter)
637 {
638 	struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
639 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter);
640 	struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch);
641 	struct spdk_iobuf_channel *channel;
642 	struct iobuf_module *module;
643 	struct spdk_iobuf_module_stats *it;
644 	uint32_t i, j;
645 
646 	for (i = 0; i < ctx->num_modules; ++i) {
647 		for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) {
648 			channel = iobuf_ch->channels[j];
649 			if (channel == NULL) {
650 				continue;
651 			}
652 
653 			it = &ctx->modules[i];
654 			module = (struct iobuf_module *)channel->module;
655 			if (strcmp(it->module, module->name) == 0) {
656 				it->small_pool.cache += channel->small.stats.cache;
657 				it->small_pool.main += channel->small.stats.main;
658 				it->small_pool.retry += channel->small.stats.retry;
659 				it->large_pool.cache += channel->large.stats.cache;
660 				it->large_pool.main += channel->large.stats.main;
661 				it->large_pool.retry += channel->large.stats.retry;
662 				break;
663 			}
664 		}
665 	}
666 
667 	spdk_for_each_channel_continue(iter, 0);
668 }
669 
670 int
671 spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg)
672 {
673 	struct iobuf_module *module;
674 	struct iobuf_get_stats_ctx *ctx;
675 	uint32_t i;
676 
677 	ctx = calloc(1, sizeof(*ctx));
678 	if (ctx == NULL) {
679 		return -ENOMEM;
680 	}
681 
682 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
683 		++ctx->num_modules;
684 	}
685 
686 	ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats));
687 	if (ctx->modules == NULL) {
688 		free(ctx);
689 		return -ENOMEM;
690 	}
691 
692 	i = 0;
693 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
694 		ctx->modules[i].module = module->name;
695 		++i;
696 	}
697 
698 	ctx->cb_fn = cb_fn;
699 	ctx->cb_arg = cb_arg;
700 
701 	spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx,
702 			      iobuf_get_channel_stats_done);
703 	return 0;
704 }
705