xref: /dflybsd-src/sys/kern/sys_pipe.c (revision bd4539cc23771f3c0b3fae4ecf80e725b613b305)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.14 2004/02/20 17:11:07 dillon Exp $
21  */
22 
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 
30 /*
31  * This code has two modes of operation, a small write mode and a large
32  * write mode.  The small write mode acts like conventional pipes with
33  * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
34  * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
35  * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
36  * the receiving process can copy it directly from the pages in the sending
37  * process.
38  *
39  * If the sending process receives a signal, it is possible that it will
40  * go away, and certainly its address space can change, because control
41  * is returned back to the user-mode side.  In that case, the pipe code
42  * arranges to copy the buffer supplied by the user process, to a pageable
43  * kernel buffer, and the receiving process will grab the data from the
44  * pageable kernel buffer.  Since signals don't happen all that often,
45  * the copy operation is normally eliminated.
46  *
47  * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
48  * happen for small transfers so that the system will not spend all of
49  * its time context switching.  PIPE_SIZE is constrained by the
50  * amount of kernel virtual memory.
51  */
52 
53 #include <sys/param.h>
54 #include <sys/systm.h>
55 #include <sys/kernel.h>
56 #include <sys/proc.h>
57 #include <sys/fcntl.h>
58 #include <sys/file.h>
59 #include <sys/filedesc.h>
60 #include <sys/filio.h>
61 #include <sys/ttycom.h>
62 #include <sys/stat.h>
63 #include <sys/poll.h>
64 #include <sys/select.h>
65 #include <sys/signalvar.h>
66 #include <sys/sysproto.h>
67 #include <sys/pipe.h>
68 #include <sys/vnode.h>
69 #include <sys/uio.h>
70 #include <sys/event.h>
71 #include <sys/globaldata.h>
72 #include <sys/module.h>
73 #include <sys/malloc.h>
74 #include <sys/sysctl.h>
75 
76 #include <vm/vm.h>
77 #include <vm/vm_param.h>
78 #include <sys/lock.h>
79 #include <vm/vm_object.h>
80 #include <vm/vm_kern.h>
81 #include <vm/vm_extern.h>
82 #include <vm/pmap.h>
83 #include <vm/vm_map.h>
84 #include <vm/vm_page.h>
85 #include <vm/vm_zone.h>
86 
87 #include <sys/file2.h>
88 
89 /*
90  * Use this define if you want to disable *fancy* VM things.  Expect an
91  * approx 30% decrease in transfer rate.  This could be useful for
92  * NetBSD or OpenBSD.
93  */
94 /* #define PIPE_NODIRECT */
95 
96 /*
97  * interfaces to the outside world
98  */
99 static int pipe_read (struct file *fp, struct uio *uio,
100 		struct ucred *cred, int flags, struct thread *td);
101 static int pipe_write (struct file *fp, struct uio *uio,
102 		struct ucred *cred, int flags, struct thread *td);
103 static int pipe_close (struct file *fp, struct thread *td);
104 static int pipe_poll (struct file *fp, int events, struct ucred *cred,
105 		struct thread *td);
106 static int pipe_kqfilter (struct file *fp, struct knote *kn);
107 static int pipe_stat (struct file *fp, struct stat *sb, struct thread *td);
108 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, struct thread *td);
109 
110 static struct fileops pipeops = {
111 	NULL,	/* port */
112 	0,	/* autoq */
113 	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
114 	pipe_stat, pipe_close
115 };
116 
117 static void	filt_pipedetach(struct knote *kn);
118 static int	filt_piperead(struct knote *kn, long hint);
119 static int	filt_pipewrite(struct knote *kn, long hint);
120 
121 static struct filterops pipe_rfiltops =
122 	{ 1, NULL, filt_pipedetach, filt_piperead };
123 static struct filterops pipe_wfiltops =
124 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
125 
126 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
127 
128 /*
129  * Default pipe buffer size(s), this can be kind-of large now because pipe
130  * space is pageable.  The pipe code will try to maintain locality of
131  * reference for performance reasons, so small amounts of outstanding I/O
132  * will not wipe the cache.
133  */
134 #define MINPIPESIZE (PIPE_SIZE/3)
135 #define MAXPIPESIZE (2*PIPE_SIZE/3)
136 
137 /*
138  * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
139  * is there so that on large systems, we don't exhaust it.
140  */
141 #define MAXPIPEKVA (8*1024*1024)
142 
143 /*
144  * Limit for direct transfers, we cannot, of course limit
145  * the amount of kva for pipes in general though.
146  */
147 #define LIMITPIPEKVA (16*1024*1024)
148 
149 /*
150  * Limit the number of "big" pipes
151  */
152 #define LIMITBIGPIPES	32
153 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
154 
155 static int pipe_maxbig = LIMITBIGPIPES;
156 static int pipe_maxcache = PIPEQ_MAX_CACHE;
157 static int pipe_nbig;
158 static int pipe_kva;
159 static int pipe_bcache_alloc;
160 static int pipe_bkmem_alloc;
161 static int pipe_dcache_alloc;
162 static int pipe_dkmem_alloc;
163 
164 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
165 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
166         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
167 SYSCTL_INT(_kern_pipe, OID_AUTO, kva,
168         CTLFLAG_RD, &pipe_kva, 0, "kva reserved by pipes");
169 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
170         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
171 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
172         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
173 #if !defined(NO_PIPE_SYSCTL_STATS)
174 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
175         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
176 SYSCTL_INT(_kern_pipe, OID_AUTO, dcache_alloc,
177         CTLFLAG_RW, &pipe_dcache_alloc, 0, "pipe direct buf from pcpu cache");
178 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
179         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
180 SYSCTL_INT(_kern_pipe, OID_AUTO, dkmem_alloc,
181         CTLFLAG_RW, &pipe_dkmem_alloc, 0, "pipe direct buf from kmem");
182 #endif
183 
184 static void pipeclose (struct pipe *cpipe);
185 static void pipe_free_kmem (struct pipe *cpipe);
186 static int pipe_create (struct pipe **cpipep);
187 static __inline int pipelock (struct pipe *cpipe, int catch);
188 static __inline void pipeunlock (struct pipe *cpipe);
189 static __inline void pipeselwakeup (struct pipe *cpipe);
190 #ifndef PIPE_NODIRECT
191 static int pipe_build_write_buffer (struct pipe *wpipe, struct uio *uio);
192 static void pipe_destroy_write_buffer (struct pipe *wpipe);
193 static int pipe_direct_write (struct pipe *wpipe, struct uio *uio);
194 static void pipe_clone_write_buffer (struct pipe *wpipe);
195 #endif
196 static int pipespace (struct pipe *cpipe, int size);
197 
198 /*
199  * The pipe system call for the DTYPE_PIPE type of pipes
200  *
201  * pipe_ARgs(int dummy)
202  */
203 
204 /* ARGSUSED */
205 int
206 pipe(struct pipe_args *uap)
207 {
208 	struct thread *td = curthread;
209 	struct proc *p = td->td_proc;
210 	struct filedesc *fdp;
211 	struct file *rf, *wf;
212 	struct pipe *rpipe, *wpipe;
213 	int fd1, fd2, error;
214 
215 	KKASSERT(p);
216 	fdp = p->p_fd;
217 
218 	rpipe = wpipe = NULL;
219 	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
220 		pipeclose(rpipe);
221 		pipeclose(wpipe);
222 		return (ENFILE);
223 	}
224 
225 	rpipe->pipe_state |= PIPE_DIRECTOK;
226 	wpipe->pipe_state |= PIPE_DIRECTOK;
227 
228 	error = falloc(p, &rf, &fd1);
229 	if (error) {
230 		pipeclose(rpipe);
231 		pipeclose(wpipe);
232 		return (error);
233 	}
234 	fhold(rf);
235 	uap->sysmsg_fds[0] = fd1;
236 
237 	/*
238 	 * Warning: once we've gotten past allocation of the fd for the
239 	 * read-side, we can only drop the read side via fdrop() in order
240 	 * to avoid races against processes which manage to dup() the read
241 	 * side while we are blocked trying to allocate the write side.
242 	 */
243 	rf->f_flag = FREAD | FWRITE;
244 	rf->f_type = DTYPE_PIPE;
245 	rf->f_data = (caddr_t)rpipe;
246 	rf->f_ops = &pipeops;
247 	error = falloc(p, &wf, &fd2);
248 	if (error) {
249 		if (fdp->fd_ofiles[fd1] == rf) {
250 			fdp->fd_ofiles[fd1] = NULL;
251 			fdrop(rf, td);
252 		}
253 		fdrop(rf, td);
254 		/* rpipe has been closed by fdrop(). */
255 		pipeclose(wpipe);
256 		return (error);
257 	}
258 	wf->f_flag = FREAD | FWRITE;
259 	wf->f_type = DTYPE_PIPE;
260 	wf->f_data = (caddr_t)wpipe;
261 	wf->f_ops = &pipeops;
262 	uap->sysmsg_fds[1] = fd2;
263 
264 	rpipe->pipe_peer = wpipe;
265 	wpipe->pipe_peer = rpipe;
266 	fdrop(rf, td);
267 
268 	return (0);
269 }
270 
271 /*
272  * Allocate kva for pipe circular buffer, the space is pageable
273  * This routine will 'realloc' the size of a pipe safely, if it fails
274  * it will retain the old buffer.
275  * If it fails it will return ENOMEM.
276  */
277 static int
278 pipespace(struct pipe *cpipe, int size)
279 {
280 	struct vm_object *object;
281 	caddr_t buffer;
282 	int npages, error;
283 
284 	npages = round_page(size) / PAGE_SIZE;
285 	object = cpipe->pipe_buffer.object;
286 
287 	/*
288 	 * [re]create the object if necessary and reserve space for it
289 	 * in the kernel_map.  The object and memory are pageable.  On
290 	 * success, free the old resources before assigning the new
291 	 * ones.
292 	 */
293 	if (object == NULL || object->size != npages) {
294 		object = vm_object_allocate(OBJT_DEFAULT, npages);
295 		buffer = (caddr_t) vm_map_min(kernel_map);
296 
297 		error = vm_map_find(kernel_map, object, 0,
298 			(vm_offset_t *) &buffer, size, 1,
299 			VM_PROT_ALL, VM_PROT_ALL, 0);
300 
301 		if (error != KERN_SUCCESS) {
302 			vm_object_deallocate(object);
303 			return (ENOMEM);
304 		}
305 		pipe_kva += size;
306 		pipe_free_kmem(cpipe);
307 		cpipe->pipe_buffer.object = object;
308 		cpipe->pipe_buffer.buffer = buffer;
309 		cpipe->pipe_buffer.size = size;
310 		++pipe_bkmem_alloc;
311 	} else {
312 		++pipe_bcache_alloc;
313 		if (cpipe->pipe_map.kva)
314 			++pipe_dcache_alloc;
315 	}
316 	cpipe->pipe_buffer.in = 0;
317 	cpipe->pipe_buffer.out = 0;
318 	cpipe->pipe_buffer.cnt = 0;
319 	return (0);
320 }
321 
322 /*
323  * Initialize and allocate VM and memory for pipe, pulling the pipe from
324  * our per-cpu cache if possible.  For now make sure it is sized for the
325  * smaller PIPE_SIZE default.
326  */
327 static int
328 pipe_create(cpipep)
329 	struct pipe **cpipep;
330 {
331 	globaldata_t gd = mycpu;
332 	struct pipe *cpipe;
333 	int error;
334 
335 	if ((cpipe = gd->gd_pipeq) != NULL) {
336 		gd->gd_pipeq = cpipe->pipe_peer;
337 		--gd->gd_pipeqcount;
338 		cpipe->pipe_peer = NULL;
339 	} else {
340 		cpipe = malloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
341 	}
342 	*cpipep = cpipe;
343 	if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
344 		return (error);
345 	vfs_timestamp(&cpipe->pipe_ctime);
346 	cpipe->pipe_atime = cpipe->pipe_ctime;
347 	cpipe->pipe_mtime = cpipe->pipe_ctime;
348 	return (0);
349 }
350 
351 
352 /*
353  * lock a pipe for I/O, blocking other access
354  */
355 static __inline int
356 pipelock(cpipe, catch)
357 	struct pipe *cpipe;
358 	int catch;
359 {
360 	int error;
361 
362 	while (cpipe->pipe_state & PIPE_LOCK) {
363 		cpipe->pipe_state |= PIPE_LWANT;
364 		error = tsleep(cpipe, (catch ? PCATCH : 0), "pipelk", 0);
365 		if (error != 0)
366 			return (error);
367 	}
368 	cpipe->pipe_state |= PIPE_LOCK;
369 	return (0);
370 }
371 
372 /*
373  * unlock a pipe I/O lock
374  */
375 static __inline void
376 pipeunlock(cpipe)
377 	struct pipe *cpipe;
378 {
379 
380 	cpipe->pipe_state &= ~PIPE_LOCK;
381 	if (cpipe->pipe_state & PIPE_LWANT) {
382 		cpipe->pipe_state &= ~PIPE_LWANT;
383 		wakeup(cpipe);
384 	}
385 }
386 
387 static __inline void
388 pipeselwakeup(cpipe)
389 	struct pipe *cpipe;
390 {
391 
392 	if (cpipe->pipe_state & PIPE_SEL) {
393 		cpipe->pipe_state &= ~PIPE_SEL;
394 		selwakeup(&cpipe->pipe_sel);
395 	}
396 	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
397 		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
398 	KNOTE(&cpipe->pipe_sel.si_note, 0);
399 }
400 
401 /* ARGSUSED */
402 static int
403 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred,
404 	int flags, struct thread *td)
405 {
406 	struct pipe *rpipe = (struct pipe *) fp->f_data;
407 	int error;
408 	int nread = 0;
409 	u_int size;
410 
411 	++rpipe->pipe_busy;
412 	error = pipelock(rpipe, 1);
413 	if (error)
414 		goto unlocked_error;
415 
416 	while (uio->uio_resid) {
417 		/*
418 		 * normal pipe buffer receive
419 		 */
420 		if (rpipe->pipe_buffer.cnt > 0) {
421 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
422 			if (size > rpipe->pipe_buffer.cnt)
423 				size = rpipe->pipe_buffer.cnt;
424 			if (size > (u_int) uio->uio_resid)
425 				size = (u_int) uio->uio_resid;
426 
427 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
428 					size, uio);
429 			if (error)
430 				break;
431 
432 			rpipe->pipe_buffer.out += size;
433 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
434 				rpipe->pipe_buffer.out = 0;
435 
436 			rpipe->pipe_buffer.cnt -= size;
437 
438 			/*
439 			 * If there is no more to read in the pipe, reset
440 			 * its pointers to the beginning.  This improves
441 			 * cache hit stats.
442 			 */
443 			if (rpipe->pipe_buffer.cnt == 0) {
444 				rpipe->pipe_buffer.in = 0;
445 				rpipe->pipe_buffer.out = 0;
446 			}
447 			nread += size;
448 #ifndef PIPE_NODIRECT
449 		/*
450 		 * Direct copy, bypassing a kernel buffer.
451 		 */
452 		} else if ((size = rpipe->pipe_map.cnt) &&
453 			   (rpipe->pipe_state & PIPE_DIRECTW)) {
454 			caddr_t	va;
455 			if (size > (u_int) uio->uio_resid)
456 				size = (u_int) uio->uio_resid;
457 
458 			va = (caddr_t) rpipe->pipe_map.kva +
459 			    rpipe->pipe_map.pos;
460 			error = uiomove(va, size, uio);
461 			if (error)
462 				break;
463 			nread += size;
464 			rpipe->pipe_map.pos += size;
465 			rpipe->pipe_map.cnt -= size;
466 			if (rpipe->pipe_map.cnt == 0) {
467 				rpipe->pipe_state &= ~PIPE_DIRECTW;
468 				wakeup(rpipe);
469 			}
470 #endif
471 		} else {
472 			/*
473 			 * detect EOF condition
474 			 * read returns 0 on EOF, no need to set error
475 			 */
476 			if (rpipe->pipe_state & PIPE_EOF)
477 				break;
478 
479 			/*
480 			 * If the "write-side" has been blocked, wake it up now.
481 			 */
482 			if (rpipe->pipe_state & PIPE_WANTW) {
483 				rpipe->pipe_state &= ~PIPE_WANTW;
484 				wakeup(rpipe);
485 			}
486 
487 			/*
488 			 * Break if some data was read.
489 			 */
490 			if (nread > 0)
491 				break;
492 
493 			/*
494 			 * Unlock the pipe buffer for our remaining processing.  We
495 			 * will either break out with an error or we will sleep and
496 			 * relock to loop.
497 			 */
498 			pipeunlock(rpipe);
499 
500 			/*
501 			 * Handle non-blocking mode operation or
502 			 * wait for more data.
503 			 */
504 			if (fp->f_flag & FNONBLOCK) {
505 				error = EAGAIN;
506 			} else {
507 				rpipe->pipe_state |= PIPE_WANTR;
508 				if ((error = tsleep(rpipe, PCATCH,
509 				    "piperd", 0)) == 0) {
510 					error = pipelock(rpipe, 1);
511 				}
512 			}
513 			if (error)
514 				goto unlocked_error;
515 		}
516 	}
517 	pipeunlock(rpipe);
518 
519 	if (error == 0)
520 		vfs_timestamp(&rpipe->pipe_atime);
521 unlocked_error:
522 	--rpipe->pipe_busy;
523 
524 	/*
525 	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
526 	 */
527 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
528 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
529 		wakeup(rpipe);
530 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
531 		/*
532 		 * Handle write blocking hysteresis.
533 		 */
534 		if (rpipe->pipe_state & PIPE_WANTW) {
535 			rpipe->pipe_state &= ~PIPE_WANTW;
536 			wakeup(rpipe);
537 		}
538 	}
539 
540 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
541 		pipeselwakeup(rpipe);
542 
543 	return (error);
544 }
545 
546 #ifndef PIPE_NODIRECT
547 /*
548  * Map the sending processes' buffer into kernel space and wire it.
549  * This is similar to a physical write operation.
550  */
551 static int
552 pipe_build_write_buffer(wpipe, uio)
553 	struct pipe *wpipe;
554 	struct uio *uio;
555 {
556 	u_int size;
557 	int i;
558 	vm_offset_t addr, endaddr;
559 	vm_paddr_t paddr;
560 
561 	size = (u_int) uio->uio_iov->iov_len;
562 	if (size > wpipe->pipe_buffer.size)
563 		size = wpipe->pipe_buffer.size;
564 
565 	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
566 	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
567 	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
568 		vm_page_t m;
569 
570 		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
571 		    (paddr = pmap_kextract(addr)) == 0) {
572 			int j;
573 
574 			for (j = 0; j < i; j++)
575 				vm_page_unhold(wpipe->pipe_map.ms[j]);
576 			return (EFAULT);
577 		}
578 
579 		m = PHYS_TO_VM_PAGE(paddr);
580 		vm_page_hold(m);
581 		wpipe->pipe_map.ms[i] = m;
582 	}
583 
584 /*
585  * set up the control block
586  */
587 	wpipe->pipe_map.npages = i;
588 	wpipe->pipe_map.pos =
589 	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
590 	wpipe->pipe_map.cnt = size;
591 
592 /*
593  * and map the buffer
594  */
595 	if (wpipe->pipe_map.kva == 0) {
596 		/*
597 		 * We need to allocate space for an extra page because the
598 		 * address range might (will) span pages at times.
599 		 */
600 		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
601 			wpipe->pipe_buffer.size + PAGE_SIZE);
602 		pipe_kva += wpipe->pipe_buffer.size + PAGE_SIZE;
603 		++pipe_dkmem_alloc;
604 	}
605 	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
606 		wpipe->pipe_map.npages);
607 
608 /*
609  * and update the uio data
610  */
611 
612 	uio->uio_iov->iov_len -= size;
613 	uio->uio_iov->iov_base += size;
614 	if (uio->uio_iov->iov_len == 0)
615 		uio->uio_iov++;
616 	uio->uio_resid -= size;
617 	uio->uio_offset += size;
618 	return (0);
619 }
620 
621 /*
622  * unmap and unwire the process buffer
623  */
624 static void
625 pipe_destroy_write_buffer(wpipe)
626 	struct pipe *wpipe;
627 {
628 	int i;
629 
630 	if (wpipe->pipe_map.kva) {
631 		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
632 
633 		if (pipe_kva > MAXPIPEKVA) {
634 			vm_offset_t kva = wpipe->pipe_map.kva;
635 			wpipe->pipe_map.kva = 0;
636 			kmem_free(kernel_map, kva,
637 				wpipe->pipe_buffer.size + PAGE_SIZE);
638 			pipe_kva -= wpipe->pipe_buffer.size + PAGE_SIZE;
639 		}
640 	}
641 	for (i = 0; i < wpipe->pipe_map.npages; i++)
642 		vm_page_unhold(wpipe->pipe_map.ms[i]);
643 	wpipe->pipe_map.npages = 0;
644 }
645 
646 /*
647  * In the case of a signal, the writing process might go away.  This
648  * code copies the data into the circular buffer so that the source
649  * pages can be freed without loss of data.
650  */
651 static void
652 pipe_clone_write_buffer(wpipe)
653 	struct pipe *wpipe;
654 {
655 	int size;
656 	int pos;
657 
658 	size = wpipe->pipe_map.cnt;
659 	pos = wpipe->pipe_map.pos;
660 	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
661 	    (caddr_t) wpipe->pipe_buffer.buffer, size);
662 
663 	wpipe->pipe_buffer.in = size;
664 	wpipe->pipe_buffer.out = 0;
665 	wpipe->pipe_buffer.cnt = size;
666 	wpipe->pipe_state &= ~PIPE_DIRECTW;
667 
668 	pipe_destroy_write_buffer(wpipe);
669 }
670 
671 /*
672  * This implements the pipe buffer write mechanism.  Note that only
673  * a direct write OR a normal pipe write can be pending at any given time.
674  * If there are any characters in the pipe buffer, the direct write will
675  * be deferred until the receiving process grabs all of the bytes from
676  * the pipe buffer.  Then the direct mapping write is set-up.
677  */
678 static int
679 pipe_direct_write(wpipe, uio)
680 	struct pipe *wpipe;
681 	struct uio *uio;
682 {
683 	int error;
684 
685 retry:
686 	while (wpipe->pipe_state & PIPE_DIRECTW) {
687 		if (wpipe->pipe_state & PIPE_WANTR) {
688 			wpipe->pipe_state &= ~PIPE_WANTR;
689 			wakeup(wpipe);
690 		}
691 		wpipe->pipe_state |= PIPE_WANTW;
692 		error = tsleep(wpipe, PCATCH, "pipdww", 0);
693 		if (error)
694 			goto error1;
695 		if (wpipe->pipe_state & PIPE_EOF) {
696 			error = EPIPE;
697 			goto error1;
698 		}
699 	}
700 	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
701 	if (wpipe->pipe_buffer.cnt > 0) {
702 		if (wpipe->pipe_state & PIPE_WANTR) {
703 			wpipe->pipe_state &= ~PIPE_WANTR;
704 			wakeup(wpipe);
705 		}
706 
707 		wpipe->pipe_state |= PIPE_WANTW;
708 		error = tsleep(wpipe, PCATCH, "pipdwc", 0);
709 		if (error)
710 			goto error1;
711 		if (wpipe->pipe_state & PIPE_EOF) {
712 			error = EPIPE;
713 			goto error1;
714 		}
715 		goto retry;
716 	}
717 
718 	wpipe->pipe_state |= PIPE_DIRECTW;
719 
720 	error = pipe_build_write_buffer(wpipe, uio);
721 	if (error) {
722 		wpipe->pipe_state &= ~PIPE_DIRECTW;
723 		goto error1;
724 	}
725 
726 	error = 0;
727 	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
728 		if (wpipe->pipe_state & PIPE_EOF) {
729 			pipelock(wpipe, 0);
730 			pipe_destroy_write_buffer(wpipe);
731 			pipeunlock(wpipe);
732 			pipeselwakeup(wpipe);
733 			error = EPIPE;
734 			goto error1;
735 		}
736 		if (wpipe->pipe_state & PIPE_WANTR) {
737 			wpipe->pipe_state &= ~PIPE_WANTR;
738 			wakeup(wpipe);
739 		}
740 		pipeselwakeup(wpipe);
741 		error = tsleep(wpipe, PCATCH, "pipdwt", 0);
742 	}
743 
744 	pipelock(wpipe,0);
745 	if (wpipe->pipe_state & PIPE_DIRECTW) {
746 		/*
747 		 * this bit of trickery substitutes a kernel buffer for
748 		 * the process that might be going away.
749 		 */
750 		pipe_clone_write_buffer(wpipe);
751 	} else {
752 		pipe_destroy_write_buffer(wpipe);
753 	}
754 	pipeunlock(wpipe);
755 	return (error);
756 
757 error1:
758 	wakeup(wpipe);
759 	return (error);
760 }
761 #endif
762 
763 static int
764 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred,
765 	int flags, struct thread *td)
766 {
767 	int error = 0;
768 	int orig_resid;
769 	struct pipe *wpipe, *rpipe;
770 
771 	rpipe = (struct pipe *) fp->f_data;
772 	wpipe = rpipe->pipe_peer;
773 
774 	/*
775 	 * detect loss of pipe read side, issue SIGPIPE if lost.
776 	 */
777 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
778 		return (EPIPE);
779 	}
780 	++wpipe->pipe_busy;
781 
782 	/*
783 	 * If it is advantageous to resize the pipe buffer, do
784 	 * so.
785 	 */
786 	if ((uio->uio_resid > PIPE_SIZE) &&
787 		(pipe_nbig < pipe_maxbig) &&
788 		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
789 		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
790 		(wpipe->pipe_buffer.cnt == 0)) {
791 
792 		if ((error = pipelock(wpipe,1)) == 0) {
793 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
794 				pipe_nbig++;
795 			pipeunlock(wpipe);
796 		}
797 	}
798 
799 	/*
800 	 * If an early error occured unbusy and return, waking up any pending
801 	 * readers.
802 	 */
803 	if (error) {
804 		--wpipe->pipe_busy;
805 		if ((wpipe->pipe_busy == 0) &&
806 		    (wpipe->pipe_state & PIPE_WANT)) {
807 			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
808 			wakeup(wpipe);
809 		}
810 		return(error);
811 	}
812 
813 	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
814 
815 	orig_resid = uio->uio_resid;
816 
817 	while (uio->uio_resid) {
818 		int space;
819 
820 #ifndef PIPE_NODIRECT
821 		/*
822 		 * If the transfer is large, we can gain performance if
823 		 * we do process-to-process copies directly.
824 		 * If the write is non-blocking, we don't use the
825 		 * direct write mechanism.
826 		 *
827 		 * The direct write mechanism will detect the reader going
828 		 * away on us.
829 		 */
830 		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
831 		    (fp->f_flag & FNONBLOCK) == 0 &&
832 			(wpipe->pipe_map.kva || (pipe_kva < LIMITPIPEKVA)) &&
833 			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
834 			error = pipe_direct_write( wpipe, uio);
835 			if (error)
836 				break;
837 			continue;
838 		}
839 #endif
840 
841 		/*
842 		 * Pipe buffered writes cannot be coincidental with
843 		 * direct writes.  We wait until the currently executing
844 		 * direct write is completed before we start filling the
845 		 * pipe buffer.  We break out if a signal occurs or the
846 		 * reader goes away.
847 		 */
848 	retrywrite:
849 		while (wpipe->pipe_state & PIPE_DIRECTW) {
850 			if (wpipe->pipe_state & PIPE_WANTR) {
851 				wpipe->pipe_state &= ~PIPE_WANTR;
852 				wakeup(wpipe);
853 			}
854 			error = tsleep(wpipe, PCATCH, "pipbww", 0);
855 			if (wpipe->pipe_state & PIPE_EOF)
856 				break;
857 			if (error)
858 				break;
859 		}
860 		if (wpipe->pipe_state & PIPE_EOF) {
861 			error = EPIPE;
862 			break;
863 		}
864 
865 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
866 
867 		/* Writes of size <= PIPE_BUF must be atomic. */
868 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
869 			space = 0;
870 
871 		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
872 			if ((error = pipelock(wpipe,1)) == 0) {
873 				int size;	/* Transfer size */
874 				int segsize;	/* first segment to transfer */
875 
876 				/*
877 				 * It is possible for a direct write to
878 				 * slip in on us... handle it here...
879 				 */
880 				if (wpipe->pipe_state & PIPE_DIRECTW) {
881 					pipeunlock(wpipe);
882 					goto retrywrite;
883 				}
884 				/*
885 				 * If a process blocked in uiomove, our
886 				 * value for space might be bad.
887 				 *
888 				 * XXX will we be ok if the reader has gone
889 				 * away here?
890 				 */
891 				if (space > wpipe->pipe_buffer.size -
892 				    wpipe->pipe_buffer.cnt) {
893 					pipeunlock(wpipe);
894 					goto retrywrite;
895 				}
896 
897 				/*
898 				 * Transfer size is minimum of uio transfer
899 				 * and free space in pipe buffer.
900 				 */
901 				if (space > uio->uio_resid)
902 					size = uio->uio_resid;
903 				else
904 					size = space;
905 				/*
906 				 * First segment to transfer is minimum of
907 				 * transfer size and contiguous space in
908 				 * pipe buffer.  If first segment to transfer
909 				 * is less than the transfer size, we've got
910 				 * a wraparound in the buffer.
911 				 */
912 				segsize = wpipe->pipe_buffer.size -
913 					wpipe->pipe_buffer.in;
914 				if (segsize > size)
915 					segsize = size;
916 
917 				/* Transfer first segment */
918 
919 				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
920 						segsize, uio);
921 
922 				if (error == 0 && segsize < size) {
923 					/*
924 					 * Transfer remaining part now, to
925 					 * support atomic writes.  Wraparound
926 					 * happened.
927 					 */
928 					if (wpipe->pipe_buffer.in + segsize !=
929 					    wpipe->pipe_buffer.size)
930 						panic("Expected pipe buffer wraparound disappeared");
931 
932 					error = uiomove(&wpipe->pipe_buffer.buffer[0],
933 							size - segsize, uio);
934 				}
935 				if (error == 0) {
936 					wpipe->pipe_buffer.in += size;
937 					if (wpipe->pipe_buffer.in >=
938 					    wpipe->pipe_buffer.size) {
939 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
940 							panic("Expected wraparound bad");
941 						wpipe->pipe_buffer.in = size - segsize;
942 					}
943 
944 					wpipe->pipe_buffer.cnt += size;
945 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
946 						panic("Pipe buffer overflow");
947 
948 				}
949 				pipeunlock(wpipe);
950 			}
951 			if (error)
952 				break;
953 
954 		} else {
955 			/*
956 			 * If the "read-side" has been blocked, wake it up now.
957 			 */
958 			if (wpipe->pipe_state & PIPE_WANTR) {
959 				wpipe->pipe_state &= ~PIPE_WANTR;
960 				wakeup(wpipe);
961 			}
962 
963 			/*
964 			 * don't block on non-blocking I/O
965 			 */
966 			if (fp->f_flag & FNONBLOCK) {
967 				error = EAGAIN;
968 				break;
969 			}
970 
971 			/*
972 			 * We have no more space and have something to offer,
973 			 * wake up select/poll.
974 			 */
975 			pipeselwakeup(wpipe);
976 
977 			wpipe->pipe_state |= PIPE_WANTW;
978 			error = tsleep(wpipe, PCATCH, "pipewr", 0);
979 			if (error != 0)
980 				break;
981 			/*
982 			 * If read side wants to go away, we just issue a signal
983 			 * to ourselves.
984 			 */
985 			if (wpipe->pipe_state & PIPE_EOF) {
986 				error = EPIPE;
987 				break;
988 			}
989 		}
990 	}
991 
992 	--wpipe->pipe_busy;
993 
994 	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
995 		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
996 		wakeup(wpipe);
997 	} else if (wpipe->pipe_buffer.cnt > 0) {
998 		/*
999 		 * If we have put any characters in the buffer, we wake up
1000 		 * the reader.
1001 		 */
1002 		if (wpipe->pipe_state & PIPE_WANTR) {
1003 			wpipe->pipe_state &= ~PIPE_WANTR;
1004 			wakeup(wpipe);
1005 		}
1006 	}
1007 
1008 	/*
1009 	 * Don't return EPIPE if I/O was successful
1010 	 */
1011 	if ((wpipe->pipe_buffer.cnt == 0) &&
1012 	    (uio->uio_resid == 0) &&
1013 	    (error == EPIPE)) {
1014 		error = 0;
1015 	}
1016 
1017 	if (error == 0)
1018 		vfs_timestamp(&wpipe->pipe_mtime);
1019 
1020 	/*
1021 	 * We have something to offer,
1022 	 * wake up select/poll.
1023 	 */
1024 	if (wpipe->pipe_buffer.cnt)
1025 		pipeselwakeup(wpipe);
1026 
1027 	return (error);
1028 }
1029 
1030 /*
1031  * we implement a very minimal set of ioctls for compatibility with sockets.
1032  */
1033 int
1034 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td)
1035 {
1036 	struct pipe *mpipe = (struct pipe *)fp->f_data;
1037 
1038 	switch (cmd) {
1039 
1040 	case FIONBIO:
1041 		return (0);
1042 
1043 	case FIOASYNC:
1044 		if (*(int *)data) {
1045 			mpipe->pipe_state |= PIPE_ASYNC;
1046 		} else {
1047 			mpipe->pipe_state &= ~PIPE_ASYNC;
1048 		}
1049 		return (0);
1050 
1051 	case FIONREAD:
1052 		if (mpipe->pipe_state & PIPE_DIRECTW)
1053 			*(int *)data = mpipe->pipe_map.cnt;
1054 		else
1055 			*(int *)data = mpipe->pipe_buffer.cnt;
1056 		return (0);
1057 
1058 	case FIOSETOWN:
1059 		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1060 
1061 	case FIOGETOWN:
1062 		*(int *)data = fgetown(mpipe->pipe_sigio);
1063 		return (0);
1064 
1065 	/* This is deprecated, FIOSETOWN should be used instead. */
1066 	case TIOCSPGRP:
1067 		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1068 
1069 	/* This is deprecated, FIOGETOWN should be used instead. */
1070 	case TIOCGPGRP:
1071 		*(int *)data = -fgetown(mpipe->pipe_sigio);
1072 		return (0);
1073 
1074 	}
1075 	return (ENOTTY);
1076 }
1077 
1078 int
1079 pipe_poll(struct file *fp, int events, struct ucred *cred, struct thread *td)
1080 {
1081 	struct pipe *rpipe = (struct pipe *)fp->f_data;
1082 	struct pipe *wpipe;
1083 	int revents = 0;
1084 
1085 	wpipe = rpipe->pipe_peer;
1086 	if (events & (POLLIN | POLLRDNORM))
1087 		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1088 		    (rpipe->pipe_buffer.cnt > 0) ||
1089 		    (rpipe->pipe_state & PIPE_EOF))
1090 			revents |= events & (POLLIN | POLLRDNORM);
1091 
1092 	if (events & (POLLOUT | POLLWRNORM))
1093 		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1094 		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1095 		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1096 			revents |= events & (POLLOUT | POLLWRNORM);
1097 
1098 	if ((rpipe->pipe_state & PIPE_EOF) ||
1099 	    (wpipe == NULL) ||
1100 	    (wpipe->pipe_state & PIPE_EOF))
1101 		revents |= POLLHUP;
1102 
1103 	if (revents == 0) {
1104 		if (events & (POLLIN | POLLRDNORM)) {
1105 			selrecord(td, &rpipe->pipe_sel);
1106 			rpipe->pipe_state |= PIPE_SEL;
1107 		}
1108 
1109 		if (events & (POLLOUT | POLLWRNORM)) {
1110 			selrecord(td, &wpipe->pipe_sel);
1111 			wpipe->pipe_state |= PIPE_SEL;
1112 		}
1113 	}
1114 
1115 	return (revents);
1116 }
1117 
1118 static int
1119 pipe_stat(struct file *fp, struct stat *ub, struct thread *td)
1120 {
1121 	struct pipe *pipe = (struct pipe *)fp->f_data;
1122 
1123 	bzero((caddr_t)ub, sizeof(*ub));
1124 	ub->st_mode = S_IFIFO;
1125 	ub->st_blksize = pipe->pipe_buffer.size;
1126 	ub->st_size = pipe->pipe_buffer.cnt;
1127 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1128 	ub->st_atimespec = pipe->pipe_atime;
1129 	ub->st_mtimespec = pipe->pipe_mtime;
1130 	ub->st_ctimespec = pipe->pipe_ctime;
1131 	/*
1132 	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1133 	 * st_flags, st_gen.
1134 	 * XXX (st_dev, st_ino) should be unique.
1135 	 */
1136 	return (0);
1137 }
1138 
1139 /* ARGSUSED */
1140 static int
1141 pipe_close(struct file *fp, struct thread *td)
1142 {
1143 	struct pipe *cpipe = (struct pipe *)fp->f_data;
1144 
1145 	fp->f_ops = &badfileops;
1146 	fp->f_data = NULL;
1147 	funsetown(cpipe->pipe_sigio);
1148 	pipeclose(cpipe);
1149 	return (0);
1150 }
1151 
1152 static void
1153 pipe_free_kmem(struct pipe *cpipe)
1154 {
1155 	if (cpipe->pipe_buffer.buffer != NULL) {
1156 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1157 			--pipe_nbig;
1158 		pipe_kva -= cpipe->pipe_buffer.size;
1159 		kmem_free(kernel_map,
1160 			(vm_offset_t)cpipe->pipe_buffer.buffer,
1161 			cpipe->pipe_buffer.size);
1162 		cpipe->pipe_buffer.buffer = NULL;
1163 		cpipe->pipe_buffer.object = NULL;
1164 	}
1165 #ifndef PIPE_NODIRECT
1166 	if (cpipe->pipe_map.kva != NULL) {
1167 		pipe_kva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1168 		kmem_free(kernel_map,
1169 			cpipe->pipe_map.kva,
1170 			cpipe->pipe_buffer.size + PAGE_SIZE);
1171 		cpipe->pipe_map.cnt = 0;
1172 		cpipe->pipe_map.kva = 0;
1173 		cpipe->pipe_map.pos = 0;
1174 		cpipe->pipe_map.npages = 0;
1175 	}
1176 #endif
1177 }
1178 
1179 /*
1180  * shutdown the pipe
1181  */
1182 static void
1183 pipeclose(struct pipe *cpipe)
1184 {
1185 	globaldata_t gd;
1186 	struct pipe *ppipe;
1187 
1188 	if (cpipe == NULL)
1189 		return;
1190 
1191 	pipeselwakeup(cpipe);
1192 
1193 	/*
1194 	 * If the other side is blocked, wake it up saying that
1195 	 * we want to close it down.
1196 	 */
1197 	while (cpipe->pipe_busy) {
1198 		wakeup(cpipe);
1199 		cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1200 		tsleep(cpipe, 0, "pipecl", 0);
1201 	}
1202 
1203 	/*
1204 	 * Disconnect from peer
1205 	 */
1206 	if ((ppipe = cpipe->pipe_peer) != NULL) {
1207 		pipeselwakeup(ppipe);
1208 
1209 		ppipe->pipe_state |= PIPE_EOF;
1210 		wakeup(ppipe);
1211 		KNOTE(&ppipe->pipe_sel.si_note, 0);
1212 		ppipe->pipe_peer = NULL;
1213 	}
1214 
1215 	/*
1216 	 * free or cache resources
1217 	 */
1218 	gd = mycpu;
1219 	if (gd->gd_pipeqcount >= pipe_maxcache ||
1220 	    cpipe->pipe_buffer.size != PIPE_SIZE
1221 	) {
1222 		pipe_free_kmem(cpipe);
1223 		free(cpipe, M_PIPE);
1224 	} else {
1225 		KKASSERT(cpipe->pipe_map.npages == 0);
1226 
1227 		cpipe->pipe_state = 0;
1228 		cpipe->pipe_busy = 0;
1229 		cpipe->pipe_map.cnt = 0;
1230 		cpipe->pipe_map.pos = 0;
1231 		cpipe->pipe_peer = gd->gd_pipeq;
1232 		gd->gd_pipeq = cpipe;
1233 		++gd->gd_pipeqcount;
1234 	}
1235 }
1236 
1237 /*ARGSUSED*/
1238 static int
1239 pipe_kqfilter(struct file *fp, struct knote *kn)
1240 {
1241 	struct pipe *cpipe = (struct pipe *)kn->kn_fp->f_data;
1242 
1243 	switch (kn->kn_filter) {
1244 	case EVFILT_READ:
1245 		kn->kn_fop = &pipe_rfiltops;
1246 		break;
1247 	case EVFILT_WRITE:
1248 		kn->kn_fop = &pipe_wfiltops;
1249 		cpipe = cpipe->pipe_peer;
1250 		if (cpipe == NULL)
1251 			/* other end of pipe has been closed */
1252 			return (EPIPE);
1253 		break;
1254 	default:
1255 		return (1);
1256 	}
1257 	kn->kn_hook = (caddr_t)cpipe;
1258 
1259 	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1260 	return (0);
1261 }
1262 
1263 static void
1264 filt_pipedetach(struct knote *kn)
1265 {
1266 	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1267 
1268 	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1269 }
1270 
1271 /*ARGSUSED*/
1272 static int
1273 filt_piperead(struct knote *kn, long hint)
1274 {
1275 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1276 	struct pipe *wpipe = rpipe->pipe_peer;
1277 
1278 	kn->kn_data = rpipe->pipe_buffer.cnt;
1279 	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1280 		kn->kn_data = rpipe->pipe_map.cnt;
1281 
1282 	if ((rpipe->pipe_state & PIPE_EOF) ||
1283 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1284 		kn->kn_flags |= EV_EOF;
1285 		return (1);
1286 	}
1287 	return (kn->kn_data > 0);
1288 }
1289 
1290 /*ARGSUSED*/
1291 static int
1292 filt_pipewrite(struct knote *kn, long hint)
1293 {
1294 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1295 	struct pipe *wpipe = rpipe->pipe_peer;
1296 
1297 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1298 		kn->kn_data = 0;
1299 		kn->kn_flags |= EV_EOF;
1300 		return (1);
1301 	}
1302 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1303 	if (wpipe->pipe_state & PIPE_DIRECTW)
1304 		kn->kn_data = 0;
1305 
1306 	return (kn->kn_data >= PIPE_BUF);
1307 }
1308