xref: /dflybsd-src/sys/kern/lwkt_msgport.c (revision fda7d3889b1114d34ad3a52a7257a2b80fe24e4c)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37 
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60 
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65 
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
69 
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
72 
73 /************************************************************************
74  *				MESSAGE FUNCTIONS			*
75  ************************************************************************/
76 
77 /*
78  * lwkt_sendmsg()
79  *
80  *	Request asynchronous completion and call lwkt_beginmsg().  The
81  *	target port can opt to execute the message synchronously or
82  *	asynchronously and this function will automatically queue the
83  *	response if the target executes the message synchronously.
84  *
85  *	NOTE: The message is in an indeterminant state until this call
86  *	returns.  The caller should not mess with it (e.g. try to abort it)
87  *	until then.
88  */
89 void
90 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
91 {
92     int error;
93 
94     KKASSERT(msg->ms_reply_port != NULL &&
95 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
96     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
97     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
98 	/*
99 	 * Target port opted to execute the message synchronously so
100 	 * queue the response.
101 	 */
102 	lwkt_replymsg(msg, error);
103     }
104 }
105 
106 /*
107  * lwkt_domsg()
108  *
109  *	Request synchronous completion and call lwkt_beginmsg().  The
110  *	target port can opt to execute the message synchronously or
111  *	asynchronously and this function will automatically block and
112  *	wait for a response if the target executes the message
113  *	asynchronously.
114  */
115 int
116 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
117 {
118     int error;
119 
120     KKASSERT(msg->ms_reply_port != NULL &&
121 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
122     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
123     msg->ms_flags |= MSGF_SYNC;
124     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
125 	/*
126 	 * Target port opted to execute the message asynchronously so
127 	 * block and wait for a reply.
128 	 */
129 	error = lwkt_waitmsg(msg, flags);
130     } else {
131 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
132     }
133     return(error);
134 }
135 
136 /*
137  * lwkt_forwardmsg()
138  *
139  * Forward a message received on one port to another port.
140  */
141 int
142 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
143 {
144     int error;
145 
146     crit_enter();
147     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
148     if ((error = port->mp_putport(port, msg)) != EASYNC)
149 	lwkt_replymsg(msg, error);
150     crit_exit();
151     return(error);
152 }
153 
154 /*
155  * lwkt_abortmsg()
156  *
157  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
158  * The caller must ensure that the message will not be both replied AND
159  * destroyed while the abort is in progress.
160  *
161  * This function issues a callback which might block!
162  */
163 void
164 lwkt_abortmsg(lwkt_msg_t msg)
165 {
166     /*
167      * A critical section protects us from reply IPIs on this cpu.
168      */
169     crit_enter();
170 
171     /*
172      * Shortcut the operation if the message has already been returned.
173      * The callback typically constructs a lwkt_msg with the abort request,
174      * issues it synchronously, and waits for completion.  The callback
175      * is not required to actually abort the message and the target port,
176      * upon receiving an abort request message generated by the callback
177      * should check whether the original message has already completed or
178      * not.
179      */
180     if (msg->ms_flags & MSGF_ABORTABLE) {
181 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
182 	    msg->ms_abortfn(msg);
183     }
184     crit_exit();
185 }
186 
187 /************************************************************************
188  *			PORT INITIALIZATION API				*
189  ************************************************************************/
190 
191 static void *lwkt_thread_getport(lwkt_port_t port);
192 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
193 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
194 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
195 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
196 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
197 
198 static void *lwkt_spin_getport(lwkt_port_t port);
199 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
200 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
201 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
202 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
203 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
204 
205 static void *lwkt_serialize_getport(lwkt_port_t port);
206 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
207 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
208 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
209 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
210 
211 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
212 static void *lwkt_panic_getport(lwkt_port_t port);
213 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
214 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
215 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
216 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
217 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
218 
219 /*
220  * Core port initialization (internal)
221  */
222 static __inline
223 void
224 _lwkt_initport(lwkt_port_t port,
225 	       void *(*gportfn)(lwkt_port_t),
226 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
227 	       int (*wmsgfn)(lwkt_msg_t, int),
228 	       void *(*wportfn)(lwkt_port_t, int),
229 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t),
230 	       void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
231 {
232     bzero(port, sizeof(*port));
233     TAILQ_INIT(&port->mp_msgq);
234     TAILQ_INIT(&port->mp_msgq_prio);
235     port->mp_getport = gportfn;
236     port->mp_putport = pportfn;
237     port->mp_waitmsg =  wmsgfn;
238     port->mp_waitport =  wportfn;
239     port->mp_replyport = rportfn;
240     port->mp_dropmsg = dmsgfn;
241 }
242 
243 /*
244  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
245  * we tell the scheduler not to reschedule if td is at a higher priority.
246  *
247  * This routine is called even if the thread is already scheduled.
248  */
249 static __inline
250 void
251 _lwkt_schedule_msg(thread_t td, int flags)
252 {
253     lwkt_schedule(td);
254 }
255 
256 /*
257  * lwkt_initport_thread()
258  *
259  *	Initialize a port for use by a particular thread.  The port may
260  *	only be used by <td>.
261  */
262 void
263 lwkt_initport_thread(lwkt_port_t port, thread_t td)
264 {
265     _lwkt_initport(port,
266 		   lwkt_thread_getport,
267 		   lwkt_thread_putport,
268 		   lwkt_thread_waitmsg,
269 		   lwkt_thread_waitport,
270 		   lwkt_thread_replyport,
271 		   lwkt_thread_dropmsg);
272     port->mpu_td = td;
273 }
274 
275 /*
276  * lwkt_initport_spin()
277  *
278  *	Initialize a port for use with descriptors that might be accessed
279  *	via multiple LWPs, processes, or threads.  Has somewhat more
280  *	overhead then thread ports.
281  */
282 void
283 lwkt_initport_spin(lwkt_port_t port, thread_t td)
284 {
285     void (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
286 
287     if (td == NULL)
288 	dmsgfn = lwkt_panic_dropmsg;
289     else
290 	dmsgfn = lwkt_spin_dropmsg;
291 
292     _lwkt_initport(port,
293 		   lwkt_spin_getport,
294 		   lwkt_spin_putport,
295 		   lwkt_spin_waitmsg,
296 		   lwkt_spin_waitport,
297 		   lwkt_spin_replyport,
298 		   dmsgfn);
299     spin_init(&port->mpu_spin);
300     port->mpu_td = td;
301 }
302 
303 /*
304  * lwkt_initport_serialize()
305  *
306  *	Initialize a port for use with descriptors that might be accessed
307  *	via multiple LWPs, processes, or threads.  Callers are assumed to
308  *	have held the serializer (slz).
309  */
310 void
311 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
312 {
313     _lwkt_initport(port,
314 		   lwkt_serialize_getport,
315 		   lwkt_serialize_putport,
316 		   lwkt_serialize_waitmsg,
317 		   lwkt_serialize_waitport,
318 		   lwkt_serialize_replyport,
319 		   lwkt_panic_dropmsg);
320     port->mpu_serialize = slz;
321 }
322 
323 /*
324  * Similar to the standard initport, this function simply marks the message
325  * as being done and does not attempt to return it to an originating port.
326  */
327 void
328 lwkt_initport_replyonly_null(lwkt_port_t port)
329 {
330     _lwkt_initport(port,
331 		   lwkt_panic_getport,
332 		   lwkt_panic_putport,
333 		   lwkt_panic_waitmsg,
334 		   lwkt_panic_waitport,
335 		   lwkt_null_replyport,
336 		   lwkt_panic_dropmsg);
337 }
338 
339 /*
340  * Initialize a reply-only port, typically used as a message sink.  Such
341  * ports can only be used as a reply port.
342  */
343 void
344 lwkt_initport_replyonly(lwkt_port_t port,
345 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
346 {
347     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
348 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
349 			 rportfn, lwkt_panic_dropmsg);
350 }
351 
352 void
353 lwkt_initport_putonly(lwkt_port_t port,
354 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
355 {
356     _lwkt_initport(port, lwkt_panic_getport, pportfn,
357 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
358 			 lwkt_panic_replyport, lwkt_panic_dropmsg);
359 }
360 
361 void
362 lwkt_initport_panic(lwkt_port_t port)
363 {
364     _lwkt_initport(port,
365 		   lwkt_panic_getport, lwkt_panic_putport,
366 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
367 		   lwkt_panic_replyport, lwkt_panic_dropmsg);
368 }
369 
370 static __inline
371 void
372 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
373 {
374     lwkt_msg_queue *queue;
375 
376     /*
377      * normal case, remove and return the message.
378      */
379     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
380 	queue = &port->mp_msgq_prio;
381     else
382 	queue = &port->mp_msgq;
383     TAILQ_REMOVE(queue, msg, ms_node);
384 
385     /*
386      * atomic op needed for spin ports
387      */
388     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
389 }
390 
391 static __inline
392 void
393 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
394 {
395     lwkt_msg_queue *queue;
396 
397     /*
398      * atomic op needed for spin ports
399      */
400     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
401     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
402 	queue = &port->mp_msgq_prio;
403     else
404     	queue = &port->mp_msgq;
405     TAILQ_INSERT_TAIL(queue, msg, ms_node);
406 }
407 
408 static __inline
409 lwkt_msg_t
410 _lwkt_pollmsg(lwkt_port_t port)
411 {
412     lwkt_msg_t msg;
413 
414     msg = TAILQ_FIRST(&port->mp_msgq_prio);
415     if (__predict_false(msg != NULL))
416 	return msg;
417 
418     /*
419      * Priority queue has no message, fallback to non-priority queue.
420      */
421     return TAILQ_FIRST(&port->mp_msgq);
422 }
423 
424 static __inline
425 void
426 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
427 {
428     /*
429      * atomic op needed for spin ports
430      */
431     _lwkt_pushmsg(port, msg);
432     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
433 }
434 
435 /************************************************************************
436  *			THREAD PORT BACKEND				*
437  ************************************************************************
438  *
439  * This backend is used when the port a message is retrieved from is owned
440  * by a single thread (the calling thread).  Messages are IPId to the
441  * correct cpu before being enqueued to a port.  Note that this is fairly
442  * optimal since scheduling would have had to do an IPI anyway if the
443  * message were headed to a different cpu.
444  */
445 
446 /*
447  * This function completes reply processing for the default case in the
448  * context of the originating cpu.
449  */
450 static
451 void
452 lwkt_thread_replyport_remote(lwkt_msg_t msg)
453 {
454     lwkt_port_t port = msg->ms_reply_port;
455     int flags;
456 
457     /*
458      * Chase any thread migration that occurs
459      */
460     if (port->mpu_td->td_gd != mycpu) {
461 	lwkt_send_ipiq(port->mpu_td->td_gd,
462 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
463 	return;
464     }
465 
466     /*
467      * Cleanup
468      */
469 #ifdef INVARIANTS
470     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
471     msg->ms_flags &= ~MSGF_INTRANSIT;
472 #endif
473     flags = msg->ms_flags;
474     if (msg->ms_flags & MSGF_SYNC) {
475 	cpu_sfence();
476 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
477     } else {
478 	_lwkt_enqueue_reply(port, msg);
479     }
480     if (port->mp_flags & MSGPORTF_WAITING)
481 	_lwkt_schedule_msg(port->mpu_td, flags);
482 }
483 
484 /*
485  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
486  *
487  * Called with the reply port as an argument but in the context of the
488  * original target port.  Completion must occur on the target port's
489  * cpu.
490  *
491  * The critical section protects us from IPIs on the this CPU.
492  */
493 static
494 void
495 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
496 {
497     int flags;
498 
499     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
500 
501     if (msg->ms_flags & MSGF_SYNC) {
502 	/*
503 	 * If a synchronous completion has been requested, just wakeup
504 	 * the message without bothering to queue it to the target port.
505 	 *
506 	 * Assume the target thread is non-preemptive, so no critical
507 	 * section is required.
508 	 */
509 	if (port->mpu_td->td_gd == mycpu) {
510 	    flags = msg->ms_flags;
511 	    cpu_sfence();
512 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
513 	    if (port->mp_flags & MSGPORTF_WAITING)
514 		_lwkt_schedule_msg(port->mpu_td, flags);
515 	} else {
516 #ifdef INVARIANTS
517 	    msg->ms_flags |= MSGF_INTRANSIT;
518 #endif
519 	    msg->ms_flags |= MSGF_REPLY;
520 	    lwkt_send_ipiq(port->mpu_td->td_gd,
521 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
522 	}
523     } else {
524 	/*
525 	 * If an asynchronous completion has been requested the message
526 	 * must be queued to the reply port.
527 	 *
528 	 * A critical section is required to interlock the port queue.
529 	 */
530 	if (port->mpu_td->td_gd == mycpu) {
531 	    crit_enter();
532 	    _lwkt_enqueue_reply(port, msg);
533 	    if (port->mp_flags & MSGPORTF_WAITING)
534 		_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
535 	    crit_exit();
536 	} else {
537 #ifdef INVARIANTS
538 	    msg->ms_flags |= MSGF_INTRANSIT;
539 #endif
540 	    msg->ms_flags |= MSGF_REPLY;
541 	    lwkt_send_ipiq(port->mpu_td->td_gd,
542 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
543 	}
544     }
545 }
546 
547 /*
548  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
549  *
550  * This function could _only_ be used when caller is in the same thread
551  * as the message's target port owner thread.
552  */
553 static void
554 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
555 {
556     KASSERT(port->mpu_td == curthread,
557     	    ("message could only be dropped in the same thread "
558 	     "as the message target port thread"));
559     crit_enter_quick(port->mpu_td);
560     _lwkt_pullmsg(port, msg);
561     msg->ms_flags |= MSGF_DONE;
562     crit_exit_quick(port->mpu_td);
563 }
564 
565 /*
566  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
567  *
568  * Called with the target port as an argument but in the context of the
569  * reply port.  This function always implements an asynchronous put to
570  * the target message port, and thus returns EASYNC.
571  *
572  * The message must already have cleared MSGF_DONE and MSGF_REPLY
573  */
574 static
575 void
576 lwkt_thread_putport_remote(lwkt_msg_t msg)
577 {
578     lwkt_port_t port = msg->ms_target_port;
579 
580     /*
581      * Chase any thread migration that occurs
582      */
583     if (port->mpu_td->td_gd != mycpu) {
584 	lwkt_send_ipiq(port->mpu_td->td_gd,
585 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
586 	return;
587     }
588 
589     /*
590      * Cleanup
591      */
592 #ifdef INVARIANTS
593     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
594     msg->ms_flags &= ~MSGF_INTRANSIT;
595 #endif
596     _lwkt_pushmsg(port, msg);
597     if (port->mp_flags & MSGPORTF_WAITING)
598 	_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
599 }
600 
601 static
602 int
603 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
604 {
605     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
606 
607     msg->ms_target_port = port;
608     if (port->mpu_td->td_gd == mycpu) {
609 	crit_enter();
610 	_lwkt_pushmsg(port, msg);
611 	if (port->mp_flags & MSGPORTF_WAITING)
612 	    _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
613 	crit_exit();
614     } else {
615 #ifdef INVARIANTS
616 	msg->ms_flags |= MSGF_INTRANSIT;
617 #endif
618 	lwkt_send_ipiq(port->mpu_td->td_gd,
619 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
620     }
621     return (EASYNC);
622 }
623 
624 /*
625  * lwkt_thread_getport()
626  *
627  *	Retrieve the next message from the port or NULL if no messages
628  *	are ready.
629  */
630 static
631 void *
632 lwkt_thread_getport(lwkt_port_t port)
633 {
634     lwkt_msg_t msg;
635 
636     KKASSERT(port->mpu_td == curthread);
637 
638     crit_enter_quick(port->mpu_td);
639     if ((msg = _lwkt_pollmsg(port)) != NULL)
640 	_lwkt_pullmsg(port, msg);
641     crit_exit_quick(port->mpu_td);
642     return(msg);
643 }
644 
645 /*
646  * lwkt_thread_waitmsg()
647  *
648  *	Wait for a particular message to be replied.  We must be the only
649  *	thread waiting on the message.  The port must be owned by the
650  *	caller.
651  */
652 static
653 int
654 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
655 {
656     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
657 	    ("can't wait dropable message"));
658 
659     if ((msg->ms_flags & MSGF_DONE) == 0) {
660 	/*
661 	 * If the done bit was not set we have to block until it is.
662 	 */
663 	lwkt_port_t port = msg->ms_reply_port;
664 	thread_t td = curthread;
665 	int sentabort;
666 
667 	KKASSERT(port->mpu_td == td);
668 	crit_enter_quick(td);
669 	sentabort = 0;
670 
671 	while ((msg->ms_flags & MSGF_DONE) == 0) {
672 	    port->mp_flags |= MSGPORTF_WAITING;
673 	    if (sentabort == 0) {
674 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
675 		    lwkt_abortmsg(msg);
676 		}
677 	    } else {
678 		lwkt_sleep("waitabt", 0);
679 	    }
680 	    port->mp_flags &= ~MSGPORTF_WAITING;
681 	}
682 	if (msg->ms_flags & MSGF_QUEUED)
683 	    _lwkt_pullmsg(port, msg);
684 	crit_exit_quick(td);
685     } else {
686 	/*
687 	 * If the done bit was set we only have to mess around with the
688 	 * message if it is queued on the reply port.
689 	 */
690 	if (msg->ms_flags & MSGF_QUEUED) {
691 	    lwkt_port_t port = msg->ms_reply_port;
692 	    thread_t td = curthread;
693 
694 	    KKASSERT(port->mpu_td == td);
695 	    crit_enter_quick(td);
696 	    _lwkt_pullmsg(port, msg);
697 	    crit_exit_quick(td);
698 	}
699     }
700     return(msg->ms_error);
701 }
702 
703 /*
704  * lwkt_thread_waitport()
705  *
706  *	Wait for a new message to be available on the port.  We must be the
707  *	the only thread waiting on the port.  The port must be owned by caller.
708  */
709 static
710 void *
711 lwkt_thread_waitport(lwkt_port_t port, int flags)
712 {
713     thread_t td = curthread;
714     lwkt_msg_t msg;
715     int error;
716 
717     KKASSERT(port->mpu_td == td);
718     crit_enter_quick(td);
719     while ((msg = _lwkt_pollmsg(port)) == NULL) {
720 	port->mp_flags |= MSGPORTF_WAITING;
721 	error = lwkt_sleep("waitport", flags);
722 	port->mp_flags &= ~MSGPORTF_WAITING;
723 	if (error)
724 	    goto done;
725     }
726     _lwkt_pullmsg(port, msg);
727 done:
728     crit_exit_quick(td);
729     return(msg);
730 }
731 
732 /************************************************************************
733  *			   SPIN PORT BACKEND				*
734  ************************************************************************
735  *
736  * This backend uses spinlocks instead of making assumptions about which
737  * thread is accessing the port.  It must be used when a port is not owned
738  * by a particular thread.  This is less optimal then thread ports but
739  * you don't have a choice if there are multiple threads accessing the port.
740  *
741  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
742  * on the message port, it is the responsibility of the code doing the
743  * wakeup to clear this flag rather then the blocked threads.  Some
744  * superfluous wakeups may occur, which is ok.
745  *
746  * XXX synchronous message wakeups are not current optimized.
747  */
748 
749 static
750 void *
751 lwkt_spin_getport(lwkt_port_t port)
752 {
753     lwkt_msg_t msg;
754 
755     spin_lock(&port->mpu_spin);
756     if ((msg = _lwkt_pollmsg(port)) != NULL)
757 	_lwkt_pullmsg(port, msg);
758     spin_unlock(&port->mpu_spin);
759     return(msg);
760 }
761 
762 static
763 int
764 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
765 {
766     int dowakeup;
767 
768     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
769 
770     msg->ms_target_port = port;
771     spin_lock(&port->mpu_spin);
772     _lwkt_pushmsg(port, msg);
773     dowakeup = 0;
774     if (port->mp_flags & MSGPORTF_WAITING) {
775 	port->mp_flags &= ~MSGPORTF_WAITING;
776 	dowakeup = 1;
777     }
778     spin_unlock(&port->mpu_spin);
779     if (dowakeup)
780 	wakeup(port);
781     return (EASYNC);
782 }
783 
784 static
785 int
786 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
787 {
788     lwkt_port_t port;
789     int sentabort;
790     int error;
791 
792     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
793 	    ("can't wait dropable message"));
794 
795     if ((msg->ms_flags & MSGF_DONE) == 0) {
796 	port = msg->ms_reply_port;
797 	sentabort = 0;
798 	spin_lock(&port->mpu_spin);
799 	while ((msg->ms_flags & MSGF_DONE) == 0) {
800 	    void *won;
801 
802 	    /*
803 	     * If message was sent synchronously from the beginning
804 	     * the wakeup will be on the message structure, else it
805 	     * will be on the port structure.
806 	     */
807 	    if (msg->ms_flags & MSGF_SYNC) {
808 		won = msg;
809 		atomic_set_int(&msg->ms_flags, MSGF_WAITING);
810 	    } else {
811 		won = port;
812 		port->mp_flags |= MSGPORTF_WAITING;
813 	    }
814 
815 	    /*
816 	     * Only messages which support abort can be interrupted.
817 	     * We must still wait for message completion regardless.
818 	     */
819 	    if ((flags & PCATCH) && sentabort == 0) {
820 		error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
821 		if (error) {
822 		    sentabort = error;
823 		    spin_unlock(&port->mpu_spin);
824 		    lwkt_abortmsg(msg);
825 		    spin_lock(&port->mpu_spin);
826 		}
827 	    } else {
828 		error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
829 	    }
830 	    /* see note at the top on the MSGPORTF_WAITING flag */
831 	}
832 	/*
833 	 * Turn EINTR into ERESTART if the signal indicates.
834 	 */
835 	if (sentabort && msg->ms_error == EINTR)
836 	    msg->ms_error = sentabort;
837 	if (msg->ms_flags & MSGF_QUEUED)
838 	    _lwkt_pullmsg(port, msg);
839 	spin_unlock(&port->mpu_spin);
840     } else {
841 	if (msg->ms_flags & MSGF_QUEUED) {
842 	    port = msg->ms_reply_port;
843 	    spin_lock(&port->mpu_spin);
844 	    _lwkt_pullmsg(port, msg);
845 	    spin_unlock(&port->mpu_spin);
846 	}
847     }
848     return(msg->ms_error);
849 }
850 
851 static
852 void *
853 lwkt_spin_waitport(lwkt_port_t port, int flags)
854 {
855     lwkt_msg_t msg;
856     int error;
857 
858     spin_lock(&port->mpu_spin);
859     while ((msg = _lwkt_pollmsg(port)) == NULL) {
860 	port->mp_flags |= MSGPORTF_WAITING;
861 	error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
862 	/* see note at the top on the MSGPORTF_WAITING flag */
863 	if (error) {
864 	    spin_unlock(&port->mpu_spin);
865 	    return(NULL);
866 	}
867     }
868     _lwkt_pullmsg(port, msg);
869     spin_unlock(&port->mpu_spin);
870     return(msg);
871 }
872 
873 static
874 void
875 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
876 {
877     int dowakeup;
878 
879     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
880 
881     if (msg->ms_flags & MSGF_SYNC) {
882 	/*
883 	 * If a synchronous completion has been requested, just wakeup
884 	 * the message without bothering to queue it to the target port.
885 	 */
886 	spin_lock(&port->mpu_spin);
887 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
888 	dowakeup = 0;
889 	if (msg->ms_flags & MSGF_WAITING) {
890 		atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
891 		dowakeup = 1;
892 	}
893 	spin_unlock(&port->mpu_spin);
894 	if (dowakeup)
895 		wakeup(msg);
896     } else {
897 	/*
898 	 * If an asynchronous completion has been requested the message
899 	 * must be queued to the reply port.
900 	 */
901 	spin_lock(&port->mpu_spin);
902 	_lwkt_enqueue_reply(port, msg);
903 	dowakeup = 0;
904 	if (port->mp_flags & MSGPORTF_WAITING) {
905 	    port->mp_flags &= ~MSGPORTF_WAITING;
906 	    dowakeup = 1;
907 	}
908 	spin_unlock(&port->mpu_spin);
909 	if (dowakeup)
910 	    wakeup(port);
911     }
912 }
913 
914 /*
915  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
916  *
917  * This function could _only_ be used when caller is in the same thread
918  * as the message's target port owner thread.
919  */
920 static void
921 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
922 {
923     KASSERT(port->mpu_td == curthread,
924     	    ("message could only be dropped in the same thread "
925 	     "as the message target port thread\n"));
926     spin_lock(&port->mpu_spin);
927     _lwkt_pullmsg(port, msg);
928     msg->ms_flags |= MSGF_DONE;
929     spin_unlock(&port->mpu_spin);
930 }
931 
932 /************************************************************************
933  *			  SERIALIZER PORT BACKEND			*
934  ************************************************************************
935  *
936  * This backend uses serializer to protect port accessing.  Callers are
937  * assumed to have serializer held.  This kind of port is usually created
938  * by network device driver along with _one_ lwkt thread to pipeline
939  * operations which may temporarily release serializer.
940  *
941  * Implementation is based on SPIN PORT BACKEND.
942  */
943 
944 static
945 void *
946 lwkt_serialize_getport(lwkt_port_t port)
947 {
948     lwkt_msg_t msg;
949 
950     ASSERT_SERIALIZED(port->mpu_serialize);
951 
952     if ((msg = _lwkt_pollmsg(port)) != NULL)
953 	_lwkt_pullmsg(port, msg);
954     return(msg);
955 }
956 
957 static
958 int
959 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
960 {
961     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
962     ASSERT_SERIALIZED(port->mpu_serialize);
963 
964     msg->ms_target_port = port;
965     _lwkt_pushmsg(port, msg);
966     if (port->mp_flags & MSGPORTF_WAITING) {
967 	port->mp_flags &= ~MSGPORTF_WAITING;
968 	wakeup(port);
969     }
970     return (EASYNC);
971 }
972 
973 static
974 int
975 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
976 {
977     lwkt_port_t port;
978     int sentabort;
979     int error;
980 
981     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
982 	    ("can't wait dropable message"));
983 
984     if ((msg->ms_flags & MSGF_DONE) == 0) {
985 	port = msg->ms_reply_port;
986 
987 	ASSERT_SERIALIZED(port->mpu_serialize);
988 
989 	sentabort = 0;
990 	while ((msg->ms_flags & MSGF_DONE) == 0) {
991 	    void *won;
992 
993 	    /*
994 	     * If message was sent synchronously from the beginning
995 	     * the wakeup will be on the message structure, else it
996 	     * will be on the port structure.
997 	     */
998 	    if (msg->ms_flags & MSGF_SYNC) {
999 		won = msg;
1000 	    } else {
1001 		won = port;
1002 		port->mp_flags |= MSGPORTF_WAITING;
1003 	    }
1004 
1005 	    /*
1006 	     * Only messages which support abort can be interrupted.
1007 	     * We must still wait for message completion regardless.
1008 	     */
1009 	    if ((flags & PCATCH) && sentabort == 0) {
1010 		error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1011 		if (error) {
1012 		    sentabort = error;
1013 		    lwkt_serialize_exit(port->mpu_serialize);
1014 		    lwkt_abortmsg(msg);
1015 		    lwkt_serialize_enter(port->mpu_serialize);
1016 		}
1017 	    } else {
1018 		error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1019 	    }
1020 	    /* see note at the top on the MSGPORTF_WAITING flag */
1021 	}
1022 	/*
1023 	 * Turn EINTR into ERESTART if the signal indicates.
1024 	 */
1025 	if (sentabort && msg->ms_error == EINTR)
1026 	    msg->ms_error = sentabort;
1027 	if (msg->ms_flags & MSGF_QUEUED)
1028 	    _lwkt_pullmsg(port, msg);
1029     } else {
1030 	if (msg->ms_flags & MSGF_QUEUED) {
1031 	    port = msg->ms_reply_port;
1032 
1033 	    ASSERT_SERIALIZED(port->mpu_serialize);
1034 	    _lwkt_pullmsg(port, msg);
1035 	}
1036     }
1037     return(msg->ms_error);
1038 }
1039 
1040 static
1041 void *
1042 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1043 {
1044     lwkt_msg_t msg;
1045     int error;
1046 
1047     ASSERT_SERIALIZED(port->mpu_serialize);
1048 
1049     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1050 	port->mp_flags |= MSGPORTF_WAITING;
1051 	error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1052 	/* see note at the top on the MSGPORTF_WAITING flag */
1053 	if (error)
1054 	    return(NULL);
1055     }
1056     _lwkt_pullmsg(port, msg);
1057     return(msg);
1058 }
1059 
1060 static
1061 void
1062 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1063 {
1064     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1065     ASSERT_SERIALIZED(port->mpu_serialize);
1066 
1067     if (msg->ms_flags & MSGF_SYNC) {
1068 	/*
1069 	 * If a synchronous completion has been requested, just wakeup
1070 	 * the message without bothering to queue it to the target port.
1071 	 */
1072 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1073 	wakeup(msg);
1074     } else {
1075 	/*
1076 	 * If an asynchronous completion has been requested the message
1077 	 * must be queued to the reply port.
1078 	 */
1079 	_lwkt_enqueue_reply(port, msg);
1080 	if (port->mp_flags & MSGPORTF_WAITING) {
1081 	    port->mp_flags &= ~MSGPORTF_WAITING;
1082 	    wakeup(port);
1083 	}
1084     }
1085 }
1086 
1087 /************************************************************************
1088  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
1089  ************************************************************************/
1090 
1091 /*
1092  * You can point a port's reply vector at this function if you just want
1093  * the message marked done, without any queueing or signaling.  This is
1094  * often used for structure-embedded messages.
1095  */
1096 static
1097 void
1098 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1099 {
1100     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1101 }
1102 
1103 static
1104 void *
1105 lwkt_panic_getport(lwkt_port_t port)
1106 {
1107     panic("lwkt_getport() illegal on port %p", port);
1108 }
1109 
1110 static
1111 int
1112 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1113 {
1114     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1115 }
1116 
1117 static
1118 int
1119 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1120 {
1121     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1122 }
1123 
1124 static
1125 void *
1126 lwkt_panic_waitport(lwkt_port_t port, int flags)
1127 {
1128     panic("port %p cannot be waited on", port);
1129 }
1130 
1131 static
1132 void
1133 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1134 {
1135     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1136 }
1137 
1138 static
1139 void
1140 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1141 {
1142     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1143 }
1144