xref: /dflybsd-src/sys/kern/lwkt_msgport.c (revision f3adbb3cdddf061a5f284c3d9b33d22fecdfcfaf)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37 
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60 
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65 
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #ifdef SMP
69 #include <machine/smp.h>
70 #endif
71 
72 #include <sys/malloc.h>
73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
74 
75 /************************************************************************
76  *				MESSAGE FUNCTIONS			*
77  ************************************************************************/
78 
79 /*
80  * lwkt_sendmsg()
81  *
82  *	Request asynchronous completion and call lwkt_beginmsg().  The
83  *	target port can opt to execute the message synchronously or
84  *	asynchronously and this function will automatically queue the
85  *	response if the target executes the message synchronously.
86  *
87  *	NOTE: The message is in an indeterminant state until this call
88  *	returns.  The caller should not mess with it (e.g. try to abort it)
89  *	until then.
90  */
91 void
92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
93 {
94     int error;
95 
96     KKASSERT(msg->ms_reply_port != NULL &&
97 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
98     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
99     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
100 	/*
101 	 * Target port opted to execute the message synchronously so
102 	 * queue the response.
103 	 */
104 	lwkt_replymsg(msg, error);
105     }
106 }
107 
108 /*
109  * lwkt_domsg()
110  *
111  *	Request synchronous completion and call lwkt_beginmsg().  The
112  *	target port can opt to execute the message synchronously or
113  *	asynchronously and this function will automatically block and
114  *	wait for a response if the target executes the message
115  *	asynchronously.
116  */
117 int
118 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
119 {
120     int error;
121 
122     KKASSERT(msg->ms_reply_port != NULL &&
123 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
124     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
125     msg->ms_flags |= MSGF_SYNC;
126     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
127 	/*
128 	 * Target port opted to execute the message asynchronously so
129 	 * block and wait for a reply.
130 	 */
131 	error = lwkt_waitmsg(msg, flags);
132     } else {
133 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
134     }
135     return(error);
136 }
137 
138 /*
139  * lwkt_forwardmsg()
140  *
141  * Forward a message received on one port to another port.
142  */
143 int
144 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
145 {
146     int error;
147 
148     crit_enter();
149     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
150     if ((error = port->mp_putport(port, msg)) != EASYNC)
151 	lwkt_replymsg(msg, error);
152     crit_exit();
153     return(error);
154 }
155 
156 /*
157  * lwkt_abortmsg()
158  *
159  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
160  * The caller must ensure that the message will not be both replied AND
161  * destroyed while the abort is in progress.
162  *
163  * This function issues a callback which might block!
164  */
165 void
166 lwkt_abortmsg(lwkt_msg_t msg)
167 {
168     /*
169      * A critical section protects us from reply IPIs on this cpu.
170      */
171     crit_enter();
172 
173     /*
174      * Shortcut the operation if the message has already been returned.
175      * The callback typically constructs a lwkt_msg with the abort request,
176      * issues it synchronously, and waits for completion.  The callback
177      * is not required to actually abort the message and the target port,
178      * upon receiving an abort request message generated by the callback
179      * should check whether the original message has already completed or
180      * not.
181      */
182     if (msg->ms_flags & MSGF_ABORTABLE) {
183 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
184 	    msg->ms_abortfn(msg);
185     }
186     crit_exit();
187 }
188 
189 /************************************************************************
190  *			PORT INITIALIZATION API				*
191  ************************************************************************/
192 
193 static void *lwkt_thread_getport(lwkt_port_t port);
194 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
197 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
199 
200 static void *lwkt_spin_getport(lwkt_port_t port);
201 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
202 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
203 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
204 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
206 
207 static void *lwkt_serialize_getport(lwkt_port_t port);
208 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
209 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
210 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
211 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
212 
213 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
214 static void *lwkt_panic_getport(lwkt_port_t port);
215 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
216 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
217 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
218 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
219 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
220 
221 /*
222  * Core port initialization (internal)
223  */
224 static __inline
225 void
226 _lwkt_initport(lwkt_port_t port,
227 	       void *(*gportfn)(lwkt_port_t),
228 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
229 	       int (*wmsgfn)(lwkt_msg_t, int),
230 	       void *(*wportfn)(lwkt_port_t, int),
231 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t),
232 	       void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
233 {
234     bzero(port, sizeof(*port));
235     TAILQ_INIT(&port->mp_msgq);
236     TAILQ_INIT(&port->mp_msgq_prio);
237     port->mp_getport = gportfn;
238     port->mp_putport = pportfn;
239     port->mp_waitmsg =  wmsgfn;
240     port->mp_waitport =  wportfn;
241     port->mp_replyport = rportfn;
242     port->mp_dropmsg = dmsgfn;
243 }
244 
245 /*
246  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
247  * we tell the scheduler not to reschedule if td is at a higher priority.
248  *
249  * This routine is called even if the thread is already scheduled.
250  */
251 static __inline
252 void
253 _lwkt_schedule_msg(thread_t td, int flags)
254 {
255     lwkt_schedule(td);
256 }
257 
258 /*
259  * lwkt_initport_thread()
260  *
261  *	Initialize a port for use by a particular thread.  The port may
262  *	only be used by <td>.
263  */
264 void
265 lwkt_initport_thread(lwkt_port_t port, thread_t td)
266 {
267     _lwkt_initport(port,
268 		   lwkt_thread_getport,
269 		   lwkt_thread_putport,
270 		   lwkt_thread_waitmsg,
271 		   lwkt_thread_waitport,
272 		   lwkt_thread_replyport,
273 		   lwkt_thread_dropmsg);
274     port->mpu_td = td;
275 }
276 
277 /*
278  * lwkt_initport_spin()
279  *
280  *	Initialize a port for use with descriptors that might be accessed
281  *	via multiple LWPs, processes, or threads.  Has somewhat more
282  *	overhead then thread ports.
283  */
284 void
285 lwkt_initport_spin(lwkt_port_t port)
286 {
287     _lwkt_initport(port,
288 		   lwkt_spin_getport,
289 		   lwkt_spin_putport,
290 		   lwkt_spin_waitmsg,
291 		   lwkt_spin_waitport,
292 		   lwkt_spin_replyport,
293 		   lwkt_spin_dropmsg);
294     spin_init(&port->mpu_spin);
295 }
296 
297 /*
298  * lwkt_initport_serialize()
299  *
300  *	Initialize a port for use with descriptors that might be accessed
301  *	via multiple LWPs, processes, or threads.  Callers are assumed to
302  *	have held the serializer (slz).
303  */
304 void
305 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
306 {
307     _lwkt_initport(port,
308 		   lwkt_serialize_getport,
309 		   lwkt_serialize_putport,
310 		   lwkt_serialize_waitmsg,
311 		   lwkt_serialize_waitport,
312 		   lwkt_serialize_replyport,
313 		   lwkt_panic_dropmsg);
314     port->mpu_serialize = slz;
315 }
316 
317 /*
318  * Similar to the standard initport, this function simply marks the message
319  * as being done and does not attempt to return it to an originating port.
320  */
321 void
322 lwkt_initport_replyonly_null(lwkt_port_t port)
323 {
324     _lwkt_initport(port,
325 		   lwkt_panic_getport,
326 		   lwkt_panic_putport,
327 		   lwkt_panic_waitmsg,
328 		   lwkt_panic_waitport,
329 		   lwkt_null_replyport,
330 		   lwkt_panic_dropmsg);
331 }
332 
333 /*
334  * Initialize a reply-only port, typically used as a message sink.  Such
335  * ports can only be used as a reply port.
336  */
337 void
338 lwkt_initport_replyonly(lwkt_port_t port,
339 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
340 {
341     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
342 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
343 			 rportfn, lwkt_panic_dropmsg);
344 }
345 
346 void
347 lwkt_initport_putonly(lwkt_port_t port,
348 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
349 {
350     _lwkt_initport(port, lwkt_panic_getport, pportfn,
351 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
352 			 lwkt_panic_replyport, lwkt_panic_dropmsg);
353 }
354 
355 void
356 lwkt_initport_panic(lwkt_port_t port)
357 {
358     _lwkt_initport(port,
359 		   lwkt_panic_getport, lwkt_panic_putport,
360 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
361 		   lwkt_panic_replyport, lwkt_panic_dropmsg);
362 }
363 
364 static __inline
365 void
366 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
367 {
368     lwkt_msg_queue *queue;
369 
370     /*
371      * normal case, remove and return the message.
372      */
373     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
374 	queue = &port->mp_msgq_prio;
375     else
376 	queue = &port->mp_msgq;
377     TAILQ_REMOVE(queue, msg, ms_node);
378 
379     /*
380      * atomic op needed for spin ports
381      */
382     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
383 }
384 
385 static __inline
386 void
387 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
388 {
389     lwkt_msg_queue *queue;
390 
391     /*
392      * atomic op needed for spin ports
393      */
394     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
395     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
396 	queue = &port->mp_msgq_prio;
397     else
398     	queue = &port->mp_msgq;
399     TAILQ_INSERT_TAIL(queue, msg, ms_node);
400 }
401 
402 static __inline
403 lwkt_msg_t
404 _lwkt_pollmsg(lwkt_port_t port)
405 {
406     lwkt_msg_t msg;
407 
408     msg = TAILQ_FIRST(&port->mp_msgq_prio);
409     if (__predict_false(msg != NULL))
410 	return msg;
411 
412     /*
413      * Priority queue has no message, fallback to non-priority queue.
414      */
415     return TAILQ_FIRST(&port->mp_msgq);
416 }
417 
418 static __inline
419 void
420 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
421 {
422     /*
423      * atomic op needed for spin ports
424      */
425     _lwkt_pushmsg(port, msg);
426     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
427 }
428 
429 /************************************************************************
430  *			THREAD PORT BACKEND				*
431  ************************************************************************
432  *
433  * This backend is used when the port a message is retrieved from is owned
434  * by a single thread (the calling thread).  Messages are IPId to the
435  * correct cpu before being enqueued to a port.  Note that this is fairly
436  * optimal since scheduling would have had to do an IPI anyway if the
437  * message were headed to a different cpu.
438  */
439 
440 #ifdef SMP
441 
442 /*
443  * This function completes reply processing for the default case in the
444  * context of the originating cpu.
445  */
446 static
447 void
448 lwkt_thread_replyport_remote(lwkt_msg_t msg)
449 {
450     lwkt_port_t port = msg->ms_reply_port;
451     int flags;
452 
453     /*
454      * Chase any thread migration that occurs
455      */
456     if (port->mpu_td->td_gd != mycpu) {
457 	lwkt_send_ipiq(port->mpu_td->td_gd,
458 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
459 	return;
460     }
461 
462     /*
463      * Cleanup
464      */
465 #ifdef INVARIANTS
466     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
467     msg->ms_flags &= ~MSGF_INTRANSIT;
468 #endif
469     flags = msg->ms_flags;
470     if (msg->ms_flags & MSGF_SYNC) {
471 	cpu_sfence();
472 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
473     } else {
474 	_lwkt_enqueue_reply(port, msg);
475     }
476     if (port->mp_flags & MSGPORTF_WAITING)
477 	_lwkt_schedule_msg(port->mpu_td, flags);
478 }
479 
480 #endif
481 
482 /*
483  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
484  *
485  * Called with the reply port as an argument but in the context of the
486  * original target port.  Completion must occur on the target port's
487  * cpu.
488  *
489  * The critical section protects us from IPIs on the this CPU.
490  */
491 static
492 void
493 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
494 {
495     int flags;
496 
497     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
498 
499     if (msg->ms_flags & MSGF_SYNC) {
500 	/*
501 	 * If a synchronous completion has been requested, just wakeup
502 	 * the message without bothering to queue it to the target port.
503 	 *
504 	 * Assume the target thread is non-preemptive, so no critical
505 	 * section is required.
506 	 */
507 #ifdef SMP
508 	if (port->mpu_td->td_gd == mycpu) {
509 #endif
510 	    flags = msg->ms_flags;
511 	    cpu_sfence();
512 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
513 	    if (port->mp_flags & MSGPORTF_WAITING)
514 		_lwkt_schedule_msg(port->mpu_td, flags);
515 #ifdef SMP
516 	} else {
517 #ifdef INVARIANTS
518 	    msg->ms_flags |= MSGF_INTRANSIT;
519 #endif
520 	    msg->ms_flags |= MSGF_REPLY;
521 	    lwkt_send_ipiq(port->mpu_td->td_gd,
522 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
523 	}
524 #endif
525     } else {
526 	/*
527 	 * If an asynchronous completion has been requested the message
528 	 * must be queued to the reply port.
529 	 *
530 	 * A critical section is required to interlock the port queue.
531 	 */
532 #ifdef SMP
533 	if (port->mpu_td->td_gd == mycpu) {
534 #endif
535 	    crit_enter();
536 	    _lwkt_enqueue_reply(port, msg);
537 	    if (port->mp_flags & MSGPORTF_WAITING)
538 		_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
539 	    crit_exit();
540 #ifdef SMP
541 	} else {
542 #ifdef INVARIANTS
543 	    msg->ms_flags |= MSGF_INTRANSIT;
544 #endif
545 	    msg->ms_flags |= MSGF_REPLY;
546 	    lwkt_send_ipiq(port->mpu_td->td_gd,
547 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
548 	}
549 #endif
550     }
551 }
552 
553 /*
554  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
555  *
556  * This function could _only_ be used when caller is in the same thread
557  * as the message's target port owner thread.
558  */
559 static void
560 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
561 {
562     KASSERT(port->mpu_td == curthread,
563     	    ("message could only be dropped in the same thread "
564 	     "as the message target port thread"));
565     crit_enter_quick(port->mpu_td);
566     _lwkt_pullmsg(port, msg);
567     msg->ms_flags |= MSGF_DONE;
568     crit_exit_quick(port->mpu_td);
569 }
570 
571 /*
572  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
573  *
574  * Called with the target port as an argument but in the context of the
575  * reply port.  This function always implements an asynchronous put to
576  * the target message port, and thus returns EASYNC.
577  *
578  * The message must already have cleared MSGF_DONE and MSGF_REPLY
579  */
580 
581 #ifdef SMP
582 
583 static
584 void
585 lwkt_thread_putport_remote(lwkt_msg_t msg)
586 {
587     lwkt_port_t port = msg->ms_target_port;
588 
589     /*
590      * Chase any thread migration that occurs
591      */
592     if (port->mpu_td->td_gd != mycpu) {
593 	lwkt_send_ipiq(port->mpu_td->td_gd,
594 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
595 	return;
596     }
597 
598     /*
599      * Cleanup
600      */
601 #ifdef INVARIANTS
602     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
603     msg->ms_flags &= ~MSGF_INTRANSIT;
604 #endif
605     _lwkt_pushmsg(port, msg);
606     if (port->mp_flags & MSGPORTF_WAITING)
607 	_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
608 }
609 
610 #endif
611 
612 static
613 int
614 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
615 {
616     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
617 
618     msg->ms_target_port = port;
619 #ifdef SMP
620     if (port->mpu_td->td_gd == mycpu) {
621 #endif
622 	crit_enter();
623 	_lwkt_pushmsg(port, msg);
624 	if (port->mp_flags & MSGPORTF_WAITING)
625 	    _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
626 	crit_exit();
627 #ifdef SMP
628     } else {
629 #ifdef INVARIANTS
630 	msg->ms_flags |= MSGF_INTRANSIT;
631 #endif
632 	lwkt_send_ipiq(port->mpu_td->td_gd,
633 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
634     }
635 #endif
636     return (EASYNC);
637 }
638 
639 /*
640  * lwkt_thread_getport()
641  *
642  *	Retrieve the next message from the port or NULL if no messages
643  *	are ready.
644  */
645 static
646 void *
647 lwkt_thread_getport(lwkt_port_t port)
648 {
649     lwkt_msg_t msg;
650 
651     KKASSERT(port->mpu_td == curthread);
652 
653     crit_enter_quick(port->mpu_td);
654     if ((msg = _lwkt_pollmsg(port)) != NULL)
655 	_lwkt_pullmsg(port, msg);
656     crit_exit_quick(port->mpu_td);
657     return(msg);
658 }
659 
660 /*
661  * lwkt_thread_waitmsg()
662  *
663  *	Wait for a particular message to be replied.  We must be the only
664  *	thread waiting on the message.  The port must be owned by the
665  *	caller.
666  */
667 static
668 int
669 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
670 {
671     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
672 	    ("can't wait dropable message"));
673 
674     if ((msg->ms_flags & MSGF_DONE) == 0) {
675 	/*
676 	 * If the done bit was not set we have to block until it is.
677 	 */
678 	lwkt_port_t port = msg->ms_reply_port;
679 	thread_t td = curthread;
680 	int sentabort;
681 
682 	KKASSERT(port->mpu_td == td);
683 	crit_enter_quick(td);
684 	sentabort = 0;
685 
686 	while ((msg->ms_flags & MSGF_DONE) == 0) {
687 	    port->mp_flags |= MSGPORTF_WAITING;
688 	    if (sentabort == 0) {
689 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
690 		    lwkt_abortmsg(msg);
691 		}
692 	    } else {
693 		lwkt_sleep("waitabt", 0);
694 	    }
695 	    port->mp_flags &= ~MSGPORTF_WAITING;
696 	}
697 	if (msg->ms_flags & MSGF_QUEUED)
698 	    _lwkt_pullmsg(port, msg);
699 	crit_exit_quick(td);
700     } else {
701 	/*
702 	 * If the done bit was set we only have to mess around with the
703 	 * message if it is queued on the reply port.
704 	 */
705 	if (msg->ms_flags & MSGF_QUEUED) {
706 	    lwkt_port_t port = msg->ms_reply_port;
707 	    thread_t td = curthread;
708 
709 	    KKASSERT(port->mpu_td == td);
710 	    crit_enter_quick(td);
711 	    _lwkt_pullmsg(port, msg);
712 	    crit_exit_quick(td);
713 	}
714     }
715     return(msg->ms_error);
716 }
717 
718 /*
719  * lwkt_thread_waitport()
720  *
721  *	Wait for a new message to be available on the port.  We must be the
722  *	the only thread waiting on the port.  The port must be owned by caller.
723  */
724 static
725 void *
726 lwkt_thread_waitport(lwkt_port_t port, int flags)
727 {
728     thread_t td = curthread;
729     lwkt_msg_t msg;
730     int error;
731 
732     KKASSERT(port->mpu_td == td);
733     crit_enter_quick(td);
734     while ((msg = _lwkt_pollmsg(port)) == NULL) {
735 	port->mp_flags |= MSGPORTF_WAITING;
736 	error = lwkt_sleep("waitport", flags);
737 	port->mp_flags &= ~MSGPORTF_WAITING;
738 	if (error)
739 	    goto done;
740     }
741     _lwkt_pullmsg(port, msg);
742 done:
743     crit_exit_quick(td);
744     return(msg);
745 }
746 
747 /************************************************************************
748  *			   SPIN PORT BACKEND				*
749  ************************************************************************
750  *
751  * This backend uses spinlocks instead of making assumptions about which
752  * thread is accessing the port.  It must be used when a port is not owned
753  * by a particular thread.  This is less optimal then thread ports but
754  * you don't have a choice if there are multiple threads accessing the port.
755  *
756  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
757  * on the message port, it is the responsibility of the code doing the
758  * wakeup to clear this flag rather then the blocked threads.  Some
759  * superfluous wakeups may occur, which is ok.
760  *
761  * XXX synchronous message wakeups are not current optimized.
762  */
763 
764 static
765 void *
766 lwkt_spin_getport(lwkt_port_t port)
767 {
768     lwkt_msg_t msg;
769 
770     spin_lock(&port->mpu_spin);
771     if ((msg = _lwkt_pollmsg(port)) != NULL)
772 	_lwkt_pullmsg(port, msg);
773     spin_unlock(&port->mpu_spin);
774     return(msg);
775 }
776 
777 static
778 int
779 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
780 {
781     int dowakeup;
782 
783     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
784 
785     msg->ms_target_port = port;
786     spin_lock(&port->mpu_spin);
787     _lwkt_pushmsg(port, msg);
788     dowakeup = 0;
789     if (port->mp_flags & MSGPORTF_WAITING) {
790 	port->mp_flags &= ~MSGPORTF_WAITING;
791 	dowakeup = 1;
792     }
793     spin_unlock(&port->mpu_spin);
794     if (dowakeup)
795 	wakeup(port);
796     return (EASYNC);
797 }
798 
799 static
800 int
801 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
802 {
803     lwkt_port_t port;
804     int sentabort;
805     int error;
806 
807     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
808 	    ("can't wait dropable message"));
809 
810     if ((msg->ms_flags & MSGF_DONE) == 0) {
811 	port = msg->ms_reply_port;
812 	sentabort = 0;
813 	spin_lock(&port->mpu_spin);
814 	while ((msg->ms_flags & MSGF_DONE) == 0) {
815 	    void *won;
816 
817 	    /*
818 	     * If message was sent synchronously from the beginning
819 	     * the wakeup will be on the message structure, else it
820 	     * will be on the port structure.
821 	     */
822 	    if (msg->ms_flags & MSGF_SYNC) {
823 		won = msg;
824 		atomic_set_int(&msg->ms_flags, MSGF_WAITING);
825 	    } else {
826 		won = port;
827 		port->mp_flags |= MSGPORTF_WAITING;
828 	    }
829 
830 	    /*
831 	     * Only messages which support abort can be interrupted.
832 	     * We must still wait for message completion regardless.
833 	     */
834 	    if ((flags & PCATCH) && sentabort == 0) {
835 		error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
836 		if (error) {
837 		    sentabort = error;
838 		    spin_unlock(&port->mpu_spin);
839 		    lwkt_abortmsg(msg);
840 		    spin_lock(&port->mpu_spin);
841 		}
842 	    } else {
843 		error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
844 	    }
845 	    /* see note at the top on the MSGPORTF_WAITING flag */
846 	}
847 	/*
848 	 * Turn EINTR into ERESTART if the signal indicates.
849 	 */
850 	if (sentabort && msg->ms_error == EINTR)
851 	    msg->ms_error = sentabort;
852 	if (msg->ms_flags & MSGF_QUEUED)
853 	    _lwkt_pullmsg(port, msg);
854 	spin_unlock(&port->mpu_spin);
855     } else {
856 	if (msg->ms_flags & MSGF_QUEUED) {
857 	    port = msg->ms_reply_port;
858 	    spin_lock(&port->mpu_spin);
859 	    _lwkt_pullmsg(port, msg);
860 	    spin_unlock(&port->mpu_spin);
861 	}
862     }
863     return(msg->ms_error);
864 }
865 
866 static
867 void *
868 lwkt_spin_waitport(lwkt_port_t port, int flags)
869 {
870     lwkt_msg_t msg;
871     int error;
872 
873     spin_lock(&port->mpu_spin);
874     while ((msg = _lwkt_pollmsg(port)) == NULL) {
875 	port->mp_flags |= MSGPORTF_WAITING;
876 	error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
877 	/* see note at the top on the MSGPORTF_WAITING flag */
878 	if (error) {
879 	    spin_unlock(&port->mpu_spin);
880 	    return(NULL);
881 	}
882     }
883     _lwkt_pullmsg(port, msg);
884     spin_unlock(&port->mpu_spin);
885     return(msg);
886 }
887 
888 static
889 void
890 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
891 {
892     int dowakeup;
893 
894     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
895 
896     if (msg->ms_flags & MSGF_SYNC) {
897 	/*
898 	 * If a synchronous completion has been requested, just wakeup
899 	 * the message without bothering to queue it to the target port.
900 	 */
901 	spin_lock(&port->mpu_spin);
902 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
903 	dowakeup = 0;
904 	if (msg->ms_flags & MSGF_WAITING) {
905 		atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
906 		dowakeup = 1;
907 	}
908 	spin_unlock(&port->mpu_spin);
909 	if (dowakeup)
910 		wakeup(msg);
911     } else {
912 	/*
913 	 * If an asynchronous completion has been requested the message
914 	 * must be queued to the reply port.
915 	 */
916 	spin_lock(&port->mpu_spin);
917 	_lwkt_enqueue_reply(port, msg);
918 	dowakeup = 0;
919 	if (port->mp_flags & MSGPORTF_WAITING) {
920 	    port->mp_flags &= ~MSGPORTF_WAITING;
921 	    dowakeup = 1;
922 	}
923 	spin_unlock(&port->mpu_spin);
924 	if (dowakeup)
925 	    wakeup(port);
926     }
927 }
928 
929 /*
930  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
931  *
932  * This function could _only_ be used when caller is in the same thread
933  * as the message's target port owner thread.
934  */
935 static void
936 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
937 {
938     KASSERT(port->mpu_td == curthread,
939     	    ("message could only be dropped in the same thread "
940 	     "as the message target port thread\n"));
941     spin_lock(&port->mpu_spin);
942     _lwkt_pullmsg(port, msg);
943     msg->ms_flags |= MSGF_DONE;
944     spin_unlock(&port->mpu_spin);
945 }
946 
947 /************************************************************************
948  *			  SERIALIZER PORT BACKEND			*
949  ************************************************************************
950  *
951  * This backend uses serializer to protect port accessing.  Callers are
952  * assumed to have serializer held.  This kind of port is usually created
953  * by network device driver along with _one_ lwkt thread to pipeline
954  * operations which may temporarily release serializer.
955  *
956  * Implementation is based on SPIN PORT BACKEND.
957  */
958 
959 static
960 void *
961 lwkt_serialize_getport(lwkt_port_t port)
962 {
963     lwkt_msg_t msg;
964 
965     ASSERT_SERIALIZED(port->mpu_serialize);
966 
967     if ((msg = _lwkt_pollmsg(port)) != NULL)
968 	_lwkt_pullmsg(port, msg);
969     return(msg);
970 }
971 
972 static
973 int
974 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
975 {
976     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
977     ASSERT_SERIALIZED(port->mpu_serialize);
978 
979     msg->ms_target_port = port;
980     _lwkt_pushmsg(port, msg);
981     if (port->mp_flags & MSGPORTF_WAITING) {
982 	port->mp_flags &= ~MSGPORTF_WAITING;
983 	wakeup(port);
984     }
985     return (EASYNC);
986 }
987 
988 static
989 int
990 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
991 {
992     lwkt_port_t port;
993     int sentabort;
994     int error;
995 
996     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
997 	    ("can't wait dropable message"));
998 
999     if ((msg->ms_flags & MSGF_DONE) == 0) {
1000 	port = msg->ms_reply_port;
1001 
1002 	ASSERT_SERIALIZED(port->mpu_serialize);
1003 
1004 	sentabort = 0;
1005 	while ((msg->ms_flags & MSGF_DONE) == 0) {
1006 	    void *won;
1007 
1008 	    /*
1009 	     * If message was sent synchronously from the beginning
1010 	     * the wakeup will be on the message structure, else it
1011 	     * will be on the port structure.
1012 	     */
1013 	    if (msg->ms_flags & MSGF_SYNC) {
1014 		won = msg;
1015 	    } else {
1016 		won = port;
1017 		port->mp_flags |= MSGPORTF_WAITING;
1018 	    }
1019 
1020 	    /*
1021 	     * Only messages which support abort can be interrupted.
1022 	     * We must still wait for message completion regardless.
1023 	     */
1024 	    if ((flags & PCATCH) && sentabort == 0) {
1025 		error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1026 		if (error) {
1027 		    sentabort = error;
1028 		    lwkt_serialize_exit(port->mpu_serialize);
1029 		    lwkt_abortmsg(msg);
1030 		    lwkt_serialize_enter(port->mpu_serialize);
1031 		}
1032 	    } else {
1033 		error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1034 	    }
1035 	    /* see note at the top on the MSGPORTF_WAITING flag */
1036 	}
1037 	/*
1038 	 * Turn EINTR into ERESTART if the signal indicates.
1039 	 */
1040 	if (sentabort && msg->ms_error == EINTR)
1041 	    msg->ms_error = sentabort;
1042 	if (msg->ms_flags & MSGF_QUEUED)
1043 	    _lwkt_pullmsg(port, msg);
1044     } else {
1045 	if (msg->ms_flags & MSGF_QUEUED) {
1046 	    port = msg->ms_reply_port;
1047 
1048 	    ASSERT_SERIALIZED(port->mpu_serialize);
1049 	    _lwkt_pullmsg(port, msg);
1050 	}
1051     }
1052     return(msg->ms_error);
1053 }
1054 
1055 static
1056 void *
1057 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1058 {
1059     lwkt_msg_t msg;
1060     int error;
1061 
1062     ASSERT_SERIALIZED(port->mpu_serialize);
1063 
1064     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1065 	port->mp_flags |= MSGPORTF_WAITING;
1066 	error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1067 	/* see note at the top on the MSGPORTF_WAITING flag */
1068 	if (error)
1069 	    return(NULL);
1070     }
1071     _lwkt_pullmsg(port, msg);
1072     return(msg);
1073 }
1074 
1075 static
1076 void
1077 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1078 {
1079     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1080     ASSERT_SERIALIZED(port->mpu_serialize);
1081 
1082     if (msg->ms_flags & MSGF_SYNC) {
1083 	/*
1084 	 * If a synchronous completion has been requested, just wakeup
1085 	 * the message without bothering to queue it to the target port.
1086 	 */
1087 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1088 	wakeup(msg);
1089     } else {
1090 	/*
1091 	 * If an asynchronous completion has been requested the message
1092 	 * must be queued to the reply port.
1093 	 */
1094 	_lwkt_enqueue_reply(port, msg);
1095 	if (port->mp_flags & MSGPORTF_WAITING) {
1096 	    port->mp_flags &= ~MSGPORTF_WAITING;
1097 	    wakeup(port);
1098 	}
1099     }
1100 }
1101 
1102 /************************************************************************
1103  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
1104  ************************************************************************/
1105 
1106 /*
1107  * You can point a port's reply vector at this function if you just want
1108  * the message marked done, without any queueing or signaling.  This is
1109  * often used for structure-embedded messages.
1110  */
1111 static
1112 void
1113 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1114 {
1115     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1116 }
1117 
1118 static
1119 void *
1120 lwkt_panic_getport(lwkt_port_t port)
1121 {
1122     panic("lwkt_getport() illegal on port %p", port);
1123 }
1124 
1125 static
1126 int
1127 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1128 {
1129     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1130 }
1131 
1132 static
1133 int
1134 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1135 {
1136     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1137 }
1138 
1139 static
1140 void *
1141 lwkt_panic_waitport(lwkt_port_t port, int flags)
1142 {
1143     panic("port %p cannot be waited on", port);
1144 }
1145 
1146 static
1147 void
1148 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1149 {
1150     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1151 }
1152 
1153 static
1154 void
1155 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1156 {
1157     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1158 }
1159