xref: /openbsd-src/sys/kern/sys_pipe.c (revision b46d8ef224b95de1dddcd1f01c1ab482f0ab3778)
1 /*	$OpenBSD: sys_pipe.c,v 1.105 2019/12/27 09:29:50 anton Exp $	*/
2 
3 /*
4  * Copyright (c) 1996 John S. Dyson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice immediately at the beginning of the file, without modification,
12  *    this list of conditions, and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Absolutely no warranty of function or purpose is made by the author
17  *    John S. Dyson.
18  * 4. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/pool.h>
36 #include <sys/ioctl.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/mount.h>
40 #include <sys/syscallargs.h>
41 #include <sys/event.h>
42 #include <sys/lock.h>
43 #include <sys/poll.h>
44 #ifdef KTRACE
45 #include <sys/ktrace.h>
46 #endif
47 
48 #include <uvm/uvm_extern.h>
49 
50 #include <sys/pipe.h>
51 
52 /*
53  * interfaces to the outside world
54  */
55 int	pipe_read(struct file *, struct uio *, int);
56 int	pipe_write(struct file *, struct uio *, int);
57 int	pipe_close(struct file *, struct proc *);
58 int	pipe_poll(struct file *, int events, struct proc *);
59 int	pipe_kqfilter(struct file *fp, struct knote *kn);
60 int	pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
61 int	pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
62 
63 static struct fileops pipeops = {
64 	.fo_read	= pipe_read,
65 	.fo_write	= pipe_write,
66 	.fo_ioctl	= pipe_ioctl,
67 	.fo_poll	= pipe_poll,
68 	.fo_kqfilter	= pipe_kqfilter,
69 	.fo_stat	= pipe_stat,
70 	.fo_close	= pipe_close
71 };
72 
73 void	filt_pipedetach(struct knote *kn);
74 int	filt_piperead(struct knote *kn, long hint);
75 int	filt_pipewrite(struct knote *kn, long hint);
76 
77 struct filterops pipe_rfiltops =
78 	{ 1, NULL, filt_pipedetach, filt_piperead };
79 struct filterops pipe_wfiltops =
80 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
81 
82 /*
83  * Default pipe buffer size(s), this can be kind-of large now because pipe
84  * space is pageable.  The pipe code will try to maintain locality of
85  * reference for performance reasons, so small amounts of outstanding I/O
86  * will not wipe the cache.
87  */
88 #define MINPIPESIZE (PIPE_SIZE/3)
89 
90 /*
91  * Limit the number of "big" pipes
92  */
93 #define LIMITBIGPIPES	32
94 unsigned int nbigpipe;
95 static unsigned int amountpipekva;
96 
97 struct pool pipe_pool;
98 
99 /*
100  * Global lock protecting fields of `struct pipe'.
101  */
102 struct rwlock pipe_lock = RWLOCK_INITIALIZER("pipeglk");
103 
104 int	dopipe(struct proc *, int *, int);
105 int	pipelock(struct pipe *);
106 void	pipeunlock(struct pipe *);
107 void	pipeselwakeup(struct pipe *);
108 
109 struct pipe *pipe_create(void);
110 void	pipe_destroy(struct pipe *);
111 int	pipe_rundown(struct pipe *);
112 struct pipe *pipe_peer(struct pipe *);
113 int	pipe_buffer_realloc(struct pipe *, u_int);
114 void	pipe_buffer_free(struct pipe *);
115 
116 int	pipe_sleep(struct pipe *, const char *);
117 
118 /*
119  * The pipe system call for the DTYPE_PIPE type of pipes
120  */
121 
122 int
123 sys_pipe(struct proc *p, void *v, register_t *retval)
124 {
125 	struct sys_pipe_args /* {
126 		syscallarg(int *) fdp;
127 	} */ *uap = v;
128 
129 	return (dopipe(p, SCARG(uap, fdp), 0));
130 }
131 
132 int
133 sys_pipe2(struct proc *p, void *v, register_t *retval)
134 {
135 	struct sys_pipe2_args /* {
136 		syscallarg(int *) fdp;
137 		syscallarg(int) flags;
138 	} */ *uap = v;
139 
140 	if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK))
141 		return (EINVAL);
142 
143 	return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags)));
144 }
145 
146 int
147 dopipe(struct proc *p, int *ufds, int flags)
148 {
149 	struct filedesc *fdp = p->p_fd;
150 	struct file *rf, *wf;
151 	struct pipe *rpipe, *wpipe = NULL;
152 	int fds[2], cloexec, error;
153 
154 	cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0;
155 
156 	if (((rpipe = pipe_create()) == NULL) ||
157 	    ((wpipe = pipe_create()) == NULL)) {
158 		error = ENOMEM;
159 		goto free1;
160 	}
161 
162 	fdplock(fdp);
163 
164 	error = falloc(p, &rf, &fds[0]);
165 	if (error != 0)
166 		goto free2;
167 	rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
168 	rf->f_type = DTYPE_PIPE;
169 	rf->f_data = rpipe;
170 	rf->f_ops = &pipeops;
171 
172 	error = falloc(p, &wf, &fds[1]);
173 	if (error != 0)
174 		goto free3;
175 	wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
176 	wf->f_type = DTYPE_PIPE;
177 	wf->f_data = wpipe;
178 	wf->f_ops = &pipeops;
179 
180 	rpipe->pipe_peer = wpipe;
181 	wpipe->pipe_peer = rpipe;
182 
183 	fdinsert(fdp, fds[0], cloexec, rf);
184 	fdinsert(fdp, fds[1], cloexec, wf);
185 
186 	error = copyout(fds, ufds, sizeof(fds));
187 	if (error == 0) {
188 		fdpunlock(fdp);
189 #ifdef KTRACE
190 		if (KTRPOINT(p, KTR_STRUCT))
191 			ktrfds(p, fds, 2);
192 #endif
193 	} else {
194 		/* fdrelease() unlocks fdp. */
195 		fdrelease(p, fds[0]);
196 		fdplock(fdp);
197 		fdrelease(p, fds[1]);
198 	}
199 
200 	FRELE(rf, p);
201 	FRELE(wf, p);
202 	return (error);
203 
204 free3:
205 	fdremove(fdp, fds[0]);
206 	closef(rf, p);
207 	rpipe = NULL;
208 free2:
209 	fdpunlock(fdp);
210 free1:
211 	pipe_destroy(wpipe);
212 	pipe_destroy(rpipe);
213 	return (error);
214 }
215 
216 /*
217  * Allocate kva for pipe circular buffer, the space is pageable.
218  * This routine will 'realloc' the size of a pipe safely, if it fails
219  * it will retain the old buffer.
220  * If it fails it will return ENOMEM.
221  */
222 int
223 pipe_buffer_realloc(struct pipe *cpipe, u_int size)
224 {
225 	caddr_t buffer;
226 
227 	/* buffer uninitialized or pipe locked */
228 	KASSERT((cpipe->pipe_buffer.buffer == NULL) ||
229 	    (cpipe->pipe_state & PIPE_LOCK));
230 
231 	/* buffer should be empty */
232 	KASSERT(cpipe->pipe_buffer.cnt == 0);
233 
234 	KERNEL_LOCK();
235 	buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok);
236 	KERNEL_UNLOCK();
237 	if (buffer == NULL)
238 		return (ENOMEM);
239 
240 	/* free old resources if we are resizing */
241 	pipe_buffer_free(cpipe);
242 
243 	cpipe->pipe_buffer.buffer = buffer;
244 	cpipe->pipe_buffer.size = size;
245 	cpipe->pipe_buffer.in = 0;
246 	cpipe->pipe_buffer.out = 0;
247 
248 	atomic_add_int(&amountpipekva, cpipe->pipe_buffer.size);
249 
250 	return (0);
251 }
252 
253 /*
254  * initialize and allocate VM and memory for pipe
255  */
256 struct pipe *
257 pipe_create(void)
258 {
259 	struct pipe *cpipe;
260 	int error;
261 
262 	cpipe = pool_get(&pipe_pool, PR_WAITOK | PR_ZERO);
263 
264 	error = pipe_buffer_realloc(cpipe, PIPE_SIZE);
265 	if (error != 0) {
266 		pool_put(&pipe_pool, cpipe);
267 		return (NULL);
268 	}
269 
270 	sigio_init(&cpipe->pipe_sigio);
271 
272 	getnanotime(&cpipe->pipe_ctime);
273 	cpipe->pipe_atime = cpipe->pipe_ctime;
274 	cpipe->pipe_mtime = cpipe->pipe_ctime;
275 
276 	return (cpipe);
277 }
278 
279 struct pipe *
280 pipe_peer(struct pipe *cpipe)
281 {
282 	struct pipe *peer;
283 
284 	rw_assert_anylock(&pipe_lock);
285 
286 	peer = cpipe->pipe_peer;
287 	if (peer == NULL || (peer->pipe_state & PIPE_EOF))
288 		return (NULL);
289 	return (peer);
290 }
291 
292 /*
293  * lock a pipe for I/O, blocking other access
294  */
295 int
296 pipelock(struct pipe *cpipe)
297 {
298 	int error;
299 
300 	rw_assert_wrlock(&pipe_lock);
301 
302 	while (cpipe->pipe_state & PIPE_LOCK) {
303 		cpipe->pipe_state |= PIPE_LWANT;
304 		error = rwsleep_nsec(cpipe, &pipe_lock, PRIBIO | PCATCH,
305 		    "pipelk", INFSLP);
306 		if (error)
307 			return (error);
308 	}
309 	cpipe->pipe_state |= PIPE_LOCK;
310 	return (0);
311 }
312 
313 /*
314  * unlock a pipe I/O lock
315  */
316 void
317 pipeunlock(struct pipe *cpipe)
318 {
319 	rw_assert_wrlock(&pipe_lock);
320 	KASSERT(cpipe->pipe_state & PIPE_LOCK);
321 
322 	cpipe->pipe_state &= ~PIPE_LOCK;
323 	if (cpipe->pipe_state & PIPE_LWANT) {
324 		cpipe->pipe_state &= ~PIPE_LWANT;
325 		wakeup(cpipe);
326 	}
327 }
328 
329 /*
330  * Unlock the pipe I/O lock and go to sleep. Returns 0 on success and the I/O
331  * lock is relocked. Otherwise if a signal was caught, non-zero is returned and
332  * the I/O lock is not locked.
333  *
334  * Any caller must obtain a reference to the pipe by incrementing `pipe_busy'
335  * before calling this function in order ensure that the same pipe is not
336  * destroyed while sleeping.
337  */
338 int
339 pipe_sleep(struct pipe *cpipe, const char *wmesg)
340 {
341 	int error;
342 
343 	pipeunlock(cpipe);
344 	error = rwsleep_nsec(cpipe, &pipe_lock, PRIBIO | PCATCH, wmesg, INFSLP);
345 	if (error)
346 		return (error);
347 	return (pipelock(cpipe));
348 }
349 
350 void
351 pipeselwakeup(struct pipe *cpipe)
352 {
353 	rw_assert_wrlock(&pipe_lock);
354 
355 	KERNEL_LOCK();
356 
357 	/* Kernel lock needed in order to prevent race with kevent. */
358 	if (cpipe->pipe_state & PIPE_SEL) {
359 		cpipe->pipe_state &= ~PIPE_SEL;
360 		selwakeup(&cpipe->pipe_sel);
361 	} else
362 		KNOTE(&cpipe->pipe_sel.si_note, NOTE_SUBMIT);
363 
364 	/* Kernel lock needed since pgsigio() calls ptsignal(). */
365 	if (cpipe->pipe_state & PIPE_ASYNC)
366 		pgsigio(&cpipe->pipe_sigio, SIGIO, 0);
367 
368 	KERNEL_UNLOCK();
369 }
370 
371 int
372 pipe_read(struct file *fp, struct uio *uio, int fflags)
373 {
374 	struct pipe *rpipe = fp->f_data;
375 	int error;
376 	size_t size, nread = 0;
377 
378 	rw_enter_write(&pipe_lock);
379 	++rpipe->pipe_busy;
380 	error = pipelock(rpipe);
381 	if (error) {
382 		--rpipe->pipe_busy;
383 		pipe_rundown(rpipe);
384 		rw_exit_write(&pipe_lock);
385 		return (error);
386 	}
387 
388 	while (uio->uio_resid) {
389 		/* Normal pipe buffer receive. */
390 		if (rpipe->pipe_buffer.cnt > 0) {
391 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
392 			if (size > rpipe->pipe_buffer.cnt)
393 				size = rpipe->pipe_buffer.cnt;
394 			if (size > uio->uio_resid)
395 				size = uio->uio_resid;
396 			rw_exit_write(&pipe_lock);
397 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
398 					size, uio);
399 			rw_enter_write(&pipe_lock);
400 			if (error) {
401 				break;
402 			}
403 			rpipe->pipe_buffer.out += size;
404 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
405 				rpipe->pipe_buffer.out = 0;
406 
407 			rpipe->pipe_buffer.cnt -= size;
408 			/*
409 			 * If there is no more to read in the pipe, reset
410 			 * its pointers to the beginning.  This improves
411 			 * cache hit stats.
412 			 */
413 			if (rpipe->pipe_buffer.cnt == 0) {
414 				rpipe->pipe_buffer.in = 0;
415 				rpipe->pipe_buffer.out = 0;
416 			}
417 			nread += size;
418 		} else {
419 			/*
420 			 * detect EOF condition
421 			 * read returns 0 on EOF, no need to set error
422 			 */
423 			if (rpipe->pipe_state & PIPE_EOF)
424 				break;
425 
426 			/* If the "write-side" has been blocked, wake it up. */
427 			if (rpipe->pipe_state & PIPE_WANTW) {
428 				rpipe->pipe_state &= ~PIPE_WANTW;
429 				wakeup(rpipe);
430 			}
431 
432 			/* Break if some data was read. */
433 			if (nread > 0)
434 				break;
435 
436 			/* Handle non-blocking mode operation. */
437 			if (fp->f_flag & FNONBLOCK) {
438 				error = EAGAIN;
439 				break;
440 			}
441 
442 			/* Wait for more data. */
443 			rpipe->pipe_state |= PIPE_WANTR;
444 			error = pipe_sleep(rpipe, "piperd");
445 			if (error)
446 				goto unlocked_error;
447 		}
448 	}
449 	pipeunlock(rpipe);
450 
451 	if (error == 0)
452 		getnanotime(&rpipe->pipe_atime);
453 unlocked_error:
454 	--rpipe->pipe_busy;
455 
456 	if (pipe_rundown(rpipe) == 0 && rpipe->pipe_buffer.cnt < MINPIPESIZE) {
457 		/* Handle write blocking hysteresis. */
458 		if (rpipe->pipe_state & PIPE_WANTW) {
459 			rpipe->pipe_state &= ~PIPE_WANTW;
460 			wakeup(rpipe);
461 		}
462 	}
463 
464 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
465 		pipeselwakeup(rpipe);
466 
467 	rw_exit_write(&pipe_lock);
468 	return (error);
469 }
470 
471 int
472 pipe_write(struct file *fp, struct uio *uio, int fflags)
473 {
474 	int error = 0;
475 	size_t orig_resid;
476 	struct pipe *wpipe, *rpipe;
477 
478 	rpipe = fp->f_data;
479 
480 	rw_enter_write(&pipe_lock);
481 	wpipe = pipe_peer(rpipe);
482 
483 	/* Detect loss of pipe read side, issue SIGPIPE if lost. */
484 	if (wpipe == NULL) {
485 		rw_exit_write(&pipe_lock);
486 		return (EPIPE);
487 	}
488 
489 	++wpipe->pipe_busy;
490 	error = pipelock(wpipe);
491 	if (error) {
492 		--wpipe->pipe_busy;
493 		pipe_rundown(wpipe);
494 		rw_exit_write(&pipe_lock);
495 		return (error);
496 	}
497 
498 
499 	/*
500 	 * If it is advantageous to resize the pipe buffer, do
501 	 * so.
502 	 */
503 	if ((uio->uio_resid > PIPE_SIZE) &&
504 	    (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
505 	    (wpipe->pipe_buffer.cnt == 0)) {
506 	    	unsigned int npipe;
507 
508 		npipe = atomic_inc_int_nv(&nbigpipe);
509 		if (npipe > LIMITBIGPIPES ||
510 		    pipe_buffer_realloc(wpipe, BIG_PIPE_SIZE) != 0)
511 			atomic_dec_int(&nbigpipe);
512 	}
513 
514 	orig_resid = uio->uio_resid;
515 
516 	while (uio->uio_resid) {
517 		size_t space;
518 
519 		if (wpipe->pipe_state & PIPE_EOF) {
520 			error = EPIPE;
521 			break;
522 		}
523 
524 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
525 
526 		/* Writes of size <= PIPE_BUF must be atomic. */
527 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
528 			space = 0;
529 
530 		if (space > 0) {
531 			size_t size;	/* Transfer size */
532 			size_t segsize;	/* first segment to transfer */
533 
534 			/*
535 			 * Transfer size is minimum of uio transfer
536 			 * and free space in pipe buffer.
537 			 */
538 			if (space > uio->uio_resid)
539 				size = uio->uio_resid;
540 			else
541 				size = space;
542 			/*
543 			 * First segment to transfer is minimum of
544 			 * transfer size and contiguous space in
545 			 * pipe buffer.  If first segment to transfer
546 			 * is less than the transfer size, we've got
547 			 * a wraparound in the buffer.
548 			 */
549 			segsize = wpipe->pipe_buffer.size -
550 				wpipe->pipe_buffer.in;
551 			if (segsize > size)
552 				segsize = size;
553 
554 			/* Transfer first segment */
555 
556 			rw_exit_write(&pipe_lock);
557 			error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
558 					segsize, uio);
559 			rw_enter_write(&pipe_lock);
560 
561 			if (error == 0 && segsize < size) {
562 				/*
563 				 * Transfer remaining part now, to
564 				 * support atomic writes.  Wraparound
565 				 * happened.
566 				 */
567 #ifdef DIAGNOSTIC
568 				if (wpipe->pipe_buffer.in + segsize !=
569 				    wpipe->pipe_buffer.size)
570 					panic("Expected pipe buffer wraparound disappeared");
571 #endif
572 
573 				rw_exit_write(&pipe_lock);
574 				error = uiomove(&wpipe->pipe_buffer.buffer[0],
575 						size - segsize, uio);
576 				rw_enter_write(&pipe_lock);
577 			}
578 			if (error == 0) {
579 				wpipe->pipe_buffer.in += size;
580 				if (wpipe->pipe_buffer.in >=
581 				    wpipe->pipe_buffer.size) {
582 #ifdef DIAGNOSTIC
583 					if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
584 						panic("Expected wraparound bad");
585 #endif
586 					wpipe->pipe_buffer.in = size - segsize;
587 				}
588 
589 				wpipe->pipe_buffer.cnt += size;
590 #ifdef DIAGNOSTIC
591 				if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
592 					panic("Pipe buffer overflow");
593 #endif
594 			}
595 			if (error)
596 				break;
597 		} else {
598 			/* If the "read-side" has been blocked, wake it up. */
599 			if (wpipe->pipe_state & PIPE_WANTR) {
600 				wpipe->pipe_state &= ~PIPE_WANTR;
601 				wakeup(wpipe);
602 			}
603 
604 			/* Don't block on non-blocking I/O. */
605 			if (fp->f_flag & FNONBLOCK) {
606 				error = EAGAIN;
607 				break;
608 			}
609 
610 			/*
611 			 * We have no more space and have something to offer,
612 			 * wake up select/poll.
613 			 */
614 			pipeselwakeup(wpipe);
615 
616 			wpipe->pipe_state |= PIPE_WANTW;
617 			error = pipe_sleep(wpipe, "pipewr");
618 			if (error)
619 				goto unlocked_error;
620 
621 			/*
622 			 * If read side wants to go away, we just issue a
623 			 * signal to ourselves.
624 			 */
625 			if (wpipe->pipe_state & PIPE_EOF) {
626 				error = EPIPE;
627 				break;
628 			}
629 		}
630 	}
631 	pipeunlock(wpipe);
632 
633 unlocked_error:
634 	--wpipe->pipe_busy;
635 
636 	if (pipe_rundown(wpipe) == 0 && wpipe->pipe_buffer.cnt > 0) {
637 		/*
638 		 * If we have put any characters in the buffer, we wake up
639 		 * the reader.
640 		 */
641 		if (wpipe->pipe_state & PIPE_WANTR) {
642 			wpipe->pipe_state &= ~PIPE_WANTR;
643 			wakeup(wpipe);
644 		}
645 	}
646 
647 	/* Don't return EPIPE if I/O was successful. */
648 	if ((wpipe->pipe_buffer.cnt == 0) &&
649 	    (uio->uio_resid == 0) &&
650 	    (error == EPIPE)) {
651 		error = 0;
652 	}
653 
654 	if (error == 0)
655 		getnanotime(&wpipe->pipe_mtime);
656 	/* We have something to offer, wake up select/poll. */
657 	if (wpipe->pipe_buffer.cnt)
658 		pipeselwakeup(wpipe);
659 
660 	rw_exit_write(&pipe_lock);
661 	return (error);
662 }
663 
664 /*
665  * we implement a very minimal set of ioctls for compatibility with sockets.
666  */
667 int
668 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
669 {
670 	struct pipe *mpipe = fp->f_data;
671 	int error = 0;
672 
673 	rw_enter_write(&pipe_lock);
674 
675 	switch (cmd) {
676 
677 	case FIONBIO:
678 		break;
679 
680 	case FIOASYNC:
681 		if (*(int *)data) {
682 			mpipe->pipe_state |= PIPE_ASYNC;
683 		} else {
684 			mpipe->pipe_state &= ~PIPE_ASYNC;
685 		}
686 		break;
687 
688 	case FIONREAD:
689 		*(int *)data = mpipe->pipe_buffer.cnt;
690 		break;
691 
692 	case TIOCSPGRP:
693 		/* FALLTHROUGH */
694 	case SIOCSPGRP:
695 		error = sigio_setown(&mpipe->pipe_sigio, *(int *)data);
696 		break;
697 
698 	case SIOCGPGRP:
699 		*(int *)data = sigio_getown(&mpipe->pipe_sigio);
700 		break;
701 
702 	case TIOCGPGRP:
703 		*(int *)data = -sigio_getown(&mpipe->pipe_sigio);
704 		break;
705 
706 	default:
707 		error = ENOTTY;
708 	}
709 
710 	rw_exit_write(&pipe_lock);
711 
712 	return (error);
713 }
714 
715 int
716 pipe_poll(struct file *fp, int events, struct proc *p)
717 {
718 	struct pipe *rpipe = fp->f_data;
719 	struct pipe *wpipe;
720 	int revents = 0;
721 
722 	rw_enter_write(&pipe_lock);
723 	wpipe = pipe_peer(rpipe);
724 
725 	if (events & (POLLIN | POLLRDNORM)) {
726 		if ((rpipe->pipe_buffer.cnt > 0) ||
727 		    (rpipe->pipe_state & PIPE_EOF))
728 			revents |= events & (POLLIN | POLLRDNORM);
729 	}
730 
731 	/* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
732 	if ((rpipe->pipe_state & PIPE_EOF) || wpipe == NULL)
733 		revents |= POLLHUP;
734 	else if (events & (POLLOUT | POLLWRNORM)) {
735 		if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)
736 			revents |= events & (POLLOUT | POLLWRNORM);
737 	}
738 
739 	if (revents == 0) {
740 		if (events & (POLLIN | POLLRDNORM)) {
741 			selrecord(p, &rpipe->pipe_sel);
742 			rpipe->pipe_state |= PIPE_SEL;
743 		}
744 		if (events & (POLLOUT | POLLWRNORM)) {
745 			selrecord(p, &wpipe->pipe_sel);
746 			wpipe->pipe_state |= PIPE_SEL;
747 		}
748 	}
749 
750 	rw_exit_write(&pipe_lock);
751 
752 	return (revents);
753 }
754 
755 int
756 pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
757 {
758 	struct pipe *pipe = fp->f_data;
759 
760 	memset(ub, 0, sizeof(*ub));
761 
762 	rw_enter_read(&pipe_lock);
763 	ub->st_mode = S_IFIFO;
764 	ub->st_blksize = pipe->pipe_buffer.size;
765 	ub->st_size = pipe->pipe_buffer.cnt;
766 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
767 	ub->st_atim.tv_sec  = pipe->pipe_atime.tv_sec;
768 	ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec;
769 	ub->st_mtim.tv_sec  = pipe->pipe_mtime.tv_sec;
770 	ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec;
771 	ub->st_ctim.tv_sec  = pipe->pipe_ctime.tv_sec;
772 	ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec;
773 	ub->st_uid = fp->f_cred->cr_uid;
774 	ub->st_gid = fp->f_cred->cr_gid;
775 	rw_exit_read(&pipe_lock);
776 	/*
777 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
778 	 * XXX (st_dev, st_ino) should be unique.
779 	 */
780 	return (0);
781 }
782 
783 int
784 pipe_close(struct file *fp, struct proc *p)
785 {
786 	struct pipe *cpipe = fp->f_data;
787 
788 	fp->f_ops = NULL;
789 	fp->f_data = NULL;
790 	pipe_destroy(cpipe);
791 	return (0);
792 }
793 
794 /*
795  * Free kva for pipe circular buffer.
796  * No pipe lock check as only called from pipe_buffer_realloc() and pipeclose()
797  */
798 void
799 pipe_buffer_free(struct pipe *cpipe)
800 {
801 	u_int size;
802 
803 	if (cpipe->pipe_buffer.buffer == NULL)
804 		return;
805 
806 	size = cpipe->pipe_buffer.size;
807 
808 	KERNEL_LOCK();
809 	km_free(cpipe->pipe_buffer.buffer, size, &kv_any, &kp_pageable);
810 	KERNEL_UNLOCK();
811 
812 	cpipe->pipe_buffer.buffer = NULL;
813 
814 	atomic_sub_int(&amountpipekva, size);
815 	if (size > PIPE_SIZE)
816 		atomic_dec_int(&nbigpipe);
817 }
818 
819 /*
820  * shutdown the pipe, and free resources.
821  */
822 void
823 pipe_destroy(struct pipe *cpipe)
824 {
825 	struct pipe *ppipe;
826 
827 	if (cpipe == NULL)
828 		return;
829 
830 	rw_enter_write(&pipe_lock);
831 
832 	pipeselwakeup(cpipe);
833 	sigio_free(&cpipe->pipe_sigio);
834 
835 	/*
836 	 * If the other side is blocked, wake it up saying that
837 	 * we want to close it down.
838 	 */
839 	cpipe->pipe_state |= PIPE_EOF;
840 	while (cpipe->pipe_busy) {
841 		wakeup(cpipe);
842 		cpipe->pipe_state |= PIPE_WANTD;
843 		rwsleep_nsec(cpipe, &pipe_lock, PRIBIO, "pipecl", INFSLP);
844 	}
845 
846 	/* Disconnect from peer. */
847 	if ((ppipe = cpipe->pipe_peer) != NULL) {
848 		pipeselwakeup(ppipe);
849 
850 		ppipe->pipe_state |= PIPE_EOF;
851 		wakeup(ppipe);
852 		ppipe->pipe_peer = NULL;
853 	}
854 
855 	rw_exit_write(&pipe_lock);
856 
857 	pipe_buffer_free(cpipe);
858 	pool_put(&pipe_pool, cpipe);
859 }
860 
861 /*
862  * Returns non-zero if a rundown is currently ongoing.
863  */
864 int
865 pipe_rundown(struct pipe *cpipe)
866 {
867 	rw_assert_wrlock(&pipe_lock);
868 
869 	if (cpipe->pipe_busy > 0 || (cpipe->pipe_state & PIPE_WANTD) == 0)
870 		return (0);
871 
872 	/* Only wakeup pipe_destroy() once the pipe is no longer busy. */
873 	cpipe->pipe_state &= ~(PIPE_WANTD | PIPE_WANTR | PIPE_WANTW);
874 	wakeup(cpipe);
875 	return (1);
876 }
877 
878 int
879 pipe_kqfilter(struct file *fp, struct knote *kn)
880 {
881 	struct pipe *rpipe = kn->kn_fp->f_data;
882 	struct pipe *wpipe;
883 	int error = 0;
884 
885 	rw_enter_write(&pipe_lock);
886 	wpipe = pipe_peer(rpipe);
887 
888 	switch (kn->kn_filter) {
889 	case EVFILT_READ:
890 		kn->kn_fop = &pipe_rfiltops;
891 		SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
892 		break;
893 	case EVFILT_WRITE:
894 		if (wpipe == NULL) {
895 			/* other end of pipe has been closed */
896 			error = EPIPE;
897 			break;
898 		}
899 		kn->kn_fop = &pipe_wfiltops;
900 		SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext);
901 		break;
902 	default:
903 		error = EINVAL;
904 	}
905 
906 	rw_exit_write(&pipe_lock);
907 
908 	return (error);
909 }
910 
911 void
912 filt_pipedetach(struct knote *kn)
913 {
914 	struct pipe *rpipe = kn->kn_fp->f_data;
915 	struct pipe *wpipe;
916 
917 	rw_enter_write(&pipe_lock);
918 	wpipe = pipe_peer(rpipe);
919 
920 	switch (kn->kn_filter) {
921 	case EVFILT_READ:
922 		SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
923 		break;
924 	case EVFILT_WRITE:
925 		if (wpipe == NULL)
926 			break;
927 		SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext);
928 		break;
929 	}
930 
931 	rw_exit_write(&pipe_lock);
932 }
933 
934 int
935 filt_piperead(struct knote *kn, long hint)
936 {
937 	struct pipe *rpipe = kn->kn_fp->f_data;
938 	struct pipe *wpipe;
939 
940 	if ((hint & NOTE_SUBMIT) == 0)
941 		rw_enter_read(&pipe_lock);
942 	wpipe = pipe_peer(rpipe);
943 
944 	kn->kn_data = rpipe->pipe_buffer.cnt;
945 
946 	if ((rpipe->pipe_state & PIPE_EOF) || wpipe == NULL) {
947 		if ((hint & NOTE_SUBMIT) == 0)
948 			rw_exit_read(&pipe_lock);
949 		kn->kn_flags |= EV_EOF;
950 		return (1);
951 	}
952 
953 	if ((hint & NOTE_SUBMIT) == 0)
954 		rw_exit_read(&pipe_lock);
955 
956 	return (kn->kn_data > 0);
957 }
958 
959 int
960 filt_pipewrite(struct knote *kn, long hint)
961 {
962 	struct pipe *rpipe = kn->kn_fp->f_data;
963 	struct pipe *wpipe;
964 
965 	if ((hint & NOTE_SUBMIT) == 0)
966 		rw_enter_read(&pipe_lock);
967 	wpipe = pipe_peer(rpipe);
968 
969 	if (wpipe == NULL) {
970 		if ((hint & NOTE_SUBMIT) == 0)
971 			rw_exit_read(&pipe_lock);
972 		kn->kn_data = 0;
973 		kn->kn_flags |= EV_EOF;
974 		return (1);
975 	}
976 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
977 
978 	if ((hint & NOTE_SUBMIT) == 0)
979 		rw_exit_read(&pipe_lock);
980 
981 	return (kn->kn_data >= PIPE_BUF);
982 }
983 
984 void
985 pipe_init(void)
986 {
987 	pool_init(&pipe_pool, sizeof(struct pipe), 0, IPL_MPFLOOR, PR_WAITOK,
988 	    "pipepl", NULL);
989 }
990