1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2023 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/env.h" 7 #include "spdk/util.h" 8 #include "spdk/likely.h" 9 #include "spdk/log.h" 10 #include "spdk/thread.h" 11 12 #define IOBUF_MIN_SMALL_POOL_SIZE 64 13 #define IOBUF_MIN_LARGE_POOL_SIZE 8 14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE 8192 15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE 1024 16 #define IOBUF_ALIGNMENT 4096 17 #define IOBUF_MIN_SMALL_BUFSIZE 4096 18 #define IOBUF_MIN_LARGE_BUFSIZE 8192 19 #define IOBUF_DEFAULT_SMALL_BUFSIZE (8 * 1024) 20 /* 132k is a weird choice at first, but this needs to be large enough to accomodate 21 * the default maximum size (128k) plus metadata everywhere. For code paths that 22 * are explicitly configured, the math is instead done properly. This is only 23 * for the default. */ 24 #define IOBUF_DEFAULT_LARGE_BUFSIZE (132 * 1024) 25 26 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE, 27 "Invalid data offset"); 28 29 struct iobuf_channel { 30 spdk_iobuf_entry_stailq_t small_queue; 31 spdk_iobuf_entry_stailq_t large_queue; 32 }; 33 34 struct iobuf_module { 35 char *name; 36 TAILQ_ENTRY(iobuf_module) tailq; 37 }; 38 39 struct iobuf { 40 struct spdk_ring *small_pool; 41 struct spdk_ring *large_pool; 42 void *small_pool_base; 43 void *large_pool_base; 44 struct spdk_iobuf_opts opts; 45 TAILQ_HEAD(, iobuf_module) modules; 46 spdk_iobuf_finish_cb finish_cb; 47 void *finish_arg; 48 }; 49 50 static struct iobuf g_iobuf = { 51 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 52 .small_pool = NULL, 53 .large_pool = NULL, 54 .small_pool_base = NULL, 55 .large_pool_base = NULL, 56 .opts = { 57 .small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE, 58 .large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE, 59 .small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE, 60 .large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE, 61 }, 62 }; 63 64 static int 65 iobuf_channel_create_cb(void *io_device, void *ctx) 66 { 67 struct iobuf_channel *ch = ctx; 68 69 STAILQ_INIT(&ch->small_queue); 70 STAILQ_INIT(&ch->large_queue); 71 72 return 0; 73 } 74 75 static void 76 iobuf_channel_destroy_cb(void *io_device, void *ctx) 77 { 78 struct iobuf_channel *ch __attribute__((unused)) = ctx; 79 80 assert(STAILQ_EMPTY(&ch->small_queue)); 81 assert(STAILQ_EMPTY(&ch->large_queue)); 82 } 83 84 int 85 spdk_iobuf_initialize(void) 86 { 87 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 88 int rc = 0; 89 uint64_t i; 90 struct spdk_iobuf_buffer *buf; 91 92 g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count, 93 SPDK_ENV_SOCKET_ID_ANY); 94 if (!g_iobuf.small_pool) { 95 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 96 rc = -ENOMEM; 97 goto error; 98 } 99 100 /* Round up to the nearest alignment so that each element remains aligned */ 101 opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT); 102 g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT, 103 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 104 if (g_iobuf.small_pool_base == NULL) { 105 SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n"); 106 rc = -ENOMEM; 107 goto error; 108 } 109 110 g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count, 111 SPDK_ENV_SOCKET_ID_ANY); 112 if (!g_iobuf.large_pool) { 113 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 114 rc = -ENOMEM; 115 goto error; 116 } 117 118 /* Round up to the nearest alignment so that each element remains aligned */ 119 opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT); 120 g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT, 121 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 122 if (g_iobuf.large_pool_base == NULL) { 123 SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n"); 124 rc = -ENOMEM; 125 goto error; 126 } 127 128 for (i = 0; i < opts->small_pool_count; i++) { 129 buf = g_iobuf.small_pool_base + i * opts->small_bufsize; 130 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 131 } 132 133 for (i = 0; i < opts->large_pool_count; i++) { 134 buf = g_iobuf.large_pool_base + i * opts->large_bufsize; 135 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 136 } 137 138 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 139 sizeof(struct iobuf_channel), "iobuf"); 140 141 return 0; 142 error: 143 spdk_free(g_iobuf.small_pool_base); 144 spdk_ring_free(g_iobuf.small_pool); 145 spdk_free(g_iobuf.large_pool_base); 146 spdk_ring_free(g_iobuf.large_pool); 147 148 return rc; 149 } 150 151 static void 152 iobuf_unregister_cb(void *io_device) 153 { 154 struct iobuf_module *module; 155 156 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 157 module = TAILQ_FIRST(&g_iobuf.modules); 158 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 159 free(module->name); 160 free(module); 161 } 162 163 if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { 164 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 165 spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); 166 } 167 168 if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { 169 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 170 spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); 171 } 172 173 spdk_free(g_iobuf.small_pool_base); 174 g_iobuf.small_pool_base = NULL; 175 spdk_ring_free(g_iobuf.small_pool); 176 g_iobuf.small_pool = NULL; 177 178 spdk_free(g_iobuf.large_pool_base); 179 g_iobuf.large_pool_base = NULL; 180 spdk_ring_free(g_iobuf.large_pool); 181 g_iobuf.large_pool = NULL; 182 183 if (g_iobuf.finish_cb != NULL) { 184 g_iobuf.finish_cb(g_iobuf.finish_arg); 185 } 186 } 187 188 void 189 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 190 { 191 g_iobuf.finish_cb = cb_fn; 192 g_iobuf.finish_arg = cb_arg; 193 194 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 195 } 196 197 int 198 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 199 { 200 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 201 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 202 IOBUF_MIN_SMALL_POOL_SIZE); 203 return -EINVAL; 204 } 205 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 206 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 207 IOBUF_MIN_LARGE_POOL_SIZE); 208 return -EINVAL; 209 } 210 211 g_iobuf.opts = *opts; 212 213 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 214 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 215 IOBUF_MIN_SMALL_BUFSIZE); 216 g_iobuf.opts.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE; 217 } 218 219 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 220 SPDK_WARNLOG("large_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 221 IOBUF_MIN_LARGE_BUFSIZE); 222 g_iobuf.opts.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE; 223 } 224 225 return 0; 226 } 227 228 void 229 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts) 230 { 231 *opts = g_iobuf.opts; 232 } 233 234 int 235 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 236 uint32_t small_cache_size, uint32_t large_cache_size) 237 { 238 struct spdk_io_channel *ioch; 239 struct iobuf_channel *iobuf_ch; 240 struct iobuf_module *module; 241 struct spdk_iobuf_buffer *buf; 242 uint32_t i; 243 244 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 245 if (strcmp(name, module->name) == 0) { 246 break; 247 } 248 } 249 250 if (module == NULL) { 251 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 252 return -ENODEV; 253 } 254 255 ioch = spdk_get_io_channel(&g_iobuf); 256 if (ioch == NULL) { 257 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 258 return -ENOMEM; 259 } 260 261 iobuf_ch = spdk_io_channel_get_ctx(ioch); 262 263 ch->small.queue = &iobuf_ch->small_queue; 264 ch->large.queue = &iobuf_ch->large_queue; 265 ch->small.pool = g_iobuf.small_pool; 266 ch->large.pool = g_iobuf.large_pool; 267 ch->small.bufsize = g_iobuf.opts.small_bufsize; 268 ch->large.bufsize = g_iobuf.opts.large_bufsize; 269 ch->parent = ioch; 270 ch->module = module; 271 ch->small.cache_size = small_cache_size; 272 ch->large.cache_size = large_cache_size; 273 ch->small.cache_count = 0; 274 ch->large.cache_count = 0; 275 276 STAILQ_INIT(&ch->small.cache); 277 STAILQ_INIT(&ch->large.cache); 278 279 for (i = 0; i < small_cache_size; ++i) { 280 if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) { 281 SPDK_ERRLOG("Failed to populate iobuf small buffer cache. " 282 "You may need to increase spdk_iobuf_opts.small_pool_count.\n"); 283 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 284 "this value.\n"); 285 goto error; 286 } 287 STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); 288 ch->small.cache_count++; 289 } 290 for (i = 0; i < large_cache_size; ++i) { 291 if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) { 292 SPDK_ERRLOG("Failed to populate iobuf large buffer cache. " 293 "You may need to increase spdk_iobuf_opts.large_pool_count.\n"); 294 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 295 "this value.\n"); 296 goto error; 297 } 298 STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); 299 ch->large.cache_count++; 300 } 301 302 return 0; 303 error: 304 spdk_iobuf_channel_fini(ch); 305 306 return -ENOMEM; 307 } 308 309 void 310 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 311 { 312 struct spdk_iobuf_entry *entry __attribute__((unused)); 313 struct spdk_iobuf_buffer *buf; 314 315 /* Make sure none of the wait queue entries are coming from this module */ 316 STAILQ_FOREACH(entry, ch->small.queue, stailq) { 317 assert(entry->module != ch->module); 318 } 319 STAILQ_FOREACH(entry, ch->large.queue, stailq) { 320 assert(entry->module != ch->module); 321 } 322 323 /* Release cached buffers back to the pool */ 324 while (!STAILQ_EMPTY(&ch->small.cache)) { 325 buf = STAILQ_FIRST(&ch->small.cache); 326 STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); 327 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 328 ch->small.cache_count--; 329 } 330 while (!STAILQ_EMPTY(&ch->large.cache)) { 331 buf = STAILQ_FIRST(&ch->large.cache); 332 STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); 333 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 334 ch->large.cache_count--; 335 } 336 337 assert(ch->small.cache_count == 0); 338 assert(ch->large.cache_count == 0); 339 340 spdk_put_io_channel(ch->parent); 341 ch->parent = NULL; 342 } 343 344 int 345 spdk_iobuf_register_module(const char *name) 346 { 347 struct iobuf_module *module; 348 349 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 350 if (strcmp(name, module->name) == 0) { 351 return -EEXIST; 352 } 353 } 354 355 module = calloc(1, sizeof(*module)); 356 if (module == NULL) { 357 return -ENOMEM; 358 } 359 360 module->name = strdup(name); 361 if (module->name == NULL) { 362 free(module); 363 return -ENOMEM; 364 } 365 366 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 367 368 return 0; 369 } 370 371 int 372 spdk_iobuf_unregister_module(const char *name) 373 { 374 struct iobuf_module *module; 375 376 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 377 if (strcmp(name, module->name) == 0) { 378 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 379 free(module->name); 380 free(module); 381 return 0; 382 } 383 } 384 385 return -ENOENT; 386 } 387 388 int 389 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, 390 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 391 { 392 struct spdk_iobuf_entry *entry, *tmp; 393 int rc; 394 395 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 396 /* We only want to iterate over the entries requested by the module which owns ch */ 397 if (entry->module != ch->module) { 398 continue; 399 } 400 401 rc = cb_fn(ch, entry, cb_ctx); 402 if (rc != 0) { 403 return rc; 404 } 405 } 406 407 return 0; 408 } 409 410 void 411 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 412 uint64_t len) 413 { 414 struct spdk_iobuf_pool *pool; 415 416 if (len <= ch->small.bufsize) { 417 pool = &ch->small; 418 } else { 419 assert(len <= ch->large.bufsize); 420 pool = &ch->large; 421 } 422 423 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 424 } 425 426 #define IOBUF_BATCH_SIZE 32 427 428 void * 429 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, 430 struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) 431 { 432 struct spdk_iobuf_pool *pool; 433 void *buf; 434 435 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 436 if (len <= ch->small.bufsize) { 437 pool = &ch->small; 438 } else { 439 assert(len <= ch->large.bufsize); 440 pool = &ch->large; 441 } 442 443 buf = (void *)STAILQ_FIRST(&pool->cache); 444 if (buf) { 445 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 446 assert(pool->cache_count > 0); 447 pool->cache_count--; 448 } else { 449 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 450 size_t sz, i; 451 452 /* If we're going to dequeue, we may as well dequeue a batch. */ 453 sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE, 454 spdk_max(pool->cache_size, 1))); 455 if (sz == 0) { 456 if (entry) { 457 STAILQ_INSERT_TAIL(pool->queue, entry, stailq); 458 entry->module = ch->module; 459 entry->cb_fn = cb_fn; 460 } 461 462 return NULL; 463 } 464 465 for (i = 0; i < (sz - 1); i++) { 466 STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq); 467 pool->cache_count++; 468 } 469 470 /* The last one is the one we'll return */ 471 buf = bufs[i]; 472 } 473 474 return (char *)buf; 475 } 476 477 void 478 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) 479 { 480 struct spdk_iobuf_entry *entry; 481 struct spdk_iobuf_buffer *iobuf_buf; 482 struct spdk_iobuf_pool *pool; 483 size_t sz; 484 485 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 486 if (len <= ch->small.bufsize) { 487 pool = &ch->small; 488 } else { 489 pool = &ch->large; 490 } 491 492 if (STAILQ_EMPTY(pool->queue)) { 493 if (pool->cache_size == 0) { 494 spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL); 495 return; 496 } 497 498 iobuf_buf = (struct spdk_iobuf_buffer *)buf; 499 500 STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq); 501 pool->cache_count++; 502 503 /* The cache size may exceed the configured amount. We always dequeue from the 504 * central pool in batches of known size, so wait until at least a batch 505 * has been returned to actually return the buffers to the central pool. */ 506 sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size); 507 if (pool->cache_count >= pool->cache_size + sz) { 508 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 509 size_t i; 510 511 for (i = 0; i < sz; i++) { 512 bufs[i] = STAILQ_FIRST(&pool->cache); 513 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 514 assert(pool->cache_count > 0); 515 pool->cache_count--; 516 } 517 518 spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL); 519 } 520 } else { 521 entry = STAILQ_FIRST(pool->queue); 522 STAILQ_REMOVE_HEAD(pool->queue, stailq); 523 entry->cb_fn(entry, buf); 524 } 525 } 526