xref: /spdk/lib/thread/iobuf.c (revision 95a367d64eadbe63e59259f0a9f30e525c345140)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/util.h"
8 #include "spdk/likely.h"
9 #include "spdk/log.h"
10 #include "spdk/thread.h"
11 
12 #define IOBUF_MIN_SMALL_POOL_SIZE	64
13 #define IOBUF_MIN_LARGE_POOL_SIZE	8
14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE	8192
15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE	1024
16 #define IOBUF_ALIGNMENT			4096
17 #define IOBUF_MIN_SMALL_BUFSIZE		4096
18 #define IOBUF_MIN_LARGE_BUFSIZE		8192
19 #define IOBUF_DEFAULT_SMALL_BUFSIZE	(8 * 1024)
20 /* 132k is a weird choice at first, but this needs to be large enough to accomodate
21  * the default maximum size (128k) plus metadata everywhere. For code paths that
22  * are explicitly configured, the math is instead done properly. This is only
23  * for the default. */
24 #define IOBUF_DEFAULT_LARGE_BUFSIZE	(132 * 1024)
25 
26 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
27 		   "Invalid data offset");
28 
29 struct iobuf_channel {
30 	spdk_iobuf_entry_stailq_t small_queue;
31 	spdk_iobuf_entry_stailq_t large_queue;
32 };
33 
34 struct iobuf_module {
35 	char				*name;
36 	TAILQ_ENTRY(iobuf_module)	tailq;
37 };
38 
39 struct iobuf {
40 	struct spdk_ring		*small_pool;
41 	struct spdk_ring		*large_pool;
42 	void				*small_pool_base;
43 	void				*large_pool_base;
44 	struct spdk_iobuf_opts		opts;
45 	TAILQ_HEAD(, iobuf_module)	modules;
46 	spdk_iobuf_finish_cb		finish_cb;
47 	void				*finish_arg;
48 };
49 
50 static struct iobuf g_iobuf = {
51 	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
52 	.small_pool = NULL,
53 	.large_pool = NULL,
54 	.small_pool_base = NULL,
55 	.large_pool_base = NULL,
56 	.opts = {
57 		.small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE,
58 		.large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE,
59 		.small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE,
60 		.large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE,
61 	},
62 };
63 
64 static int
65 iobuf_channel_create_cb(void *io_device, void *ctx)
66 {
67 	struct iobuf_channel *ch = ctx;
68 
69 	STAILQ_INIT(&ch->small_queue);
70 	STAILQ_INIT(&ch->large_queue);
71 
72 	return 0;
73 }
74 
75 static void
76 iobuf_channel_destroy_cb(void *io_device, void *ctx)
77 {
78 	struct iobuf_channel *ch __attribute__((unused)) = ctx;
79 
80 	assert(STAILQ_EMPTY(&ch->small_queue));
81 	assert(STAILQ_EMPTY(&ch->large_queue));
82 }
83 
84 int
85 spdk_iobuf_initialize(void)
86 {
87 	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
88 	int rc = 0;
89 	uint64_t i;
90 	struct spdk_iobuf_buffer *buf;
91 
92 	g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
93 					      SPDK_ENV_SOCKET_ID_ANY);
94 	if (!g_iobuf.small_pool) {
95 		SPDK_ERRLOG("Failed to create small iobuf pool\n");
96 		rc = -ENOMEM;
97 		goto error;
98 	}
99 
100 	/* Round up to the nearest alignment so that each element remains aligned */
101 	opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT);
102 	g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
103 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
104 	if (g_iobuf.small_pool_base == NULL) {
105 		SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
106 		rc = -ENOMEM;
107 		goto error;
108 	}
109 
110 	g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
111 					      SPDK_ENV_SOCKET_ID_ANY);
112 	if (!g_iobuf.large_pool) {
113 		SPDK_ERRLOG("Failed to create large iobuf pool\n");
114 		rc = -ENOMEM;
115 		goto error;
116 	}
117 
118 	/* Round up to the nearest alignment so that each element remains aligned */
119 	opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT);
120 	g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
121 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
122 	if (g_iobuf.large_pool_base == NULL) {
123 		SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
124 		rc = -ENOMEM;
125 		goto error;
126 	}
127 
128 	for (i = 0; i < opts->small_pool_count; i++) {
129 		buf = g_iobuf.small_pool_base + i * opts->small_bufsize;
130 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
131 	}
132 
133 	for (i = 0; i < opts->large_pool_count; i++) {
134 		buf = g_iobuf.large_pool_base + i * opts->large_bufsize;
135 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
136 	}
137 
138 	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
139 				sizeof(struct iobuf_channel), "iobuf");
140 
141 	return 0;
142 error:
143 	spdk_free(g_iobuf.small_pool_base);
144 	spdk_ring_free(g_iobuf.small_pool);
145 	spdk_free(g_iobuf.large_pool_base);
146 	spdk_ring_free(g_iobuf.large_pool);
147 
148 	return rc;
149 }
150 
151 static void
152 iobuf_unregister_cb(void *io_device)
153 {
154 	struct iobuf_module *module;
155 
156 	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
157 		module = TAILQ_FIRST(&g_iobuf.modules);
158 		TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
159 		free(module->name);
160 		free(module);
161 	}
162 
163 	if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
164 		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
165 			    spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
166 	}
167 
168 	if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
169 		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
170 			    spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
171 	}
172 
173 	spdk_free(g_iobuf.small_pool_base);
174 	g_iobuf.small_pool_base = NULL;
175 	spdk_ring_free(g_iobuf.small_pool);
176 	g_iobuf.small_pool = NULL;
177 
178 	spdk_free(g_iobuf.large_pool_base);
179 	g_iobuf.large_pool_base = NULL;
180 	spdk_ring_free(g_iobuf.large_pool);
181 	g_iobuf.large_pool = NULL;
182 
183 	if (g_iobuf.finish_cb != NULL) {
184 		g_iobuf.finish_cb(g_iobuf.finish_arg);
185 	}
186 }
187 
188 void
189 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
190 {
191 	g_iobuf.finish_cb = cb_fn;
192 	g_iobuf.finish_arg = cb_arg;
193 
194 	spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
195 }
196 
197 int
198 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
199 {
200 	if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
201 		SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
202 			    IOBUF_MIN_SMALL_POOL_SIZE);
203 		return -EINVAL;
204 	}
205 	if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
206 		SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
207 			    IOBUF_MIN_LARGE_POOL_SIZE);
208 		return -EINVAL;
209 	}
210 
211 	g_iobuf.opts = *opts;
212 
213 	if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
214 		SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 ". Automatically increasing.\n",
215 			    IOBUF_MIN_SMALL_BUFSIZE);
216 		g_iobuf.opts.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE;
217 	}
218 
219 	if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
220 		SPDK_WARNLOG("large_bufsize must be at least %" PRIu32 ". Automatically increasing.\n",
221 			     IOBUF_MIN_LARGE_BUFSIZE);
222 		g_iobuf.opts.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE;
223 	}
224 
225 	return 0;
226 }
227 
228 void
229 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts)
230 {
231 	*opts = g_iobuf.opts;
232 }
233 
234 int
235 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
236 			uint32_t small_cache_size, uint32_t large_cache_size)
237 {
238 	struct spdk_io_channel *ioch;
239 	struct iobuf_channel *iobuf_ch;
240 	struct iobuf_module *module;
241 	struct spdk_iobuf_buffer *buf;
242 	uint32_t i;
243 
244 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
245 		if (strcmp(name, module->name) == 0) {
246 			break;
247 		}
248 	}
249 
250 	if (module == NULL) {
251 		SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
252 		return -ENODEV;
253 	}
254 
255 	ioch = spdk_get_io_channel(&g_iobuf);
256 	if (ioch == NULL) {
257 		SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
258 		return -ENOMEM;
259 	}
260 
261 	iobuf_ch = spdk_io_channel_get_ctx(ioch);
262 
263 	ch->small.queue = &iobuf_ch->small_queue;
264 	ch->large.queue = &iobuf_ch->large_queue;
265 	ch->small.pool = g_iobuf.small_pool;
266 	ch->large.pool = g_iobuf.large_pool;
267 	ch->small.bufsize = g_iobuf.opts.small_bufsize;
268 	ch->large.bufsize = g_iobuf.opts.large_bufsize;
269 	ch->parent = ioch;
270 	ch->module = module;
271 	ch->small.cache_size = small_cache_size;
272 	ch->large.cache_size = large_cache_size;
273 	ch->small.cache_count = 0;
274 	ch->large.cache_count = 0;
275 
276 	STAILQ_INIT(&ch->small.cache);
277 	STAILQ_INIT(&ch->large.cache);
278 
279 	for (i = 0; i < small_cache_size; ++i) {
280 		if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) {
281 			SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
282 				    "You may need to increase spdk_iobuf_opts.small_pool_count\n");
283 			goto error;
284 		}
285 		STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
286 		ch->small.cache_count++;
287 	}
288 	for (i = 0; i < large_cache_size; ++i) {
289 		if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) {
290 			SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
291 				    "You may need to increase spdk_iobuf_opts.large_pool_count\n");
292 			goto error;
293 		}
294 		STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
295 		ch->large.cache_count++;
296 	}
297 
298 	return 0;
299 error:
300 	spdk_iobuf_channel_fini(ch);
301 
302 	return -ENOMEM;
303 }
304 
305 void
306 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
307 {
308 	struct spdk_iobuf_entry *entry __attribute__((unused));
309 	struct spdk_iobuf_buffer *buf;
310 
311 	/* Make sure none of the wait queue entries are coming from this module */
312 	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
313 		assert(entry->module != ch->module);
314 	}
315 	STAILQ_FOREACH(entry, ch->large.queue, stailq) {
316 		assert(entry->module != ch->module);
317 	}
318 
319 	/* Release cached buffers back to the pool */
320 	while (!STAILQ_EMPTY(&ch->small.cache)) {
321 		buf = STAILQ_FIRST(&ch->small.cache);
322 		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
323 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
324 		ch->small.cache_count--;
325 	}
326 	while (!STAILQ_EMPTY(&ch->large.cache)) {
327 		buf = STAILQ_FIRST(&ch->large.cache);
328 		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
329 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
330 		ch->large.cache_count--;
331 	}
332 
333 	assert(ch->small.cache_count == 0);
334 	assert(ch->large.cache_count == 0);
335 
336 	spdk_put_io_channel(ch->parent);
337 	ch->parent = NULL;
338 }
339 
340 int
341 spdk_iobuf_register_module(const char *name)
342 {
343 	struct iobuf_module *module;
344 
345 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
346 		if (strcmp(name, module->name) == 0) {
347 			return -EEXIST;
348 		}
349 	}
350 
351 	module = calloc(1, sizeof(*module));
352 	if (module == NULL) {
353 		return -ENOMEM;
354 	}
355 
356 	module->name = strdup(name);
357 	if (module->name == NULL) {
358 		free(module);
359 		return -ENOMEM;
360 	}
361 
362 	TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
363 
364 	return 0;
365 }
366 
367 int
368 spdk_iobuf_unregister_module(const char *name)
369 {
370 	struct iobuf_module *module;
371 
372 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
373 		if (strcmp(name, module->name) == 0) {
374 			TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
375 			free(module->name);
376 			free(module);
377 			return 0;
378 		}
379 	}
380 
381 	return -ENOENT;
382 }
383 
384 int
385 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
386 			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
387 {
388 	struct spdk_iobuf_entry *entry, *tmp;
389 	int rc;
390 
391 	STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
392 		/* We only want to iterate over the entries requested by the module which owns ch */
393 		if (entry->module != ch->module) {
394 			continue;
395 		}
396 
397 		rc = cb_fn(ch, entry, cb_ctx);
398 		if (rc != 0) {
399 			return rc;
400 		}
401 	}
402 
403 	return 0;
404 }
405 
406 void
407 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
408 		       uint64_t len)
409 {
410 	struct spdk_iobuf_pool *pool;
411 
412 	if (len <= ch->small.bufsize) {
413 		pool = &ch->small;
414 	} else {
415 		assert(len <= ch->large.bufsize);
416 		pool = &ch->large;
417 	}
418 
419 	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
420 }
421 
422 #define IOBUF_BATCH_SIZE 32
423 
424 void *
425 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
426 	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
427 {
428 	struct spdk_iobuf_pool *pool;
429 	void *buf;
430 
431 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
432 	if (len <= ch->small.bufsize) {
433 		pool = &ch->small;
434 	} else {
435 		assert(len <= ch->large.bufsize);
436 		pool = &ch->large;
437 	}
438 
439 	buf = (void *)STAILQ_FIRST(&pool->cache);
440 	if (buf) {
441 		STAILQ_REMOVE_HEAD(&pool->cache, stailq);
442 		assert(pool->cache_count > 0);
443 		pool->cache_count--;
444 	} else {
445 		struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
446 		size_t sz, i;
447 
448 		/* If we're going to dequeue, we may as well dequeue a batch. */
449 		sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE,
450 				       spdk_max(pool->cache_size, 1)));
451 		if (sz == 0) {
452 			if (entry) {
453 				STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
454 				entry->module = ch->module;
455 				entry->cb_fn = cb_fn;
456 			}
457 
458 			return NULL;
459 		}
460 
461 		for (i = 0; i < (sz - 1); i++) {
462 			STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq);
463 			pool->cache_count++;
464 		}
465 
466 		/* The last one is the one we'll return */
467 		buf = bufs[i];
468 	}
469 
470 	return (char *)buf;
471 }
472 
473 void
474 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
475 {
476 	struct spdk_iobuf_entry *entry;
477 	struct spdk_iobuf_buffer *iobuf_buf;
478 	struct spdk_iobuf_pool *pool;
479 	size_t sz;
480 
481 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
482 	if (len <= ch->small.bufsize) {
483 		pool = &ch->small;
484 	} else {
485 		pool = &ch->large;
486 	}
487 
488 	if (STAILQ_EMPTY(pool->queue)) {
489 		if (pool->cache_size == 0) {
490 			spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
491 			return;
492 		}
493 
494 		iobuf_buf = (struct spdk_iobuf_buffer *)buf;
495 
496 		STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
497 		pool->cache_count++;
498 
499 		/* The cache size may exceed the configured amount. We always dequeue from the
500 		 * central pool in batches of known size, so wait until at least a batch
501 		 * has been returned to actually return the buffers to the central pool. */
502 		sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size);
503 		if (pool->cache_count >= pool->cache_size + sz) {
504 			struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
505 			size_t i;
506 
507 			for (i = 0; i < sz; i++) {
508 				bufs[i] = STAILQ_FIRST(&pool->cache);
509 				STAILQ_REMOVE_HEAD(&pool->cache, stailq);
510 				assert(pool->cache_count > 0);
511 				pool->cache_count--;
512 			}
513 
514 			spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL);
515 		}
516 	} else {
517 		entry = STAILQ_FIRST(pool->queue);
518 		STAILQ_REMOVE_HEAD(pool->queue, stailq);
519 		entry->cb_fn(entry, buf);
520 	}
521 }
522