xref: /openbsd-src/sys/kern/sys_pipe.c (revision ae3cb403620ab940fbaabb3055fac045a63d56b7)
1 /*	$OpenBSD: sys_pipe.c,v 1.77 2018/01/02 06:38:45 guenther Exp $	*/
2 
3 /*
4  * Copyright (c) 1996 John S. Dyson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice immediately at the beginning of the file, without modification,
12  *    this list of conditions, and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Absolutely no warranty of function or purpose is made by the author
17  *    John S. Dyson.
18  * 4. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/pool.h>
36 #include <sys/ioctl.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/mount.h>
40 #include <sys/syscallargs.h>
41 #include <sys/event.h>
42 #include <sys/lock.h>
43 #include <sys/poll.h>
44 #ifdef KTRACE
45 #include <sys/ktrace.h>
46 #endif
47 
48 #include <uvm/uvm_extern.h>
49 
50 #include <sys/pipe.h>
51 
52 /*
53  * interfaces to the outside world
54  */
55 int	pipe_read(struct file *, off_t *, struct uio *, struct ucred *);
56 int	pipe_write(struct file *, off_t *, struct uio *, struct ucred *);
57 int	pipe_close(struct file *, struct proc *);
58 int	pipe_poll(struct file *, int events, struct proc *);
59 int	pipe_kqfilter(struct file *fp, struct knote *kn);
60 int	pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
61 int	pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
62 
63 static struct fileops pipeops = {
64 	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
65 	pipe_stat, pipe_close
66 };
67 
68 void	filt_pipedetach(struct knote *kn);
69 int	filt_piperead(struct knote *kn, long hint);
70 int	filt_pipewrite(struct knote *kn, long hint);
71 
72 struct filterops pipe_rfiltops =
73 	{ 1, NULL, filt_pipedetach, filt_piperead };
74 struct filterops pipe_wfiltops =
75 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
76 
77 /*
78  * Default pipe buffer size(s), this can be kind-of large now because pipe
79  * space is pageable.  The pipe code will try to maintain locality of
80  * reference for performance reasons, so small amounts of outstanding I/O
81  * will not wipe the cache.
82  */
83 #define MINPIPESIZE (PIPE_SIZE/3)
84 
85 /*
86  * Limit the number of "big" pipes
87  */
88 #define LIMITBIGPIPES	32
89 int nbigpipe;
90 static int amountpipekva;
91 
92 struct pool pipe_pool;
93 
94 int	dopipe(struct proc *, int *, int);
95 void	pipeclose(struct pipe *);
96 void	pipe_free_kmem(struct pipe *);
97 int	pipe_create(struct pipe *);
98 int	pipelock(struct pipe *);
99 void	pipeunlock(struct pipe *);
100 void	pipeselwakeup(struct pipe *);
101 int	pipespace(struct pipe *, u_int);
102 
103 /*
104  * The pipe system call for the DTYPE_PIPE type of pipes
105  */
106 
107 int
108 sys_pipe(struct proc *p, void *v, register_t *retval)
109 {
110 	struct sys_pipe_args /* {
111 		syscallarg(int *) fdp;
112 	} */ *uap = v;
113 
114 	return (dopipe(p, SCARG(uap, fdp), 0));
115 }
116 
117 int
118 sys_pipe2(struct proc *p, void *v, register_t *retval)
119 {
120 	struct sys_pipe2_args /* {
121 		syscallarg(int *) fdp;
122 		syscallarg(int) flags;
123 	} */ *uap = v;
124 
125 	if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK))
126 		return (EINVAL);
127 
128 	return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags)));
129 }
130 
131 int
132 dopipe(struct proc *p, int *ufds, int flags)
133 {
134 	struct filedesc *fdp = p->p_fd;
135 	struct file *rf, *wf;
136 	struct pipe *rpipe, *wpipe = NULL;
137 	int fds[2], cloexec, error;
138 
139 	cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0;
140 
141 	rpipe = pool_get(&pipe_pool, PR_WAITOK);
142 	error = pipe_create(rpipe);
143 	if (error != 0)
144 		goto free1;
145 	wpipe = pool_get(&pipe_pool, PR_WAITOK);
146 	error = pipe_create(wpipe);
147 	if (error != 0)
148 		goto free1;
149 
150 	fdplock(fdp);
151 
152 	error = falloc(p, cloexec, &rf, &fds[0]);
153 	if (error != 0)
154 		goto free2;
155 	rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
156 	rf->f_type = DTYPE_PIPE;
157 	rf->f_data = rpipe;
158 	rf->f_ops = &pipeops;
159 
160 	error = falloc(p, cloexec, &wf, &fds[1]);
161 	if (error != 0)
162 		goto free3;
163 	wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
164 	wf->f_type = DTYPE_PIPE;
165 	wf->f_data = wpipe;
166 	wf->f_ops = &pipeops;
167 
168 	rpipe->pipe_peer = wpipe;
169 	wpipe->pipe_peer = rpipe;
170 
171 	FILE_SET_MATURE(rf, p);
172 	FILE_SET_MATURE(wf, p);
173 
174 	error = copyout(fds, ufds, sizeof(fds));
175 	if (error != 0) {
176 		fdrelease(p, fds[0]);
177 		fdrelease(p, fds[1]);
178 	}
179 #ifdef KTRACE
180 	else if (KTRPOINT(p, KTR_STRUCT))
181 		ktrfds(p, fds, 2);
182 #endif
183 	fdpunlock(fdp);
184 	return (error);
185 
186 free3:
187 	fdremove(fdp, fds[0]);
188 	closef(rf, p);
189 	rpipe = NULL;
190 free2:
191 	fdpunlock(fdp);
192 free1:
193 	pipeclose(wpipe);
194 	pipeclose(rpipe);
195 	return (error);
196 }
197 
198 /*
199  * Allocate kva for pipe circular buffer, the space is pageable.
200  * This routine will 'realloc' the size of a pipe safely, if it fails
201  * it will retain the old buffer.
202  * If it fails it will return ENOMEM.
203  */
204 int
205 pipespace(struct pipe *cpipe, u_int size)
206 {
207 	caddr_t buffer;
208 
209 	buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok);
210 	if (buffer == NULL) {
211 		return (ENOMEM);
212 	}
213 
214 	/* free old resources if we are resizing */
215 	pipe_free_kmem(cpipe);
216 	cpipe->pipe_buffer.buffer = buffer;
217 	cpipe->pipe_buffer.size = size;
218 	cpipe->pipe_buffer.in = 0;
219 	cpipe->pipe_buffer.out = 0;
220 	cpipe->pipe_buffer.cnt = 0;
221 
222 	amountpipekva += cpipe->pipe_buffer.size;
223 
224 	return (0);
225 }
226 
227 /*
228  * initialize and allocate VM and memory for pipe
229  */
230 int
231 pipe_create(struct pipe *cpipe)
232 {
233 	int error;
234 
235 	/* so pipe_free_kmem() doesn't follow junk pointer */
236 	cpipe->pipe_buffer.buffer = NULL;
237 	/*
238 	 * protect so pipeclose() doesn't follow a junk pointer
239 	 * if pipespace() fails.
240 	 */
241 	memset(&cpipe->pipe_sel, 0, sizeof(cpipe->pipe_sel));
242 	cpipe->pipe_state = 0;
243 	cpipe->pipe_peer = NULL;
244 	cpipe->pipe_busy = 0;
245 
246 	error = pipespace(cpipe, PIPE_SIZE);
247 	if (error != 0)
248 		return (error);
249 
250 	getnanotime(&cpipe->pipe_ctime);
251 	cpipe->pipe_atime = cpipe->pipe_ctime;
252 	cpipe->pipe_mtime = cpipe->pipe_ctime;
253 	cpipe->pipe_pgid = NO_PID;
254 
255 	return (0);
256 }
257 
258 
259 /*
260  * lock a pipe for I/O, blocking other access
261  */
262 int
263 pipelock(struct pipe *cpipe)
264 {
265 	int error;
266 	while (cpipe->pipe_state & PIPE_LOCK) {
267 		cpipe->pipe_state |= PIPE_LWANT;
268 		if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0)))
269 			return error;
270 	}
271 	cpipe->pipe_state |= PIPE_LOCK;
272 	return 0;
273 }
274 
275 /*
276  * unlock a pipe I/O lock
277  */
278 void
279 pipeunlock(struct pipe *cpipe)
280 {
281 	cpipe->pipe_state &= ~PIPE_LOCK;
282 	if (cpipe->pipe_state & PIPE_LWANT) {
283 		cpipe->pipe_state &= ~PIPE_LWANT;
284 		wakeup(cpipe);
285 	}
286 }
287 
288 void
289 pipeselwakeup(struct pipe *cpipe)
290 {
291 	if (cpipe->pipe_state & PIPE_SEL) {
292 		cpipe->pipe_state &= ~PIPE_SEL;
293 		selwakeup(&cpipe->pipe_sel);
294 	} else
295 		KNOTE(&cpipe->pipe_sel.si_note, 0);
296 	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID)
297 		gsignal(cpipe->pipe_pgid, SIGIO);
298 }
299 
300 int
301 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
302 {
303 	struct pipe *rpipe = fp->f_data;
304 	int error;
305 	size_t size, nread = 0;
306 
307 	error = pipelock(rpipe);
308 	if (error)
309 		return (error);
310 
311 	++rpipe->pipe_busy;
312 
313 	while (uio->uio_resid) {
314 		/*
315 		 * normal pipe buffer receive
316 		 */
317 		if (rpipe->pipe_buffer.cnt > 0) {
318 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
319 			if (size > rpipe->pipe_buffer.cnt)
320 				size = rpipe->pipe_buffer.cnt;
321 			if (size > uio->uio_resid)
322 				size = uio->uio_resid;
323 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
324 					size, uio);
325 			if (error) {
326 				break;
327 			}
328 			rpipe->pipe_buffer.out += size;
329 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
330 				rpipe->pipe_buffer.out = 0;
331 
332 			rpipe->pipe_buffer.cnt -= size;
333 			/*
334 			 * If there is no more to read in the pipe, reset
335 			 * its pointers to the beginning.  This improves
336 			 * cache hit stats.
337 			 */
338 			if (rpipe->pipe_buffer.cnt == 0) {
339 				rpipe->pipe_buffer.in = 0;
340 				rpipe->pipe_buffer.out = 0;
341 			}
342 			nread += size;
343 		} else {
344 			/*
345 			 * detect EOF condition
346 			 * read returns 0 on EOF, no need to set error
347 			 */
348 			if (rpipe->pipe_state & PIPE_EOF)
349 				break;
350 
351 			/*
352 			 * If the "write-side" has been blocked, wake it up now.
353 			 */
354 			if (rpipe->pipe_state & PIPE_WANTW) {
355 				rpipe->pipe_state &= ~PIPE_WANTW;
356 				wakeup(rpipe);
357 			}
358 
359 			/*
360 			 * Break if some data was read.
361 			 */
362 			if (nread > 0)
363 				break;
364 
365 			/*
366 			 * Unlock the pipe buffer for our remaining processing.
367 			 * We will either break out with an error or we will
368 			 * sleep and relock to loop.
369 			 */
370 			pipeunlock(rpipe);
371 
372 			/*
373 			 * Handle non-blocking mode operation or
374 			 * wait for more data.
375 			 */
376 			if (fp->f_flag & FNONBLOCK) {
377 				error = EAGAIN;
378 			} else {
379 				rpipe->pipe_state |= PIPE_WANTR;
380 				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
381 					error = pipelock(rpipe);
382 			}
383 			if (error)
384 				goto unlocked_error;
385 		}
386 	}
387 	pipeunlock(rpipe);
388 
389 	if (error == 0)
390 		getnanotime(&rpipe->pipe_atime);
391 unlocked_error:
392 	--rpipe->pipe_busy;
393 
394 	/*
395 	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
396 	 */
397 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
398 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
399 		wakeup(rpipe);
400 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
401 		/*
402 		 * Handle write blocking hysteresis.
403 		 */
404 		if (rpipe->pipe_state & PIPE_WANTW) {
405 			rpipe->pipe_state &= ~PIPE_WANTW;
406 			wakeup(rpipe);
407 		}
408 	}
409 
410 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
411 		pipeselwakeup(rpipe);
412 
413 	return (error);
414 }
415 
416 int
417 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
418 {
419 	int error = 0;
420 	size_t orig_resid;
421 	struct pipe *wpipe, *rpipe;
422 
423 	rpipe = fp->f_data;
424 	wpipe = rpipe->pipe_peer;
425 
426 	/*
427 	 * detect loss of pipe read side, issue SIGPIPE if lost.
428 	 */
429 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
430 		return (EPIPE);
431 	}
432 	++wpipe->pipe_busy;
433 
434 	/*
435 	 * If it is advantageous to resize the pipe buffer, do
436 	 * so.
437 	 */
438 	if ((uio->uio_resid > PIPE_SIZE) &&
439 	    (nbigpipe < LIMITBIGPIPES) &&
440 	    (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
441 	    (wpipe->pipe_buffer.cnt == 0)) {
442 
443 		if ((error = pipelock(wpipe)) == 0) {
444 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
445 				nbigpipe++;
446 			pipeunlock(wpipe);
447 		}
448 	}
449 
450 	/*
451 	 * If an early error occurred unbusy and return, waking up any pending
452 	 * readers.
453 	 */
454 	if (error) {
455 		--wpipe->pipe_busy;
456 		if ((wpipe->pipe_busy == 0) &&
457 		    (wpipe->pipe_state & PIPE_WANT)) {
458 			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
459 			wakeup(wpipe);
460 		}
461 		return (error);
462 	}
463 
464 	orig_resid = uio->uio_resid;
465 
466 	while (uio->uio_resid) {
467 		size_t space;
468 
469 retrywrite:
470 		if (wpipe->pipe_state & PIPE_EOF) {
471 			error = EPIPE;
472 			break;
473 		}
474 
475 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
476 
477 		/* Writes of size <= PIPE_BUF must be atomic. */
478 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
479 			space = 0;
480 
481 		if (space > 0) {
482 			if ((error = pipelock(wpipe)) == 0) {
483 				size_t size;	/* Transfer size */
484 				size_t segsize;	/* first segment to transfer */
485 
486 				/*
487 				 * If a process blocked in uiomove, our
488 				 * value for space might be bad.
489 				 *
490 				 * XXX will we be ok if the reader has gone
491 				 * away here?
492 				 */
493 				if (space > wpipe->pipe_buffer.size -
494 				    wpipe->pipe_buffer.cnt) {
495 					pipeunlock(wpipe);
496 					goto retrywrite;
497 				}
498 
499 				/*
500 				 * Transfer size is minimum of uio transfer
501 				 * and free space in pipe buffer.
502 				 */
503 				if (space > uio->uio_resid)
504 					size = uio->uio_resid;
505 				else
506 					size = space;
507 				/*
508 				 * First segment to transfer is minimum of
509 				 * transfer size and contiguous space in
510 				 * pipe buffer.  If first segment to transfer
511 				 * is less than the transfer size, we've got
512 				 * a wraparound in the buffer.
513 				 */
514 				segsize = wpipe->pipe_buffer.size -
515 					wpipe->pipe_buffer.in;
516 				if (segsize > size)
517 					segsize = size;
518 
519 				/* Transfer first segment */
520 
521 				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
522 						segsize, uio);
523 
524 				if (error == 0 && segsize < size) {
525 					/*
526 					 * Transfer remaining part now, to
527 					 * support atomic writes.  Wraparound
528 					 * happened.
529 					 */
530 #ifdef DIAGNOSTIC
531 					if (wpipe->pipe_buffer.in + segsize !=
532 					    wpipe->pipe_buffer.size)
533 						panic("Expected pipe buffer wraparound disappeared");
534 #endif
535 
536 					error = uiomove(&wpipe->pipe_buffer.buffer[0],
537 							size - segsize, uio);
538 				}
539 				if (error == 0) {
540 					wpipe->pipe_buffer.in += size;
541 					if (wpipe->pipe_buffer.in >=
542 					    wpipe->pipe_buffer.size) {
543 #ifdef DIAGNOSTIC
544 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
545 							panic("Expected wraparound bad");
546 #endif
547 						wpipe->pipe_buffer.in = size - segsize;
548 					}
549 
550 					wpipe->pipe_buffer.cnt += size;
551 #ifdef DIAGNOSTIC
552 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
553 						panic("Pipe buffer overflow");
554 #endif
555 				}
556 				pipeunlock(wpipe);
557 			}
558 			if (error)
559 				break;
560 		} else {
561 			/*
562 			 * If the "read-side" has been blocked, wake it up now.
563 			 */
564 			if (wpipe->pipe_state & PIPE_WANTR) {
565 				wpipe->pipe_state &= ~PIPE_WANTR;
566 				wakeup(wpipe);
567 			}
568 
569 			/*
570 			 * don't block on non-blocking I/O
571 			 */
572 			if (fp->f_flag & FNONBLOCK) {
573 				error = EAGAIN;
574 				break;
575 			}
576 
577 			/*
578 			 * We have no more space and have something to offer,
579 			 * wake up select/poll.
580 			 */
581 			pipeselwakeup(wpipe);
582 
583 			wpipe->pipe_state |= PIPE_WANTW;
584 			error = tsleep(wpipe, (PRIBIO + 1)|PCATCH,
585 			    "pipewr", 0);
586 			if (error)
587 				break;
588 			/*
589 			 * If read side wants to go away, we just issue a
590 			 * signal to ourselves.
591 			 */
592 			if (wpipe->pipe_state & PIPE_EOF) {
593 				error = EPIPE;
594 				break;
595 			}
596 		}
597 	}
598 
599 	--wpipe->pipe_busy;
600 
601 	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
602 		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
603 		wakeup(wpipe);
604 	} else if (wpipe->pipe_buffer.cnt > 0) {
605 		/*
606 		 * If we have put any characters in the buffer, we wake up
607 		 * the reader.
608 		 */
609 		if (wpipe->pipe_state & PIPE_WANTR) {
610 			wpipe->pipe_state &= ~PIPE_WANTR;
611 			wakeup(wpipe);
612 		}
613 	}
614 
615 	/*
616 	 * Don't return EPIPE if I/O was successful
617 	 */
618 	if ((wpipe->pipe_buffer.cnt == 0) &&
619 	    (uio->uio_resid == 0) &&
620 	    (error == EPIPE)) {
621 		error = 0;
622 	}
623 
624 	if (error == 0)
625 		getnanotime(&wpipe->pipe_mtime);
626 	/*
627 	 * We have something to offer, wake up select/poll.
628 	 */
629 	if (wpipe->pipe_buffer.cnt)
630 		pipeselwakeup(wpipe);
631 
632 	return (error);
633 }
634 
635 /*
636  * we implement a very minimal set of ioctls for compatibility with sockets.
637  */
638 int
639 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
640 {
641 	struct pipe *mpipe = fp->f_data;
642 
643 	switch (cmd) {
644 
645 	case FIONBIO:
646 		return (0);
647 
648 	case FIOASYNC:
649 		if (*(int *)data) {
650 			mpipe->pipe_state |= PIPE_ASYNC;
651 		} else {
652 			mpipe->pipe_state &= ~PIPE_ASYNC;
653 		}
654 		return (0);
655 
656 	case FIONREAD:
657 		*(int *)data = mpipe->pipe_buffer.cnt;
658 		return (0);
659 
660 	case SIOCSPGRP:
661 		mpipe->pipe_pgid = *(int *)data;
662 		return (0);
663 
664 	case SIOCGPGRP:
665 		*(int *)data = mpipe->pipe_pgid;
666 		return (0);
667 
668 	}
669 	return (ENOTTY);
670 }
671 
672 int
673 pipe_poll(struct file *fp, int events, struct proc *p)
674 {
675 	struct pipe *rpipe = fp->f_data;
676 	struct pipe *wpipe;
677 	int revents = 0;
678 
679 	wpipe = rpipe->pipe_peer;
680 	if (events & (POLLIN | POLLRDNORM)) {
681 		if ((rpipe->pipe_buffer.cnt > 0) ||
682 		    (rpipe->pipe_state & PIPE_EOF))
683 			revents |= events & (POLLIN | POLLRDNORM);
684 	}
685 
686 	/* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
687 	if ((rpipe->pipe_state & PIPE_EOF) ||
688 	    (wpipe == NULL) ||
689 	    (wpipe->pipe_state & PIPE_EOF))
690 		revents |= POLLHUP;
691 	else if (events & (POLLOUT | POLLWRNORM)) {
692 		if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)
693 			revents |= events & (POLLOUT | POLLWRNORM);
694 	}
695 
696 	if (revents == 0) {
697 		if (events & (POLLIN | POLLRDNORM)) {
698 			selrecord(p, &rpipe->pipe_sel);
699 			rpipe->pipe_state |= PIPE_SEL;
700 		}
701 		if (events & (POLLOUT | POLLWRNORM)) {
702 			selrecord(p, &wpipe->pipe_sel);
703 			wpipe->pipe_state |= PIPE_SEL;
704 		}
705 	}
706 	return (revents);
707 }
708 
709 int
710 pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
711 {
712 	struct pipe *pipe = fp->f_data;
713 
714 	memset(ub, 0, sizeof(*ub));
715 	ub->st_mode = S_IFIFO;
716 	ub->st_blksize = pipe->pipe_buffer.size;
717 	ub->st_size = pipe->pipe_buffer.cnt;
718 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
719 	ub->st_atim.tv_sec  = pipe->pipe_atime.tv_sec;
720 	ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec;
721 	ub->st_mtim.tv_sec  = pipe->pipe_mtime.tv_sec;
722 	ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec;
723 	ub->st_ctim.tv_sec  = pipe->pipe_ctime.tv_sec;
724 	ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec;
725 	ub->st_uid = fp->f_cred->cr_uid;
726 	ub->st_gid = fp->f_cred->cr_gid;
727 	/*
728 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
729 	 * XXX (st_dev, st_ino) should be unique.
730 	 */
731 	return (0);
732 }
733 
734 int
735 pipe_close(struct file *fp, struct proc *p)
736 {
737 	struct pipe *cpipe = fp->f_data;
738 
739 	fp->f_ops = NULL;
740 	fp->f_data = NULL;
741 	pipeclose(cpipe);
742 	return (0);
743 }
744 
745 void
746 pipe_free_kmem(struct pipe *cpipe)
747 {
748 	if (cpipe->pipe_buffer.buffer != NULL) {
749 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
750 			--nbigpipe;
751 		amountpipekva -= cpipe->pipe_buffer.size;
752 		km_free(cpipe->pipe_buffer.buffer, cpipe->pipe_buffer.size,
753 		    &kv_any, &kp_pageable);
754 		cpipe->pipe_buffer.buffer = NULL;
755 	}
756 }
757 
758 /*
759  * shutdown the pipe
760  */
761 void
762 pipeclose(struct pipe *cpipe)
763 {
764 	struct pipe *ppipe;
765 	if (cpipe) {
766 
767 		pipeselwakeup(cpipe);
768 
769 		/*
770 		 * If the other side is blocked, wake it up saying that
771 		 * we want to close it down.
772 		 */
773 		cpipe->pipe_state |= PIPE_EOF;
774 		while (cpipe->pipe_busy) {
775 			wakeup(cpipe);
776 			cpipe->pipe_state |= PIPE_WANT;
777 			tsleep(cpipe, PRIBIO, "pipecl", 0);
778 		}
779 
780 		/*
781 		 * Disconnect from peer
782 		 */
783 		if ((ppipe = cpipe->pipe_peer) != NULL) {
784 			pipeselwakeup(ppipe);
785 
786 			ppipe->pipe_state |= PIPE_EOF;
787 			wakeup(ppipe);
788 			ppipe->pipe_peer = NULL;
789 		}
790 
791 		/*
792 		 * free resources
793 		 */
794 		pipe_free_kmem(cpipe);
795 		pool_put(&pipe_pool, cpipe);
796 	}
797 }
798 
799 int
800 pipe_kqfilter(struct file *fp, struct knote *kn)
801 {
802 	struct pipe *rpipe = kn->kn_fp->f_data;
803 	struct pipe *wpipe = rpipe->pipe_peer;
804 
805 	switch (kn->kn_filter) {
806 	case EVFILT_READ:
807 		kn->kn_fop = &pipe_rfiltops;
808 		SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
809 		break;
810 	case EVFILT_WRITE:
811 		if (wpipe == NULL) {
812 			/* other end of pipe has been closed */
813 			return (EPIPE);
814 		}
815 		kn->kn_fop = &pipe_wfiltops;
816 		SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext);
817 		break;
818 	default:
819 		return (EINVAL);
820 	}
821 
822 	return (0);
823 }
824 
825 void
826 filt_pipedetach(struct knote *kn)
827 {
828 	struct pipe *rpipe = kn->kn_fp->f_data;
829 	struct pipe *wpipe = rpipe->pipe_peer;
830 
831 	switch (kn->kn_filter) {
832 	case EVFILT_READ:
833 		SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
834 		break;
835 	case EVFILT_WRITE:
836 		if (wpipe == NULL)
837 			return;
838 		SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext);
839 		break;
840 	}
841 }
842 
843 int
844 filt_piperead(struct knote *kn, long hint)
845 {
846 	struct pipe *rpipe = kn->kn_fp->f_data;
847 	struct pipe *wpipe = rpipe->pipe_peer;
848 
849 	kn->kn_data = rpipe->pipe_buffer.cnt;
850 
851 	if ((rpipe->pipe_state & PIPE_EOF) ||
852 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
853 		kn->kn_flags |= EV_EOF;
854 		return (1);
855 	}
856 	return (kn->kn_data > 0);
857 }
858 
859 int
860 filt_pipewrite(struct knote *kn, long hint)
861 {
862 	struct pipe *rpipe = kn->kn_fp->f_data;
863 	struct pipe *wpipe = rpipe->pipe_peer;
864 
865 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
866 		kn->kn_data = 0;
867 		kn->kn_flags |= EV_EOF;
868 		return (1);
869 	}
870 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
871 
872 	return (kn->kn_data >= PIPE_BUF);
873 }
874 
875 void
876 pipe_init(void)
877 {
878 	pool_init(&pipe_pool, sizeof(struct pipe), 0, IPL_NONE, PR_WAITOK,
879 	    "pipepl", NULL);
880 }
881 
882