xref: /openbsd-src/sys/kern/sys_pipe.c (revision cb39b41371628601fbe4c618205356d538b9d08a)
1 /*	$OpenBSD: sys_pipe.c,v 1.69 2015/02/10 21:56:10 miod Exp $	*/
2 
3 /*
4  * Copyright (c) 1996 John S. Dyson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice immediately at the beginning of the file, without modification,
12  *    this list of conditions, and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Absolutely no warranty of function or purpose is made by the author
17  *    John S. Dyson.
18  * 4. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/file.h>
33 #include <sys/filedesc.h>
34 #include <sys/pool.h>
35 #include <sys/ioctl.h>
36 #include <sys/stat.h>
37 #include <sys/signalvar.h>
38 #include <sys/mount.h>
39 #include <sys/syscallargs.h>
40 #include <sys/event.h>
41 #include <sys/lock.h>
42 #include <sys/poll.h>
43 
44 #include <uvm/uvm_extern.h>
45 
46 #include <sys/pipe.h>
47 
48 /*
49  * interfaces to the outside world
50  */
51 int	pipe_read(struct file *, off_t *, struct uio *, struct ucred *);
52 int	pipe_write(struct file *, off_t *, struct uio *, struct ucred *);
53 int	pipe_close(struct file *, struct proc *);
54 int	pipe_poll(struct file *, int events, struct proc *);
55 int	pipe_kqfilter(struct file *fp, struct knote *kn);
56 int	pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
57 int	pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
58 
59 static struct fileops pipeops = {
60 	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
61 	pipe_stat, pipe_close
62 };
63 
64 void	filt_pipedetach(struct knote *kn);
65 int	filt_piperead(struct knote *kn, long hint);
66 int	filt_pipewrite(struct knote *kn, long hint);
67 
68 struct filterops pipe_rfiltops =
69 	{ 1, NULL, filt_pipedetach, filt_piperead };
70 struct filterops pipe_wfiltops =
71 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
72 
73 /*
74  * Default pipe buffer size(s), this can be kind-of large now because pipe
75  * space is pageable.  The pipe code will try to maintain locality of
76  * reference for performance reasons, so small amounts of outstanding I/O
77  * will not wipe the cache.
78  */
79 #define MINPIPESIZE (PIPE_SIZE/3)
80 
81 /*
82  * Limit the number of "big" pipes
83  */
84 #define LIMITBIGPIPES	32
85 int nbigpipe;
86 static int amountpipekva;
87 
88 struct pool pipe_pool;
89 
90 int	dopipe(struct proc *, int *, int);
91 void	pipeclose(struct pipe *);
92 void	pipe_free_kmem(struct pipe *);
93 int	pipe_create(struct pipe *);
94 int	pipelock(struct pipe *);
95 void	pipeunlock(struct pipe *);
96 void	pipeselwakeup(struct pipe *);
97 int	pipespace(struct pipe *, u_int);
98 
99 /*
100  * The pipe system call for the DTYPE_PIPE type of pipes
101  */
102 
103 int
104 sys_pipe(struct proc *p, void *v, register_t *retval)
105 {
106 	struct sys_pipe_args /* {
107 		syscallarg(int *) fdp;
108 	} */ *uap = v;
109 
110 	return (dopipe(p, SCARG(uap, fdp), 0));
111 }
112 
113 int
114 sys_pipe2(struct proc *p, void *v, register_t *retval)
115 {
116 	struct sys_pipe2_args /* {
117 		syscallarg(int *) fdp;
118 		syscallarg(int) flags;
119 	} */ *uap = v;
120 
121 	if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK))
122 		return (EINVAL);
123 
124 	return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags)));
125 }
126 
127 int
128 dopipe(struct proc *p, int *ufds, int flags)
129 {
130 	struct filedesc *fdp = p->p_fd;
131 	struct file *rf, *wf;
132 	struct pipe *rpipe, *wpipe = NULL;
133 	int fds[2], error;
134 
135 	rpipe = pool_get(&pipe_pool, PR_WAITOK);
136 	error = pipe_create(rpipe);
137 	if (error != 0)
138 		goto free1;
139 	wpipe = pool_get(&pipe_pool, PR_WAITOK);
140 	error = pipe_create(wpipe);
141 	if (error != 0)
142 		goto free1;
143 
144 	fdplock(fdp);
145 
146 	error = falloc(p, &rf, &fds[0]);
147 	if (error != 0)
148 		goto free2;
149 	rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
150 	rf->f_type = DTYPE_PIPE;
151 	rf->f_data = rpipe;
152 	rf->f_ops = &pipeops;
153 
154 	error = falloc(p, &wf, &fds[1]);
155 	if (error != 0)
156 		goto free3;
157 	wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
158 	wf->f_type = DTYPE_PIPE;
159 	wf->f_data = wpipe;
160 	wf->f_ops = &pipeops;
161 
162 	if (flags & O_CLOEXEC) {
163 		fdp->fd_ofileflags[fds[0]] |= UF_EXCLOSE;
164 		fdp->fd_ofileflags[fds[1]] |= UF_EXCLOSE;
165 	}
166 
167 	rpipe->pipe_peer = wpipe;
168 	wpipe->pipe_peer = rpipe;
169 
170 	FILE_SET_MATURE(rf, p);
171 	FILE_SET_MATURE(wf, p);
172 
173 	error = copyout(fds, ufds, sizeof(fds));
174 	if (error != 0) {
175 		fdrelease(p, fds[0]);
176 		fdrelease(p, fds[1]);
177 	}
178 	fdpunlock(fdp);
179 	return (error);
180 
181 free3:
182 	fdremove(fdp, fds[0]);
183 	closef(rf, p);
184 	rpipe = NULL;
185 free2:
186 	fdpunlock(fdp);
187 free1:
188 	pipeclose(wpipe);
189 	pipeclose(rpipe);
190 	return (error);
191 }
192 
193 /*
194  * Allocate kva for pipe circular buffer, the space is pageable.
195  * This routine will 'realloc' the size of a pipe safely, if it fails
196  * it will retain the old buffer.
197  * If it fails it will return ENOMEM.
198  */
199 int
200 pipespace(struct pipe *cpipe, u_int size)
201 {
202 	caddr_t buffer;
203 
204 	buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok);
205 	if (buffer == NULL) {
206 		return (ENOMEM);
207 	}
208 
209 	/* free old resources if we are resizing */
210 	pipe_free_kmem(cpipe);
211 	cpipe->pipe_buffer.buffer = buffer;
212 	cpipe->pipe_buffer.size = size;
213 	cpipe->pipe_buffer.in = 0;
214 	cpipe->pipe_buffer.out = 0;
215 	cpipe->pipe_buffer.cnt = 0;
216 
217 	amountpipekva += cpipe->pipe_buffer.size;
218 
219 	return (0);
220 }
221 
222 /*
223  * initialize and allocate VM and memory for pipe
224  */
225 int
226 pipe_create(struct pipe *cpipe)
227 {
228 	int error;
229 
230 	/* so pipe_free_kmem() doesn't follow junk pointer */
231 	cpipe->pipe_buffer.buffer = NULL;
232 	/*
233 	 * protect so pipeclose() doesn't follow a junk pointer
234 	 * if pipespace() fails.
235 	 */
236 	memset(&cpipe->pipe_sel, 0, sizeof(cpipe->pipe_sel));
237 	cpipe->pipe_state = 0;
238 	cpipe->pipe_peer = NULL;
239 	cpipe->pipe_busy = 0;
240 
241 	error = pipespace(cpipe, PIPE_SIZE);
242 	if (error != 0)
243 		return (error);
244 
245 	getnanotime(&cpipe->pipe_ctime);
246 	cpipe->pipe_atime = cpipe->pipe_ctime;
247 	cpipe->pipe_mtime = cpipe->pipe_ctime;
248 	cpipe->pipe_pgid = NO_PID;
249 
250 	return (0);
251 }
252 
253 
254 /*
255  * lock a pipe for I/O, blocking other access
256  */
257 int
258 pipelock(struct pipe *cpipe)
259 {
260 	int error;
261 	while (cpipe->pipe_state & PIPE_LOCK) {
262 		cpipe->pipe_state |= PIPE_LWANT;
263 		if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0)))
264 			return error;
265 	}
266 	cpipe->pipe_state |= PIPE_LOCK;
267 	return 0;
268 }
269 
270 /*
271  * unlock a pipe I/O lock
272  */
273 void
274 pipeunlock(struct pipe *cpipe)
275 {
276 	cpipe->pipe_state &= ~PIPE_LOCK;
277 	if (cpipe->pipe_state & PIPE_LWANT) {
278 		cpipe->pipe_state &= ~PIPE_LWANT;
279 		wakeup(cpipe);
280 	}
281 }
282 
283 void
284 pipeselwakeup(struct pipe *cpipe)
285 {
286 	if (cpipe->pipe_state & PIPE_SEL) {
287 		cpipe->pipe_state &= ~PIPE_SEL;
288 		selwakeup(&cpipe->pipe_sel);
289 	} else
290 		KNOTE(&cpipe->pipe_sel.si_note, 0);
291 	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID)
292 		gsignal(cpipe->pipe_pgid, SIGIO);
293 }
294 
295 /* ARGSUSED */
296 int
297 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
298 {
299 	struct pipe *rpipe = (struct pipe *) fp->f_data;
300 	int error;
301 	int nread = 0;
302 	int size;
303 
304 	error = pipelock(rpipe);
305 	if (error)
306 		return (error);
307 
308 	++rpipe->pipe_busy;
309 
310 	while (uio->uio_resid) {
311 		/*
312 		 * normal pipe buffer receive
313 		 */
314 		if (rpipe->pipe_buffer.cnt > 0) {
315 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
316 			if (size > rpipe->pipe_buffer.cnt)
317 				size = rpipe->pipe_buffer.cnt;
318 			if (size > uio->uio_resid)
319 				size = uio->uio_resid;
320 			error = uiomovei(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
321 					size, uio);
322 			if (error) {
323 				break;
324 			}
325 			rpipe->pipe_buffer.out += size;
326 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
327 				rpipe->pipe_buffer.out = 0;
328 
329 			rpipe->pipe_buffer.cnt -= size;
330 			/*
331 			 * If there is no more to read in the pipe, reset
332 			 * its pointers to the beginning.  This improves
333 			 * cache hit stats.
334 			 */
335 			if (rpipe->pipe_buffer.cnt == 0) {
336 				rpipe->pipe_buffer.in = 0;
337 				rpipe->pipe_buffer.out = 0;
338 			}
339 			nread += size;
340 		} else {
341 			/*
342 			 * detect EOF condition
343 			 * read returns 0 on EOF, no need to set error
344 			 */
345 			if (rpipe->pipe_state & PIPE_EOF)
346 				break;
347 
348 			/*
349 			 * If the "write-side" has been blocked, wake it up now.
350 			 */
351 			if (rpipe->pipe_state & PIPE_WANTW) {
352 				rpipe->pipe_state &= ~PIPE_WANTW;
353 				wakeup(rpipe);
354 			}
355 
356 			/*
357 			 * Break if some data was read.
358 			 */
359 			if (nread > 0)
360 				break;
361 
362 			/*
363 			 * Unlock the pipe buffer for our remaining processing.
364 			 * We will either break out with an error or we will
365 			 * sleep and relock to loop.
366 			 */
367 			pipeunlock(rpipe);
368 
369 			/*
370 			 * Handle non-blocking mode operation or
371 			 * wait for more data.
372 			 */
373 			if (fp->f_flag & FNONBLOCK) {
374 				error = EAGAIN;
375 			} else {
376 				rpipe->pipe_state |= PIPE_WANTR;
377 				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
378 					error = pipelock(rpipe);
379 			}
380 			if (error)
381 				goto unlocked_error;
382 		}
383 	}
384 	pipeunlock(rpipe);
385 
386 	if (error == 0)
387 		getnanotime(&rpipe->pipe_atime);
388 unlocked_error:
389 	--rpipe->pipe_busy;
390 
391 	/*
392 	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
393 	 */
394 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
395 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
396 		wakeup(rpipe);
397 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
398 		/*
399 		 * Handle write blocking hysteresis.
400 		 */
401 		if (rpipe->pipe_state & PIPE_WANTW) {
402 			rpipe->pipe_state &= ~PIPE_WANTW;
403 			wakeup(rpipe);
404 		}
405 	}
406 
407 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
408 		pipeselwakeup(rpipe);
409 
410 	return (error);
411 }
412 
413 int
414 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
415 {
416 	int error = 0;
417 	int orig_resid;
418 
419 	struct pipe *wpipe, *rpipe;
420 
421 	rpipe = (struct pipe *) fp->f_data;
422 	wpipe = rpipe->pipe_peer;
423 
424 	/*
425 	 * detect loss of pipe read side, issue SIGPIPE if lost.
426 	 */
427 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
428 		return (EPIPE);
429 	}
430 	++wpipe->pipe_busy;
431 
432 	/*
433 	 * If it is advantageous to resize the pipe buffer, do
434 	 * so.
435 	 */
436 	if ((uio->uio_resid > PIPE_SIZE) &&
437 	    (nbigpipe < LIMITBIGPIPES) &&
438 	    (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
439 	    (wpipe->pipe_buffer.cnt == 0)) {
440 
441 		if ((error = pipelock(wpipe)) == 0) {
442 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
443 				nbigpipe++;
444 			pipeunlock(wpipe);
445 		}
446 	}
447 
448 	/*
449 	 * If an early error occurred unbusy and return, waking up any pending
450 	 * readers.
451 	 */
452 	if (error) {
453 		--wpipe->pipe_busy;
454 		if ((wpipe->pipe_busy == 0) &&
455 		    (wpipe->pipe_state & PIPE_WANT)) {
456 			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
457 			wakeup(wpipe);
458 		}
459 		return (error);
460 	}
461 
462 	orig_resid = uio->uio_resid;
463 
464 	while (uio->uio_resid) {
465 		int space;
466 
467 retrywrite:
468 		if (wpipe->pipe_state & PIPE_EOF) {
469 			error = EPIPE;
470 			break;
471 		}
472 
473 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
474 
475 		/* Writes of size <= PIPE_BUF must be atomic. */
476 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
477 			space = 0;
478 
479 		if (space > 0) {
480 			if ((error = pipelock(wpipe)) == 0) {
481 				int size;	/* Transfer size */
482 				int segsize;	/* first segment to transfer */
483 
484 				/*
485 				 * If a process blocked in uiomove, our
486 				 * value for space might be bad.
487 				 *
488 				 * XXX will we be ok if the reader has gone
489 				 * away here?
490 				 */
491 				if (space > wpipe->pipe_buffer.size -
492 				    wpipe->pipe_buffer.cnt) {
493 					pipeunlock(wpipe);
494 					goto retrywrite;
495 				}
496 
497 				/*
498 				 * Transfer size is minimum of uio transfer
499 				 * and free space in pipe buffer.
500 				 */
501 				if (space > uio->uio_resid)
502 					size = uio->uio_resid;
503 				else
504 					size = space;
505 				/*
506 				 * First segment to transfer is minimum of
507 				 * transfer size and contiguous space in
508 				 * pipe buffer.  If first segment to transfer
509 				 * is less than the transfer size, we've got
510 				 * a wraparound in the buffer.
511 				 */
512 				segsize = wpipe->pipe_buffer.size -
513 					wpipe->pipe_buffer.in;
514 				if (segsize > size)
515 					segsize = size;
516 
517 				/* Transfer first segment */
518 
519 				error = uiomovei(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
520 						segsize, uio);
521 
522 				if (error == 0 && segsize < size) {
523 					/*
524 					 * Transfer remaining part now, to
525 					 * support atomic writes.  Wraparound
526 					 * happened.
527 					 */
528 #ifdef DIAGNOSTIC
529 					if (wpipe->pipe_buffer.in + segsize !=
530 					    wpipe->pipe_buffer.size)
531 						panic("Expected pipe buffer wraparound disappeared");
532 #endif
533 
534 					error = uiomovei(&wpipe->pipe_buffer.buffer[0],
535 							size - segsize, uio);
536 				}
537 				if (error == 0) {
538 					wpipe->pipe_buffer.in += size;
539 					if (wpipe->pipe_buffer.in >=
540 					    wpipe->pipe_buffer.size) {
541 #ifdef DIAGNOSTIC
542 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
543 							panic("Expected wraparound bad");
544 #endif
545 						wpipe->pipe_buffer.in = size - segsize;
546 					}
547 
548 					wpipe->pipe_buffer.cnt += size;
549 #ifdef DIAGNOSTIC
550 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
551 						panic("Pipe buffer overflow");
552 #endif
553 				}
554 				pipeunlock(wpipe);
555 			}
556 			if (error)
557 				break;
558 		} else {
559 			/*
560 			 * If the "read-side" has been blocked, wake it up now.
561 			 */
562 			if (wpipe->pipe_state & PIPE_WANTR) {
563 				wpipe->pipe_state &= ~PIPE_WANTR;
564 				wakeup(wpipe);
565 			}
566 
567 			/*
568 			 * don't block on non-blocking I/O
569 			 */
570 			if (fp->f_flag & FNONBLOCK) {
571 				error = EAGAIN;
572 				break;
573 			}
574 
575 			/*
576 			 * We have no more space and have something to offer,
577 			 * wake up select/poll.
578 			 */
579 			pipeselwakeup(wpipe);
580 
581 			wpipe->pipe_state |= PIPE_WANTW;
582 			error = tsleep(wpipe, (PRIBIO + 1)|PCATCH,
583 			    "pipewr", 0);
584 			if (error)
585 				break;
586 			/*
587 			 * If read side wants to go away, we just issue a
588 			 * signal to ourselves.
589 			 */
590 			if (wpipe->pipe_state & PIPE_EOF) {
591 				error = EPIPE;
592 				break;
593 			}
594 		}
595 	}
596 
597 	--wpipe->pipe_busy;
598 
599 	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
600 		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
601 		wakeup(wpipe);
602 	} else if (wpipe->pipe_buffer.cnt > 0) {
603 		/*
604 		 * If we have put any characters in the buffer, we wake up
605 		 * the reader.
606 		 */
607 		if (wpipe->pipe_state & PIPE_WANTR) {
608 			wpipe->pipe_state &= ~PIPE_WANTR;
609 			wakeup(wpipe);
610 		}
611 	}
612 
613 	/*
614 	 * Don't return EPIPE if I/O was successful
615 	 */
616 	if ((wpipe->pipe_buffer.cnt == 0) &&
617 	    (uio->uio_resid == 0) &&
618 	    (error == EPIPE)) {
619 		error = 0;
620 	}
621 
622 	if (error == 0)
623 		getnanotime(&wpipe->pipe_mtime);
624 	/*
625 	 * We have something to offer, wake up select/poll.
626 	 */
627 	if (wpipe->pipe_buffer.cnt)
628 		pipeselwakeup(wpipe);
629 
630 	return (error);
631 }
632 
633 /*
634  * we implement a very minimal set of ioctls for compatibility with sockets.
635  */
636 int
637 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
638 {
639 	struct pipe *mpipe = (struct pipe *)fp->f_data;
640 
641 	switch (cmd) {
642 
643 	case FIONBIO:
644 		return (0);
645 
646 	case FIOASYNC:
647 		if (*(int *)data) {
648 			mpipe->pipe_state |= PIPE_ASYNC;
649 		} else {
650 			mpipe->pipe_state &= ~PIPE_ASYNC;
651 		}
652 		return (0);
653 
654 	case FIONREAD:
655 		*(int *)data = mpipe->pipe_buffer.cnt;
656 		return (0);
657 
658 	case SIOCSPGRP:
659 		mpipe->pipe_pgid = *(int *)data;
660 		return (0);
661 
662 	case SIOCGPGRP:
663 		*(int *)data = mpipe->pipe_pgid;
664 		return (0);
665 
666 	}
667 	return (ENOTTY);
668 }
669 
670 int
671 pipe_poll(struct file *fp, int events, struct proc *p)
672 {
673 	struct pipe *rpipe = (struct pipe *)fp->f_data;
674 	struct pipe *wpipe;
675 	int revents = 0;
676 
677 	wpipe = rpipe->pipe_peer;
678 	if (events & (POLLIN | POLLRDNORM)) {
679 		if ((rpipe->pipe_buffer.cnt > 0) ||
680 		    (rpipe->pipe_state & PIPE_EOF))
681 			revents |= events & (POLLIN | POLLRDNORM);
682 	}
683 
684 	/* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
685 	if ((rpipe->pipe_state & PIPE_EOF) ||
686 	    (wpipe == NULL) ||
687 	    (wpipe->pipe_state & PIPE_EOF))
688 		revents |= POLLHUP;
689 	else if (events & (POLLOUT | POLLWRNORM)) {
690 		if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)
691 			revents |= events & (POLLOUT | POLLWRNORM);
692 	}
693 
694 	if (revents == 0) {
695 		if (events & (POLLIN | POLLRDNORM)) {
696 			selrecord(p, &rpipe->pipe_sel);
697 			rpipe->pipe_state |= PIPE_SEL;
698 		}
699 		if (events & (POLLOUT | POLLWRNORM)) {
700 			selrecord(p, &wpipe->pipe_sel);
701 			wpipe->pipe_state |= PIPE_SEL;
702 		}
703 	}
704 	return (revents);
705 }
706 
707 int
708 pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
709 {
710 	struct pipe *pipe = (struct pipe *)fp->f_data;
711 
712 	memset(ub, 0, sizeof(*ub));
713 	ub->st_mode = S_IFIFO;
714 	ub->st_blksize = pipe->pipe_buffer.size;
715 	ub->st_size = pipe->pipe_buffer.cnt;
716 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
717 	ub->st_atim.tv_sec  = pipe->pipe_atime.tv_sec;
718 	ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec;
719 	ub->st_mtim.tv_sec  = pipe->pipe_mtime.tv_sec;
720 	ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec;
721 	ub->st_ctim.tv_sec  = pipe->pipe_ctime.tv_sec;
722 	ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec;
723 	ub->st_uid = fp->f_cred->cr_uid;
724 	ub->st_gid = fp->f_cred->cr_gid;
725 	/*
726 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
727 	 * XXX (st_dev, st_ino) should be unique.
728 	 */
729 	return (0);
730 }
731 
732 /* ARGSUSED */
733 int
734 pipe_close(struct file *fp, struct proc *p)
735 {
736 	struct pipe *cpipe = (struct pipe *)fp->f_data;
737 
738 	fp->f_ops = NULL;
739 	fp->f_data = NULL;
740 	pipeclose(cpipe);
741 	return (0);
742 }
743 
744 void
745 pipe_free_kmem(struct pipe *cpipe)
746 {
747 	if (cpipe->pipe_buffer.buffer != NULL) {
748 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
749 			--nbigpipe;
750 		amountpipekva -= cpipe->pipe_buffer.size;
751 		km_free(cpipe->pipe_buffer.buffer, cpipe->pipe_buffer.size,
752 		    &kv_any, &kp_pageable);
753 		cpipe->pipe_buffer.buffer = NULL;
754 	}
755 }
756 
757 /*
758  * shutdown the pipe
759  */
760 void
761 pipeclose(struct pipe *cpipe)
762 {
763 	struct pipe *ppipe;
764 	if (cpipe) {
765 
766 		pipeselwakeup(cpipe);
767 
768 		/*
769 		 * If the other side is blocked, wake it up saying that
770 		 * we want to close it down.
771 		 */
772 		cpipe->pipe_state |= PIPE_EOF;
773 		while (cpipe->pipe_busy) {
774 			wakeup(cpipe);
775 			cpipe->pipe_state |= PIPE_WANT;
776 			tsleep(cpipe, PRIBIO, "pipecl", 0);
777 		}
778 
779 		/*
780 		 * Disconnect from peer
781 		 */
782 		if ((ppipe = cpipe->pipe_peer) != NULL) {
783 			pipeselwakeup(ppipe);
784 
785 			ppipe->pipe_state |= PIPE_EOF;
786 			wakeup(ppipe);
787 			ppipe->pipe_peer = NULL;
788 		}
789 
790 		/*
791 		 * free resources
792 		 */
793 		pipe_free_kmem(cpipe);
794 		pool_put(&pipe_pool, cpipe);
795 	}
796 }
797 
798 int
799 pipe_kqfilter(struct file *fp, struct knote *kn)
800 {
801 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
802 	struct pipe *wpipe = rpipe->pipe_peer;
803 
804 	switch (kn->kn_filter) {
805 	case EVFILT_READ:
806 		kn->kn_fop = &pipe_rfiltops;
807 		SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
808 		break;
809 	case EVFILT_WRITE:
810 		if (wpipe == NULL) {
811 			/* other end of pipe has been closed */
812 			return (EPIPE);
813 		}
814 		kn->kn_fop = &pipe_wfiltops;
815 		SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext);
816 		break;
817 	default:
818 		return (EINVAL);
819 	}
820 
821 	return (0);
822 }
823 
824 void
825 filt_pipedetach(struct knote *kn)
826 {
827 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
828 	struct pipe *wpipe = rpipe->pipe_peer;
829 
830 	switch (kn->kn_filter) {
831 	case EVFILT_READ:
832 		SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
833 		break;
834 	case EVFILT_WRITE:
835 		if (wpipe == NULL)
836 			return;
837 		SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext);
838 		break;
839 	}
840 }
841 
842 /*ARGSUSED*/
843 int
844 filt_piperead(struct knote *kn, long hint)
845 {
846 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
847 	struct pipe *wpipe = rpipe->pipe_peer;
848 
849 	kn->kn_data = rpipe->pipe_buffer.cnt;
850 
851 	if ((rpipe->pipe_state & PIPE_EOF) ||
852 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
853 		kn->kn_flags |= EV_EOF;
854 		return (1);
855 	}
856 	return (kn->kn_data > 0);
857 }
858 
859 /*ARGSUSED*/
860 int
861 filt_pipewrite(struct knote *kn, long hint)
862 {
863 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
864 	struct pipe *wpipe = rpipe->pipe_peer;
865 
866 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
867 		kn->kn_data = 0;
868 		kn->kn_flags |= EV_EOF;
869 		return (1);
870 	}
871 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
872 
873 	return (kn->kn_data >= PIPE_BUF);
874 }
875 
876 void
877 pipe_init(void)
878 {
879 	pool_init(&pipe_pool, sizeof(struct pipe), 0, 0, PR_WAITOK, "pipepl",
880 	    NULL);
881 }
882 
883