xref: /openbsd-src/sys/kern/sys_pipe.c (revision d4c5fc9dc00f5a9cadd8c2de4e52d85d3c1c6003)
1 /*	$OpenBSD: sys_pipe.c,v 1.78 2018/04/10 09:17:45 mpi Exp $	*/
2 
3 /*
4  * Copyright (c) 1996 John S. Dyson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice immediately at the beginning of the file, without modification,
12  *    this list of conditions, and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Absolutely no warranty of function or purpose is made by the author
17  *    John S. Dyson.
18  * 4. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/pool.h>
36 #include <sys/ioctl.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/mount.h>
40 #include <sys/syscallargs.h>
41 #include <sys/event.h>
42 #include <sys/lock.h>
43 #include <sys/poll.h>
44 #ifdef KTRACE
45 #include <sys/ktrace.h>
46 #endif
47 
48 #include <uvm/uvm_extern.h>
49 
50 #include <sys/pipe.h>
51 
52 /*
53  * interfaces to the outside world
54  */
55 int	pipe_read(struct file *, off_t *, struct uio *, struct ucred *);
56 int	pipe_write(struct file *, off_t *, struct uio *, struct ucred *);
57 int	pipe_close(struct file *, struct proc *);
58 int	pipe_poll(struct file *, int events, struct proc *);
59 int	pipe_kqfilter(struct file *fp, struct knote *kn);
60 int	pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
61 int	pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
62 
63 static struct fileops pipeops = {
64 	.fo_read	= pipe_read,
65 	.fo_write	= pipe_write,
66 	.fo_ioctl	= pipe_ioctl,
67 	.fo_poll	= pipe_poll,
68 	.fo_kqfilter	= pipe_kqfilter,
69 	.fo_stat	= pipe_stat,
70 	.fo_close	= pipe_close
71 };
72 
73 void	filt_pipedetach(struct knote *kn);
74 int	filt_piperead(struct knote *kn, long hint);
75 int	filt_pipewrite(struct knote *kn, long hint);
76 
77 struct filterops pipe_rfiltops =
78 	{ 1, NULL, filt_pipedetach, filt_piperead };
79 struct filterops pipe_wfiltops =
80 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
81 
82 /*
83  * Default pipe buffer size(s), this can be kind-of large now because pipe
84  * space is pageable.  The pipe code will try to maintain locality of
85  * reference for performance reasons, so small amounts of outstanding I/O
86  * will not wipe the cache.
87  */
88 #define MINPIPESIZE (PIPE_SIZE/3)
89 
90 /*
91  * Limit the number of "big" pipes
92  */
93 #define LIMITBIGPIPES	32
94 int nbigpipe;
95 static int amountpipekva;
96 
97 struct pool pipe_pool;
98 
99 int	dopipe(struct proc *, int *, int);
100 void	pipeclose(struct pipe *);
101 void	pipe_free_kmem(struct pipe *);
102 int	pipe_create(struct pipe *);
103 int	pipelock(struct pipe *);
104 void	pipeunlock(struct pipe *);
105 void	pipeselwakeup(struct pipe *);
106 int	pipespace(struct pipe *, u_int);
107 
108 /*
109  * The pipe system call for the DTYPE_PIPE type of pipes
110  */
111 
112 int
113 sys_pipe(struct proc *p, void *v, register_t *retval)
114 {
115 	struct sys_pipe_args /* {
116 		syscallarg(int *) fdp;
117 	} */ *uap = v;
118 
119 	return (dopipe(p, SCARG(uap, fdp), 0));
120 }
121 
122 int
123 sys_pipe2(struct proc *p, void *v, register_t *retval)
124 {
125 	struct sys_pipe2_args /* {
126 		syscallarg(int *) fdp;
127 		syscallarg(int) flags;
128 	} */ *uap = v;
129 
130 	if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK))
131 		return (EINVAL);
132 
133 	return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags)));
134 }
135 
136 int
137 dopipe(struct proc *p, int *ufds, int flags)
138 {
139 	struct filedesc *fdp = p->p_fd;
140 	struct file *rf, *wf;
141 	struct pipe *rpipe, *wpipe = NULL;
142 	int fds[2], cloexec, error;
143 
144 	cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0;
145 
146 	rpipe = pool_get(&pipe_pool, PR_WAITOK);
147 	error = pipe_create(rpipe);
148 	if (error != 0)
149 		goto free1;
150 	wpipe = pool_get(&pipe_pool, PR_WAITOK);
151 	error = pipe_create(wpipe);
152 	if (error != 0)
153 		goto free1;
154 
155 	fdplock(fdp);
156 
157 	error = falloc(p, cloexec, &rf, &fds[0]);
158 	if (error != 0)
159 		goto free2;
160 	rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
161 	rf->f_type = DTYPE_PIPE;
162 	rf->f_data = rpipe;
163 	rf->f_ops = &pipeops;
164 
165 	error = falloc(p, cloexec, &wf, &fds[1]);
166 	if (error != 0)
167 		goto free3;
168 	wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
169 	wf->f_type = DTYPE_PIPE;
170 	wf->f_data = wpipe;
171 	wf->f_ops = &pipeops;
172 
173 	rpipe->pipe_peer = wpipe;
174 	wpipe->pipe_peer = rpipe;
175 
176 	FILE_SET_MATURE(rf, p);
177 	FILE_SET_MATURE(wf, p);
178 
179 	error = copyout(fds, ufds, sizeof(fds));
180 	if (error != 0) {
181 		fdrelease(p, fds[0]);
182 		fdrelease(p, fds[1]);
183 	}
184 #ifdef KTRACE
185 	else if (KTRPOINT(p, KTR_STRUCT))
186 		ktrfds(p, fds, 2);
187 #endif
188 	fdpunlock(fdp);
189 	return (error);
190 
191 free3:
192 	fdremove(fdp, fds[0]);
193 	closef(rf, p);
194 	rpipe = NULL;
195 free2:
196 	fdpunlock(fdp);
197 free1:
198 	pipeclose(wpipe);
199 	pipeclose(rpipe);
200 	return (error);
201 }
202 
203 /*
204  * Allocate kva for pipe circular buffer, the space is pageable.
205  * This routine will 'realloc' the size of a pipe safely, if it fails
206  * it will retain the old buffer.
207  * If it fails it will return ENOMEM.
208  */
209 int
210 pipespace(struct pipe *cpipe, u_int size)
211 {
212 	caddr_t buffer;
213 
214 	buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok);
215 	if (buffer == NULL) {
216 		return (ENOMEM);
217 	}
218 
219 	/* free old resources if we are resizing */
220 	pipe_free_kmem(cpipe);
221 	cpipe->pipe_buffer.buffer = buffer;
222 	cpipe->pipe_buffer.size = size;
223 	cpipe->pipe_buffer.in = 0;
224 	cpipe->pipe_buffer.out = 0;
225 	cpipe->pipe_buffer.cnt = 0;
226 
227 	amountpipekva += cpipe->pipe_buffer.size;
228 
229 	return (0);
230 }
231 
232 /*
233  * initialize and allocate VM and memory for pipe
234  */
235 int
236 pipe_create(struct pipe *cpipe)
237 {
238 	int error;
239 
240 	/* so pipe_free_kmem() doesn't follow junk pointer */
241 	cpipe->pipe_buffer.buffer = NULL;
242 	/*
243 	 * protect so pipeclose() doesn't follow a junk pointer
244 	 * if pipespace() fails.
245 	 */
246 	memset(&cpipe->pipe_sel, 0, sizeof(cpipe->pipe_sel));
247 	cpipe->pipe_state = 0;
248 	cpipe->pipe_peer = NULL;
249 	cpipe->pipe_busy = 0;
250 
251 	error = pipespace(cpipe, PIPE_SIZE);
252 	if (error != 0)
253 		return (error);
254 
255 	getnanotime(&cpipe->pipe_ctime);
256 	cpipe->pipe_atime = cpipe->pipe_ctime;
257 	cpipe->pipe_mtime = cpipe->pipe_ctime;
258 	cpipe->pipe_pgid = NO_PID;
259 
260 	return (0);
261 }
262 
263 
264 /*
265  * lock a pipe for I/O, blocking other access
266  */
267 int
268 pipelock(struct pipe *cpipe)
269 {
270 	int error;
271 	while (cpipe->pipe_state & PIPE_LOCK) {
272 		cpipe->pipe_state |= PIPE_LWANT;
273 		if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0)))
274 			return error;
275 	}
276 	cpipe->pipe_state |= PIPE_LOCK;
277 	return 0;
278 }
279 
280 /*
281  * unlock a pipe I/O lock
282  */
283 void
284 pipeunlock(struct pipe *cpipe)
285 {
286 	cpipe->pipe_state &= ~PIPE_LOCK;
287 	if (cpipe->pipe_state & PIPE_LWANT) {
288 		cpipe->pipe_state &= ~PIPE_LWANT;
289 		wakeup(cpipe);
290 	}
291 }
292 
293 void
294 pipeselwakeup(struct pipe *cpipe)
295 {
296 	if (cpipe->pipe_state & PIPE_SEL) {
297 		cpipe->pipe_state &= ~PIPE_SEL;
298 		selwakeup(&cpipe->pipe_sel);
299 	} else
300 		KNOTE(&cpipe->pipe_sel.si_note, 0);
301 	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID)
302 		gsignal(cpipe->pipe_pgid, SIGIO);
303 }
304 
305 int
306 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
307 {
308 	struct pipe *rpipe = fp->f_data;
309 	int error;
310 	size_t size, nread = 0;
311 
312 	error = pipelock(rpipe);
313 	if (error)
314 		return (error);
315 
316 	++rpipe->pipe_busy;
317 
318 	while (uio->uio_resid) {
319 		/*
320 		 * normal pipe buffer receive
321 		 */
322 		if (rpipe->pipe_buffer.cnt > 0) {
323 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
324 			if (size > rpipe->pipe_buffer.cnt)
325 				size = rpipe->pipe_buffer.cnt;
326 			if (size > uio->uio_resid)
327 				size = uio->uio_resid;
328 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
329 					size, uio);
330 			if (error) {
331 				break;
332 			}
333 			rpipe->pipe_buffer.out += size;
334 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
335 				rpipe->pipe_buffer.out = 0;
336 
337 			rpipe->pipe_buffer.cnt -= size;
338 			/*
339 			 * If there is no more to read in the pipe, reset
340 			 * its pointers to the beginning.  This improves
341 			 * cache hit stats.
342 			 */
343 			if (rpipe->pipe_buffer.cnt == 0) {
344 				rpipe->pipe_buffer.in = 0;
345 				rpipe->pipe_buffer.out = 0;
346 			}
347 			nread += size;
348 		} else {
349 			/*
350 			 * detect EOF condition
351 			 * read returns 0 on EOF, no need to set error
352 			 */
353 			if (rpipe->pipe_state & PIPE_EOF)
354 				break;
355 
356 			/*
357 			 * If the "write-side" has been blocked, wake it up now.
358 			 */
359 			if (rpipe->pipe_state & PIPE_WANTW) {
360 				rpipe->pipe_state &= ~PIPE_WANTW;
361 				wakeup(rpipe);
362 			}
363 
364 			/*
365 			 * Break if some data was read.
366 			 */
367 			if (nread > 0)
368 				break;
369 
370 			/*
371 			 * Unlock the pipe buffer for our remaining processing.
372 			 * We will either break out with an error or we will
373 			 * sleep and relock to loop.
374 			 */
375 			pipeunlock(rpipe);
376 
377 			/*
378 			 * Handle non-blocking mode operation or
379 			 * wait for more data.
380 			 */
381 			if (fp->f_flag & FNONBLOCK) {
382 				error = EAGAIN;
383 			} else {
384 				rpipe->pipe_state |= PIPE_WANTR;
385 				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
386 					error = pipelock(rpipe);
387 			}
388 			if (error)
389 				goto unlocked_error;
390 		}
391 	}
392 	pipeunlock(rpipe);
393 
394 	if (error == 0)
395 		getnanotime(&rpipe->pipe_atime);
396 unlocked_error:
397 	--rpipe->pipe_busy;
398 
399 	/*
400 	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
401 	 */
402 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
403 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
404 		wakeup(rpipe);
405 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
406 		/*
407 		 * Handle write blocking hysteresis.
408 		 */
409 		if (rpipe->pipe_state & PIPE_WANTW) {
410 			rpipe->pipe_state &= ~PIPE_WANTW;
411 			wakeup(rpipe);
412 		}
413 	}
414 
415 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
416 		pipeselwakeup(rpipe);
417 
418 	return (error);
419 }
420 
421 int
422 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
423 {
424 	int error = 0;
425 	size_t orig_resid;
426 	struct pipe *wpipe, *rpipe;
427 
428 	rpipe = fp->f_data;
429 	wpipe = rpipe->pipe_peer;
430 
431 	/*
432 	 * detect loss of pipe read side, issue SIGPIPE if lost.
433 	 */
434 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
435 		return (EPIPE);
436 	}
437 	++wpipe->pipe_busy;
438 
439 	/*
440 	 * If it is advantageous to resize the pipe buffer, do
441 	 * so.
442 	 */
443 	if ((uio->uio_resid > PIPE_SIZE) &&
444 	    (nbigpipe < LIMITBIGPIPES) &&
445 	    (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
446 	    (wpipe->pipe_buffer.cnt == 0)) {
447 
448 		if ((error = pipelock(wpipe)) == 0) {
449 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
450 				nbigpipe++;
451 			pipeunlock(wpipe);
452 		}
453 	}
454 
455 	/*
456 	 * If an early error occurred unbusy and return, waking up any pending
457 	 * readers.
458 	 */
459 	if (error) {
460 		--wpipe->pipe_busy;
461 		if ((wpipe->pipe_busy == 0) &&
462 		    (wpipe->pipe_state & PIPE_WANT)) {
463 			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
464 			wakeup(wpipe);
465 		}
466 		return (error);
467 	}
468 
469 	orig_resid = uio->uio_resid;
470 
471 	while (uio->uio_resid) {
472 		size_t space;
473 
474 retrywrite:
475 		if (wpipe->pipe_state & PIPE_EOF) {
476 			error = EPIPE;
477 			break;
478 		}
479 
480 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
481 
482 		/* Writes of size <= PIPE_BUF must be atomic. */
483 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
484 			space = 0;
485 
486 		if (space > 0) {
487 			if ((error = pipelock(wpipe)) == 0) {
488 				size_t size;	/* Transfer size */
489 				size_t segsize;	/* first segment to transfer */
490 
491 				/*
492 				 * If a process blocked in uiomove, our
493 				 * value for space might be bad.
494 				 *
495 				 * XXX will we be ok if the reader has gone
496 				 * away here?
497 				 */
498 				if (space > wpipe->pipe_buffer.size -
499 				    wpipe->pipe_buffer.cnt) {
500 					pipeunlock(wpipe);
501 					goto retrywrite;
502 				}
503 
504 				/*
505 				 * Transfer size is minimum of uio transfer
506 				 * and free space in pipe buffer.
507 				 */
508 				if (space > uio->uio_resid)
509 					size = uio->uio_resid;
510 				else
511 					size = space;
512 				/*
513 				 * First segment to transfer is minimum of
514 				 * transfer size and contiguous space in
515 				 * pipe buffer.  If first segment to transfer
516 				 * is less than the transfer size, we've got
517 				 * a wraparound in the buffer.
518 				 */
519 				segsize = wpipe->pipe_buffer.size -
520 					wpipe->pipe_buffer.in;
521 				if (segsize > size)
522 					segsize = size;
523 
524 				/* Transfer first segment */
525 
526 				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
527 						segsize, uio);
528 
529 				if (error == 0 && segsize < size) {
530 					/*
531 					 * Transfer remaining part now, to
532 					 * support atomic writes.  Wraparound
533 					 * happened.
534 					 */
535 #ifdef DIAGNOSTIC
536 					if (wpipe->pipe_buffer.in + segsize !=
537 					    wpipe->pipe_buffer.size)
538 						panic("Expected pipe buffer wraparound disappeared");
539 #endif
540 
541 					error = uiomove(&wpipe->pipe_buffer.buffer[0],
542 							size - segsize, uio);
543 				}
544 				if (error == 0) {
545 					wpipe->pipe_buffer.in += size;
546 					if (wpipe->pipe_buffer.in >=
547 					    wpipe->pipe_buffer.size) {
548 #ifdef DIAGNOSTIC
549 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
550 							panic("Expected wraparound bad");
551 #endif
552 						wpipe->pipe_buffer.in = size - segsize;
553 					}
554 
555 					wpipe->pipe_buffer.cnt += size;
556 #ifdef DIAGNOSTIC
557 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
558 						panic("Pipe buffer overflow");
559 #endif
560 				}
561 				pipeunlock(wpipe);
562 			}
563 			if (error)
564 				break;
565 		} else {
566 			/*
567 			 * If the "read-side" has been blocked, wake it up now.
568 			 */
569 			if (wpipe->pipe_state & PIPE_WANTR) {
570 				wpipe->pipe_state &= ~PIPE_WANTR;
571 				wakeup(wpipe);
572 			}
573 
574 			/*
575 			 * don't block on non-blocking I/O
576 			 */
577 			if (fp->f_flag & FNONBLOCK) {
578 				error = EAGAIN;
579 				break;
580 			}
581 
582 			/*
583 			 * We have no more space and have something to offer,
584 			 * wake up select/poll.
585 			 */
586 			pipeselwakeup(wpipe);
587 
588 			wpipe->pipe_state |= PIPE_WANTW;
589 			error = tsleep(wpipe, (PRIBIO + 1)|PCATCH,
590 			    "pipewr", 0);
591 			if (error)
592 				break;
593 			/*
594 			 * If read side wants to go away, we just issue a
595 			 * signal to ourselves.
596 			 */
597 			if (wpipe->pipe_state & PIPE_EOF) {
598 				error = EPIPE;
599 				break;
600 			}
601 		}
602 	}
603 
604 	--wpipe->pipe_busy;
605 
606 	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
607 		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
608 		wakeup(wpipe);
609 	} else if (wpipe->pipe_buffer.cnt > 0) {
610 		/*
611 		 * If we have put any characters in the buffer, we wake up
612 		 * the reader.
613 		 */
614 		if (wpipe->pipe_state & PIPE_WANTR) {
615 			wpipe->pipe_state &= ~PIPE_WANTR;
616 			wakeup(wpipe);
617 		}
618 	}
619 
620 	/*
621 	 * Don't return EPIPE if I/O was successful
622 	 */
623 	if ((wpipe->pipe_buffer.cnt == 0) &&
624 	    (uio->uio_resid == 0) &&
625 	    (error == EPIPE)) {
626 		error = 0;
627 	}
628 
629 	if (error == 0)
630 		getnanotime(&wpipe->pipe_mtime);
631 	/*
632 	 * We have something to offer, wake up select/poll.
633 	 */
634 	if (wpipe->pipe_buffer.cnt)
635 		pipeselwakeup(wpipe);
636 
637 	return (error);
638 }
639 
640 /*
641  * we implement a very minimal set of ioctls for compatibility with sockets.
642  */
643 int
644 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
645 {
646 	struct pipe *mpipe = fp->f_data;
647 
648 	switch (cmd) {
649 
650 	case FIONBIO:
651 		return (0);
652 
653 	case FIOASYNC:
654 		if (*(int *)data) {
655 			mpipe->pipe_state |= PIPE_ASYNC;
656 		} else {
657 			mpipe->pipe_state &= ~PIPE_ASYNC;
658 		}
659 		return (0);
660 
661 	case FIONREAD:
662 		*(int *)data = mpipe->pipe_buffer.cnt;
663 		return (0);
664 
665 	case SIOCSPGRP:
666 		mpipe->pipe_pgid = *(int *)data;
667 		return (0);
668 
669 	case SIOCGPGRP:
670 		*(int *)data = mpipe->pipe_pgid;
671 		return (0);
672 
673 	}
674 	return (ENOTTY);
675 }
676 
677 int
678 pipe_poll(struct file *fp, int events, struct proc *p)
679 {
680 	struct pipe *rpipe = fp->f_data;
681 	struct pipe *wpipe;
682 	int revents = 0;
683 
684 	wpipe = rpipe->pipe_peer;
685 	if (events & (POLLIN | POLLRDNORM)) {
686 		if ((rpipe->pipe_buffer.cnt > 0) ||
687 		    (rpipe->pipe_state & PIPE_EOF))
688 			revents |= events & (POLLIN | POLLRDNORM);
689 	}
690 
691 	/* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
692 	if ((rpipe->pipe_state & PIPE_EOF) ||
693 	    (wpipe == NULL) ||
694 	    (wpipe->pipe_state & PIPE_EOF))
695 		revents |= POLLHUP;
696 	else if (events & (POLLOUT | POLLWRNORM)) {
697 		if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)
698 			revents |= events & (POLLOUT | POLLWRNORM);
699 	}
700 
701 	if (revents == 0) {
702 		if (events & (POLLIN | POLLRDNORM)) {
703 			selrecord(p, &rpipe->pipe_sel);
704 			rpipe->pipe_state |= PIPE_SEL;
705 		}
706 		if (events & (POLLOUT | POLLWRNORM)) {
707 			selrecord(p, &wpipe->pipe_sel);
708 			wpipe->pipe_state |= PIPE_SEL;
709 		}
710 	}
711 	return (revents);
712 }
713 
714 int
715 pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
716 {
717 	struct pipe *pipe = fp->f_data;
718 
719 	memset(ub, 0, sizeof(*ub));
720 	ub->st_mode = S_IFIFO;
721 	ub->st_blksize = pipe->pipe_buffer.size;
722 	ub->st_size = pipe->pipe_buffer.cnt;
723 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
724 	ub->st_atim.tv_sec  = pipe->pipe_atime.tv_sec;
725 	ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec;
726 	ub->st_mtim.tv_sec  = pipe->pipe_mtime.tv_sec;
727 	ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec;
728 	ub->st_ctim.tv_sec  = pipe->pipe_ctime.tv_sec;
729 	ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec;
730 	ub->st_uid = fp->f_cred->cr_uid;
731 	ub->st_gid = fp->f_cred->cr_gid;
732 	/*
733 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
734 	 * XXX (st_dev, st_ino) should be unique.
735 	 */
736 	return (0);
737 }
738 
739 int
740 pipe_close(struct file *fp, struct proc *p)
741 {
742 	struct pipe *cpipe = fp->f_data;
743 
744 	fp->f_ops = NULL;
745 	fp->f_data = NULL;
746 	pipeclose(cpipe);
747 	return (0);
748 }
749 
750 void
751 pipe_free_kmem(struct pipe *cpipe)
752 {
753 	if (cpipe->pipe_buffer.buffer != NULL) {
754 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
755 			--nbigpipe;
756 		amountpipekva -= cpipe->pipe_buffer.size;
757 		km_free(cpipe->pipe_buffer.buffer, cpipe->pipe_buffer.size,
758 		    &kv_any, &kp_pageable);
759 		cpipe->pipe_buffer.buffer = NULL;
760 	}
761 }
762 
763 /*
764  * shutdown the pipe
765  */
766 void
767 pipeclose(struct pipe *cpipe)
768 {
769 	struct pipe *ppipe;
770 	if (cpipe) {
771 
772 		pipeselwakeup(cpipe);
773 
774 		/*
775 		 * If the other side is blocked, wake it up saying that
776 		 * we want to close it down.
777 		 */
778 		cpipe->pipe_state |= PIPE_EOF;
779 		while (cpipe->pipe_busy) {
780 			wakeup(cpipe);
781 			cpipe->pipe_state |= PIPE_WANT;
782 			tsleep(cpipe, PRIBIO, "pipecl", 0);
783 		}
784 
785 		/*
786 		 * Disconnect from peer
787 		 */
788 		if ((ppipe = cpipe->pipe_peer) != NULL) {
789 			pipeselwakeup(ppipe);
790 
791 			ppipe->pipe_state |= PIPE_EOF;
792 			wakeup(ppipe);
793 			ppipe->pipe_peer = NULL;
794 		}
795 
796 		/*
797 		 * free resources
798 		 */
799 		pipe_free_kmem(cpipe);
800 		pool_put(&pipe_pool, cpipe);
801 	}
802 }
803 
804 int
805 pipe_kqfilter(struct file *fp, struct knote *kn)
806 {
807 	struct pipe *rpipe = kn->kn_fp->f_data;
808 	struct pipe *wpipe = rpipe->pipe_peer;
809 
810 	switch (kn->kn_filter) {
811 	case EVFILT_READ:
812 		kn->kn_fop = &pipe_rfiltops;
813 		SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
814 		break;
815 	case EVFILT_WRITE:
816 		if (wpipe == NULL) {
817 			/* other end of pipe has been closed */
818 			return (EPIPE);
819 		}
820 		kn->kn_fop = &pipe_wfiltops;
821 		SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext);
822 		break;
823 	default:
824 		return (EINVAL);
825 	}
826 
827 	return (0);
828 }
829 
830 void
831 filt_pipedetach(struct knote *kn)
832 {
833 	struct pipe *rpipe = kn->kn_fp->f_data;
834 	struct pipe *wpipe = rpipe->pipe_peer;
835 
836 	switch (kn->kn_filter) {
837 	case EVFILT_READ:
838 		SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
839 		break;
840 	case EVFILT_WRITE:
841 		if (wpipe == NULL)
842 			return;
843 		SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext);
844 		break;
845 	}
846 }
847 
848 int
849 filt_piperead(struct knote *kn, long hint)
850 {
851 	struct pipe *rpipe = kn->kn_fp->f_data;
852 	struct pipe *wpipe = rpipe->pipe_peer;
853 
854 	kn->kn_data = rpipe->pipe_buffer.cnt;
855 
856 	if ((rpipe->pipe_state & PIPE_EOF) ||
857 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
858 		kn->kn_flags |= EV_EOF;
859 		return (1);
860 	}
861 	return (kn->kn_data > 0);
862 }
863 
864 int
865 filt_pipewrite(struct knote *kn, long hint)
866 {
867 	struct pipe *rpipe = kn->kn_fp->f_data;
868 	struct pipe *wpipe = rpipe->pipe_peer;
869 
870 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
871 		kn->kn_data = 0;
872 		kn->kn_flags |= EV_EOF;
873 		return (1);
874 	}
875 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
876 
877 	return (kn->kn_data >= PIPE_BUF);
878 }
879 
880 void
881 pipe_init(void)
882 {
883 	pool_init(&pipe_pool, sizeof(struct pipe), 0, IPL_NONE, PR_WAITOK,
884 	    "pipepl", NULL);
885 }
886 
887