xref: /dflybsd-src/sys/kern/sys_pipe.c (revision e436662cfa66ce7f6db187ffb059eea82ab0d1e5)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice immediately at the beginning of the file, without modification,
14  *    this list of conditions, and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  * 3. Absolutely no warranty of function or purpose is made by the author
19  *    John S. Dyson.
20  * 4. Modifications may be freely made to this file if the above conditions
21  *    are met.
22  */
23 
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysmsg.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54 
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64 
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68 
69 #include <machine/cpufunc.h>
70 
71 struct pipegdlock {
72 	struct mtx	mtx;
73 } __cachealign;
74 
75 /*
76  * interfaces to the outside world
77  */
78 static int pipe_read (struct file *fp, struct uio *uio,
79 		struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio,
81 		struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 		struct ucred *cred, struct sysmsg *msg);
88 
89 __read_mostly static struct fileops pipeops = {
90 	.fo_read = pipe_read,
91 	.fo_write = pipe_write,
92 	.fo_ioctl = pipe_ioctl,
93 	.fo_kqfilter = pipe_kqfilter,
94 	.fo_stat = pipe_stat,
95 	.fo_close = pipe_close,
96 	.fo_shutdown = pipe_shutdown,
97 	.fo_seek = badfo_seek
98 };
99 
100 static void	filt_pipedetach(struct knote *kn);
101 static int	filt_piperead(struct knote *kn, long hint);
102 static int	filt_pipewrite(struct knote *kn, long hint);
103 
104 __read_mostly static struct filterops pipe_rfiltops =
105 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
106 __read_mostly static struct filterops pipe_wfiltops =
107 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
108 
109 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
110 
111 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
112 
113 __read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE;
114 __read_mostly static struct pipegdlock *pipe_gdlocks;
115 
116 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
117 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
118         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
119 
120 /*
121  * The pipe buffer size can be changed at any time.  Only new pipe()s
122  * are affected.  Note that due to cpu cache effects, you do not want
123  * to make this value too large.
124  */
125 __read_mostly static int pipe_size = 32768;
126 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
127         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
128 
129 /*
130  * Reader/writer delay loop.  When the reader exhausts the pipe buffer
131  * or the write completely fills the pipe buffer and would otherwise sleep,
132  * it first busy-loops for a few microseconds waiting for data or buffer
133  * space.  This eliminates IPIs for most high-bandwidth writer/reader pipes
134  * and also helps when the user program uses a large data buffer in its
135  * UIOs.
136  *
137  * This defaults to 4uS.
138  */
139 #ifdef _RDTSC_SUPPORTED_
140 __read_mostly static int pipe_delay = 4000;	/* 4uS default */
141 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
142         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
143 #endif
144 
145 /*
146  * Auto-size pipe cache to reduce kmem allocations and frees.
147  */
148 static
149 void
150 pipeinit(void *dummy)
151 {
152 	size_t mbytes = kmem_lim_size();
153 	int n;
154 
155 	if (pipe_maxcache == PIPEQ_MAX_CACHE) {
156 		if (mbytes >= 7 * 1024)
157 			pipe_maxcache *= 2;
158 		if (mbytes >= 15 * 1024)
159 			pipe_maxcache *= 2;
160 	}
161 
162 	/*
163 	 * Detune the pcpu caching a bit on systems with an insane number
164 	 * of cpu threads to reduce memory waste.
165 	 */
166 	if (ncpus > 64) {
167 		pipe_maxcache = pipe_maxcache * 64 / ncpus;
168 		if (pipe_maxcache < PIPEQ_MAX_CACHE)
169 			pipe_maxcache = PIPEQ_MAX_CACHE;
170 	}
171 
172 	pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
173 			     M_PIPE, M_WAITOK | M_ZERO);
174 	for (n = 0; n < ncpus; ++n)
175 		mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
176 }
177 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
178 
179 static void pipeclose (struct pipe *pipe,
180 		struct pipebuf *pbr, struct pipebuf *pbw);
181 static void pipe_free_kmem (struct pipebuf *buf);
182 static int pipe_create (struct pipe **pipep);
183 
184 /*
185  * Test and clear the specified flag, wakeup(pb) if it was set.
186  * This function must also act as a memory barrier.
187  */
188 static __inline void
189 pipesignal(struct pipebuf *pb, uint32_t flags)
190 {
191 	uint32_t oflags;
192 	uint32_t nflags;
193 
194 	for (;;) {
195 		oflags = pb->state;
196 		cpu_ccfence();
197 		nflags = oflags & ~flags;
198 		if (atomic_cmpset_int(&pb->state, oflags, nflags))
199 			break;
200 	}
201 	if (oflags & flags)
202 		wakeup(pb);
203 }
204 
205 /*
206  *
207  */
208 static __inline void
209 pipewakeup(struct pipebuf *pb, int dosigio)
210 {
211 	if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
212 		lwkt_gettoken(&sigio_token);
213 		pgsigio(pb->sigio, SIGIO, 0);
214 		lwkt_reltoken(&sigio_token);
215 	}
216 	KNOTE(&pb->kq.ki_note, 0);
217 }
218 
219 /*
220  * These routines are called before and after a UIO.  The UIO
221  * may block, causing our held tokens to be lost temporarily.
222  *
223  * We use these routines to serialize reads against other reads
224  * and writes against other writes.
225  *
226  * The appropriate token is held on entry so *ipp does not race.
227  */
228 static __inline int
229 pipe_start_uio(int *ipp)
230 {
231 	int error;
232 
233 	while (*ipp) {
234 		*ipp = -1;
235 		error = tsleep(ipp, PCATCH, "pipexx", 0);
236 		if (error)
237 			return (error);
238 	}
239 	*ipp = 1;
240 	return (0);
241 }
242 
243 static __inline void
244 pipe_end_uio(int *ipp)
245 {
246 	if (*ipp < 0) {
247 		*ipp = 0;
248 		wakeup(ipp);
249 	} else {
250 		KKASSERT(*ipp > 0);
251 		*ipp = 0;
252 	}
253 }
254 
255 /*
256  * The pipe system call for the DTYPE_PIPE type of pipes
257  *
258  * pipe_args(int dummy)
259  *
260  * MPSAFE
261  */
262 int
263 sys_pipe(struct sysmsg *sysmsg, const struct pipe_args *uap)
264 {
265 	return kern_pipe(sysmsg->sysmsg_fds, 0);
266 }
267 
268 int
269 sys_pipe2(struct sysmsg *sysmsg, const struct pipe2_args *uap)
270 {
271 	return kern_pipe(sysmsg->sysmsg_fds, uap->flags);
272 }
273 
274 int
275 kern_pipe(long *fds, int flags)
276 {
277 	struct thread *td = curthread;
278 	struct filedesc *fdp = td->td_proc->p_fd;
279 	struct file *rf, *wf;
280 	struct pipe *pipe;
281 	int fd1, fd2, error;
282 
283 	pipe = NULL;
284 	if (pipe_create(&pipe)) {
285 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
286 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
287 		return (ENFILE);
288 	}
289 
290 	error = falloc(td->td_lwp, &rf, &fd1);
291 	if (error) {
292 		pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
293 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
294 		return (error);
295 	}
296 	fds[0] = fd1;
297 
298 	/*
299 	 * Warning: once we've gotten past allocation of the fd for the
300 	 * read-side, we can only drop the read side via fdrop() in order
301 	 * to avoid races against processes which manage to dup() the read
302 	 * side while we are blocked trying to allocate the write side.
303 	 */
304 	rf->f_type = DTYPE_PIPE;
305 	rf->f_flag = FREAD | FWRITE;
306 	rf->f_ops = &pipeops;
307 	rf->f_data = (void *)((intptr_t)pipe | 0);
308 	if (flags & O_NONBLOCK)
309 		rf->f_flag |= O_NONBLOCK;
310 	if (flags & O_CLOEXEC)
311 		fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
312 
313 	error = falloc(td->td_lwp, &wf, &fd2);
314 	if (error) {
315 		fsetfd(fdp, NULL, fd1);
316 		fdrop(rf);
317 		/* pipeA has been closed by fdrop() */
318 		/* close pipeB here */
319 		pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
320 		return (error);
321 	}
322 	wf->f_type = DTYPE_PIPE;
323 	wf->f_flag = FREAD | FWRITE;
324 	wf->f_ops = &pipeops;
325 	wf->f_data = (void *)((intptr_t)pipe | 1);
326 	if (flags & O_NONBLOCK)
327 		wf->f_flag |= O_NONBLOCK;
328 	if (flags & O_CLOEXEC)
329 		fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
330 
331 	fds[1] = fd2;
332 
333 	/*
334 	 * Once activated the peer relationship remains valid until
335 	 * both sides are closed.
336 	 */
337 	fsetfd(fdp, rf, fd1);
338 	fsetfd(fdp, wf, fd2);
339 	fdrop(rf);
340 	fdrop(wf);
341 
342 	return (0);
343 }
344 
345 /*
346  * [re]allocates KVA for the pipe's circular buffer.  The space is
347  * pageable.  Called twice to setup full-duplex communications.
348  *
349  * NOTE: Independent vm_object's are used to improve performance.
350  *
351  * Returns 0 on success, ENOMEM on failure.
352  */
353 static int
354 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
355 {
356 	struct vm_object *object;
357 	caddr_t buffer;
358 	vm_pindex_t npages;
359 	int error;
360 
361 	size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
362 	if (size < 16384)
363 		size = 16384;
364 	if (size > 1024*1024)
365 		size = 1024*1024;
366 
367 	npages = round_page(size) / PAGE_SIZE;
368 	object = pb->object;
369 
370 	/*
371 	 * [re]create the object if necessary and reserve space for it
372 	 * in the kernel_map.  The object and memory are pageable.  On
373 	 * success, free the old resources before assigning the new
374 	 * ones.
375 	 */
376 	if (object == NULL || object->size != npages) {
377 		object = vm_object_allocate(OBJT_DEFAULT, npages);
378 		buffer = (caddr_t)vm_map_min(kernel_map);
379 
380 		error = vm_map_find(kernel_map, object, NULL,
381 				    0, (vm_offset_t *)&buffer, size,
382 				    PAGE_SIZE, TRUE,
383 				    VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
384 				    VM_PROT_ALL, VM_PROT_ALL, 0);
385 
386 		if (error != KERN_SUCCESS) {
387 			vm_object_deallocate(object);
388 			return (ENOMEM);
389 		}
390 		pipe_free_kmem(pb);
391 		pb->object = object;
392 		pb->buffer = buffer;
393 		pb->size = size;
394 	}
395 	pb->rindex = 0;
396 	pb->windex = 0;
397 
398 	return (0);
399 }
400 
401 /*
402  * Initialize and allocate VM and memory for pipe, pulling the pipe from
403  * our per-cpu cache if possible.
404  *
405  * Returns 0 on success, else an error code (typically ENOMEM).  Caller
406  * must still deallocate the pipe on failure.
407  */
408 static int
409 pipe_create(struct pipe **pipep)
410 {
411 	globaldata_t gd = mycpu;
412 	struct pipe *pipe;
413 	int error;
414 
415 	if ((pipe = gd->gd_pipeq) != NULL) {
416 		gd->gd_pipeq = pipe->next;
417 		--gd->gd_pipeqcount;
418 		pipe->next = NULL;
419 	} else {
420 		pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
421 		pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
422 		lwkt_token_init(&pipe->bufferA.rlock, "piper");
423 		lwkt_token_init(&pipe->bufferA.wlock, "pipew");
424 		lwkt_token_init(&pipe->bufferB.rlock, "piper");
425 		lwkt_token_init(&pipe->bufferB.wlock, "pipew");
426 	}
427 	*pipep = pipe;
428 	if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
429 		return (error);
430 	}
431 	if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
432 		return (error);
433 	}
434 	vfs_timestamp(&pipe->ctime);
435 	pipe->bufferA.atime = pipe->ctime;
436 	pipe->bufferA.mtime = pipe->ctime;
437 	pipe->bufferB.atime = pipe->ctime;
438 	pipe->bufferB.mtime = pipe->ctime;
439 	pipe->open_count = 2;
440 
441 	return (0);
442 }
443 
444 /*
445  * Read data from a pipe
446  */
447 static int
448 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
449 {
450 	struct pipebuf *rpb;
451 	struct pipebuf *wpb;
452 	struct pipe *pipe;
453 	size_t nread = 0;
454 	size_t size;	/* total bytes available */
455 	size_t nsize;	/* total bytes to read */
456 	size_t rindex;	/* contiguous bytes available */
457 	int notify_writer;
458 	int bigread;
459 	int bigcount;
460 	int error;
461 	int nbio;
462 
463 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
464 	if ((intptr_t)fp->f_data & 1) {
465 		rpb = &pipe->bufferB;
466 		wpb = &pipe->bufferA;
467 	} else {
468 		rpb = &pipe->bufferA;
469 		wpb = &pipe->bufferB;
470 	}
471 	atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
472 
473 	if (uio->uio_resid == 0)
474 		return(0);
475 
476 	/*
477 	 * Calculate nbio
478 	 */
479 	if (fflags & O_FBLOCKING)
480 		nbio = 0;
481 	else if (fflags & O_FNONBLOCKING)
482 		nbio = 1;
483 	else if (fp->f_flag & O_NONBLOCK)
484 		nbio = 1;
485 	else
486 		nbio = 0;
487 
488 	/*
489 	 * 'quick' NBIO test before things get expensive.
490 	 */
491 	if (nbio && rpb->rindex == rpb->windex &&
492 	    (rpb->state & PIPE_REOF) == 0) {
493 		return EAGAIN;
494 	}
495 
496 	/*
497 	 * Reads are serialized.  Note however that buffer.buffer and
498 	 * buffer.size can change out from under us when the number
499 	 * of bytes in the buffer are zero due to the write-side doing a
500 	 * pipespace().
501 	 */
502 	lwkt_gettoken(&rpb->rlock);
503 	error = pipe_start_uio(&rpb->rip);
504 	if (error) {
505 		lwkt_reltoken(&rpb->rlock);
506 		return (error);
507 	}
508 	notify_writer = 0;
509 
510 	bigread = (uio->uio_resid > 10 * 1024 * 1024);
511 	bigcount = 10;
512 
513 	while (uio->uio_resid) {
514 		/*
515 		 * Don't hog the cpu.
516 		 */
517 		if (bigread && --bigcount == 0) {
518 			lwkt_user_yield();
519 			bigcount = 10;
520 			if (CURSIG(curthread->td_lwp)) {
521 				error = EINTR;
522 				break;
523 			}
524 		}
525 
526 		/*
527 		 * lfence required to avoid read-reordering of buffer
528 		 * contents prior to validation of size.
529 		 */
530 		size = rpb->windex - rpb->rindex;
531 		cpu_lfence();
532 		if (size) {
533 			rindex = rpb->rindex & (rpb->size - 1);
534 			nsize = size;
535 			if (nsize > rpb->size - rindex)
536 				nsize = rpb->size - rindex;
537 			nsize = szmin(nsize, uio->uio_resid);
538 
539 			/*
540 			 * Limit how much we move in one go so we have a
541 			 * chance to kick the writer while data is still
542 			 * available in the pipe.  This avoids getting into
543 			 * a ping-pong with the writer.
544 			 */
545 			if (nsize > (rpb->size >> 1))
546 				nsize = rpb->size >> 1;
547 
548 			error = uiomove(&rpb->buffer[rindex], nsize, uio);
549 			if (error)
550 				break;
551 			rpb->rindex += nsize;
552 			nread += nsize;
553 
554 			/*
555 			 * If the FIFO is still over half full just continue
556 			 * and do not try to notify the writer yet.  If
557 			 * less than half full notify any waiting writer.
558 			 */
559 			if (size - nsize > (rpb->size >> 1)) {
560 				notify_writer = 0;
561 			} else {
562 				notify_writer = 1;
563 				pipesignal(rpb, PIPE_WANTW);
564 			}
565 			continue;
566 		}
567 
568 		/*
569 		 * If the "write-side" was blocked we wake it up.  This code
570 		 * is reached when the buffer is completely emptied.
571 		 */
572 		pipesignal(rpb, PIPE_WANTW);
573 
574 		/*
575 		 * Pick up our copy loop again if the writer sent data to
576 		 * us while we were messing around.
577 		 *
578 		 * On a SMP box poll up to pipe_delay nanoseconds for new
579 		 * data.  Typically a value of 2000 to 4000 is sufficient
580 		 * to eradicate most IPIs/tsleeps/wakeups when a pipe
581 		 * is used for synchronous communications with small packets,
582 		 * and 8000 or so (8uS) will pipeline large buffer xfers
583 		 * between cpus over a pipe.
584 		 *
585 		 * For synchronous communications a hit means doing a
586 		 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
587 		 * where as miss requiring a tsleep/wakeup sequence
588 		 * will take 7uS or more.
589 		 */
590 		if (rpb->windex != rpb->rindex)
591 			continue;
592 
593 #ifdef _RDTSC_SUPPORTED_
594 		if (pipe_delay) {
595 			int64_t tsc_target;
596 			int good = 0;
597 
598 			tsc_target = tsc_get_target(pipe_delay);
599 			while (tsc_test_target(tsc_target) == 0) {
600 				cpu_lfence();
601 				if (rpb->windex != rpb->rindex) {
602 					good = 1;
603 					break;
604 				}
605 				cpu_pause();
606 			}
607 			if (good)
608 				continue;
609 		}
610 #endif
611 
612 		/*
613 		 * Detect EOF condition, do not set error.
614 		 */
615 		if (rpb->state & PIPE_REOF)
616 			break;
617 
618 		/*
619 		 * Break if some data was read, or if this was a non-blocking
620 		 * read.
621 		 */
622 		if (nread > 0)
623 			break;
624 
625 		if (nbio) {
626 			error = EAGAIN;
627 			break;
628 		}
629 
630 		/*
631 		 * Last chance, interlock with WANTR
632 		 */
633 		tsleep_interlock(rpb, PCATCH);
634 		atomic_set_int(&rpb->state, PIPE_WANTR);
635 
636 		/*
637 		 * Retest bytes available after memory barrier above.
638 		 */
639 		size = rpb->windex - rpb->rindex;
640 		if (size)
641 			continue;
642 
643 		/*
644 		 * Retest EOF after memory barrier above.
645 		 */
646 		if (rpb->state & PIPE_REOF)
647 			break;
648 
649 		/*
650 		 * Wait for more data or state change
651 		 */
652 		error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
653 		if (error)
654 			break;
655 	}
656 	pipe_end_uio(&rpb->rip);
657 
658 	/*
659 	 * Uptime last access time
660 	 */
661 	if (error == 0 && nread && rpb->lticks != ticks) {
662 		vfs_timestamp(&rpb->atime);
663 		rpb->lticks = ticks;
664 	}
665 
666 	/*
667 	 * If we drained the FIFO more then half way then handle
668 	 * write blocking hysteresis.
669 	 *
670 	 * Note that PIPE_WANTW cannot be set by the writer without
671 	 * it holding both rlock and wlock, so we can test it
672 	 * while holding just rlock.
673 	 */
674 	if (notify_writer) {
675 		/*
676 		 * Synchronous blocking is done on the pipe involved
677 		 */
678 		pipesignal(rpb, PIPE_WANTW);
679 
680 		/*
681 		 * But we may also have to deal with a kqueue which is
682 		 * stored on the same pipe as its descriptor, so a
683 		 * EVFILT_WRITE event waiting for our side to drain will
684 		 * be on the other side.
685 		 */
686 		pipewakeup(wpb, 0);
687 	}
688 	/*size = rpb->windex - rpb->rindex;*/
689 	lwkt_reltoken(&rpb->rlock);
690 
691 	return (error);
692 }
693 
694 static int
695 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
696 {
697 	struct pipebuf *rpb;
698 	struct pipebuf *wpb;
699 	struct pipe *pipe;
700 	size_t windex;
701 	size_t space;
702 	size_t wcount;
703 	size_t orig_resid;
704 	int bigwrite;
705 	int bigcount;
706 	int error;
707 	int nbio;
708 
709 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
710 	if ((intptr_t)fp->f_data & 1) {
711 		rpb = &pipe->bufferB;
712 		wpb = &pipe->bufferA;
713 	} else {
714 		rpb = &pipe->bufferA;
715 		wpb = &pipe->bufferB;
716 	}
717 
718 	/*
719 	 * Calculate nbio
720 	 */
721 	if (fflags & O_FBLOCKING)
722 		nbio = 0;
723 	else if (fflags & O_FNONBLOCKING)
724 		nbio = 1;
725 	else if (fp->f_flag & O_NONBLOCK)
726 		nbio = 1;
727 	else
728 		nbio = 0;
729 
730 	/*
731 	 * 'quick' NBIO test before things get expensive.
732 	 */
733 	if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
734 	    uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
735 		return EAGAIN;
736 	}
737 
738 	/*
739 	 * Writes go to the peer.  The peer will always exist.
740 	 */
741 	lwkt_gettoken(&wpb->wlock);
742 	if (wpb->state & PIPE_WEOF) {
743 		lwkt_reltoken(&wpb->wlock);
744 		return (EPIPE);
745 	}
746 
747 	/*
748 	 * Degenerate case (EPIPE takes prec)
749 	 */
750 	if (uio->uio_resid == 0) {
751 		lwkt_reltoken(&wpb->wlock);
752 		return(0);
753 	}
754 
755 	/*
756 	 * Writes are serialized (start_uio must be called with wlock)
757 	 */
758 	error = pipe_start_uio(&wpb->wip);
759 	if (error) {
760 		lwkt_reltoken(&wpb->wlock);
761 		return (error);
762 	}
763 
764 	orig_resid = uio->uio_resid;
765 	wcount = 0;
766 
767 	bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
768 	bigcount = 10;
769 
770 	while (uio->uio_resid) {
771 		if (wpb->state & PIPE_WEOF) {
772 			error = EPIPE;
773 			break;
774 		}
775 
776 		/*
777 		 * Don't hog the cpu.
778 		 */
779 		if (bigwrite && --bigcount == 0) {
780 			lwkt_user_yield();
781 			bigcount = 10;
782 			if (CURSIG(curthread->td_lwp)) {
783 				error = EINTR;
784 				break;
785 			}
786 		}
787 
788 		windex = wpb->windex & (wpb->size - 1);
789 		space = wpb->size - (wpb->windex - wpb->rindex);
790 
791 		/*
792 		 * Writes of size <= PIPE_BUF must be atomic.
793 		 */
794 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
795 			space = 0;
796 
797 		/*
798 		 * Write to fill, read size handles write hysteresis.  Also
799 		 * additional restrictions can cause select-based non-blocking
800 		 * writes to spin.
801 		 */
802 		if (space > 0) {
803 			size_t segsize;
804 
805 			/*
806 			 * We want to notify a potentially waiting reader
807 			 * before we exhaust the write buffer for SMP
808 			 * pipelining.  Otherwise the write/read will begin
809 			 * to ping-pong.
810 			 */
811 			space = szmin(space, uio->uio_resid);
812 			if (space > (wpb->size >> 1))
813 				space = (wpb->size >> 1);
814 
815 			/*
816 			 * First segment to transfer is minimum of
817 			 * transfer size and contiguous space in
818 			 * pipe buffer.  If first segment to transfer
819 			 * is less than the transfer size, we've got
820 			 * a wraparound in the buffer.
821 			 */
822 			segsize = wpb->size - windex;
823 			if (segsize > space)
824 				segsize = space;
825 
826 			/*
827 			 * If this is the first loop and the reader is
828 			 * blocked, do a preemptive wakeup of the reader.
829 			 *
830 			 * On SMP the IPI latency plus the wlock interlock
831 			 * on the reader side is the fastest way to get the
832 			 * reader going.  (The scheduler will hard loop on
833 			 * lock tokens).
834 			 */
835 			if (wcount == 0)
836 				pipesignal(wpb, PIPE_WANTR);
837 
838 			/*
839 			 * Transfer segment, which may include a wrap-around.
840 			 * Update windex to account for both all in one go
841 			 * so the reader can read() the data atomically.
842 			 */
843 			error = uiomove(&wpb->buffer[windex], segsize, uio);
844 			if (error == 0 && segsize < space) {
845 				segsize = space - segsize;
846 				error = uiomove(&wpb->buffer[0], segsize, uio);
847 			}
848 			if (error)
849 				break;
850 
851 			/*
852 			 * Memory fence prior to windex updating (note: not
853 			 * needed so this is a NOP on Intel).
854 			 */
855 			cpu_sfence();
856 			wpb->windex += space;
857 
858 			/*
859 			 * Signal reader
860 			 */
861 			if (wcount != 0)
862 				pipesignal(wpb, PIPE_WANTR);
863 			wcount += space;
864 			continue;
865 		}
866 
867 		/*
868 		 * Wakeup any pending reader
869 		 */
870 		pipesignal(wpb, PIPE_WANTR);
871 
872 		/*
873 		 * don't block on non-blocking I/O
874 		 */
875 		if (nbio) {
876 			error = EAGAIN;
877 			break;
878 		}
879 
880 #ifdef _RDTSC_SUPPORTED_
881 		if (pipe_delay) {
882 			int64_t tsc_target;
883 			int good = 0;
884 
885 			tsc_target = tsc_get_target(pipe_delay);
886 			while (tsc_test_target(tsc_target) == 0) {
887 				cpu_lfence();
888 				space = wpb->size - (wpb->windex - wpb->rindex);
889 				if ((space < uio->uio_resid) &&
890 				    (orig_resid <= PIPE_BUF)) {
891 					space = 0;
892 				}
893 				if (space) {
894 					good = 1;
895 					break;
896 				}
897 				cpu_pause();
898 			}
899 			if (good)
900 				continue;
901 		}
902 #endif
903 
904 		/*
905 		 * Interlocked test.   Atomic op enforces the memory barrier.
906 		 */
907 		tsleep_interlock(wpb, PCATCH);
908 		atomic_set_int(&wpb->state, PIPE_WANTW);
909 
910 		/*
911 		 * Retest space available after memory barrier above.
912 		 * Writes of size <= PIPE_BUF must be atomic.
913 		 */
914 		space = wpb->size - (wpb->windex - wpb->rindex);
915 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
916 			space = 0;
917 
918 		/*
919 		 * Retest EOF after memory barrier above.
920 		 */
921 		if (wpb->state & PIPE_WEOF) {
922 			error = EPIPE;
923 			break;
924 		}
925 
926 		/*
927 		 * We have no more space and have something to offer,
928 		 * wake up select/poll/kq.
929 		 */
930 		if (space == 0) {
931 			pipewakeup(wpb, 1);
932 			error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
933 		}
934 
935 		/*
936 		 * Break out if we errored or the read side wants us to go
937 		 * away.
938 		 */
939 		if (error)
940 			break;
941 		if (wpb->state & PIPE_WEOF) {
942 			error = EPIPE;
943 			break;
944 		}
945 	}
946 	pipe_end_uio(&wpb->wip);
947 
948 	/*
949 	 * If we have put any characters in the buffer, we wake up
950 	 * the reader.
951 	 *
952 	 * Both rlock and wlock are required to be able to modify pipe_state.
953 	 */
954 	if (wpb->windex != wpb->rindex) {
955 		pipesignal(wpb, PIPE_WANTR);
956 		pipewakeup(wpb, 1);
957 	}
958 
959 	/*
960 	 * Don't return EPIPE if I/O was successful
961 	 */
962 	if ((wpb->rindex == wpb->windex) &&
963 	    (uio->uio_resid == 0) &&
964 	    (error == EPIPE)) {
965 		error = 0;
966 	}
967 
968 	if (error == 0 && wpb->lticks != ticks) {
969 		vfs_timestamp(&wpb->mtime);
970 		wpb->lticks = ticks;
971 	}
972 
973 	/*
974 	 * We have something to offer,
975 	 * wake up select/poll/kq.
976 	 */
977 	/*space = wpb->windex - wpb->rindex;*/
978 	lwkt_reltoken(&wpb->wlock);
979 
980 	return (error);
981 }
982 
983 /*
984  * we implement a very minimal set of ioctls for compatibility with sockets.
985  */
986 static int
987 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
988 	   struct ucred *cred, struct sysmsg *msg)
989 {
990 	struct pipebuf *rpb;
991 	struct pipe *pipe;
992 	int error;
993 
994 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
995 	if ((intptr_t)fp->f_data & 1) {
996 		rpb = &pipe->bufferB;
997 	} else {
998 		rpb = &pipe->bufferA;
999 	}
1000 
1001 	lwkt_gettoken(&rpb->rlock);
1002 	lwkt_gettoken(&rpb->wlock);
1003 
1004 	switch (cmd) {
1005 	case FIOASYNC:
1006 		if (*(int *)data) {
1007 			atomic_set_int(&rpb->state, PIPE_ASYNC);
1008 		} else {
1009 			atomic_clear_int(&rpb->state, PIPE_ASYNC);
1010 		}
1011 		error = 0;
1012 		break;
1013 	case FIONREAD:
1014 		*(int *)data = (int)(rpb->windex - rpb->rindex);
1015 		error = 0;
1016 		break;
1017 	case FIOSETOWN:
1018 		error = fsetown(*(int *)data, &rpb->sigio);
1019 		break;
1020 	case FIOGETOWN:
1021 		*(int *)data = fgetown(&rpb->sigio);
1022 		error = 0;
1023 		break;
1024 	case TIOCSPGRP:
1025 		/* This is deprecated, FIOSETOWN should be used instead. */
1026 		error = fsetown(-(*(int *)data), &rpb->sigio);
1027 		break;
1028 
1029 	case TIOCGPGRP:
1030 		/* This is deprecated, FIOGETOWN should be used instead. */
1031 		*(int *)data = -fgetown(&rpb->sigio);
1032 		error = 0;
1033 		break;
1034 	default:
1035 		error = ENOTTY;
1036 		break;
1037 	}
1038 	lwkt_reltoken(&rpb->wlock);
1039 	lwkt_reltoken(&rpb->rlock);
1040 
1041 	return (error);
1042 }
1043 
1044 /*
1045  * MPSAFE
1046  */
1047 static int
1048 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1049 {
1050 	struct pipebuf *rpb;
1051 	struct pipe *pipe;
1052 
1053 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1054 	if ((intptr_t)fp->f_data & 1) {
1055 		rpb = &pipe->bufferB;
1056 	} else {
1057 		rpb = &pipe->bufferA;
1058 	}
1059 
1060 	bzero((caddr_t)ub, sizeof(*ub));
1061 	ub->st_mode = S_IFIFO;
1062 	ub->st_blksize = rpb->size;
1063 	ub->st_size = rpb->windex - rpb->rindex;
1064 	ub->st_blocks = howmany(ub->st_size, ub->st_blksize);
1065 	ub->st_atimespec = rpb->atime;
1066 	ub->st_mtimespec = rpb->mtime;
1067 	ub->st_ctimespec = pipe->ctime;
1068 	ub->st_uid = fp->f_cred->cr_uid;
1069 	ub->st_gid = fp->f_cred->cr_gid;
1070 	ub->st_ino = pipe->inum;
1071 	/*
1072 	 * Left as 0: st_dev, st_nlink, st_rdev,
1073 	 * st_flags, st_gen.
1074 	 * XXX (st_dev, st_ino) should be unique.
1075 	 */
1076 
1077 	return (0);
1078 }
1079 
1080 static int
1081 pipe_close(struct file *fp)
1082 {
1083 	struct pipebuf *rpb;
1084 	struct pipebuf *wpb;
1085 	struct pipe *pipe;
1086 
1087 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1088 	if ((intptr_t)fp->f_data & 1) {
1089 		rpb = &pipe->bufferB;
1090 		wpb = &pipe->bufferA;
1091 	} else {
1092 		rpb = &pipe->bufferA;
1093 		wpb = &pipe->bufferB;
1094 	}
1095 
1096 	fp->f_ops = &badfileops;
1097 	fp->f_data = NULL;
1098 	funsetown(&rpb->sigio);
1099 	pipeclose(pipe, rpb, wpb);
1100 
1101 	return (0);
1102 }
1103 
1104 /*
1105  * Shutdown one or both directions of a full-duplex pipe.
1106  */
1107 static int
1108 pipe_shutdown(struct file *fp, int how)
1109 {
1110 	struct pipebuf *rpb;
1111 	struct pipebuf *wpb;
1112 	struct pipe *pipe;
1113 	int error = EPIPE;
1114 
1115 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1116 	if ((intptr_t)fp->f_data & 1) {
1117 		rpb = &pipe->bufferB;
1118 		wpb = &pipe->bufferA;
1119 	} else {
1120 		rpb = &pipe->bufferA;
1121 		wpb = &pipe->bufferB;
1122 	}
1123 
1124 	/*
1125 	 * We modify pipe_state on both pipes, which means we need
1126 	 * all four tokens!
1127 	 */
1128 	lwkt_gettoken(&rpb->rlock);
1129 	lwkt_gettoken(&rpb->wlock);
1130 	lwkt_gettoken(&wpb->rlock);
1131 	lwkt_gettoken(&wpb->wlock);
1132 
1133 	switch(how) {
1134 	case SHUT_RDWR:
1135 	case SHUT_RD:
1136 		/*
1137 		 * EOF on my reads and peer writes
1138 		 */
1139 		atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1140 		if (rpb->state & PIPE_WANTR) {
1141 			rpb->state &= ~PIPE_WANTR;
1142 			wakeup(rpb);
1143 		}
1144 		if (rpb->state & PIPE_WANTW) {
1145 			rpb->state &= ~PIPE_WANTW;
1146 			wakeup(rpb);
1147 		}
1148 		error = 0;
1149 		if (how == SHUT_RD)
1150 			break;
1151 		/* fall through */
1152 	case SHUT_WR:
1153 		/*
1154 		 * EOF on peer reads and my writes
1155 		 */
1156 		atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1157 		if (wpb->state & PIPE_WANTR) {
1158 			wpb->state &= ~PIPE_WANTR;
1159 			wakeup(wpb);
1160 		}
1161 		if (wpb->state & PIPE_WANTW) {
1162 			wpb->state &= ~PIPE_WANTW;
1163 			wakeup(wpb);
1164 		}
1165 		error = 0;
1166 		break;
1167 	}
1168 	pipewakeup(rpb, 1);
1169 	pipewakeup(wpb, 1);
1170 
1171 	lwkt_reltoken(&wpb->wlock);
1172 	lwkt_reltoken(&wpb->rlock);
1173 	lwkt_reltoken(&rpb->wlock);
1174 	lwkt_reltoken(&rpb->rlock);
1175 
1176 	return (error);
1177 }
1178 
1179 /*
1180  * Destroy the pipe buffer.
1181  */
1182 static void
1183 pipe_free_kmem(struct pipebuf *pb)
1184 {
1185 	if (pb->buffer != NULL) {
1186 		kmem_free(kernel_map, (vm_offset_t)pb->buffer, pb->size);
1187 		pb->buffer = NULL;
1188 		pb->object = NULL;
1189 	}
1190 }
1191 
1192 /*
1193  * Close one half of the pipe.  We are closing the pipe for reading on rpb
1194  * and writing on wpb.  This routine must be called twice with the pipebufs
1195  * reversed to close both directions.
1196  */
1197 static void
1198 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1199 {
1200 	globaldata_t gd;
1201 
1202 	if (pipe == NULL)
1203 		return;
1204 
1205 	/*
1206 	 * We need both the read and write tokens to modify pipe_state.
1207 	 */
1208 	lwkt_gettoken(&rpb->rlock);
1209 	lwkt_gettoken(&rpb->wlock);
1210 
1211 	/*
1212 	 * Set our state, wakeup anyone waiting in select/poll/kq, and
1213 	 * wakeup anyone blocked on our pipe.  No action if our side
1214 	 * is already closed.
1215 	 */
1216 	if (rpb->state & PIPE_CLOSED) {
1217 		lwkt_reltoken(&rpb->wlock);
1218 		lwkt_reltoken(&rpb->rlock);
1219 		return;
1220 	}
1221 
1222 	atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1223 	pipewakeup(rpb, 1);
1224 	if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1225 		rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1226 		wakeup(rpb);
1227 	}
1228 	lwkt_reltoken(&rpb->wlock);
1229 	lwkt_reltoken(&rpb->rlock);
1230 
1231 	/*
1232 	 * Disconnect from peer.
1233 	 */
1234 	lwkt_gettoken(&wpb->rlock);
1235 	lwkt_gettoken(&wpb->wlock);
1236 
1237 	atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1238 	pipewakeup(wpb, 1);
1239 	if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1240 		wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1241 		wakeup(wpb);
1242 	}
1243 	if (SLIST_FIRST(&wpb->kq.ki_note))
1244 		KNOTE(&wpb->kq.ki_note, 0);
1245 	lwkt_reltoken(&wpb->wlock);
1246 	lwkt_reltoken(&wpb->rlock);
1247 
1248 	/*
1249 	 * Free resources once both sides are closed.  We maintain a pcpu
1250 	 * cache to improve performance, so the actual tear-down case is
1251 	 * limited to bulk situations.
1252 	 *
1253 	 * However, the bulk tear-down case can cause intense contention
1254 	 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1255 	 * of processes are killed at the same time.  To deal with this we
1256 	 * use a pcpu mutex to maintain concurrency but also limit the
1257 	 * number of threads banging on the map and pmap.
1258 	 *
1259 	 * We use the mtx mechanism instead of the lockmgr mechanism because
1260 	 * the mtx mechanism utilizes a queued design which will not break
1261 	 * down in the face of thousands to hundreds of thousands of
1262 	 * processes trying to free pipes simultaneously.  The lockmgr
1263 	 * mechanism will wind up waking them all up each time a lock
1264 	 * cycles.
1265 	 */
1266 	if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1267 		gd = mycpu;
1268 		if (gd->gd_pipeqcount >= pipe_maxcache) {
1269 			mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1270 			pipe_free_kmem(rpb);
1271 			pipe_free_kmem(wpb);
1272 			mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1273 			kfree(pipe, M_PIPE);
1274 		} else {
1275 			rpb->state = 0;
1276 			wpb->state = 0;
1277 			pipe->next = gd->gd_pipeq;
1278 			gd->gd_pipeq = pipe;
1279 			++gd->gd_pipeqcount;
1280 		}
1281 	}
1282 }
1283 
1284 static int
1285 pipe_kqfilter(struct file *fp, struct knote *kn)
1286 {
1287 	struct pipebuf *rpb;
1288 	struct pipebuf *wpb;
1289 	struct pipe *pipe;
1290 
1291 	pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1292 	if ((intptr_t)fp->f_data & 1) {
1293 		rpb = &pipe->bufferB;
1294 		wpb = &pipe->bufferA;
1295 	} else {
1296 		rpb = &pipe->bufferA;
1297 		wpb = &pipe->bufferB;
1298 	}
1299 
1300 	switch (kn->kn_filter) {
1301 	case EVFILT_READ:
1302 		kn->kn_fop = &pipe_rfiltops;
1303 		break;
1304 	case EVFILT_WRITE:
1305 		kn->kn_fop = &pipe_wfiltops;
1306 		break;
1307 	default:
1308 		return (EOPNOTSUPP);
1309 	}
1310 
1311 	if (rpb == &pipe->bufferA)
1312 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1313 	else
1314 		kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1315 
1316 	knote_insert(&rpb->kq.ki_note, kn);
1317 
1318 	return (0);
1319 }
1320 
1321 static void
1322 filt_pipedetach(struct knote *kn)
1323 {
1324 	struct pipebuf *rpb;
1325 	struct pipebuf *wpb;
1326 	struct pipe *pipe;
1327 
1328 	pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1329 	if ((intptr_t)kn->kn_hook & 1) {
1330 		rpb = &pipe->bufferB;
1331 		wpb = &pipe->bufferA;
1332 	} else {
1333 		rpb = &pipe->bufferA;
1334 		wpb = &pipe->bufferB;
1335 	}
1336 	knote_remove(&rpb->kq.ki_note, kn);
1337 }
1338 
1339 /*ARGSUSED*/
1340 static int
1341 filt_piperead(struct knote *kn, long hint)
1342 {
1343 	struct pipebuf *rpb;
1344 	struct pipebuf *wpb;
1345 	struct pipe *pipe;
1346 	int ready = 0;
1347 
1348 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1349 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1350 		rpb = &pipe->bufferB;
1351 		wpb = &pipe->bufferA;
1352 	} else {
1353 		rpb = &pipe->bufferA;
1354 		wpb = &pipe->bufferB;
1355 	}
1356 
1357 	/*
1358 	 * We shouldn't need the pipe locks because the knote itself is
1359 	 * locked via KN_PROCESSING.  If we lose a race against the writer,
1360 	 * the writer will just issue a KNOTE() after us.
1361 	 */
1362 #if 0
1363 	lwkt_gettoken(&rpb->rlock);
1364 	lwkt_gettoken(&rpb->wlock);
1365 #endif
1366 
1367 	kn->kn_data = rpb->windex - rpb->rindex;
1368 	if (kn->kn_data < 0)
1369 		kn->kn_data = 0;
1370 
1371 	if (rpb->state & PIPE_REOF) {
1372 		/*
1373 		 * Only set NODATA if all data has been exhausted
1374 		 */
1375 		if (kn->kn_data == 0)
1376 			kn->kn_flags |= EV_NODATA;
1377 		kn->kn_flags |= EV_EOF;
1378 
1379 		/*
1380 		 * Only set HUP if the pipe is completely closed.
1381 		 * half-closed does not count (to make the behavior
1382 		 * the same as linux).
1383 		 */
1384 		if (wpb->state & PIPE_CLOSED) {
1385 			kn->kn_flags |= EV_HUP;
1386 			ready = 1;
1387 		}
1388 	}
1389 
1390 #if 0
1391 	lwkt_reltoken(&rpb->wlock);
1392 	lwkt_reltoken(&rpb->rlock);
1393 #endif
1394 
1395 	if (!ready && (kn->kn_sfflags & NOTE_HUPONLY) == 0)
1396 		ready = kn->kn_data > 0;
1397 
1398 	return (ready);
1399 }
1400 
1401 /*ARGSUSED*/
1402 static int
1403 filt_pipewrite(struct knote *kn, long hint)
1404 {
1405 	struct pipebuf *rpb;
1406 	struct pipebuf *wpb;
1407 	struct pipe *pipe;
1408 	int ready = 0;
1409 
1410 	pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1411 	if ((intptr_t)kn->kn_fp->f_data & 1) {
1412 		rpb = &pipe->bufferB;
1413 		wpb = &pipe->bufferA;
1414 	} else {
1415 		rpb = &pipe->bufferA;
1416 		wpb = &pipe->bufferB;
1417 	}
1418 
1419 	kn->kn_data = 0;
1420 	if (wpb->state & PIPE_CLOSED) {
1421 		kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
1422 		return (1);
1423 	}
1424 
1425 	/*
1426 	 * We shouldn't need the pipe locks because the knote itself is
1427 	 * locked via KN_PROCESSING.  If we lose a race against the reader,
1428 	 * the writer will just issue a KNOTE() after us.
1429 	 */
1430 #if 0
1431 	lwkt_gettoken(&wpb->rlock);
1432 	lwkt_gettoken(&wpb->wlock);
1433 #endif
1434 
1435 	if (wpb->state & PIPE_WEOF) {
1436 		kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
1437 		ready = 1;
1438 	}
1439 
1440 	if (!ready) {
1441 		kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1442 		if (kn->kn_data < 0)
1443 			kn->kn_data = 0;
1444 	}
1445 
1446 #if 0
1447 	lwkt_reltoken(&wpb->wlock);
1448 	lwkt_reltoken(&wpb->rlock);
1449 #endif
1450 
1451 	if (!ready)
1452 		ready = kn->kn_data >= PIPE_BUF;
1453 
1454 	return (ready);
1455 }
1456