1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/thread.h" 37 #include "spdk/log.h" 38 39 #ifdef __linux__ 40 #include <sys/prctl.h> 41 #endif 42 43 #ifdef __FreeBSD__ 44 #include <pthread_np.h> 45 #endif 46 47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 48 49 struct io_device { 50 void *io_device; 51 spdk_io_channel_create_cb create_cb; 52 spdk_io_channel_destroy_cb destroy_cb; 53 spdk_io_device_unregister_cb unregister_cb; 54 struct spdk_thread *unregister_thread; 55 uint32_t ctx_size; 56 uint32_t for_each_count; 57 TAILQ_ENTRY(io_device) tailq; 58 59 uint32_t refcnt; 60 61 bool unregistered; 62 }; 63 64 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 65 66 struct spdk_thread { 67 pthread_t thread_id; 68 spdk_thread_pass_msg msg_fn; 69 spdk_start_poller start_poller_fn; 70 spdk_stop_poller stop_poller_fn; 71 void *thread_ctx; 72 TAILQ_HEAD(, spdk_io_channel) io_channels; 73 TAILQ_ENTRY(spdk_thread) tailq; 74 char *name; 75 }; 76 77 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 78 static uint32_t g_thread_count = 0; 79 80 static struct spdk_thread * 81 _get_thread(void) 82 { 83 pthread_t thread_id; 84 struct spdk_thread *thread; 85 86 thread_id = pthread_self(); 87 88 thread = NULL; 89 TAILQ_FOREACH(thread, &g_threads, tailq) { 90 if (thread->thread_id == thread_id) { 91 return thread; 92 } 93 } 94 95 return NULL; 96 } 97 98 static void 99 _set_thread_name(const char *thread_name) 100 { 101 #if defined(__linux__) 102 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 103 #elif defined(__FreeBSD__) 104 pthread_set_name_np(pthread_self(), thread_name); 105 #else 106 #error missing platform support for thread name 107 #endif 108 } 109 110 struct spdk_thread * 111 spdk_allocate_thread(spdk_thread_pass_msg msg_fn, 112 spdk_start_poller start_poller_fn, 113 spdk_stop_poller stop_poller_fn, 114 void *thread_ctx, const char *name) 115 { 116 struct spdk_thread *thread; 117 118 pthread_mutex_lock(&g_devlist_mutex); 119 120 thread = _get_thread(); 121 if (thread) { 122 SPDK_ERRLOG("Double allocated SPDK thread\n"); 123 pthread_mutex_unlock(&g_devlist_mutex); 124 return NULL; 125 } 126 127 thread = calloc(1, sizeof(*thread)); 128 if (!thread) { 129 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 130 pthread_mutex_unlock(&g_devlist_mutex); 131 return NULL; 132 } 133 134 thread->thread_id = pthread_self(); 135 thread->msg_fn = msg_fn; 136 thread->start_poller_fn = start_poller_fn; 137 thread->stop_poller_fn = stop_poller_fn; 138 thread->thread_ctx = thread_ctx; 139 TAILQ_INIT(&thread->io_channels); 140 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 141 g_thread_count++; 142 if (name) { 143 _set_thread_name(name); 144 thread->name = strdup(name); 145 } 146 147 pthread_mutex_unlock(&g_devlist_mutex); 148 149 return thread; 150 } 151 152 void 153 spdk_free_thread(void) 154 { 155 struct spdk_thread *thread; 156 157 pthread_mutex_lock(&g_devlist_mutex); 158 159 thread = _get_thread(); 160 if (!thread) { 161 SPDK_ERRLOG("No thread allocated\n"); 162 pthread_mutex_unlock(&g_devlist_mutex); 163 return; 164 } 165 166 assert(g_thread_count > 0); 167 g_thread_count--; 168 TAILQ_REMOVE(&g_threads, thread, tailq); 169 free(thread->name); 170 free(thread); 171 172 pthread_mutex_unlock(&g_devlist_mutex); 173 } 174 175 uint32_t 176 spdk_thread_get_count(void) 177 { 178 /* 179 * Return cached value of the current thread count. We could acquire the 180 * lock and iterate through the TAILQ of threads to count them, but that 181 * count could still be invalidated after we release the lock. 182 */ 183 return g_thread_count; 184 } 185 186 struct spdk_thread * 187 spdk_get_thread(void) 188 { 189 struct spdk_thread *thread; 190 191 pthread_mutex_lock(&g_devlist_mutex); 192 193 thread = _get_thread(); 194 if (!thread) { 195 SPDK_ERRLOG("No thread allocated\n"); 196 } 197 198 pthread_mutex_unlock(&g_devlist_mutex); 199 200 return thread; 201 } 202 203 const char * 204 spdk_thread_get_name(const struct spdk_thread *thread) 205 { 206 return thread->name; 207 } 208 209 void 210 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx) 211 { 212 thread->msg_fn(fn, ctx, thread->thread_ctx); 213 } 214 215 216 struct spdk_poller * 217 spdk_poller_register(spdk_poller_fn fn, 218 void *arg, 219 uint64_t period_microseconds) 220 { 221 struct spdk_thread *thread; 222 struct spdk_poller *poller; 223 224 thread = spdk_get_thread(); 225 if (!thread) { 226 abort(); 227 } 228 229 if (!thread->start_poller_fn || !thread->stop_poller_fn) { 230 SPDK_ERRLOG("No related functions to start requested poller\n"); 231 abort(); 232 } 233 234 poller = thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds); 235 if (!poller) { 236 SPDK_ERRLOG("Unable to start requested poller\n"); 237 abort(); 238 } 239 240 return poller; 241 } 242 243 void 244 spdk_poller_unregister(struct spdk_poller **ppoller) 245 { 246 struct spdk_thread *thread; 247 struct spdk_poller *poller; 248 249 poller = *ppoller; 250 if (poller == NULL) { 251 return; 252 } 253 254 *ppoller = NULL; 255 256 thread = spdk_get_thread(); 257 258 if (thread) { 259 thread->stop_poller_fn(poller, thread->thread_ctx); 260 } 261 } 262 263 struct call_thread { 264 struct spdk_thread *cur_thread; 265 spdk_thread_fn fn; 266 void *ctx; 267 268 struct spdk_thread *orig_thread; 269 spdk_thread_fn cpl; 270 }; 271 272 static void 273 spdk_on_thread(void *ctx) 274 { 275 struct call_thread *ct = ctx; 276 277 ct->fn(ct->ctx); 278 279 pthread_mutex_lock(&g_devlist_mutex); 280 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 281 pthread_mutex_unlock(&g_devlist_mutex); 282 283 if (!ct->cur_thread) { 284 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 285 free(ctx); 286 } else { 287 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 288 } 289 } 290 291 void 292 spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl) 293 { 294 struct call_thread *ct; 295 296 ct = calloc(1, sizeof(*ct)); 297 if (!ct) { 298 SPDK_ERRLOG("Unable to perform thread iteration\n"); 299 cpl(ctx); 300 return; 301 } 302 303 ct->fn = fn; 304 ct->ctx = ctx; 305 ct->cpl = cpl; 306 307 pthread_mutex_lock(&g_devlist_mutex); 308 ct->orig_thread = _get_thread(); 309 ct->cur_thread = TAILQ_FIRST(&g_threads); 310 pthread_mutex_unlock(&g_devlist_mutex); 311 312 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 313 } 314 315 void 316 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 317 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size) 318 { 319 struct io_device *dev, *tmp; 320 321 assert(io_device != NULL); 322 assert(create_cb != NULL); 323 assert(destroy_cb != NULL); 324 325 dev = calloc(1, sizeof(struct io_device)); 326 if (dev == NULL) { 327 SPDK_ERRLOG("could not allocate io_device\n"); 328 return; 329 } 330 331 dev->io_device = io_device; 332 dev->create_cb = create_cb; 333 dev->destroy_cb = destroy_cb; 334 dev->unregister_cb = NULL; 335 dev->ctx_size = ctx_size; 336 dev->for_each_count = 0; 337 dev->unregistered = false; 338 dev->refcnt = 0; 339 340 pthread_mutex_lock(&g_devlist_mutex); 341 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 342 if (tmp->io_device == io_device) { 343 SPDK_ERRLOG("io_device %p already registered\n", io_device); 344 free(dev); 345 pthread_mutex_unlock(&g_devlist_mutex); 346 return; 347 } 348 } 349 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 350 pthread_mutex_unlock(&g_devlist_mutex); 351 } 352 353 static void 354 _finish_unregister(void *arg) 355 { 356 struct io_device *dev = arg; 357 358 dev->unregister_cb(dev->io_device); 359 free(dev); 360 } 361 362 static void 363 _spdk_io_device_free(struct io_device *dev) 364 { 365 if (dev->unregister_cb == NULL) { 366 free(dev); 367 } else { 368 assert(dev->unregister_thread != NULL); 369 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 370 } 371 } 372 373 void 374 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 375 { 376 struct io_device *dev; 377 uint32_t refcnt; 378 struct spdk_thread *thread; 379 380 thread = spdk_get_thread(); 381 382 pthread_mutex_lock(&g_devlist_mutex); 383 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 384 if (dev->io_device == io_device) { 385 break; 386 } 387 } 388 389 if (!dev) { 390 SPDK_ERRLOG("io_device %p not found\n", io_device); 391 assert(false); 392 pthread_mutex_unlock(&g_devlist_mutex); 393 return; 394 } 395 396 if (dev->for_each_count > 0) { 397 SPDK_ERRLOG("io_device %p has %u for_each calls outstanding\n", io_device, dev->for_each_count); 398 pthread_mutex_unlock(&g_devlist_mutex); 399 return; 400 } 401 402 dev->unregister_cb = unregister_cb; 403 dev->unregistered = true; 404 TAILQ_REMOVE(&g_io_devices, dev, tailq); 405 refcnt = dev->refcnt; 406 dev->unregister_thread = thread; 407 pthread_mutex_unlock(&g_devlist_mutex); 408 409 if (refcnt > 0) { 410 /* defer deletion */ 411 return; 412 } 413 414 _spdk_io_device_free(dev); 415 } 416 417 struct spdk_io_channel * 418 spdk_get_io_channel(void *io_device) 419 { 420 struct spdk_io_channel *ch; 421 struct spdk_thread *thread; 422 struct io_device *dev; 423 int rc; 424 425 pthread_mutex_lock(&g_devlist_mutex); 426 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 427 if (dev->io_device == io_device) { 428 break; 429 } 430 } 431 if (dev == NULL) { 432 SPDK_ERRLOG("could not find io_device %p\n", io_device); 433 pthread_mutex_unlock(&g_devlist_mutex); 434 return NULL; 435 } 436 437 thread = _get_thread(); 438 if (!thread) { 439 SPDK_ERRLOG("No thread allocated\n"); 440 pthread_mutex_unlock(&g_devlist_mutex); 441 return NULL; 442 } 443 444 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 445 if (ch->dev == dev) { 446 ch->ref++; 447 /* 448 * An I/O channel already exists for this device on this 449 * thread, so return it. 450 */ 451 pthread_mutex_unlock(&g_devlist_mutex); 452 return ch; 453 } 454 } 455 456 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 457 if (ch == NULL) { 458 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 459 pthread_mutex_unlock(&g_devlist_mutex); 460 return NULL; 461 } 462 463 ch->dev = dev; 464 ch->destroy_cb = dev->destroy_cb; 465 ch->thread = thread; 466 ch->ref = 1; 467 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 468 469 dev->refcnt++; 470 471 pthread_mutex_unlock(&g_devlist_mutex); 472 473 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 474 if (rc == -1) { 475 pthread_mutex_lock(&g_devlist_mutex); 476 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 477 dev->refcnt--; 478 free(ch); 479 pthread_mutex_unlock(&g_devlist_mutex); 480 return NULL; 481 } 482 483 return ch; 484 } 485 486 static void 487 _spdk_put_io_channel(void *arg) 488 { 489 struct spdk_io_channel *ch = arg; 490 bool do_remove_dev = true; 491 492 assert(ch->thread == spdk_get_thread()); 493 assert(ch->ref == 0); 494 495 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 496 497 pthread_mutex_lock(&g_devlist_mutex); 498 ch->dev->refcnt--; 499 500 if (!ch->dev->unregistered) { 501 do_remove_dev = false; 502 } 503 504 if (ch->dev->refcnt > 0) { 505 do_remove_dev = false; 506 } 507 508 pthread_mutex_unlock(&g_devlist_mutex); 509 510 if (do_remove_dev) { 511 _spdk_io_device_free(ch->dev); 512 } 513 free(ch); 514 } 515 516 void 517 spdk_put_io_channel(struct spdk_io_channel *ch) 518 { 519 ch->ref--; 520 521 if (ch->ref == 0) { 522 /* If this was the last reference, remove the channel from the list */ 523 pthread_mutex_lock(&g_devlist_mutex); 524 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 525 pthread_mutex_unlock(&g_devlist_mutex); 526 527 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 528 } 529 } 530 531 struct spdk_io_channel * 532 spdk_io_channel_from_ctx(void *ctx) 533 { 534 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 535 } 536 537 struct spdk_thread * 538 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 539 { 540 return ch->thread; 541 } 542 543 struct spdk_io_channel_iter { 544 void *io_device; 545 struct io_device *dev; 546 spdk_channel_msg fn; 547 int status; 548 void *ctx; 549 struct spdk_io_channel *ch; 550 551 struct spdk_thread *cur_thread; 552 553 struct spdk_thread *orig_thread; 554 spdk_channel_for_each_cpl cpl; 555 }; 556 557 void * 558 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 559 { 560 return i->io_device; 561 } 562 563 struct spdk_io_channel * 564 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 565 { 566 return i->ch; 567 } 568 569 void * 570 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 571 { 572 return i->ctx; 573 } 574 575 static void 576 _call_completion(void *ctx) 577 { 578 struct spdk_io_channel_iter *i = ctx; 579 580 if (i->cpl != NULL) { 581 i->cpl(i, i->status); 582 } 583 free(i); 584 } 585 586 static void 587 _call_channel(void *ctx) 588 { 589 struct spdk_io_channel_iter *i = ctx; 590 struct spdk_io_channel *ch; 591 592 /* 593 * It is possible that the channel was deleted before this 594 * message had a chance to execute. If so, skip calling 595 * the fn() on this thread. 596 */ 597 pthread_mutex_lock(&g_devlist_mutex); 598 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 599 if (ch->dev->io_device == i->io_device) { 600 break; 601 } 602 } 603 pthread_mutex_unlock(&g_devlist_mutex); 604 605 if (ch) { 606 i->fn(i); 607 } else { 608 spdk_for_each_channel_continue(i, 0); 609 } 610 } 611 612 void 613 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 614 spdk_channel_for_each_cpl cpl) 615 { 616 struct spdk_thread *thread; 617 struct spdk_io_channel *ch; 618 struct spdk_io_channel_iter *i; 619 620 i = calloc(1, sizeof(*i)); 621 if (!i) { 622 SPDK_ERRLOG("Unable to allocate iterator\n"); 623 return; 624 } 625 626 i->io_device = io_device; 627 i->fn = fn; 628 i->ctx = ctx; 629 i->cpl = cpl; 630 631 pthread_mutex_lock(&g_devlist_mutex); 632 i->orig_thread = _get_thread(); 633 634 TAILQ_FOREACH(thread, &g_threads, tailq) { 635 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 636 if (ch->dev->io_device == io_device) { 637 ch->dev->for_each_count++; 638 i->dev = ch->dev; 639 i->cur_thread = thread; 640 i->ch = ch; 641 pthread_mutex_unlock(&g_devlist_mutex); 642 spdk_thread_send_msg(thread, _call_channel, i); 643 return; 644 } 645 } 646 } 647 648 pthread_mutex_unlock(&g_devlist_mutex); 649 650 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 651 } 652 653 void 654 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 655 { 656 struct spdk_thread *thread; 657 struct spdk_io_channel *ch; 658 659 assert(i->cur_thread == spdk_get_thread()); 660 661 i->status = status; 662 663 pthread_mutex_lock(&g_devlist_mutex); 664 if (status) { 665 goto end; 666 } 667 thread = TAILQ_NEXT(i->cur_thread, tailq); 668 while (thread) { 669 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 670 if (ch->dev->io_device == i->io_device) { 671 i->cur_thread = thread; 672 i->ch = ch; 673 pthread_mutex_unlock(&g_devlist_mutex); 674 spdk_thread_send_msg(thread, _call_channel, i); 675 return; 676 } 677 } 678 thread = TAILQ_NEXT(thread, tailq); 679 } 680 681 end: 682 i->dev->for_each_count--; 683 i->ch = NULL; 684 pthread_mutex_unlock(&g_devlist_mutex); 685 686 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 687 } 688