1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/thread.h" 37 #include "spdk/log.h" 38 39 #ifdef __linux__ 40 #include <sys/prctl.h> 41 #endif 42 43 #ifdef __FreeBSD__ 44 #include <pthread_np.h> 45 #endif 46 47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 48 49 struct io_device { 50 void *io_device; 51 spdk_io_channel_create_cb create_cb; 52 spdk_io_channel_destroy_cb destroy_cb; 53 spdk_io_device_unregister_cb unregister_cb; 54 struct spdk_thread *unregister_thread; 55 uint32_t ctx_size; 56 uint32_t for_each_count; 57 TAILQ_ENTRY(io_device) tailq; 58 59 uint32_t refcnt; 60 61 bool unregistered; 62 }; 63 64 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 65 66 struct spdk_thread { 67 pthread_t thread_id; 68 spdk_thread_pass_msg msg_fn; 69 spdk_start_poller start_poller_fn; 70 spdk_stop_poller stop_poller_fn; 71 void *thread_ctx; 72 TAILQ_HEAD(, spdk_io_channel) io_channels; 73 TAILQ_ENTRY(spdk_thread) tailq; 74 char *name; 75 }; 76 77 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 78 static uint32_t g_thread_count = 0; 79 80 static struct spdk_thread * 81 _get_thread(void) 82 { 83 pthread_t thread_id; 84 struct spdk_thread *thread; 85 86 thread_id = pthread_self(); 87 88 thread = NULL; 89 TAILQ_FOREACH(thread, &g_threads, tailq) { 90 if (thread->thread_id == thread_id) { 91 return thread; 92 } 93 } 94 95 return NULL; 96 } 97 98 static void 99 _set_thread_name(const char *thread_name) 100 { 101 #if defined(__linux__) 102 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 103 #elif defined(__FreeBSD__) 104 pthread_set_name_np(pthread_self(), thread_name); 105 #else 106 #error missing platform support for thread name 107 #endif 108 } 109 110 struct spdk_thread * 111 spdk_allocate_thread(spdk_thread_pass_msg msg_fn, 112 spdk_start_poller start_poller_fn, 113 spdk_stop_poller stop_poller_fn, 114 void *thread_ctx, const char *name) 115 { 116 struct spdk_thread *thread; 117 118 pthread_mutex_lock(&g_devlist_mutex); 119 120 thread = _get_thread(); 121 if (thread) { 122 SPDK_ERRLOG("Double allocated SPDK thread\n"); 123 pthread_mutex_unlock(&g_devlist_mutex); 124 return NULL; 125 } 126 127 thread = calloc(1, sizeof(*thread)); 128 if (!thread) { 129 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 130 pthread_mutex_unlock(&g_devlist_mutex); 131 return NULL; 132 } 133 134 thread->thread_id = pthread_self(); 135 thread->msg_fn = msg_fn; 136 thread->start_poller_fn = start_poller_fn; 137 thread->stop_poller_fn = stop_poller_fn; 138 thread->thread_ctx = thread_ctx; 139 TAILQ_INIT(&thread->io_channels); 140 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 141 g_thread_count++; 142 if (name) { 143 _set_thread_name(name); 144 thread->name = strdup(name); 145 } 146 147 pthread_mutex_unlock(&g_devlist_mutex); 148 149 return thread; 150 } 151 152 void 153 spdk_free_thread(void) 154 { 155 struct spdk_thread *thread; 156 157 pthread_mutex_lock(&g_devlist_mutex); 158 159 thread = _get_thread(); 160 if (!thread) { 161 SPDK_ERRLOG("No thread allocated\n"); 162 pthread_mutex_unlock(&g_devlist_mutex); 163 return; 164 } 165 166 assert(g_thread_count > 0); 167 g_thread_count--; 168 TAILQ_REMOVE(&g_threads, thread, tailq); 169 free(thread->name); 170 free(thread); 171 172 pthread_mutex_unlock(&g_devlist_mutex); 173 } 174 175 uint32_t 176 spdk_thread_get_count(void) 177 { 178 /* 179 * Return cached value of the current thread count. We could acquire the 180 * lock and iterate through the TAILQ of threads to count them, but that 181 * count could still be invalidated after we release the lock. 182 */ 183 return g_thread_count; 184 } 185 186 struct spdk_thread * 187 spdk_get_thread(void) 188 { 189 struct spdk_thread *thread; 190 191 pthread_mutex_lock(&g_devlist_mutex); 192 193 thread = _get_thread(); 194 if (!thread) { 195 SPDK_ERRLOG("No thread allocated\n"); 196 } 197 198 pthread_mutex_unlock(&g_devlist_mutex); 199 200 return thread; 201 } 202 203 const char * 204 spdk_thread_get_name(const struct spdk_thread *thread) 205 { 206 return thread->name; 207 } 208 209 void 210 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx) 211 { 212 thread->msg_fn(fn, ctx, thread->thread_ctx); 213 } 214 215 216 struct spdk_poller * 217 spdk_poller_register(spdk_poller_fn fn, 218 void *arg, 219 uint64_t period_microseconds) 220 { 221 struct spdk_thread *thread; 222 struct spdk_poller *poller; 223 224 thread = spdk_get_thread(); 225 if (!thread) { 226 assert(false); 227 return NULL; 228 } 229 230 if (!thread->start_poller_fn || !thread->stop_poller_fn) { 231 SPDK_ERRLOG("No related functions to start requested poller\n"); 232 assert(false); 233 return NULL; 234 } 235 236 poller = thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds); 237 if (!poller) { 238 SPDK_ERRLOG("Unable to start requested poller\n"); 239 assert(false); 240 return NULL; 241 } 242 243 return poller; 244 } 245 246 void 247 spdk_poller_unregister(struct spdk_poller **ppoller) 248 { 249 struct spdk_thread *thread; 250 struct spdk_poller *poller; 251 252 poller = *ppoller; 253 if (poller == NULL) { 254 return; 255 } 256 257 *ppoller = NULL; 258 259 thread = spdk_get_thread(); 260 261 if (thread) { 262 thread->stop_poller_fn(poller, thread->thread_ctx); 263 } 264 } 265 266 struct call_thread { 267 struct spdk_thread *cur_thread; 268 spdk_thread_fn fn; 269 void *ctx; 270 271 struct spdk_thread *orig_thread; 272 spdk_thread_fn cpl; 273 }; 274 275 static void 276 spdk_on_thread(void *ctx) 277 { 278 struct call_thread *ct = ctx; 279 280 ct->fn(ct->ctx); 281 282 pthread_mutex_lock(&g_devlist_mutex); 283 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 284 pthread_mutex_unlock(&g_devlist_mutex); 285 286 if (!ct->cur_thread) { 287 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 288 free(ctx); 289 } else { 290 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 291 } 292 } 293 294 void 295 spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl) 296 { 297 struct call_thread *ct; 298 299 ct = calloc(1, sizeof(*ct)); 300 if (!ct) { 301 SPDK_ERRLOG("Unable to perform thread iteration\n"); 302 cpl(ctx); 303 return; 304 } 305 306 ct->fn = fn; 307 ct->ctx = ctx; 308 ct->cpl = cpl; 309 310 pthread_mutex_lock(&g_devlist_mutex); 311 ct->orig_thread = _get_thread(); 312 ct->cur_thread = TAILQ_FIRST(&g_threads); 313 pthread_mutex_unlock(&g_devlist_mutex); 314 315 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 316 } 317 318 void 319 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 320 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size) 321 { 322 struct io_device *dev, *tmp; 323 324 assert(io_device != NULL); 325 assert(create_cb != NULL); 326 assert(destroy_cb != NULL); 327 328 dev = calloc(1, sizeof(struct io_device)); 329 if (dev == NULL) { 330 SPDK_ERRLOG("could not allocate io_device\n"); 331 return; 332 } 333 334 dev->io_device = io_device; 335 dev->create_cb = create_cb; 336 dev->destroy_cb = destroy_cb; 337 dev->unregister_cb = NULL; 338 dev->ctx_size = ctx_size; 339 dev->for_each_count = 0; 340 dev->unregistered = false; 341 dev->refcnt = 0; 342 343 pthread_mutex_lock(&g_devlist_mutex); 344 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 345 if (tmp->io_device == io_device) { 346 SPDK_ERRLOG("io_device %p already registered\n", io_device); 347 free(dev); 348 pthread_mutex_unlock(&g_devlist_mutex); 349 return; 350 } 351 } 352 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 353 pthread_mutex_unlock(&g_devlist_mutex); 354 } 355 356 static void 357 _finish_unregister(void *arg) 358 { 359 struct io_device *dev = arg; 360 361 dev->unregister_cb(dev->io_device); 362 free(dev); 363 } 364 365 static void 366 _spdk_io_device_free(struct io_device *dev) 367 { 368 if (dev->unregister_cb == NULL) { 369 free(dev); 370 } else { 371 assert(dev->unregister_thread != NULL); 372 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 373 } 374 } 375 376 void 377 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 378 { 379 struct io_device *dev; 380 uint32_t refcnt; 381 struct spdk_thread *thread; 382 383 thread = spdk_get_thread(); 384 385 pthread_mutex_lock(&g_devlist_mutex); 386 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 387 if (dev->io_device == io_device) { 388 break; 389 } 390 } 391 392 if (!dev) { 393 SPDK_ERRLOG("io_device %p not found\n", io_device); 394 assert(false); 395 pthread_mutex_unlock(&g_devlist_mutex); 396 return; 397 } 398 399 if (dev->for_each_count > 0) { 400 SPDK_ERRLOG("io_device %p has %u for_each calls outstanding\n", io_device, dev->for_each_count); 401 pthread_mutex_unlock(&g_devlist_mutex); 402 return; 403 } 404 405 dev->unregister_cb = unregister_cb; 406 dev->unregistered = true; 407 TAILQ_REMOVE(&g_io_devices, dev, tailq); 408 refcnt = dev->refcnt; 409 dev->unregister_thread = thread; 410 pthread_mutex_unlock(&g_devlist_mutex); 411 412 if (refcnt > 0) { 413 /* defer deletion */ 414 return; 415 } 416 417 _spdk_io_device_free(dev); 418 } 419 420 struct spdk_io_channel * 421 spdk_get_io_channel(void *io_device) 422 { 423 struct spdk_io_channel *ch; 424 struct spdk_thread *thread; 425 struct io_device *dev; 426 int rc; 427 428 pthread_mutex_lock(&g_devlist_mutex); 429 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 430 if (dev->io_device == io_device) { 431 break; 432 } 433 } 434 if (dev == NULL) { 435 SPDK_ERRLOG("could not find io_device %p\n", io_device); 436 pthread_mutex_unlock(&g_devlist_mutex); 437 return NULL; 438 } 439 440 thread = _get_thread(); 441 if (!thread) { 442 SPDK_ERRLOG("No thread allocated\n"); 443 pthread_mutex_unlock(&g_devlist_mutex); 444 return NULL; 445 } 446 447 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 448 if (ch->dev == dev) { 449 ch->ref++; 450 /* 451 * An I/O channel already exists for this device on this 452 * thread, so return it. 453 */ 454 pthread_mutex_unlock(&g_devlist_mutex); 455 return ch; 456 } 457 } 458 459 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 460 if (ch == NULL) { 461 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 462 pthread_mutex_unlock(&g_devlist_mutex); 463 return NULL; 464 } 465 466 ch->dev = dev; 467 ch->destroy_cb = dev->destroy_cb; 468 ch->thread = thread; 469 ch->ref = 1; 470 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 471 472 dev->refcnt++; 473 474 pthread_mutex_unlock(&g_devlist_mutex); 475 476 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 477 if (rc == -1) { 478 pthread_mutex_lock(&g_devlist_mutex); 479 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 480 dev->refcnt--; 481 free(ch); 482 pthread_mutex_unlock(&g_devlist_mutex); 483 return NULL; 484 } 485 486 return ch; 487 } 488 489 static void 490 _spdk_put_io_channel(void *arg) 491 { 492 struct spdk_io_channel *ch = arg; 493 bool do_remove_dev = true; 494 495 assert(ch->thread == spdk_get_thread()); 496 497 if (ch->ref > 0) { 498 /* 499 * Another reference to the associated io_device was requested 500 * after this message was sent but before it had a chance to 501 * execute. 502 */ 503 return; 504 } 505 506 pthread_mutex_lock(&g_devlist_mutex); 507 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 508 pthread_mutex_unlock(&g_devlist_mutex); 509 510 /* Don't hold the devlist mutex while the destroy_cb is called. */ 511 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 512 513 pthread_mutex_lock(&g_devlist_mutex); 514 ch->dev->refcnt--; 515 516 if (!ch->dev->unregistered) { 517 do_remove_dev = false; 518 } 519 520 if (ch->dev->refcnt > 0) { 521 do_remove_dev = false; 522 } 523 524 pthread_mutex_unlock(&g_devlist_mutex); 525 526 if (do_remove_dev) { 527 _spdk_io_device_free(ch->dev); 528 } 529 free(ch); 530 } 531 532 void 533 spdk_put_io_channel(struct spdk_io_channel *ch) 534 { 535 ch->ref--; 536 537 if (ch->ref == 0) { 538 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 539 } 540 } 541 542 struct spdk_io_channel * 543 spdk_io_channel_from_ctx(void *ctx) 544 { 545 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 546 } 547 548 struct spdk_thread * 549 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 550 { 551 return ch->thread; 552 } 553 554 struct spdk_io_channel_iter { 555 void *io_device; 556 struct io_device *dev; 557 spdk_channel_msg fn; 558 int status; 559 void *ctx; 560 struct spdk_io_channel *ch; 561 562 struct spdk_thread *cur_thread; 563 564 struct spdk_thread *orig_thread; 565 spdk_channel_for_each_cpl cpl; 566 }; 567 568 void * 569 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 570 { 571 return i->io_device; 572 } 573 574 struct spdk_io_channel * 575 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 576 { 577 return i->ch; 578 } 579 580 void * 581 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 582 { 583 return i->ctx; 584 } 585 586 static void 587 _call_completion(void *ctx) 588 { 589 struct spdk_io_channel_iter *i = ctx; 590 591 if (i->cpl != NULL) { 592 i->cpl(i, i->status); 593 } 594 free(i); 595 } 596 597 static void 598 _call_channel(void *ctx) 599 { 600 struct spdk_io_channel_iter *i = ctx; 601 struct spdk_io_channel *ch; 602 603 /* 604 * It is possible that the channel was deleted before this 605 * message had a chance to execute. If so, skip calling 606 * the fn() on this thread. 607 */ 608 pthread_mutex_lock(&g_devlist_mutex); 609 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 610 if (ch->dev->io_device == i->io_device) { 611 break; 612 } 613 } 614 pthread_mutex_unlock(&g_devlist_mutex); 615 616 if (ch) { 617 i->fn(i); 618 } else { 619 spdk_for_each_channel_continue(i, 0); 620 } 621 } 622 623 void 624 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 625 spdk_channel_for_each_cpl cpl) 626 { 627 struct spdk_thread *thread; 628 struct spdk_io_channel *ch; 629 struct spdk_io_channel_iter *i; 630 631 i = calloc(1, sizeof(*i)); 632 if (!i) { 633 SPDK_ERRLOG("Unable to allocate iterator\n"); 634 return; 635 } 636 637 i->io_device = io_device; 638 i->fn = fn; 639 i->ctx = ctx; 640 i->cpl = cpl; 641 642 pthread_mutex_lock(&g_devlist_mutex); 643 i->orig_thread = _get_thread(); 644 645 TAILQ_FOREACH(thread, &g_threads, tailq) { 646 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 647 if (ch->dev->io_device == io_device) { 648 ch->dev->for_each_count++; 649 i->dev = ch->dev; 650 i->cur_thread = thread; 651 i->ch = ch; 652 pthread_mutex_unlock(&g_devlist_mutex); 653 spdk_thread_send_msg(thread, _call_channel, i); 654 return; 655 } 656 } 657 } 658 659 pthread_mutex_unlock(&g_devlist_mutex); 660 661 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 662 } 663 664 void 665 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 666 { 667 struct spdk_thread *thread; 668 struct spdk_io_channel *ch; 669 670 assert(i->cur_thread == spdk_get_thread()); 671 672 i->status = status; 673 674 pthread_mutex_lock(&g_devlist_mutex); 675 if (status) { 676 goto end; 677 } 678 thread = TAILQ_NEXT(i->cur_thread, tailq); 679 while (thread) { 680 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 681 if (ch->dev->io_device == i->io_device) { 682 i->cur_thread = thread; 683 i->ch = ch; 684 pthread_mutex_unlock(&g_devlist_mutex); 685 spdk_thread_send_msg(thread, _call_channel, i); 686 return; 687 } 688 } 689 thread = TAILQ_NEXT(thread, tailq); 690 } 691 692 end: 693 i->dev->for_each_count--; 694 i->ch = NULL; 695 pthread_mutex_unlock(&g_devlist_mutex); 696 697 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 698 } 699