1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/thread.h" 37 #include "spdk/log.h" 38 39 #ifdef __linux__ 40 #include <sys/prctl.h> 41 #endif 42 43 #ifdef __FreeBSD__ 44 #include <pthread_np.h> 45 #endif 46 47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; 48 49 struct io_device { 50 void *io_device; 51 spdk_io_channel_create_cb create_cb; 52 spdk_io_channel_destroy_cb destroy_cb; 53 spdk_io_device_unregister_cb unregister_cb; 54 struct spdk_thread *unregister_thread; 55 uint32_t ctx_size; 56 uint32_t for_each_count; 57 TAILQ_ENTRY(io_device) tailq; 58 59 uint32_t refcnt; 60 61 bool unregistered; 62 }; 63 64 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); 65 66 struct spdk_thread { 67 pthread_t thread_id; 68 spdk_thread_pass_msg msg_fn; 69 spdk_start_poller start_poller_fn; 70 spdk_stop_poller stop_poller_fn; 71 void *thread_ctx; 72 TAILQ_HEAD(, spdk_io_channel) io_channels; 73 TAILQ_ENTRY(spdk_thread) tailq; 74 char *name; 75 }; 76 77 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); 78 79 static struct spdk_thread * 80 _get_thread(void) 81 { 82 pthread_t thread_id; 83 struct spdk_thread *thread; 84 85 thread_id = pthread_self(); 86 87 thread = NULL; 88 TAILQ_FOREACH(thread, &g_threads, tailq) { 89 if (thread->thread_id == thread_id) { 90 return thread; 91 } 92 } 93 94 return NULL; 95 } 96 97 static void 98 _set_thread_name(const char *thread_name) 99 { 100 #if defined(__linux__) 101 prctl(PR_SET_NAME, thread_name, 0, 0, 0); 102 #elif defined(__FreeBSD__) 103 pthread_set_name_np(pthread_self(), thread_name); 104 #else 105 #error missing platform support for thread name 106 #endif 107 } 108 109 struct spdk_thread * 110 spdk_allocate_thread(spdk_thread_pass_msg msg_fn, 111 spdk_start_poller start_poller_fn, 112 spdk_stop_poller stop_poller_fn, 113 void *thread_ctx, const char *name) 114 { 115 struct spdk_thread *thread; 116 117 pthread_mutex_lock(&g_devlist_mutex); 118 119 thread = _get_thread(); 120 if (thread) { 121 SPDK_ERRLOG("Double allocated SPDK thread\n"); 122 pthread_mutex_unlock(&g_devlist_mutex); 123 return NULL; 124 } 125 126 thread = calloc(1, sizeof(*thread)); 127 if (!thread) { 128 SPDK_ERRLOG("Unable to allocate memory for thread\n"); 129 pthread_mutex_unlock(&g_devlist_mutex); 130 return NULL; 131 } 132 133 thread->thread_id = pthread_self(); 134 thread->msg_fn = msg_fn; 135 thread->start_poller_fn = start_poller_fn; 136 thread->stop_poller_fn = stop_poller_fn; 137 thread->thread_ctx = thread_ctx; 138 TAILQ_INIT(&thread->io_channels); 139 TAILQ_INSERT_TAIL(&g_threads, thread, tailq); 140 if (name) { 141 _set_thread_name(name); 142 thread->name = strdup(name); 143 } 144 145 pthread_mutex_unlock(&g_devlist_mutex); 146 147 return thread; 148 } 149 150 void 151 spdk_free_thread(void) 152 { 153 struct spdk_thread *thread; 154 155 pthread_mutex_lock(&g_devlist_mutex); 156 157 thread = _get_thread(); 158 if (!thread) { 159 SPDK_ERRLOG("No thread allocated\n"); 160 pthread_mutex_unlock(&g_devlist_mutex); 161 return; 162 } 163 164 TAILQ_REMOVE(&g_threads, thread, tailq); 165 free(thread->name); 166 free(thread); 167 168 pthread_mutex_unlock(&g_devlist_mutex); 169 } 170 171 struct spdk_thread * 172 spdk_get_thread(void) 173 { 174 struct spdk_thread *thread; 175 176 pthread_mutex_lock(&g_devlist_mutex); 177 178 thread = _get_thread(); 179 if (!thread) { 180 SPDK_ERRLOG("No thread allocated\n"); 181 } 182 183 pthread_mutex_unlock(&g_devlist_mutex); 184 185 return thread; 186 } 187 188 const char * 189 spdk_thread_get_name(const struct spdk_thread *thread) 190 { 191 return thread->name; 192 } 193 194 void 195 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx) 196 { 197 thread->msg_fn(fn, ctx, thread->thread_ctx); 198 } 199 200 201 struct spdk_poller * 202 spdk_poller_register(spdk_poller_fn fn, 203 void *arg, 204 uint64_t period_microseconds) 205 { 206 struct spdk_thread *thread; 207 struct spdk_poller *poller; 208 209 thread = spdk_get_thread(); 210 if (!thread) { 211 abort(); 212 } 213 214 if (!thread->start_poller_fn || !thread->stop_poller_fn) { 215 SPDK_ERRLOG("No related functions to start requested poller\n"); 216 abort(); 217 } 218 219 poller = thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds); 220 if (!poller) { 221 SPDK_ERRLOG("Unable to start requested poller\n"); 222 abort(); 223 } 224 225 return poller; 226 } 227 228 void 229 spdk_poller_unregister(struct spdk_poller **ppoller) 230 { 231 struct spdk_thread *thread; 232 struct spdk_poller *poller; 233 234 poller = *ppoller; 235 if (poller == NULL) { 236 return; 237 } 238 239 *ppoller = NULL; 240 241 thread = spdk_get_thread(); 242 243 if (thread) { 244 thread->stop_poller_fn(poller, thread->thread_ctx); 245 } 246 } 247 248 struct call_thread { 249 struct spdk_thread *cur_thread; 250 spdk_thread_fn fn; 251 void *ctx; 252 253 struct spdk_thread *orig_thread; 254 spdk_thread_fn cpl; 255 }; 256 257 static void 258 spdk_on_thread(void *ctx) 259 { 260 struct call_thread *ct = ctx; 261 262 ct->fn(ct->ctx); 263 264 pthread_mutex_lock(&g_devlist_mutex); 265 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); 266 pthread_mutex_unlock(&g_devlist_mutex); 267 268 if (!ct->cur_thread) { 269 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); 270 free(ctx); 271 } else { 272 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx); 273 } 274 } 275 276 void 277 spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl) 278 { 279 struct call_thread *ct; 280 281 ct = calloc(1, sizeof(*ct)); 282 if (!ct) { 283 SPDK_ERRLOG("Unable to perform thread iteration\n"); 284 cpl(ctx); 285 return; 286 } 287 288 ct->fn = fn; 289 ct->ctx = ctx; 290 ct->cpl = cpl; 291 292 pthread_mutex_lock(&g_devlist_mutex); 293 ct->orig_thread = _get_thread(); 294 ct->cur_thread = TAILQ_FIRST(&g_threads); 295 pthread_mutex_unlock(&g_devlist_mutex); 296 297 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct); 298 } 299 300 void 301 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, 302 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size) 303 { 304 struct io_device *dev, *tmp; 305 306 assert(io_device != NULL); 307 assert(create_cb != NULL); 308 assert(destroy_cb != NULL); 309 310 dev = calloc(1, sizeof(struct io_device)); 311 if (dev == NULL) { 312 SPDK_ERRLOG("could not allocate io_device\n"); 313 return; 314 } 315 316 dev->io_device = io_device; 317 dev->create_cb = create_cb; 318 dev->destroy_cb = destroy_cb; 319 dev->unregister_cb = NULL; 320 dev->ctx_size = ctx_size; 321 dev->for_each_count = 0; 322 dev->unregistered = false; 323 dev->refcnt = 0; 324 325 pthread_mutex_lock(&g_devlist_mutex); 326 TAILQ_FOREACH(tmp, &g_io_devices, tailq) { 327 if (tmp->io_device == io_device) { 328 SPDK_ERRLOG("io_device %p already registered\n", io_device); 329 free(dev); 330 pthread_mutex_unlock(&g_devlist_mutex); 331 return; 332 } 333 } 334 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); 335 pthread_mutex_unlock(&g_devlist_mutex); 336 } 337 338 static void 339 _finish_unregister(void *arg) 340 { 341 struct io_device *dev = arg; 342 343 dev->unregister_cb(dev->io_device); 344 free(dev); 345 } 346 347 static void 348 _spdk_io_device_free(struct io_device *dev) 349 { 350 if (dev->unregister_cb == NULL) { 351 free(dev); 352 } else { 353 assert(dev->unregister_thread != NULL); 354 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); 355 } 356 } 357 358 void 359 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) 360 { 361 struct io_device *dev; 362 uint32_t refcnt; 363 struct spdk_thread *thread; 364 365 thread = spdk_get_thread(); 366 367 pthread_mutex_lock(&g_devlist_mutex); 368 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 369 if (dev->io_device == io_device) { 370 break; 371 } 372 } 373 374 if (!dev) { 375 SPDK_ERRLOG("io_device %p not found\n", io_device); 376 pthread_mutex_unlock(&g_devlist_mutex); 377 return; 378 } 379 380 if (dev->for_each_count > 0) { 381 SPDK_ERRLOG("io_device %p has %u for_each calls outstanding\n", io_device, dev->for_each_count); 382 pthread_mutex_unlock(&g_devlist_mutex); 383 return; 384 } 385 386 dev->unregister_cb = unregister_cb; 387 dev->unregistered = true; 388 TAILQ_REMOVE(&g_io_devices, dev, tailq); 389 refcnt = dev->refcnt; 390 dev->unregister_thread = thread; 391 pthread_mutex_unlock(&g_devlist_mutex); 392 393 if (refcnt > 0) { 394 /* defer deletion */ 395 return; 396 } 397 398 _spdk_io_device_free(dev); 399 } 400 401 struct spdk_io_channel * 402 spdk_get_io_channel(void *io_device) 403 { 404 struct spdk_io_channel *ch; 405 struct spdk_thread *thread; 406 struct io_device *dev; 407 int rc; 408 409 pthread_mutex_lock(&g_devlist_mutex); 410 TAILQ_FOREACH(dev, &g_io_devices, tailq) { 411 if (dev->io_device == io_device) { 412 break; 413 } 414 } 415 if (dev == NULL) { 416 SPDK_ERRLOG("could not find io_device %p\n", io_device); 417 pthread_mutex_unlock(&g_devlist_mutex); 418 return NULL; 419 } 420 421 thread = _get_thread(); 422 if (!thread) { 423 SPDK_ERRLOG("No thread allocated\n"); 424 pthread_mutex_unlock(&g_devlist_mutex); 425 return NULL; 426 } 427 428 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 429 if (ch->dev == dev) { 430 ch->ref++; 431 /* 432 * An I/O channel already exists for this device on this 433 * thread, so return it. 434 */ 435 pthread_mutex_unlock(&g_devlist_mutex); 436 return ch; 437 } 438 } 439 440 ch = calloc(1, sizeof(*ch) + dev->ctx_size); 441 if (ch == NULL) { 442 SPDK_ERRLOG("could not calloc spdk_io_channel\n"); 443 pthread_mutex_unlock(&g_devlist_mutex); 444 return NULL; 445 } 446 447 ch->dev = dev; 448 ch->destroy_cb = dev->destroy_cb; 449 ch->thread = thread; 450 ch->ref = 1; 451 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); 452 453 dev->refcnt++; 454 455 pthread_mutex_unlock(&g_devlist_mutex); 456 457 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); 458 if (rc == -1) { 459 pthread_mutex_lock(&g_devlist_mutex); 460 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 461 dev->refcnt--; 462 free(ch); 463 pthread_mutex_unlock(&g_devlist_mutex); 464 return NULL; 465 } 466 467 return ch; 468 } 469 470 static void 471 _spdk_put_io_channel(void *arg) 472 { 473 struct spdk_io_channel *ch = arg; 474 bool do_remove_dev = true; 475 476 assert(ch->thread == spdk_get_thread()); 477 assert(ch->ref == 0); 478 479 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); 480 481 pthread_mutex_lock(&g_devlist_mutex); 482 ch->dev->refcnt--; 483 484 if (!ch->dev->unregistered) { 485 do_remove_dev = false; 486 } 487 488 if (ch->dev->refcnt > 0) { 489 do_remove_dev = false; 490 } 491 492 pthread_mutex_unlock(&g_devlist_mutex); 493 494 if (do_remove_dev) { 495 _spdk_io_device_free(ch->dev); 496 } 497 free(ch); 498 } 499 500 void 501 spdk_put_io_channel(struct spdk_io_channel *ch) 502 { 503 ch->ref--; 504 505 if (ch->ref == 0) { 506 /* If this was the last reference, remove the channel from the list */ 507 pthread_mutex_lock(&g_devlist_mutex); 508 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); 509 pthread_mutex_unlock(&g_devlist_mutex); 510 511 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch); 512 } 513 } 514 515 struct spdk_io_channel * 516 spdk_io_channel_from_ctx(void *ctx) 517 { 518 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); 519 } 520 521 struct spdk_thread * 522 spdk_io_channel_get_thread(struct spdk_io_channel *ch) 523 { 524 return ch->thread; 525 } 526 527 struct spdk_io_channel_iter { 528 void *io_device; 529 struct io_device *dev; 530 spdk_channel_msg fn; 531 int status; 532 void *ctx; 533 struct spdk_io_channel *ch; 534 535 struct spdk_thread *cur_thread; 536 537 struct spdk_thread *orig_thread; 538 spdk_channel_for_each_cpl cpl; 539 }; 540 541 void * 542 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) 543 { 544 return i->io_device; 545 } 546 547 struct spdk_io_channel * 548 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) 549 { 550 return i->ch; 551 } 552 553 void * 554 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) 555 { 556 return i->ctx; 557 } 558 559 static void 560 _call_completion(void *ctx) 561 { 562 struct spdk_io_channel_iter *i = ctx; 563 564 if (i->cpl != NULL) { 565 i->cpl(i, i->status); 566 } 567 free(i); 568 } 569 570 static void 571 _call_channel(void *ctx) 572 { 573 struct spdk_io_channel_iter *i = ctx; 574 struct spdk_io_channel *ch; 575 576 /* 577 * It is possible that the channel was deleted before this 578 * message had a chance to execute. If so, skip calling 579 * the fn() on this thread. 580 */ 581 pthread_mutex_lock(&g_devlist_mutex); 582 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { 583 if (ch->dev->io_device == i->io_device) { 584 break; 585 } 586 } 587 pthread_mutex_unlock(&g_devlist_mutex); 588 589 if (ch) { 590 i->fn(i); 591 } else { 592 spdk_for_each_channel_continue(i, 0); 593 } 594 } 595 596 void 597 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, 598 spdk_channel_for_each_cpl cpl) 599 { 600 struct spdk_thread *thread; 601 struct spdk_io_channel *ch; 602 struct spdk_io_channel_iter *i; 603 604 i = calloc(1, sizeof(*i)); 605 if (!i) { 606 SPDK_ERRLOG("Unable to allocate iterator\n"); 607 return; 608 } 609 610 i->io_device = io_device; 611 i->fn = fn; 612 i->ctx = ctx; 613 i->cpl = cpl; 614 615 pthread_mutex_lock(&g_devlist_mutex); 616 i->orig_thread = _get_thread(); 617 618 TAILQ_FOREACH(thread, &g_threads, tailq) { 619 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 620 if (ch->dev->io_device == io_device) { 621 ch->dev->for_each_count++; 622 i->dev = ch->dev; 623 i->cur_thread = thread; 624 i->ch = ch; 625 pthread_mutex_unlock(&g_devlist_mutex); 626 spdk_thread_send_msg(thread, _call_channel, i); 627 return; 628 } 629 } 630 } 631 632 pthread_mutex_unlock(&g_devlist_mutex); 633 634 cpl(i, 0); 635 636 free(i); 637 } 638 639 void 640 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) 641 { 642 struct spdk_thread *thread; 643 struct spdk_io_channel *ch; 644 645 assert(i->cur_thread == spdk_get_thread()); 646 647 i->status = status; 648 649 pthread_mutex_lock(&g_devlist_mutex); 650 if (status) { 651 goto end; 652 } 653 thread = TAILQ_NEXT(i->cur_thread, tailq); 654 while (thread) { 655 TAILQ_FOREACH(ch, &thread->io_channels, tailq) { 656 if (ch->dev->io_device == i->io_device) { 657 i->cur_thread = thread; 658 i->ch = ch; 659 pthread_mutex_unlock(&g_devlist_mutex); 660 spdk_thread_send_msg(thread, _call_channel, i); 661 return; 662 } 663 } 664 thread = TAILQ_NEXT(thread, tailq); 665 } 666 667 end: 668 i->dev->for_each_count--; 669 i->ch = NULL; 670 pthread_mutex_unlock(&g_devlist_mutex); 671 672 spdk_thread_send_msg(i->orig_thread, _call_completion, i); 673 } 674