1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 */ 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/proc.h> 42 #include <sys/rtprio.h> 43 #include <sys/queue.h> 44 #include <sys/sysctl.h> 45 #include <sys/kthread.h> 46 #include <sys/signalvar.h> 47 #include <sys/signal2.h> 48 #include <machine/cpu.h> 49 #include <sys/lock.h> 50 51 #include <vm/vm.h> 52 #include <vm/vm_param.h> 53 #include <vm/vm_kern.h> 54 #include <vm/vm_object.h> 55 #include <vm/vm_page.h> 56 #include <vm/vm_map.h> 57 #include <vm/vm_pager.h> 58 #include <vm/vm_extern.h> 59 #include <vm/vm_zone.h> 60 61 #include <sys/thread2.h> 62 #include <sys/msgport2.h> 63 #include <sys/spinlock2.h> 64 #include <sys/serialize.h> 65 66 #include <machine/stdarg.h> 67 #include <machine/cpufunc.h> 68 #include <machine/smp.h> 69 70 #include <sys/malloc.h> 71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 72 73 /************************************************************************ 74 * MESSAGE FUNCTIONS * 75 ************************************************************************/ 76 77 /* 78 * lwkt_sendmsg() 79 * 80 * Request asynchronous completion and call lwkt_beginmsg(). The 81 * target port can opt to execute the message synchronously or 82 * asynchronously and this function will automatically queue the 83 * response if the target executes the message synchronously. 84 * 85 * NOTE: The message is in an indeterminant state until this call 86 * returns. The caller should not mess with it (e.g. try to abort it) 87 * until then. 88 */ 89 void 90 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 91 { 92 int error; 93 94 KKASSERT(msg->ms_reply_port != NULL && 95 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 96 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 97 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 98 /* 99 * Target port opted to execute the message synchronously so 100 * queue the response. 101 */ 102 lwkt_replymsg(msg, error); 103 } 104 } 105 106 /* 107 * lwkt_domsg() 108 * 109 * Request synchronous completion and call lwkt_beginmsg(). The 110 * target port can opt to execute the message synchronously or 111 * asynchronously and this function will automatically block and 112 * wait for a response if the target executes the message 113 * asynchronously. 114 */ 115 int 116 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 117 { 118 int error; 119 120 KKASSERT(msg->ms_reply_port != NULL && 121 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 122 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 123 msg->ms_flags |= MSGF_SYNC; 124 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 125 /* 126 * Target port opted to execute the message asynchronously so 127 * block and wait for a reply. 128 */ 129 error = lwkt_waitmsg(msg, flags); 130 } else { 131 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 132 } 133 return(error); 134 } 135 136 /* 137 * lwkt_forwardmsg() 138 * 139 * Forward a message received on one port to another port. 140 */ 141 int 142 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 143 { 144 int error; 145 146 crit_enter(); 147 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 148 if ((error = port->mp_putport(port, msg)) != EASYNC) 149 lwkt_replymsg(msg, error); 150 crit_exit(); 151 return(error); 152 } 153 154 /* 155 * lwkt_abortmsg() 156 * 157 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 158 * The caller must ensure that the message will not be both replied AND 159 * destroyed while the abort is in progress. 160 * 161 * This function issues a callback which might block! 162 */ 163 void 164 lwkt_abortmsg(lwkt_msg_t msg) 165 { 166 /* 167 * A critical section protects us from reply IPIs on this cpu. 168 */ 169 crit_enter(); 170 171 /* 172 * Shortcut the operation if the message has already been returned. 173 * The callback typically constructs a lwkt_msg with the abort request, 174 * issues it synchronously, and waits for completion. The callback 175 * is not required to actually abort the message and the target port, 176 * upon receiving an abort request message generated by the callback 177 * should check whether the original message has already completed or 178 * not. 179 */ 180 if (msg->ms_flags & MSGF_ABORTABLE) { 181 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 182 msg->ms_abortfn(msg); 183 } 184 crit_exit(); 185 } 186 187 /************************************************************************ 188 * PORT INITIALIZATION API * 189 ************************************************************************/ 190 191 static void *lwkt_thread_getport(lwkt_port_t port); 192 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 193 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 194 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 195 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 196 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 197 198 static void *lwkt_spin_getport(lwkt_port_t port); 199 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 200 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 201 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 202 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 203 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 204 205 static void *lwkt_serialize_getport(lwkt_port_t port); 206 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 207 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 208 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 209 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 210 211 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 212 static void *lwkt_panic_getport(lwkt_port_t port); 213 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 214 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 215 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 216 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 217 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 218 219 /* 220 * Core port initialization (internal) 221 */ 222 static __inline 223 void 224 _lwkt_initport(lwkt_port_t port, 225 void *(*gportfn)(lwkt_port_t), 226 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 227 int (*wmsgfn)(lwkt_msg_t, int), 228 void *(*wportfn)(lwkt_port_t, int), 229 void (*rportfn)(lwkt_port_t, lwkt_msg_t), 230 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t)) 231 { 232 bzero(port, sizeof(*port)); 233 TAILQ_INIT(&port->mp_msgq); 234 TAILQ_INIT(&port->mp_msgq_prio); 235 port->mp_getport = gportfn; 236 port->mp_putport = pportfn; 237 port->mp_waitmsg = wmsgfn; 238 port->mp_waitport = wportfn; 239 port->mp_replyport = rportfn; 240 port->mp_dropmsg = dmsgfn; 241 } 242 243 /* 244 * Schedule the target thread. If the message flags contains MSGF_NORESCHED 245 * we tell the scheduler not to reschedule if td is at a higher priority. 246 * 247 * This routine is called even if the thread is already scheduled. 248 */ 249 static __inline 250 void 251 _lwkt_schedule_msg(thread_t td, int flags) 252 { 253 lwkt_schedule(td); 254 } 255 256 /* 257 * lwkt_initport_thread() 258 * 259 * Initialize a port for use by a particular thread. The port may 260 * only be used by <td>. 261 */ 262 void 263 lwkt_initport_thread(lwkt_port_t port, thread_t td) 264 { 265 _lwkt_initport(port, 266 lwkt_thread_getport, 267 lwkt_thread_putport, 268 lwkt_thread_waitmsg, 269 lwkt_thread_waitport, 270 lwkt_thread_replyport, 271 lwkt_thread_dropmsg); 272 port->mpu_td = td; 273 } 274 275 /* 276 * lwkt_initport_spin() 277 * 278 * Initialize a port for use with descriptors that might be accessed 279 * via multiple LWPs, processes, or threads. Has somewhat more 280 * overhead then thread ports. 281 */ 282 void 283 lwkt_initport_spin(lwkt_port_t port, thread_t td) 284 { 285 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t); 286 287 if (td == NULL) 288 dmsgfn = lwkt_panic_dropmsg; 289 else 290 dmsgfn = lwkt_spin_dropmsg; 291 292 _lwkt_initport(port, 293 lwkt_spin_getport, 294 lwkt_spin_putport, 295 lwkt_spin_waitmsg, 296 lwkt_spin_waitport, 297 lwkt_spin_replyport, 298 dmsgfn); 299 spin_init(&port->mpu_spin); 300 port->mpu_td = td; 301 } 302 303 /* 304 * lwkt_initport_serialize() 305 * 306 * Initialize a port for use with descriptors that might be accessed 307 * via multiple LWPs, processes, or threads. Callers are assumed to 308 * have held the serializer (slz). 309 */ 310 void 311 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 312 { 313 _lwkt_initport(port, 314 lwkt_serialize_getport, 315 lwkt_serialize_putport, 316 lwkt_serialize_waitmsg, 317 lwkt_serialize_waitport, 318 lwkt_serialize_replyport, 319 lwkt_panic_dropmsg); 320 port->mpu_serialize = slz; 321 } 322 323 /* 324 * Similar to the standard initport, this function simply marks the message 325 * as being done and does not attempt to return it to an originating port. 326 */ 327 void 328 lwkt_initport_replyonly_null(lwkt_port_t port) 329 { 330 _lwkt_initport(port, 331 lwkt_panic_getport, 332 lwkt_panic_putport, 333 lwkt_panic_waitmsg, 334 lwkt_panic_waitport, 335 lwkt_null_replyport, 336 lwkt_panic_dropmsg); 337 } 338 339 /* 340 * Initialize a reply-only port, typically used as a message sink. Such 341 * ports can only be used as a reply port. 342 */ 343 void 344 lwkt_initport_replyonly(lwkt_port_t port, 345 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 346 { 347 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 348 lwkt_panic_waitmsg, lwkt_panic_waitport, 349 rportfn, lwkt_panic_dropmsg); 350 } 351 352 void 353 lwkt_initport_putonly(lwkt_port_t port, 354 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 355 { 356 _lwkt_initport(port, lwkt_panic_getport, pportfn, 357 lwkt_panic_waitmsg, lwkt_panic_waitport, 358 lwkt_panic_replyport, lwkt_panic_dropmsg); 359 } 360 361 void 362 lwkt_initport_panic(lwkt_port_t port) 363 { 364 _lwkt_initport(port, 365 lwkt_panic_getport, lwkt_panic_putport, 366 lwkt_panic_waitmsg, lwkt_panic_waitport, 367 lwkt_panic_replyport, lwkt_panic_dropmsg); 368 } 369 370 static __inline 371 void 372 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 373 { 374 lwkt_msg_queue *queue; 375 376 /* 377 * normal case, remove and return the message. 378 */ 379 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 380 queue = &port->mp_msgq_prio; 381 else 382 queue = &port->mp_msgq; 383 TAILQ_REMOVE(queue, msg, ms_node); 384 385 /* 386 * atomic op needed for spin ports 387 */ 388 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED); 389 } 390 391 static __inline 392 void 393 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) 394 { 395 lwkt_msg_queue *queue; 396 397 /* 398 * atomic op needed for spin ports 399 */ 400 atomic_set_int(&msg->ms_flags, MSGF_QUEUED); 401 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 402 queue = &port->mp_msgq_prio; 403 else 404 queue = &port->mp_msgq; 405 TAILQ_INSERT_TAIL(queue, msg, ms_node); 406 } 407 408 static __inline 409 lwkt_msg_t 410 _lwkt_pollmsg(lwkt_port_t port) 411 { 412 lwkt_msg_t msg; 413 414 msg = TAILQ_FIRST(&port->mp_msgq_prio); 415 if (__predict_false(msg != NULL)) 416 return msg; 417 418 /* 419 * Priority queue has no message, fallback to non-priority queue. 420 */ 421 return TAILQ_FIRST(&port->mp_msgq); 422 } 423 424 static __inline 425 void 426 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) 427 { 428 /* 429 * atomic op needed for spin ports 430 */ 431 _lwkt_pushmsg(port, msg); 432 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE); 433 } 434 435 /************************************************************************ 436 * THREAD PORT BACKEND * 437 ************************************************************************ 438 * 439 * This backend is used when the port a message is retrieved from is owned 440 * by a single thread (the calling thread). Messages are IPId to the 441 * correct cpu before being enqueued to a port. Note that this is fairly 442 * optimal since scheduling would have had to do an IPI anyway if the 443 * message were headed to a different cpu. 444 */ 445 446 /* 447 * This function completes reply processing for the default case in the 448 * context of the originating cpu. 449 */ 450 static 451 void 452 lwkt_thread_replyport_remote(lwkt_msg_t msg) 453 { 454 lwkt_port_t port = msg->ms_reply_port; 455 int flags; 456 457 /* 458 * Chase any thread migration that occurs 459 */ 460 if (port->mpu_td->td_gd != mycpu) { 461 lwkt_send_ipiq(port->mpu_td->td_gd, 462 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 463 return; 464 } 465 466 /* 467 * Cleanup 468 */ 469 #ifdef INVARIANTS 470 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 471 msg->ms_flags &= ~MSGF_INTRANSIT; 472 #endif 473 flags = msg->ms_flags; 474 if (msg->ms_flags & MSGF_SYNC) { 475 cpu_sfence(); 476 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 477 } else { 478 _lwkt_enqueue_reply(port, msg); 479 } 480 if (port->mp_flags & MSGPORTF_WAITING) 481 _lwkt_schedule_msg(port->mpu_td, flags); 482 } 483 484 /* 485 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 486 * 487 * Called with the reply port as an argument but in the context of the 488 * original target port. Completion must occur on the target port's 489 * cpu. 490 * 491 * The critical section protects us from IPIs on the this CPU. 492 */ 493 static 494 void 495 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 496 { 497 int flags; 498 499 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 500 501 if (msg->ms_flags & MSGF_SYNC) { 502 /* 503 * If a synchronous completion has been requested, just wakeup 504 * the message without bothering to queue it to the target port. 505 * 506 * Assume the target thread is non-preemptive, so no critical 507 * section is required. 508 */ 509 if (port->mpu_td->td_gd == mycpu) { 510 flags = msg->ms_flags; 511 cpu_sfence(); 512 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 513 if (port->mp_flags & MSGPORTF_WAITING) 514 _lwkt_schedule_msg(port->mpu_td, flags); 515 } else { 516 #ifdef INVARIANTS 517 msg->ms_flags |= MSGF_INTRANSIT; 518 #endif 519 msg->ms_flags |= MSGF_REPLY; 520 lwkt_send_ipiq(port->mpu_td->td_gd, 521 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 522 } 523 } else { 524 /* 525 * If an asynchronous completion has been requested the message 526 * must be queued to the reply port. 527 * 528 * A critical section is required to interlock the port queue. 529 */ 530 if (port->mpu_td->td_gd == mycpu) { 531 crit_enter(); 532 _lwkt_enqueue_reply(port, msg); 533 if (port->mp_flags & MSGPORTF_WAITING) 534 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 535 crit_exit(); 536 } else { 537 #ifdef INVARIANTS 538 msg->ms_flags |= MSGF_INTRANSIT; 539 #endif 540 msg->ms_flags |= MSGF_REPLY; 541 lwkt_send_ipiq(port->mpu_td->td_gd, 542 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 543 } 544 } 545 } 546 547 /* 548 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() 549 * 550 * This function could _only_ be used when caller is in the same thread 551 * as the message's target port owner thread. 552 */ 553 static void 554 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 555 { 556 KASSERT(port->mpu_td == curthread, 557 ("message could only be dropped in the same thread " 558 "as the message target port thread")); 559 crit_enter_quick(port->mpu_td); 560 _lwkt_pullmsg(port, msg); 561 msg->ms_flags |= MSGF_DONE; 562 crit_exit_quick(port->mpu_td); 563 } 564 565 /* 566 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 567 * 568 * Called with the target port as an argument but in the context of the 569 * reply port. This function always implements an asynchronous put to 570 * the target message port, and thus returns EASYNC. 571 * 572 * The message must already have cleared MSGF_DONE and MSGF_REPLY 573 */ 574 static 575 void 576 lwkt_thread_putport_remote(lwkt_msg_t msg) 577 { 578 lwkt_port_t port = msg->ms_target_port; 579 580 /* 581 * Chase any thread migration that occurs 582 */ 583 if (port->mpu_td->td_gd != mycpu) { 584 lwkt_send_ipiq(port->mpu_td->td_gd, 585 (ipifunc1_t)lwkt_thread_putport_remote, msg); 586 return; 587 } 588 589 /* 590 * Cleanup 591 */ 592 #ifdef INVARIANTS 593 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 594 msg->ms_flags &= ~MSGF_INTRANSIT; 595 #endif 596 _lwkt_pushmsg(port, msg); 597 if (port->mp_flags & MSGPORTF_WAITING) 598 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 599 } 600 601 static 602 int 603 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 604 { 605 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 606 607 msg->ms_target_port = port; 608 if (port->mpu_td->td_gd == mycpu) { 609 crit_enter(); 610 _lwkt_pushmsg(port, msg); 611 if (port->mp_flags & MSGPORTF_WAITING) 612 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 613 crit_exit(); 614 } else { 615 #ifdef INVARIANTS 616 msg->ms_flags |= MSGF_INTRANSIT; 617 #endif 618 lwkt_send_ipiq(port->mpu_td->td_gd, 619 (ipifunc1_t)lwkt_thread_putport_remote, msg); 620 } 621 return (EASYNC); 622 } 623 624 /* 625 * lwkt_thread_getport() 626 * 627 * Retrieve the next message from the port or NULL if no messages 628 * are ready. 629 */ 630 static 631 void * 632 lwkt_thread_getport(lwkt_port_t port) 633 { 634 lwkt_msg_t msg; 635 636 KKASSERT(port->mpu_td == curthread); 637 638 crit_enter_quick(port->mpu_td); 639 if ((msg = _lwkt_pollmsg(port)) != NULL) 640 _lwkt_pullmsg(port, msg); 641 crit_exit_quick(port->mpu_td); 642 return(msg); 643 } 644 645 /* 646 * lwkt_thread_waitmsg() 647 * 648 * Wait for a particular message to be replied. We must be the only 649 * thread waiting on the message. The port must be owned by the 650 * caller. 651 */ 652 static 653 int 654 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 655 { 656 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 657 ("can't wait dropable message")); 658 659 if ((msg->ms_flags & MSGF_DONE) == 0) { 660 /* 661 * If the done bit was not set we have to block until it is. 662 */ 663 lwkt_port_t port = msg->ms_reply_port; 664 thread_t td = curthread; 665 int sentabort; 666 667 KKASSERT(port->mpu_td == td); 668 crit_enter_quick(td); 669 sentabort = 0; 670 671 while ((msg->ms_flags & MSGF_DONE) == 0) { 672 port->mp_flags |= MSGPORTF_WAITING; 673 if (sentabort == 0) { 674 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 675 lwkt_abortmsg(msg); 676 } 677 } else { 678 lwkt_sleep("waitabt", 0); 679 } 680 port->mp_flags &= ~MSGPORTF_WAITING; 681 } 682 if (msg->ms_flags & MSGF_QUEUED) 683 _lwkt_pullmsg(port, msg); 684 crit_exit_quick(td); 685 } else { 686 /* 687 * If the done bit was set we only have to mess around with the 688 * message if it is queued on the reply port. 689 */ 690 if (msg->ms_flags & MSGF_QUEUED) { 691 lwkt_port_t port = msg->ms_reply_port; 692 thread_t td = curthread; 693 694 KKASSERT(port->mpu_td == td); 695 crit_enter_quick(td); 696 _lwkt_pullmsg(port, msg); 697 crit_exit_quick(td); 698 } 699 } 700 return(msg->ms_error); 701 } 702 703 /* 704 * lwkt_thread_waitport() 705 * 706 * Wait for a new message to be available on the port. We must be the 707 * the only thread waiting on the port. The port must be owned by caller. 708 */ 709 static 710 void * 711 lwkt_thread_waitport(lwkt_port_t port, int flags) 712 { 713 thread_t td = curthread; 714 lwkt_msg_t msg; 715 int error; 716 717 KKASSERT(port->mpu_td == td); 718 crit_enter_quick(td); 719 while ((msg = _lwkt_pollmsg(port)) == NULL) { 720 port->mp_flags |= MSGPORTF_WAITING; 721 error = lwkt_sleep("waitport", flags); 722 port->mp_flags &= ~MSGPORTF_WAITING; 723 if (error) 724 goto done; 725 } 726 _lwkt_pullmsg(port, msg); 727 done: 728 crit_exit_quick(td); 729 return(msg); 730 } 731 732 /************************************************************************ 733 * SPIN PORT BACKEND * 734 ************************************************************************ 735 * 736 * This backend uses spinlocks instead of making assumptions about which 737 * thread is accessing the port. It must be used when a port is not owned 738 * by a particular thread. This is less optimal then thread ports but 739 * you don't have a choice if there are multiple threads accessing the port. 740 * 741 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 742 * on the message port, it is the responsibility of the code doing the 743 * wakeup to clear this flag rather then the blocked threads. Some 744 * superfluous wakeups may occur, which is ok. 745 * 746 * XXX synchronous message wakeups are not current optimized. 747 */ 748 749 static 750 void * 751 lwkt_spin_getport(lwkt_port_t port) 752 { 753 lwkt_msg_t msg; 754 755 spin_lock(&port->mpu_spin); 756 if ((msg = _lwkt_pollmsg(port)) != NULL) 757 _lwkt_pullmsg(port, msg); 758 spin_unlock(&port->mpu_spin); 759 return(msg); 760 } 761 762 static 763 int 764 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 765 { 766 int dowakeup; 767 768 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 769 770 msg->ms_target_port = port; 771 spin_lock(&port->mpu_spin); 772 _lwkt_pushmsg(port, msg); 773 dowakeup = 0; 774 if (port->mp_flags & MSGPORTF_WAITING) { 775 port->mp_flags &= ~MSGPORTF_WAITING; 776 dowakeup = 1; 777 } 778 spin_unlock(&port->mpu_spin); 779 if (dowakeup) 780 wakeup(port); 781 return (EASYNC); 782 } 783 784 static 785 int 786 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 787 { 788 lwkt_port_t port; 789 int sentabort; 790 int error; 791 792 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 793 ("can't wait dropable message")); 794 795 if ((msg->ms_flags & MSGF_DONE) == 0) { 796 port = msg->ms_reply_port; 797 sentabort = 0; 798 spin_lock(&port->mpu_spin); 799 while ((msg->ms_flags & MSGF_DONE) == 0) { 800 void *won; 801 802 /* 803 * If message was sent synchronously from the beginning 804 * the wakeup will be on the message structure, else it 805 * will be on the port structure. 806 */ 807 if (msg->ms_flags & MSGF_SYNC) { 808 won = msg; 809 atomic_set_int(&msg->ms_flags, MSGF_WAITING); 810 } else { 811 won = port; 812 port->mp_flags |= MSGPORTF_WAITING; 813 } 814 815 /* 816 * Only messages which support abort can be interrupted. 817 * We must still wait for message completion regardless. 818 */ 819 if ((flags & PCATCH) && sentabort == 0) { 820 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 821 if (error) { 822 sentabort = error; 823 spin_unlock(&port->mpu_spin); 824 lwkt_abortmsg(msg); 825 spin_lock(&port->mpu_spin); 826 } 827 } else { 828 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0); 829 } 830 /* see note at the top on the MSGPORTF_WAITING flag */ 831 } 832 /* 833 * Turn EINTR into ERESTART if the signal indicates. 834 */ 835 if (sentabort && msg->ms_error == EINTR) 836 msg->ms_error = sentabort; 837 if (msg->ms_flags & MSGF_QUEUED) 838 _lwkt_pullmsg(port, msg); 839 spin_unlock(&port->mpu_spin); 840 } else { 841 if (msg->ms_flags & MSGF_QUEUED) { 842 port = msg->ms_reply_port; 843 spin_lock(&port->mpu_spin); 844 _lwkt_pullmsg(port, msg); 845 spin_unlock(&port->mpu_spin); 846 } 847 } 848 return(msg->ms_error); 849 } 850 851 static 852 void * 853 lwkt_spin_waitport(lwkt_port_t port, int flags) 854 { 855 lwkt_msg_t msg; 856 int error; 857 858 spin_lock(&port->mpu_spin); 859 while ((msg = _lwkt_pollmsg(port)) == NULL) { 860 port->mp_flags |= MSGPORTF_WAITING; 861 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0); 862 /* see note at the top on the MSGPORTF_WAITING flag */ 863 if (error) { 864 spin_unlock(&port->mpu_spin); 865 return(NULL); 866 } 867 } 868 _lwkt_pullmsg(port, msg); 869 spin_unlock(&port->mpu_spin); 870 return(msg); 871 } 872 873 static 874 void 875 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 876 { 877 int dowakeup; 878 879 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 880 881 if (msg->ms_flags & MSGF_SYNC) { 882 /* 883 * If a synchronous completion has been requested, just wakeup 884 * the message without bothering to queue it to the target port. 885 */ 886 spin_lock(&port->mpu_spin); 887 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 888 dowakeup = 0; 889 if (msg->ms_flags & MSGF_WAITING) { 890 atomic_clear_int(&msg->ms_flags, MSGF_WAITING); 891 dowakeup = 1; 892 } 893 spin_unlock(&port->mpu_spin); 894 if (dowakeup) 895 wakeup(msg); 896 } else { 897 /* 898 * If an asynchronous completion has been requested the message 899 * must be queued to the reply port. 900 */ 901 spin_lock(&port->mpu_spin); 902 _lwkt_enqueue_reply(port, msg); 903 dowakeup = 0; 904 if (port->mp_flags & MSGPORTF_WAITING) { 905 port->mp_flags &= ~MSGPORTF_WAITING; 906 dowakeup = 1; 907 } 908 spin_unlock(&port->mpu_spin); 909 if (dowakeup) 910 wakeup(port); 911 } 912 } 913 914 /* 915 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg() 916 * 917 * This function could _only_ be used when caller is in the same thread 918 * as the message's target port owner thread. 919 */ 920 static void 921 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 922 { 923 KASSERT(port->mpu_td == curthread, 924 ("message could only be dropped in the same thread " 925 "as the message target port thread\n")); 926 spin_lock(&port->mpu_spin); 927 _lwkt_pullmsg(port, msg); 928 msg->ms_flags |= MSGF_DONE; 929 spin_unlock(&port->mpu_spin); 930 } 931 932 /************************************************************************ 933 * SERIALIZER PORT BACKEND * 934 ************************************************************************ 935 * 936 * This backend uses serializer to protect port accessing. Callers are 937 * assumed to have serializer held. This kind of port is usually created 938 * by network device driver along with _one_ lwkt thread to pipeline 939 * operations which may temporarily release serializer. 940 * 941 * Implementation is based on SPIN PORT BACKEND. 942 */ 943 944 static 945 void * 946 lwkt_serialize_getport(lwkt_port_t port) 947 { 948 lwkt_msg_t msg; 949 950 ASSERT_SERIALIZED(port->mpu_serialize); 951 952 if ((msg = _lwkt_pollmsg(port)) != NULL) 953 _lwkt_pullmsg(port, msg); 954 return(msg); 955 } 956 957 static 958 int 959 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 960 { 961 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 962 ASSERT_SERIALIZED(port->mpu_serialize); 963 964 msg->ms_target_port = port; 965 _lwkt_pushmsg(port, msg); 966 if (port->mp_flags & MSGPORTF_WAITING) { 967 port->mp_flags &= ~MSGPORTF_WAITING; 968 wakeup(port); 969 } 970 return (EASYNC); 971 } 972 973 static 974 int 975 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 976 { 977 lwkt_port_t port; 978 int sentabort; 979 int error; 980 981 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 982 ("can't wait dropable message")); 983 984 if ((msg->ms_flags & MSGF_DONE) == 0) { 985 port = msg->ms_reply_port; 986 987 ASSERT_SERIALIZED(port->mpu_serialize); 988 989 sentabort = 0; 990 while ((msg->ms_flags & MSGF_DONE) == 0) { 991 void *won; 992 993 /* 994 * If message was sent synchronously from the beginning 995 * the wakeup will be on the message structure, else it 996 * will be on the port structure. 997 */ 998 if (msg->ms_flags & MSGF_SYNC) { 999 won = msg; 1000 } else { 1001 won = port; 1002 port->mp_flags |= MSGPORTF_WAITING; 1003 } 1004 1005 /* 1006 * Only messages which support abort can be interrupted. 1007 * We must still wait for message completion regardless. 1008 */ 1009 if ((flags & PCATCH) && sentabort == 0) { 1010 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0); 1011 if (error) { 1012 sentabort = error; 1013 lwkt_serialize_exit(port->mpu_serialize); 1014 lwkt_abortmsg(msg); 1015 lwkt_serialize_enter(port->mpu_serialize); 1016 } 1017 } else { 1018 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0); 1019 } 1020 /* see note at the top on the MSGPORTF_WAITING flag */ 1021 } 1022 /* 1023 * Turn EINTR into ERESTART if the signal indicates. 1024 */ 1025 if (sentabort && msg->ms_error == EINTR) 1026 msg->ms_error = sentabort; 1027 if (msg->ms_flags & MSGF_QUEUED) 1028 _lwkt_pullmsg(port, msg); 1029 } else { 1030 if (msg->ms_flags & MSGF_QUEUED) { 1031 port = msg->ms_reply_port; 1032 1033 ASSERT_SERIALIZED(port->mpu_serialize); 1034 _lwkt_pullmsg(port, msg); 1035 } 1036 } 1037 return(msg->ms_error); 1038 } 1039 1040 static 1041 void * 1042 lwkt_serialize_waitport(lwkt_port_t port, int flags) 1043 { 1044 lwkt_msg_t msg; 1045 int error; 1046 1047 ASSERT_SERIALIZED(port->mpu_serialize); 1048 1049 while ((msg = _lwkt_pollmsg(port)) == NULL) { 1050 port->mp_flags |= MSGPORTF_WAITING; 1051 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0); 1052 /* see note at the top on the MSGPORTF_WAITING flag */ 1053 if (error) 1054 return(NULL); 1055 } 1056 _lwkt_pullmsg(port, msg); 1057 return(msg); 1058 } 1059 1060 static 1061 void 1062 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 1063 { 1064 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1065 ASSERT_SERIALIZED(port->mpu_serialize); 1066 1067 if (msg->ms_flags & MSGF_SYNC) { 1068 /* 1069 * If a synchronous completion has been requested, just wakeup 1070 * the message without bothering to queue it to the target port. 1071 */ 1072 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1073 wakeup(msg); 1074 } else { 1075 /* 1076 * If an asynchronous completion has been requested the message 1077 * must be queued to the reply port. 1078 */ 1079 _lwkt_enqueue_reply(port, msg); 1080 if (port->mp_flags & MSGPORTF_WAITING) { 1081 port->mp_flags &= ~MSGPORTF_WAITING; 1082 wakeup(port); 1083 } 1084 } 1085 } 1086 1087 /************************************************************************ 1088 * PANIC AND SPECIAL PORT FUNCTIONS * 1089 ************************************************************************/ 1090 1091 /* 1092 * You can point a port's reply vector at this function if you just want 1093 * the message marked done, without any queueing or signaling. This is 1094 * often used for structure-embedded messages. 1095 */ 1096 static 1097 void 1098 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 1099 { 1100 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1101 } 1102 1103 static 1104 void * 1105 lwkt_panic_getport(lwkt_port_t port) 1106 { 1107 panic("lwkt_getport() illegal on port %p", port); 1108 } 1109 1110 static 1111 int 1112 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1113 { 1114 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1115 } 1116 1117 static 1118 int 1119 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1120 { 1121 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1122 } 1123 1124 static 1125 void * 1126 lwkt_panic_waitport(lwkt_port_t port, int flags) 1127 { 1128 panic("port %p cannot be waited on", port); 1129 } 1130 1131 static 1132 void 1133 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1134 { 1135 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1136 } 1137 1138 static 1139 void 1140 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1141 { 1142 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); 1143 } 1144