1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2023 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/env.h" 7 #include "spdk/util.h" 8 #include "spdk/likely.h" 9 #include "spdk/log.h" 10 #include "spdk/thread.h" 11 12 #define IOBUF_MIN_SMALL_POOL_SIZE 64 13 #define IOBUF_MIN_LARGE_POOL_SIZE 8 14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE 8192 15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE 1024 16 #define IOBUF_ALIGNMENT 4096 17 #define IOBUF_MIN_SMALL_BUFSIZE 4096 18 #define IOBUF_MIN_LARGE_BUFSIZE 8192 19 #define IOBUF_DEFAULT_SMALL_BUFSIZE (8 * 1024) 20 /* 132k is a weird choice at first, but this needs to be large enough to accomodate 21 * the default maximum size (128k) plus metadata everywhere. For code paths that 22 * are explicitly configured, the math is instead done properly. This is only 23 * for the default. */ 24 #define IOBUF_DEFAULT_LARGE_BUFSIZE (132 * 1024) 25 26 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE, 27 "Invalid data offset"); 28 29 struct iobuf_channel { 30 spdk_iobuf_entry_stailq_t small_queue; 31 spdk_iobuf_entry_stailq_t large_queue; 32 }; 33 34 struct iobuf_module { 35 char *name; 36 TAILQ_ENTRY(iobuf_module) tailq; 37 }; 38 39 struct iobuf { 40 struct spdk_ring *small_pool; 41 struct spdk_ring *large_pool; 42 void *small_pool_base; 43 void *large_pool_base; 44 struct spdk_iobuf_opts opts; 45 TAILQ_HEAD(, iobuf_module) modules; 46 spdk_iobuf_finish_cb finish_cb; 47 void *finish_arg; 48 }; 49 50 static struct iobuf g_iobuf = { 51 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 52 .small_pool = NULL, 53 .large_pool = NULL, 54 .small_pool_base = NULL, 55 .large_pool_base = NULL, 56 .opts = { 57 .small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE, 58 .large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE, 59 .small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE, 60 .large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE, 61 }, 62 }; 63 64 static int 65 iobuf_channel_create_cb(void *io_device, void *ctx) 66 { 67 struct iobuf_channel *ch = ctx; 68 69 STAILQ_INIT(&ch->small_queue); 70 STAILQ_INIT(&ch->large_queue); 71 72 return 0; 73 } 74 75 static void 76 iobuf_channel_destroy_cb(void *io_device, void *ctx) 77 { 78 struct iobuf_channel *ch __attribute__((unused)) = ctx; 79 80 assert(STAILQ_EMPTY(&ch->small_queue)); 81 assert(STAILQ_EMPTY(&ch->large_queue)); 82 } 83 84 int 85 spdk_iobuf_initialize(void) 86 { 87 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 88 int rc = 0; 89 uint64_t i; 90 struct spdk_iobuf_buffer *buf; 91 92 g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count, 93 SPDK_ENV_SOCKET_ID_ANY); 94 if (!g_iobuf.small_pool) { 95 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 96 rc = -ENOMEM; 97 goto error; 98 } 99 100 /* Round up to the nearest alignment so that each element remains aligned */ 101 opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT); 102 g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT, 103 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 104 if (g_iobuf.small_pool_base == NULL) { 105 SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n"); 106 rc = -ENOMEM; 107 goto error; 108 } 109 110 g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count, 111 SPDK_ENV_SOCKET_ID_ANY); 112 if (!g_iobuf.large_pool) { 113 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 114 rc = -ENOMEM; 115 goto error; 116 } 117 118 /* Round up to the nearest alignment so that each element remains aligned */ 119 opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT); 120 g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT, 121 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 122 if (g_iobuf.large_pool_base == NULL) { 123 SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n"); 124 rc = -ENOMEM; 125 goto error; 126 } 127 128 for (i = 0; i < opts->small_pool_count; i++) { 129 buf = g_iobuf.small_pool_base + i * opts->small_bufsize; 130 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 131 } 132 133 for (i = 0; i < opts->large_pool_count; i++) { 134 buf = g_iobuf.large_pool_base + i * opts->large_bufsize; 135 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 136 } 137 138 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 139 sizeof(struct iobuf_channel), "iobuf"); 140 141 return 0; 142 error: 143 spdk_free(g_iobuf.small_pool_base); 144 spdk_ring_free(g_iobuf.small_pool); 145 spdk_free(g_iobuf.large_pool_base); 146 spdk_ring_free(g_iobuf.large_pool); 147 148 return rc; 149 } 150 151 static void 152 iobuf_unregister_cb(void *io_device) 153 { 154 struct iobuf_module *module; 155 156 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 157 module = TAILQ_FIRST(&g_iobuf.modules); 158 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 159 free(module->name); 160 free(module); 161 } 162 163 if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { 164 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 165 spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); 166 } 167 168 if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { 169 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 170 spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); 171 } 172 173 spdk_free(g_iobuf.small_pool_base); 174 g_iobuf.small_pool_base = NULL; 175 spdk_ring_free(g_iobuf.small_pool); 176 g_iobuf.small_pool = NULL; 177 178 spdk_free(g_iobuf.large_pool_base); 179 g_iobuf.large_pool_base = NULL; 180 spdk_ring_free(g_iobuf.large_pool); 181 g_iobuf.large_pool = NULL; 182 183 if (g_iobuf.finish_cb != NULL) { 184 g_iobuf.finish_cb(g_iobuf.finish_arg); 185 } 186 } 187 188 void 189 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 190 { 191 g_iobuf.finish_cb = cb_fn; 192 g_iobuf.finish_arg = cb_arg; 193 194 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 195 } 196 197 int 198 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 199 { 200 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 201 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 202 IOBUF_MIN_SMALL_POOL_SIZE); 203 return -EINVAL; 204 } 205 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 206 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 207 IOBUF_MIN_LARGE_POOL_SIZE); 208 return -EINVAL; 209 } 210 211 g_iobuf.opts = *opts; 212 213 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 214 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 215 IOBUF_MIN_SMALL_BUFSIZE); 216 g_iobuf.opts.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE; 217 } 218 219 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 220 SPDK_WARNLOG("large_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 221 IOBUF_MIN_LARGE_BUFSIZE); 222 g_iobuf.opts.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE; 223 } 224 225 return 0; 226 } 227 228 void 229 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts) 230 { 231 *opts = g_iobuf.opts; 232 } 233 234 int 235 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 236 uint32_t small_cache_size, uint32_t large_cache_size) 237 { 238 struct spdk_io_channel *ioch; 239 struct iobuf_channel *iobuf_ch; 240 struct iobuf_module *module; 241 struct spdk_iobuf_buffer *buf; 242 uint32_t i; 243 244 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 245 if (strcmp(name, module->name) == 0) { 246 break; 247 } 248 } 249 250 if (module == NULL) { 251 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 252 return -ENODEV; 253 } 254 255 ioch = spdk_get_io_channel(&g_iobuf); 256 if (ioch == NULL) { 257 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 258 return -ENOMEM; 259 } 260 261 iobuf_ch = spdk_io_channel_get_ctx(ioch); 262 263 ch->small.queue = &iobuf_ch->small_queue; 264 ch->large.queue = &iobuf_ch->large_queue; 265 ch->small.pool = g_iobuf.small_pool; 266 ch->large.pool = g_iobuf.large_pool; 267 ch->small.bufsize = g_iobuf.opts.small_bufsize; 268 ch->large.bufsize = g_iobuf.opts.large_bufsize; 269 ch->parent = ioch; 270 ch->module = module; 271 ch->small.cache_size = small_cache_size; 272 ch->large.cache_size = large_cache_size; 273 ch->small.cache_count = 0; 274 ch->large.cache_count = 0; 275 276 STAILQ_INIT(&ch->small.cache); 277 STAILQ_INIT(&ch->large.cache); 278 279 for (i = 0; i < small_cache_size; ++i) { 280 if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) { 281 SPDK_ERRLOG("Failed to populate iobuf small buffer cache. " 282 "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n", 283 g_iobuf.opts.small_pool_count); 284 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 285 "this value.\n"); 286 goto error; 287 } 288 STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); 289 ch->small.cache_count++; 290 } 291 for (i = 0; i < large_cache_size; ++i) { 292 if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) { 293 SPDK_ERRLOG("Failed to populate iobuf large buffer cache. " 294 "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n", 295 g_iobuf.opts.large_pool_count); 296 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 297 "this value.\n"); 298 goto error; 299 } 300 STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); 301 ch->large.cache_count++; 302 } 303 304 return 0; 305 error: 306 spdk_iobuf_channel_fini(ch); 307 308 return -ENOMEM; 309 } 310 311 void 312 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 313 { 314 struct spdk_iobuf_entry *entry __attribute__((unused)); 315 struct spdk_iobuf_buffer *buf; 316 317 /* Make sure none of the wait queue entries are coming from this module */ 318 STAILQ_FOREACH(entry, ch->small.queue, stailq) { 319 assert(entry->module != ch->module); 320 } 321 STAILQ_FOREACH(entry, ch->large.queue, stailq) { 322 assert(entry->module != ch->module); 323 } 324 325 /* Release cached buffers back to the pool */ 326 while (!STAILQ_EMPTY(&ch->small.cache)) { 327 buf = STAILQ_FIRST(&ch->small.cache); 328 STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); 329 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 330 ch->small.cache_count--; 331 } 332 while (!STAILQ_EMPTY(&ch->large.cache)) { 333 buf = STAILQ_FIRST(&ch->large.cache); 334 STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); 335 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 336 ch->large.cache_count--; 337 } 338 339 assert(ch->small.cache_count == 0); 340 assert(ch->large.cache_count == 0); 341 342 spdk_put_io_channel(ch->parent); 343 ch->parent = NULL; 344 } 345 346 int 347 spdk_iobuf_register_module(const char *name) 348 { 349 struct iobuf_module *module; 350 351 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 352 if (strcmp(name, module->name) == 0) { 353 return -EEXIST; 354 } 355 } 356 357 module = calloc(1, sizeof(*module)); 358 if (module == NULL) { 359 return -ENOMEM; 360 } 361 362 module->name = strdup(name); 363 if (module->name == NULL) { 364 free(module); 365 return -ENOMEM; 366 } 367 368 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 369 370 return 0; 371 } 372 373 int 374 spdk_iobuf_unregister_module(const char *name) 375 { 376 struct iobuf_module *module; 377 378 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 379 if (strcmp(name, module->name) == 0) { 380 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 381 free(module->name); 382 free(module); 383 return 0; 384 } 385 } 386 387 return -ENOENT; 388 } 389 390 int 391 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, 392 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 393 { 394 struct spdk_iobuf_entry *entry, *tmp; 395 int rc; 396 397 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 398 /* We only want to iterate over the entries requested by the module which owns ch */ 399 if (entry->module != ch->module) { 400 continue; 401 } 402 403 rc = cb_fn(ch, entry, cb_ctx); 404 if (rc != 0) { 405 return rc; 406 } 407 } 408 409 return 0; 410 } 411 412 void 413 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 414 uint64_t len) 415 { 416 struct spdk_iobuf_pool *pool; 417 418 if (len <= ch->small.bufsize) { 419 pool = &ch->small; 420 } else { 421 assert(len <= ch->large.bufsize); 422 pool = &ch->large; 423 } 424 425 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 426 } 427 428 #define IOBUF_BATCH_SIZE 32 429 430 void * 431 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, 432 struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) 433 { 434 struct spdk_iobuf_pool *pool; 435 void *buf; 436 437 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 438 if (len <= ch->small.bufsize) { 439 pool = &ch->small; 440 } else { 441 assert(len <= ch->large.bufsize); 442 pool = &ch->large; 443 } 444 445 buf = (void *)STAILQ_FIRST(&pool->cache); 446 if (buf) { 447 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 448 assert(pool->cache_count > 0); 449 pool->cache_count--; 450 } else { 451 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 452 size_t sz, i; 453 454 /* If we're going to dequeue, we may as well dequeue a batch. */ 455 sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE, 456 spdk_max(pool->cache_size, 1))); 457 if (sz == 0) { 458 if (entry) { 459 STAILQ_INSERT_TAIL(pool->queue, entry, stailq); 460 entry->module = ch->module; 461 entry->cb_fn = cb_fn; 462 } 463 464 return NULL; 465 } 466 467 for (i = 0; i < (sz - 1); i++) { 468 STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq); 469 pool->cache_count++; 470 } 471 472 /* The last one is the one we'll return */ 473 buf = bufs[i]; 474 } 475 476 return (char *)buf; 477 } 478 479 void 480 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) 481 { 482 struct spdk_iobuf_entry *entry; 483 struct spdk_iobuf_buffer *iobuf_buf; 484 struct spdk_iobuf_pool *pool; 485 size_t sz; 486 487 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 488 if (len <= ch->small.bufsize) { 489 pool = &ch->small; 490 } else { 491 pool = &ch->large; 492 } 493 494 if (STAILQ_EMPTY(pool->queue)) { 495 if (pool->cache_size == 0) { 496 spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL); 497 return; 498 } 499 500 iobuf_buf = (struct spdk_iobuf_buffer *)buf; 501 502 STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq); 503 pool->cache_count++; 504 505 /* The cache size may exceed the configured amount. We always dequeue from the 506 * central pool in batches of known size, so wait until at least a batch 507 * has been returned to actually return the buffers to the central pool. */ 508 sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size); 509 if (pool->cache_count >= pool->cache_size + sz) { 510 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 511 size_t i; 512 513 for (i = 0; i < sz; i++) { 514 bufs[i] = STAILQ_FIRST(&pool->cache); 515 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 516 assert(pool->cache_count > 0); 517 pool->cache_count--; 518 } 519 520 spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL); 521 } 522 } else { 523 entry = STAILQ_FIRST(pool->queue); 524 STAILQ_REMOVE_HEAD(pool->queue, stailq); 525 entry->cb_fn(entry, buf); 526 } 527 } 528