1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 */ 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/proc.h> 42 #include <sys/rtprio.h> 43 #include <sys/queue.h> 44 #include <sys/sysctl.h> 45 #include <sys/kthread.h> 46 #include <sys/signalvar.h> 47 #include <sys/signal2.h> 48 #include <machine/cpu.h> 49 #include <sys/lock.h> 50 51 #include <vm/vm.h> 52 #include <vm/vm_param.h> 53 #include <vm/vm_kern.h> 54 #include <vm/vm_object.h> 55 #include <vm/vm_page.h> 56 #include <vm/vm_map.h> 57 #include <vm/vm_pager.h> 58 #include <vm/vm_extern.h> 59 #include <vm/vm_zone.h> 60 61 #include <sys/thread2.h> 62 #include <sys/msgport2.h> 63 #include <sys/spinlock2.h> 64 #include <sys/serialize.h> 65 66 #include <machine/stdarg.h> 67 #include <machine/cpufunc.h> 68 #ifdef SMP 69 #include <machine/smp.h> 70 #endif 71 72 #include <sys/malloc.h> 73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 74 75 /************************************************************************ 76 * MESSAGE FUNCTIONS * 77 ************************************************************************/ 78 79 /* 80 * lwkt_sendmsg() 81 * 82 * Request asynchronous completion and call lwkt_beginmsg(). The 83 * target port can opt to execute the message synchronously or 84 * asynchronously and this function will automatically queue the 85 * response if the target executes the message synchronously. 86 * 87 * NOTE: The message is in an indeterminant state until this call 88 * returns. The caller should not mess with it (e.g. try to abort it) 89 * until then. 90 */ 91 void 92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 93 { 94 int error; 95 96 KKASSERT(msg->ms_reply_port != NULL && 97 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 98 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 99 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 100 /* 101 * Target port opted to execute the message synchronously so 102 * queue the response. 103 */ 104 lwkt_replymsg(msg, error); 105 } 106 } 107 108 /* 109 * lwkt_domsg() 110 * 111 * Request synchronous completion and call lwkt_beginmsg(). The 112 * target port can opt to execute the message synchronously or 113 * asynchronously and this function will automatically block and 114 * wait for a response if the target executes the message 115 * asynchronously. 116 */ 117 int 118 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 119 { 120 int error; 121 122 KKASSERT(msg->ms_reply_port != NULL && 123 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 124 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 125 msg->ms_flags |= MSGF_SYNC; 126 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 127 /* 128 * Target port opted to execute the message asynchronously so 129 * block and wait for a reply. 130 */ 131 error = lwkt_waitmsg(msg, flags); 132 } else { 133 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 134 } 135 return(error); 136 } 137 138 /* 139 * lwkt_forwardmsg() 140 * 141 * Forward a message received on one port to another port. 142 */ 143 int 144 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 145 { 146 int error; 147 148 crit_enter(); 149 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 150 if ((error = port->mp_putport(port, msg)) != EASYNC) 151 lwkt_replymsg(msg, error); 152 crit_exit(); 153 return(error); 154 } 155 156 /* 157 * lwkt_abortmsg() 158 * 159 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 160 * The caller must ensure that the message will not be both replied AND 161 * destroyed while the abort is in progress. 162 * 163 * This function issues a callback which might block! 164 */ 165 void 166 lwkt_abortmsg(lwkt_msg_t msg) 167 { 168 /* 169 * A critical section protects us from reply IPIs on this cpu. 170 */ 171 crit_enter(); 172 173 /* 174 * Shortcut the operation if the message has already been returned. 175 * The callback typically constructs a lwkt_msg with the abort request, 176 * issues it synchronously, and waits for completion. The callback 177 * is not required to actually abort the message and the target port, 178 * upon receiving an abort request message generated by the callback 179 * should check whether the original message has already completed or 180 * not. 181 */ 182 if (msg->ms_flags & MSGF_ABORTABLE) { 183 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 184 msg->ms_abortfn(msg); 185 } 186 crit_exit(); 187 } 188 189 /************************************************************************ 190 * PORT INITIALIZATION API * 191 ************************************************************************/ 192 193 static void *lwkt_thread_getport(lwkt_port_t port); 194 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 195 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 196 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 197 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 198 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 199 200 static void *lwkt_spin_getport(lwkt_port_t port); 201 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 202 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 203 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 204 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 205 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 206 207 static void *lwkt_serialize_getport(lwkt_port_t port); 208 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 209 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 210 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 211 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 212 213 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 214 static void *lwkt_panic_getport(lwkt_port_t port); 215 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 216 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 217 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 218 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 219 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 220 221 /* 222 * Core port initialization (internal) 223 */ 224 static __inline 225 void 226 _lwkt_initport(lwkt_port_t port, 227 void *(*gportfn)(lwkt_port_t), 228 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 229 int (*wmsgfn)(lwkt_msg_t, int), 230 void *(*wportfn)(lwkt_port_t, int), 231 void (*rportfn)(lwkt_port_t, lwkt_msg_t), 232 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t)) 233 { 234 bzero(port, sizeof(*port)); 235 TAILQ_INIT(&port->mp_msgq); 236 TAILQ_INIT(&port->mp_msgq_prio); 237 port->mp_getport = gportfn; 238 port->mp_putport = pportfn; 239 port->mp_waitmsg = wmsgfn; 240 port->mp_waitport = wportfn; 241 port->mp_replyport = rportfn; 242 port->mp_dropmsg = dmsgfn; 243 } 244 245 /* 246 * Schedule the target thread. If the message flags contains MSGF_NORESCHED 247 * we tell the scheduler not to reschedule if td is at a higher priority. 248 * 249 * This routine is called even if the thread is already scheduled. 250 */ 251 static __inline 252 void 253 _lwkt_schedule_msg(thread_t td, int flags) 254 { 255 lwkt_schedule(td); 256 } 257 258 /* 259 * lwkt_initport_thread() 260 * 261 * Initialize a port for use by a particular thread. The port may 262 * only be used by <td>. 263 */ 264 void 265 lwkt_initport_thread(lwkt_port_t port, thread_t td) 266 { 267 _lwkt_initport(port, 268 lwkt_thread_getport, 269 lwkt_thread_putport, 270 lwkt_thread_waitmsg, 271 lwkt_thread_waitport, 272 lwkt_thread_replyport, 273 lwkt_thread_dropmsg); 274 port->mpu_td = td; 275 } 276 277 /* 278 * lwkt_initport_spin() 279 * 280 * Initialize a port for use with descriptors that might be accessed 281 * via multiple LWPs, processes, or threads. Has somewhat more 282 * overhead then thread ports. 283 */ 284 void 285 lwkt_initport_spin(lwkt_port_t port) 286 { 287 _lwkt_initport(port, 288 lwkt_spin_getport, 289 lwkt_spin_putport, 290 lwkt_spin_waitmsg, 291 lwkt_spin_waitport, 292 lwkt_spin_replyport, 293 lwkt_spin_dropmsg); 294 spin_init(&port->mpu_spin); 295 } 296 297 /* 298 * lwkt_initport_serialize() 299 * 300 * Initialize a port for use with descriptors that might be accessed 301 * via multiple LWPs, processes, or threads. Callers are assumed to 302 * have held the serializer (slz). 303 */ 304 void 305 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 306 { 307 _lwkt_initport(port, 308 lwkt_serialize_getport, 309 lwkt_serialize_putport, 310 lwkt_serialize_waitmsg, 311 lwkt_serialize_waitport, 312 lwkt_serialize_replyport, 313 lwkt_panic_dropmsg); 314 port->mpu_serialize = slz; 315 } 316 317 /* 318 * Similar to the standard initport, this function simply marks the message 319 * as being done and does not attempt to return it to an originating port. 320 */ 321 void 322 lwkt_initport_replyonly_null(lwkt_port_t port) 323 { 324 _lwkt_initport(port, 325 lwkt_panic_getport, 326 lwkt_panic_putport, 327 lwkt_panic_waitmsg, 328 lwkt_panic_waitport, 329 lwkt_null_replyport, 330 lwkt_panic_dropmsg); 331 } 332 333 /* 334 * Initialize a reply-only port, typically used as a message sink. Such 335 * ports can only be used as a reply port. 336 */ 337 void 338 lwkt_initport_replyonly(lwkt_port_t port, 339 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 340 { 341 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 342 lwkt_panic_waitmsg, lwkt_panic_waitport, 343 rportfn, lwkt_panic_dropmsg); 344 } 345 346 void 347 lwkt_initport_putonly(lwkt_port_t port, 348 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 349 { 350 _lwkt_initport(port, lwkt_panic_getport, pportfn, 351 lwkt_panic_waitmsg, lwkt_panic_waitport, 352 lwkt_panic_replyport, lwkt_panic_dropmsg); 353 } 354 355 void 356 lwkt_initport_panic(lwkt_port_t port) 357 { 358 _lwkt_initport(port, 359 lwkt_panic_getport, lwkt_panic_putport, 360 lwkt_panic_waitmsg, lwkt_panic_waitport, 361 lwkt_panic_replyport, lwkt_panic_dropmsg); 362 } 363 364 static __inline 365 void 366 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 367 { 368 lwkt_msg_queue *queue; 369 370 /* 371 * normal case, remove and return the message. 372 */ 373 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 374 queue = &port->mp_msgq_prio; 375 else 376 queue = &port->mp_msgq; 377 TAILQ_REMOVE(queue, msg, ms_node); 378 379 /* 380 * atomic op needed for spin ports 381 */ 382 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED); 383 } 384 385 static __inline 386 void 387 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) 388 { 389 lwkt_msg_queue *queue; 390 391 /* 392 * atomic op needed for spin ports 393 */ 394 atomic_set_int(&msg->ms_flags, MSGF_QUEUED); 395 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 396 queue = &port->mp_msgq_prio; 397 else 398 queue = &port->mp_msgq; 399 TAILQ_INSERT_TAIL(queue, msg, ms_node); 400 } 401 402 static __inline 403 lwkt_msg_t 404 _lwkt_pollmsg(lwkt_port_t port) 405 { 406 lwkt_msg_t msg; 407 408 msg = TAILQ_FIRST(&port->mp_msgq_prio); 409 if (__predict_false(msg != NULL)) 410 return msg; 411 412 /* 413 * Priority queue has no message, fallback to non-priority queue. 414 */ 415 return TAILQ_FIRST(&port->mp_msgq); 416 } 417 418 static __inline 419 void 420 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) 421 { 422 /* 423 * atomic op needed for spin ports 424 */ 425 _lwkt_pushmsg(port, msg); 426 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE); 427 } 428 429 /************************************************************************ 430 * THREAD PORT BACKEND * 431 ************************************************************************ 432 * 433 * This backend is used when the port a message is retrieved from is owned 434 * by a single thread (the calling thread). Messages are IPId to the 435 * correct cpu before being enqueued to a port. Note that this is fairly 436 * optimal since scheduling would have had to do an IPI anyway if the 437 * message were headed to a different cpu. 438 */ 439 440 #ifdef SMP 441 442 /* 443 * This function completes reply processing for the default case in the 444 * context of the originating cpu. 445 */ 446 static 447 void 448 lwkt_thread_replyport_remote(lwkt_msg_t msg) 449 { 450 lwkt_port_t port = msg->ms_reply_port; 451 int flags; 452 453 /* 454 * Chase any thread migration that occurs 455 */ 456 if (port->mpu_td->td_gd != mycpu) { 457 lwkt_send_ipiq(port->mpu_td->td_gd, 458 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 459 return; 460 } 461 462 /* 463 * Cleanup 464 */ 465 #ifdef INVARIANTS 466 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 467 msg->ms_flags &= ~MSGF_INTRANSIT; 468 #endif 469 flags = msg->ms_flags; 470 if (msg->ms_flags & MSGF_SYNC) { 471 cpu_sfence(); 472 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 473 } else { 474 _lwkt_enqueue_reply(port, msg); 475 } 476 if (port->mp_flags & MSGPORTF_WAITING) 477 _lwkt_schedule_msg(port->mpu_td, flags); 478 } 479 480 #endif 481 482 /* 483 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 484 * 485 * Called with the reply port as an argument but in the context of the 486 * original target port. Completion must occur on the target port's 487 * cpu. 488 * 489 * The critical section protects us from IPIs on the this CPU. 490 */ 491 static 492 void 493 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 494 { 495 int flags; 496 497 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 498 499 if (msg->ms_flags & MSGF_SYNC) { 500 /* 501 * If a synchronous completion has been requested, just wakeup 502 * the message without bothering to queue it to the target port. 503 * 504 * Assume the target thread is non-preemptive, so no critical 505 * section is required. 506 */ 507 #ifdef SMP 508 if (port->mpu_td->td_gd == mycpu) { 509 #endif 510 flags = msg->ms_flags; 511 cpu_sfence(); 512 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 513 if (port->mp_flags & MSGPORTF_WAITING) 514 _lwkt_schedule_msg(port->mpu_td, flags); 515 #ifdef SMP 516 } else { 517 #ifdef INVARIANTS 518 msg->ms_flags |= MSGF_INTRANSIT; 519 #endif 520 msg->ms_flags |= MSGF_REPLY; 521 lwkt_send_ipiq(port->mpu_td->td_gd, 522 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 523 } 524 #endif 525 } else { 526 /* 527 * If an asynchronous completion has been requested the message 528 * must be queued to the reply port. 529 * 530 * A critical section is required to interlock the port queue. 531 */ 532 #ifdef SMP 533 if (port->mpu_td->td_gd == mycpu) { 534 #endif 535 crit_enter(); 536 _lwkt_enqueue_reply(port, msg); 537 if (port->mp_flags & MSGPORTF_WAITING) 538 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 539 crit_exit(); 540 #ifdef SMP 541 } else { 542 #ifdef INVARIANTS 543 msg->ms_flags |= MSGF_INTRANSIT; 544 #endif 545 msg->ms_flags |= MSGF_REPLY; 546 lwkt_send_ipiq(port->mpu_td->td_gd, 547 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 548 } 549 #endif 550 } 551 } 552 553 /* 554 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() 555 * 556 * This function could _only_ be used when caller is in the same thread 557 * as the message's target port owner thread. 558 */ 559 static void 560 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 561 { 562 KASSERT(port->mpu_td == curthread, 563 ("message could only be dropped in the same thread " 564 "as the message target port thread")); 565 crit_enter_quick(port->mpu_td); 566 _lwkt_pullmsg(port, msg); 567 msg->ms_flags |= MSGF_DONE; 568 crit_exit_quick(port->mpu_td); 569 } 570 571 /* 572 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 573 * 574 * Called with the target port as an argument but in the context of the 575 * reply port. This function always implements an asynchronous put to 576 * the target message port, and thus returns EASYNC. 577 * 578 * The message must already have cleared MSGF_DONE and MSGF_REPLY 579 */ 580 581 #ifdef SMP 582 583 static 584 void 585 lwkt_thread_putport_remote(lwkt_msg_t msg) 586 { 587 lwkt_port_t port = msg->ms_target_port; 588 589 /* 590 * Chase any thread migration that occurs 591 */ 592 if (port->mpu_td->td_gd != mycpu) { 593 lwkt_send_ipiq(port->mpu_td->td_gd, 594 (ipifunc1_t)lwkt_thread_putport_remote, msg); 595 return; 596 } 597 598 /* 599 * Cleanup 600 */ 601 #ifdef INVARIANTS 602 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 603 msg->ms_flags &= ~MSGF_INTRANSIT; 604 #endif 605 _lwkt_pushmsg(port, msg); 606 if (port->mp_flags & MSGPORTF_WAITING) 607 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 608 } 609 610 #endif 611 612 static 613 int 614 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 615 { 616 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 617 618 msg->ms_target_port = port; 619 #ifdef SMP 620 if (port->mpu_td->td_gd == mycpu) { 621 #endif 622 crit_enter(); 623 _lwkt_pushmsg(port, msg); 624 if (port->mp_flags & MSGPORTF_WAITING) 625 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 626 crit_exit(); 627 #ifdef SMP 628 } else { 629 #ifdef INVARIANTS 630 msg->ms_flags |= MSGF_INTRANSIT; 631 #endif 632 lwkt_send_ipiq(port->mpu_td->td_gd, 633 (ipifunc1_t)lwkt_thread_putport_remote, msg); 634 } 635 #endif 636 return (EASYNC); 637 } 638 639 /* 640 * lwkt_thread_getport() 641 * 642 * Retrieve the next message from the port or NULL if no messages 643 * are ready. 644 */ 645 static 646 void * 647 lwkt_thread_getport(lwkt_port_t port) 648 { 649 lwkt_msg_t msg; 650 651 KKASSERT(port->mpu_td == curthread); 652 653 crit_enter_quick(port->mpu_td); 654 if ((msg = _lwkt_pollmsg(port)) != NULL) 655 _lwkt_pullmsg(port, msg); 656 crit_exit_quick(port->mpu_td); 657 return(msg); 658 } 659 660 /* 661 * lwkt_thread_waitmsg() 662 * 663 * Wait for a particular message to be replied. We must be the only 664 * thread waiting on the message. The port must be owned by the 665 * caller. 666 */ 667 static 668 int 669 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 670 { 671 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 672 ("can't wait dropable message")); 673 674 if ((msg->ms_flags & MSGF_DONE) == 0) { 675 /* 676 * If the done bit was not set we have to block until it is. 677 */ 678 lwkt_port_t port = msg->ms_reply_port; 679 thread_t td = curthread; 680 int sentabort; 681 682 KKASSERT(port->mpu_td == td); 683 crit_enter_quick(td); 684 sentabort = 0; 685 686 while ((msg->ms_flags & MSGF_DONE) == 0) { 687 port->mp_flags |= MSGPORTF_WAITING; 688 if (sentabort == 0) { 689 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 690 lwkt_abortmsg(msg); 691 } 692 } else { 693 lwkt_sleep("waitabt", 0); 694 } 695 port->mp_flags &= ~MSGPORTF_WAITING; 696 } 697 if (msg->ms_flags & MSGF_QUEUED) 698 _lwkt_pullmsg(port, msg); 699 crit_exit_quick(td); 700 } else { 701 /* 702 * If the done bit was set we only have to mess around with the 703 * message if it is queued on the reply port. 704 */ 705 if (msg->ms_flags & MSGF_QUEUED) { 706 lwkt_port_t port = msg->ms_reply_port; 707 thread_t td = curthread; 708 709 KKASSERT(port->mpu_td == td); 710 crit_enter_quick(td); 711 _lwkt_pullmsg(port, msg); 712 crit_exit_quick(td); 713 } 714 } 715 return(msg->ms_error); 716 } 717 718 /* 719 * lwkt_thread_waitport() 720 * 721 * Wait for a new message to be available on the port. We must be the 722 * the only thread waiting on the port. The port must be owned by caller. 723 */ 724 static 725 void * 726 lwkt_thread_waitport(lwkt_port_t port, int flags) 727 { 728 thread_t td = curthread; 729 lwkt_msg_t msg; 730 int error; 731 732 KKASSERT(port->mpu_td == td); 733 crit_enter_quick(td); 734 while ((msg = _lwkt_pollmsg(port)) == NULL) { 735 port->mp_flags |= MSGPORTF_WAITING; 736 error = lwkt_sleep("waitport", flags); 737 port->mp_flags &= ~MSGPORTF_WAITING; 738 if (error) 739 goto done; 740 } 741 _lwkt_pullmsg(port, msg); 742 done: 743 crit_exit_quick(td); 744 return(msg); 745 } 746 747 /************************************************************************ 748 * SPIN PORT BACKEND * 749 ************************************************************************ 750 * 751 * This backend uses spinlocks instead of making assumptions about which 752 * thread is accessing the port. It must be used when a port is not owned 753 * by a particular thread. This is less optimal then thread ports but 754 * you don't have a choice if there are multiple threads accessing the port. 755 * 756 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 757 * on the message port, it is the responsibility of the code doing the 758 * wakeup to clear this flag rather then the blocked threads. Some 759 * superfluous wakeups may occur, which is ok. 760 * 761 * XXX synchronous message wakeups are not current optimized. 762 */ 763 764 static 765 void * 766 lwkt_spin_getport(lwkt_port_t port) 767 { 768 lwkt_msg_t msg; 769 770 spin_lock(&port->mpu_spin); 771 if ((msg = _lwkt_pollmsg(port)) != NULL) 772 _lwkt_pullmsg(port, msg); 773 spin_unlock(&port->mpu_spin); 774 return(msg); 775 } 776 777 static 778 int 779 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 780 { 781 int dowakeup; 782 783 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 784 785 msg->ms_target_port = port; 786 spin_lock(&port->mpu_spin); 787 _lwkt_pushmsg(port, msg); 788 dowakeup = 0; 789 if (port->mp_flags & MSGPORTF_WAITING) { 790 port->mp_flags &= ~MSGPORTF_WAITING; 791 dowakeup = 1; 792 } 793 spin_unlock(&port->mpu_spin); 794 if (dowakeup) 795 wakeup(port); 796 return (EASYNC); 797 } 798 799 static 800 int 801 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 802 { 803 lwkt_port_t port; 804 int sentabort; 805 int error; 806 807 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 808 ("can't wait dropable message")); 809 810 if ((msg->ms_flags & MSGF_DONE) == 0) { 811 port = msg->ms_reply_port; 812 sentabort = 0; 813 spin_lock(&port->mpu_spin); 814 while ((msg->ms_flags & MSGF_DONE) == 0) { 815 void *won; 816 817 /* 818 * If message was sent synchronously from the beginning 819 * the wakeup will be on the message structure, else it 820 * will be on the port structure. 821 */ 822 if (msg->ms_flags & MSGF_SYNC) { 823 won = msg; 824 atomic_set_int(&msg->ms_flags, MSGF_WAITING); 825 } else { 826 won = port; 827 port->mp_flags |= MSGPORTF_WAITING; 828 } 829 830 /* 831 * Only messages which support abort can be interrupted. 832 * We must still wait for message completion regardless. 833 */ 834 if ((flags & PCATCH) && sentabort == 0) { 835 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 836 if (error) { 837 sentabort = error; 838 spin_unlock(&port->mpu_spin); 839 lwkt_abortmsg(msg); 840 spin_lock(&port->mpu_spin); 841 } 842 } else { 843 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0); 844 } 845 /* see note at the top on the MSGPORTF_WAITING flag */ 846 } 847 /* 848 * Turn EINTR into ERESTART if the signal indicates. 849 */ 850 if (sentabort && msg->ms_error == EINTR) 851 msg->ms_error = sentabort; 852 if (msg->ms_flags & MSGF_QUEUED) 853 _lwkt_pullmsg(port, msg); 854 spin_unlock(&port->mpu_spin); 855 } else { 856 if (msg->ms_flags & MSGF_QUEUED) { 857 port = msg->ms_reply_port; 858 spin_lock(&port->mpu_spin); 859 _lwkt_pullmsg(port, msg); 860 spin_unlock(&port->mpu_spin); 861 } 862 } 863 return(msg->ms_error); 864 } 865 866 static 867 void * 868 lwkt_spin_waitport(lwkt_port_t port, int flags) 869 { 870 lwkt_msg_t msg; 871 int error; 872 873 spin_lock(&port->mpu_spin); 874 while ((msg = _lwkt_pollmsg(port)) == NULL) { 875 port->mp_flags |= MSGPORTF_WAITING; 876 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0); 877 /* see note at the top on the MSGPORTF_WAITING flag */ 878 if (error) { 879 spin_unlock(&port->mpu_spin); 880 return(NULL); 881 } 882 } 883 _lwkt_pullmsg(port, msg); 884 spin_unlock(&port->mpu_spin); 885 return(msg); 886 } 887 888 static 889 void 890 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 891 { 892 int dowakeup; 893 894 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 895 896 if (msg->ms_flags & MSGF_SYNC) { 897 /* 898 * If a synchronous completion has been requested, just wakeup 899 * the message without bothering to queue it to the target port. 900 */ 901 spin_lock(&port->mpu_spin); 902 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 903 dowakeup = 0; 904 if (msg->ms_flags & MSGF_WAITING) { 905 atomic_clear_int(&msg->ms_flags, MSGF_WAITING); 906 dowakeup = 1; 907 } 908 spin_unlock(&port->mpu_spin); 909 if (dowakeup) 910 wakeup(msg); 911 } else { 912 /* 913 * If an asynchronous completion has been requested the message 914 * must be queued to the reply port. 915 */ 916 spin_lock(&port->mpu_spin); 917 _lwkt_enqueue_reply(port, msg); 918 dowakeup = 0; 919 if (port->mp_flags & MSGPORTF_WAITING) { 920 port->mp_flags &= ~MSGPORTF_WAITING; 921 dowakeup = 1; 922 } 923 spin_unlock(&port->mpu_spin); 924 if (dowakeup) 925 wakeup(port); 926 } 927 } 928 929 /* 930 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg() 931 * 932 * This function could _only_ be used when caller is in the same thread 933 * as the message's target port owner thread. 934 */ 935 static void 936 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 937 { 938 KASSERT(port->mpu_td == curthread, 939 ("message could only be dropped in the same thread " 940 "as the message target port thread\n")); 941 spin_lock(&port->mpu_spin); 942 _lwkt_pullmsg(port, msg); 943 msg->ms_flags |= MSGF_DONE; 944 spin_unlock(&port->mpu_spin); 945 } 946 947 /************************************************************************ 948 * SERIALIZER PORT BACKEND * 949 ************************************************************************ 950 * 951 * This backend uses serializer to protect port accessing. Callers are 952 * assumed to have serializer held. This kind of port is usually created 953 * by network device driver along with _one_ lwkt thread to pipeline 954 * operations which may temporarily release serializer. 955 * 956 * Implementation is based on SPIN PORT BACKEND. 957 */ 958 959 static 960 void * 961 lwkt_serialize_getport(lwkt_port_t port) 962 { 963 lwkt_msg_t msg; 964 965 ASSERT_SERIALIZED(port->mpu_serialize); 966 967 if ((msg = _lwkt_pollmsg(port)) != NULL) 968 _lwkt_pullmsg(port, msg); 969 return(msg); 970 } 971 972 static 973 int 974 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 975 { 976 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 977 ASSERT_SERIALIZED(port->mpu_serialize); 978 979 msg->ms_target_port = port; 980 _lwkt_pushmsg(port, msg); 981 if (port->mp_flags & MSGPORTF_WAITING) { 982 port->mp_flags &= ~MSGPORTF_WAITING; 983 wakeup(port); 984 } 985 return (EASYNC); 986 } 987 988 static 989 int 990 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 991 { 992 lwkt_port_t port; 993 int sentabort; 994 int error; 995 996 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 997 ("can't wait dropable message")); 998 999 if ((msg->ms_flags & MSGF_DONE) == 0) { 1000 port = msg->ms_reply_port; 1001 1002 ASSERT_SERIALIZED(port->mpu_serialize); 1003 1004 sentabort = 0; 1005 while ((msg->ms_flags & MSGF_DONE) == 0) { 1006 void *won; 1007 1008 /* 1009 * If message was sent synchronously from the beginning 1010 * the wakeup will be on the message structure, else it 1011 * will be on the port structure. 1012 */ 1013 if (msg->ms_flags & MSGF_SYNC) { 1014 won = msg; 1015 } else { 1016 won = port; 1017 port->mp_flags |= MSGPORTF_WAITING; 1018 } 1019 1020 /* 1021 * Only messages which support abort can be interrupted. 1022 * We must still wait for message completion regardless. 1023 */ 1024 if ((flags & PCATCH) && sentabort == 0) { 1025 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0); 1026 if (error) { 1027 sentabort = error; 1028 lwkt_serialize_exit(port->mpu_serialize); 1029 lwkt_abortmsg(msg); 1030 lwkt_serialize_enter(port->mpu_serialize); 1031 } 1032 } else { 1033 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0); 1034 } 1035 /* see note at the top on the MSGPORTF_WAITING flag */ 1036 } 1037 /* 1038 * Turn EINTR into ERESTART if the signal indicates. 1039 */ 1040 if (sentabort && msg->ms_error == EINTR) 1041 msg->ms_error = sentabort; 1042 if (msg->ms_flags & MSGF_QUEUED) 1043 _lwkt_pullmsg(port, msg); 1044 } else { 1045 if (msg->ms_flags & MSGF_QUEUED) { 1046 port = msg->ms_reply_port; 1047 1048 ASSERT_SERIALIZED(port->mpu_serialize); 1049 _lwkt_pullmsg(port, msg); 1050 } 1051 } 1052 return(msg->ms_error); 1053 } 1054 1055 static 1056 void * 1057 lwkt_serialize_waitport(lwkt_port_t port, int flags) 1058 { 1059 lwkt_msg_t msg; 1060 int error; 1061 1062 ASSERT_SERIALIZED(port->mpu_serialize); 1063 1064 while ((msg = _lwkt_pollmsg(port)) == NULL) { 1065 port->mp_flags |= MSGPORTF_WAITING; 1066 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0); 1067 /* see note at the top on the MSGPORTF_WAITING flag */ 1068 if (error) 1069 return(NULL); 1070 } 1071 _lwkt_pullmsg(port, msg); 1072 return(msg); 1073 } 1074 1075 static 1076 void 1077 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 1078 { 1079 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1080 ASSERT_SERIALIZED(port->mpu_serialize); 1081 1082 if (msg->ms_flags & MSGF_SYNC) { 1083 /* 1084 * If a synchronous completion has been requested, just wakeup 1085 * the message without bothering to queue it to the target port. 1086 */ 1087 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1088 wakeup(msg); 1089 } else { 1090 /* 1091 * If an asynchronous completion has been requested the message 1092 * must be queued to the reply port. 1093 */ 1094 _lwkt_enqueue_reply(port, msg); 1095 if (port->mp_flags & MSGPORTF_WAITING) { 1096 port->mp_flags &= ~MSGPORTF_WAITING; 1097 wakeup(port); 1098 } 1099 } 1100 } 1101 1102 /************************************************************************ 1103 * PANIC AND SPECIAL PORT FUNCTIONS * 1104 ************************************************************************/ 1105 1106 /* 1107 * You can point a port's reply vector at this function if you just want 1108 * the message marked done, without any queueing or signaling. This is 1109 * often used for structure-embedded messages. 1110 */ 1111 static 1112 void 1113 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 1114 { 1115 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1116 } 1117 1118 static 1119 void * 1120 lwkt_panic_getport(lwkt_port_t port) 1121 { 1122 panic("lwkt_getport() illegal on port %p", port); 1123 } 1124 1125 static 1126 int 1127 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1128 { 1129 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1130 } 1131 1132 static 1133 int 1134 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1135 { 1136 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1137 } 1138 1139 static 1140 void * 1141 lwkt_panic_waitport(lwkt_port_t port, int flags) 1142 { 1143 panic("port %p cannot be waited on", port); 1144 } 1145 1146 static 1147 void 1148 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1149 { 1150 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1151 } 1152 1153 static 1154 void 1155 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1156 { 1157 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); 1158 } 1159