xref: /openbsd-src/sys/kern/sys_pipe.c (revision 538638a5b71127ab1f290352c025c7a3601c443d)
1 /*	$OpenBSD: sys_pipe.c,v 1.139 2022/05/30 14:06:16 visa Exp $	*/
2 
3 /*
4  * Copyright (c) 1996 John S. Dyson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice immediately at the beginning of the file, without modification,
12  *    this list of conditions, and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Absolutely no warranty of function or purpose is made by the author
17  *    John S. Dyson.
18  * 4. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/pool.h>
36 #include <sys/ioctl.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/mount.h>
40 #include <sys/syscallargs.h>
41 #include <sys/event.h>
42 #include <sys/lock.h>
43 #include <sys/poll.h>
44 #ifdef KTRACE
45 #include <sys/ktrace.h>
46 #endif
47 
48 #include <uvm/uvm_extern.h>
49 
50 #include <sys/pipe.h>
51 
52 struct pipe_pair {
53 	struct pipe pp_wpipe;
54 	struct pipe pp_rpipe;
55 	struct rwlock pp_lock;
56 };
57 
58 /*
59  * interfaces to the outside world
60  */
61 int	pipe_read(struct file *, struct uio *, int);
62 int	pipe_write(struct file *, struct uio *, int);
63 int	pipe_close(struct file *, struct proc *);
64 int	pipe_poll(struct file *, int events, struct proc *);
65 int	pipe_kqfilter(struct file *fp, struct knote *kn);
66 int	pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
67 int	pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
68 
69 static const struct fileops pipeops = {
70 	.fo_read	= pipe_read,
71 	.fo_write	= pipe_write,
72 	.fo_ioctl	= pipe_ioctl,
73 	.fo_poll	= pipe_poll,
74 	.fo_kqfilter	= pipe_kqfilter,
75 	.fo_stat	= pipe_stat,
76 	.fo_close	= pipe_close
77 };
78 
79 void	filt_pipedetach(struct knote *kn);
80 int	filt_piperead(struct knote *kn, long hint);
81 int	filt_pipewrite(struct knote *kn, long hint);
82 int	filt_pipeexcept(struct knote *kn, long hint);
83 int	filt_pipemodify(struct kevent *kev, struct knote *kn);
84 int	filt_pipeprocess(struct knote *kn, struct kevent *kev);
85 
86 const struct filterops pipe_rfiltops = {
87 	.f_flags	= FILTEROP_ISFD | FILTEROP_MPSAFE,
88 	.f_attach	= NULL,
89 	.f_detach	= filt_pipedetach,
90 	.f_event	= filt_piperead,
91 	.f_modify	= filt_pipemodify,
92 	.f_process	= filt_pipeprocess,
93 };
94 
95 const struct filterops pipe_wfiltops = {
96 	.f_flags	= FILTEROP_ISFD | FILTEROP_MPSAFE,
97 	.f_attach	= NULL,
98 	.f_detach	= filt_pipedetach,
99 	.f_event	= filt_pipewrite,
100 	.f_modify	= filt_pipemodify,
101 	.f_process	= filt_pipeprocess,
102 };
103 
104 const struct filterops pipe_efiltops = {
105 	.f_flags	= FILTEROP_ISFD | FILTEROP_MPSAFE,
106 	.f_attach	= NULL,
107 	.f_detach	= filt_pipedetach,
108 	.f_event	= filt_pipeexcept,
109 	.f_modify	= filt_pipemodify,
110 	.f_process	= filt_pipeprocess,
111 };
112 
113 /*
114  * Default pipe buffer size(s), this can be kind-of large now because pipe
115  * space is pageable.  The pipe code will try to maintain locality of
116  * reference for performance reasons, so small amounts of outstanding I/O
117  * will not wipe the cache.
118  */
119 #define MINPIPESIZE (PIPE_SIZE/3)
120 
121 /*
122  * Limit the number of "big" pipes
123  */
124 #define LIMITBIGPIPES	32
125 unsigned int nbigpipe;
126 static unsigned int amountpipekva;
127 
128 struct pool pipe_pair_pool;
129 
130 int	dopipe(struct proc *, int *, int);
131 void	pipeselwakeup(struct pipe *);
132 
133 int	pipe_create(struct pipe *);
134 void	pipe_destroy(struct pipe *);
135 int	pipe_rundown(struct pipe *);
136 struct pipe *pipe_peer(struct pipe *);
137 int	pipe_buffer_realloc(struct pipe *, u_int);
138 void	pipe_buffer_free(struct pipe *);
139 
140 int	pipe_iolock(struct pipe *);
141 void	pipe_iounlock(struct pipe *);
142 int	pipe_iosleep(struct pipe *, const char *);
143 
144 struct pipe_pair *pipe_pair_create(void);
145 void	pipe_pair_destroy(struct pipe_pair *);
146 
147 /*
148  * The pipe system call for the DTYPE_PIPE type of pipes
149  */
150 
151 int
152 sys_pipe(struct proc *p, void *v, register_t *retval)
153 {
154 	struct sys_pipe_args /* {
155 		syscallarg(int *) fdp;
156 	} */ *uap = v;
157 
158 	return (dopipe(p, SCARG(uap, fdp), 0));
159 }
160 
161 int
162 sys_pipe2(struct proc *p, void *v, register_t *retval)
163 {
164 	struct sys_pipe2_args /* {
165 		syscallarg(int *) fdp;
166 		syscallarg(int) flags;
167 	} */ *uap = v;
168 
169 	if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK))
170 		return (EINVAL);
171 
172 	return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags)));
173 }
174 
175 int
176 dopipe(struct proc *p, int *ufds, int flags)
177 {
178 	struct filedesc *fdp = p->p_fd;
179 	struct file *rf, *wf;
180 	struct pipe_pair *pp;
181 	struct pipe *rpipe, *wpipe = NULL;
182 	int fds[2], cloexec, error;
183 
184 	cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0;
185 
186 	pp = pipe_pair_create();
187 	if (pp == NULL)
188 		return (ENOMEM);
189 	wpipe = &pp->pp_wpipe;
190 	rpipe = &pp->pp_rpipe;
191 
192 	fdplock(fdp);
193 
194 	error = falloc(p, &rf, &fds[0]);
195 	if (error != 0)
196 		goto free2;
197 	rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
198 	rf->f_type = DTYPE_PIPE;
199 	rf->f_data = rpipe;
200 	rf->f_ops = &pipeops;
201 
202 	error = falloc(p, &wf, &fds[1]);
203 	if (error != 0)
204 		goto free3;
205 	wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
206 	wf->f_type = DTYPE_PIPE;
207 	wf->f_data = wpipe;
208 	wf->f_ops = &pipeops;
209 
210 	fdinsert(fdp, fds[0], cloexec, rf);
211 	fdinsert(fdp, fds[1], cloexec, wf);
212 
213 	error = copyout(fds, ufds, sizeof(fds));
214 	if (error == 0) {
215 		fdpunlock(fdp);
216 #ifdef KTRACE
217 		if (KTRPOINT(p, KTR_STRUCT))
218 			ktrfds(p, fds, 2);
219 #endif
220 	} else {
221 		/* fdrelease() unlocks fdp. */
222 		fdrelease(p, fds[0]);
223 		fdplock(fdp);
224 		fdrelease(p, fds[1]);
225 	}
226 
227 	FRELE(rf, p);
228 	FRELE(wf, p);
229 	return (error);
230 
231 free3:
232 	fdremove(fdp, fds[0]);
233 	closef(rf, p);
234 	rpipe = NULL;
235 free2:
236 	fdpunlock(fdp);
237 	pipe_destroy(wpipe);
238 	pipe_destroy(rpipe);
239 	return (error);
240 }
241 
242 /*
243  * Allocate kva for pipe circular buffer, the space is pageable.
244  * This routine will 'realloc' the size of a pipe safely, if it fails
245  * it will retain the old buffer.
246  * If it fails it will return ENOMEM.
247  */
248 int
249 pipe_buffer_realloc(struct pipe *cpipe, u_int size)
250 {
251 	caddr_t buffer;
252 
253 	/* buffer uninitialized or pipe locked */
254 	KASSERT((cpipe->pipe_buffer.buffer == NULL) ||
255 	    (cpipe->pipe_state & PIPE_LOCK));
256 
257 	/* buffer should be empty */
258 	KASSERT(cpipe->pipe_buffer.cnt == 0);
259 
260 	KERNEL_LOCK();
261 	buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok);
262 	KERNEL_UNLOCK();
263 	if (buffer == NULL)
264 		return (ENOMEM);
265 
266 	/* free old resources if we are resizing */
267 	pipe_buffer_free(cpipe);
268 
269 	cpipe->pipe_buffer.buffer = buffer;
270 	cpipe->pipe_buffer.size = size;
271 	cpipe->pipe_buffer.in = 0;
272 	cpipe->pipe_buffer.out = 0;
273 
274 	atomic_add_int(&amountpipekva, cpipe->pipe_buffer.size);
275 
276 	return (0);
277 }
278 
279 /*
280  * initialize and allocate VM and memory for pipe
281  */
282 int
283 pipe_create(struct pipe *cpipe)
284 {
285 	int error;
286 
287 	error = pipe_buffer_realloc(cpipe, PIPE_SIZE);
288 	if (error != 0)
289 		return (error);
290 
291 	sigio_init(&cpipe->pipe_sigio);
292 
293 	getnanotime(&cpipe->pipe_ctime);
294 	cpipe->pipe_atime = cpipe->pipe_ctime;
295 	cpipe->pipe_mtime = cpipe->pipe_ctime;
296 
297 	return (0);
298 }
299 
300 struct pipe *
301 pipe_peer(struct pipe *cpipe)
302 {
303 	struct pipe *peer;
304 
305 	rw_assert_anylock(cpipe->pipe_lock);
306 
307 	peer = cpipe->pipe_peer;
308 	if (peer == NULL || (peer->pipe_state & PIPE_EOF))
309 		return (NULL);
310 	return (peer);
311 }
312 
313 /*
314  * Lock a pipe for exclusive I/O access.
315  */
316 int
317 pipe_iolock(struct pipe *cpipe)
318 {
319 	int error;
320 
321 	rw_assert_wrlock(cpipe->pipe_lock);
322 
323 	while (cpipe->pipe_state & PIPE_LOCK) {
324 		cpipe->pipe_state |= PIPE_LWANT;
325 		error = rwsleep_nsec(cpipe, cpipe->pipe_lock, PRIBIO | PCATCH,
326 		    "pipeiolk", INFSLP);
327 		if (error)
328 			return (error);
329 	}
330 	cpipe->pipe_state |= PIPE_LOCK;
331 	return (0);
332 }
333 
334 /*
335  * Unlock a pipe I/O lock.
336  */
337 void
338 pipe_iounlock(struct pipe *cpipe)
339 {
340 	rw_assert_wrlock(cpipe->pipe_lock);
341 	KASSERT(cpipe->pipe_state & PIPE_LOCK);
342 
343 	cpipe->pipe_state &= ~PIPE_LOCK;
344 	if (cpipe->pipe_state & PIPE_LWANT) {
345 		cpipe->pipe_state &= ~PIPE_LWANT;
346 		wakeup(cpipe);
347 	}
348 }
349 
350 /*
351  * Unlock the pipe I/O lock and go to sleep. Returns 0 on success and the I/O
352  * lock is relocked. Otherwise if a signal was caught, non-zero is returned and
353  * the I/O lock is not locked.
354  *
355  * Any caller must obtain a reference to the pipe by incrementing `pipe_busy'
356  * before calling this function in order ensure that the same pipe is not
357  * destroyed while sleeping.
358  */
359 int
360 pipe_iosleep(struct pipe *cpipe, const char *wmesg)
361 {
362 	int error;
363 
364 	pipe_iounlock(cpipe);
365 	error = rwsleep_nsec(cpipe, cpipe->pipe_lock, PRIBIO | PCATCH, wmesg,
366 	    INFSLP);
367 	if (error)
368 		return (error);
369 	return (pipe_iolock(cpipe));
370 }
371 
372 void
373 pipeselwakeup(struct pipe *cpipe)
374 {
375 	rw_assert_wrlock(cpipe->pipe_lock);
376 
377 	KNOTE(&cpipe->pipe_sel.si_note, 0);
378 
379 	if (cpipe->pipe_state & PIPE_ASYNC)
380 		pgsigio(&cpipe->pipe_sigio, SIGIO, 0);
381 }
382 
383 int
384 pipe_read(struct file *fp, struct uio *uio, int fflags)
385 {
386 	struct pipe *rpipe = fp->f_data;
387 	size_t nread = 0, size;
388 	int error;
389 
390 	rw_enter_write(rpipe->pipe_lock);
391 	++rpipe->pipe_busy;
392 	error = pipe_iolock(rpipe);
393 	if (error) {
394 		--rpipe->pipe_busy;
395 		pipe_rundown(rpipe);
396 		rw_exit_write(rpipe->pipe_lock);
397 		return (error);
398 	}
399 
400 	while (uio->uio_resid) {
401 		/* Normal pipe buffer receive. */
402 		if (rpipe->pipe_buffer.cnt > 0) {
403 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
404 			if (size > rpipe->pipe_buffer.cnt)
405 				size = rpipe->pipe_buffer.cnt;
406 			if (size > uio->uio_resid)
407 				size = uio->uio_resid;
408 			rw_exit_write(rpipe->pipe_lock);
409 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
410 					size, uio);
411 			rw_enter_write(rpipe->pipe_lock);
412 			if (error) {
413 				break;
414 			}
415 			rpipe->pipe_buffer.out += size;
416 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
417 				rpipe->pipe_buffer.out = 0;
418 
419 			rpipe->pipe_buffer.cnt -= size;
420 			/*
421 			 * If there is no more to read in the pipe, reset
422 			 * its pointers to the beginning.  This improves
423 			 * cache hit stats.
424 			 */
425 			if (rpipe->pipe_buffer.cnt == 0) {
426 				rpipe->pipe_buffer.in = 0;
427 				rpipe->pipe_buffer.out = 0;
428 			}
429 			nread += size;
430 		} else {
431 			/*
432 			 * detect EOF condition
433 			 * read returns 0 on EOF, no need to set error
434 			 */
435 			if (rpipe->pipe_state & PIPE_EOF)
436 				break;
437 
438 			/* If the "write-side" has been blocked, wake it up. */
439 			if (rpipe->pipe_state & PIPE_WANTW) {
440 				rpipe->pipe_state &= ~PIPE_WANTW;
441 				wakeup(rpipe);
442 			}
443 
444 			/* Break if some data was read. */
445 			if (nread > 0)
446 				break;
447 
448 			/* Handle non-blocking mode operation. */
449 			if (fp->f_flag & FNONBLOCK) {
450 				error = EAGAIN;
451 				break;
452 			}
453 
454 			/* Wait for more data. */
455 			rpipe->pipe_state |= PIPE_WANTR;
456 			error = pipe_iosleep(rpipe, "piperd");
457 			if (error)
458 				goto unlocked_error;
459 		}
460 	}
461 	pipe_iounlock(rpipe);
462 
463 	if (error == 0)
464 		getnanotime(&rpipe->pipe_atime);
465 unlocked_error:
466 	--rpipe->pipe_busy;
467 
468 	if (pipe_rundown(rpipe) == 0 && rpipe->pipe_buffer.cnt < MINPIPESIZE) {
469 		/* Handle write blocking hysteresis. */
470 		if (rpipe->pipe_state & PIPE_WANTW) {
471 			rpipe->pipe_state &= ~PIPE_WANTW;
472 			wakeup(rpipe);
473 		}
474 	}
475 
476 	if (rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt >= PIPE_BUF)
477 		pipeselwakeup(rpipe);
478 
479 	rw_exit_write(rpipe->pipe_lock);
480 	return (error);
481 }
482 
483 int
484 pipe_write(struct file *fp, struct uio *uio, int fflags)
485 {
486 	struct pipe *rpipe = fp->f_data, *wpipe;
487 	struct rwlock *lock = rpipe->pipe_lock;
488 	size_t orig_resid;
489 	int error;
490 
491 	rw_enter_write(lock);
492 	wpipe = pipe_peer(rpipe);
493 
494 	/* Detect loss of pipe read side, issue SIGPIPE if lost. */
495 	if (wpipe == NULL) {
496 		rw_exit_write(lock);
497 		return (EPIPE);
498 	}
499 
500 	++wpipe->pipe_busy;
501 	error = pipe_iolock(wpipe);
502 	if (error) {
503 		--wpipe->pipe_busy;
504 		pipe_rundown(wpipe);
505 		rw_exit_write(lock);
506 		return (error);
507 	}
508 
509 
510 	/* If it is advantageous to resize the pipe buffer, do so. */
511 	if (uio->uio_resid > PIPE_SIZE &&
512 	    wpipe->pipe_buffer.size <= PIPE_SIZE &&
513 	    wpipe->pipe_buffer.cnt == 0) {
514 	    	unsigned int npipe;
515 
516 		npipe = atomic_inc_int_nv(&nbigpipe);
517 		if (npipe > LIMITBIGPIPES ||
518 		    pipe_buffer_realloc(wpipe, BIG_PIPE_SIZE) != 0)
519 			atomic_dec_int(&nbigpipe);
520 	}
521 
522 	orig_resid = uio->uio_resid;
523 
524 	while (uio->uio_resid) {
525 		size_t space;
526 
527 		if (wpipe->pipe_state & PIPE_EOF) {
528 			error = EPIPE;
529 			break;
530 		}
531 
532 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
533 
534 		/* Writes of size <= PIPE_BUF must be atomic. */
535 		if (space < uio->uio_resid && orig_resid <= PIPE_BUF)
536 			space = 0;
537 
538 		if (space > 0) {
539 			size_t size;	/* Transfer size */
540 			size_t segsize;	/* first segment to transfer */
541 
542 			/*
543 			 * Transfer size is minimum of uio transfer
544 			 * and free space in pipe buffer.
545 			 */
546 			if (space > uio->uio_resid)
547 				size = uio->uio_resid;
548 			else
549 				size = space;
550 			/*
551 			 * First segment to transfer is minimum of
552 			 * transfer size and contiguous space in
553 			 * pipe buffer.  If first segment to transfer
554 			 * is less than the transfer size, we've got
555 			 * a wraparound in the buffer.
556 			 */
557 			segsize = wpipe->pipe_buffer.size -
558 				wpipe->pipe_buffer.in;
559 			if (segsize > size)
560 				segsize = size;
561 
562 			/* Transfer first segment */
563 
564 			rw_exit_write(lock);
565 			error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
566 					segsize, uio);
567 			rw_enter_write(lock);
568 
569 			if (error == 0 && segsize < size) {
570 				/*
571 				 * Transfer remaining part now, to
572 				 * support atomic writes.  Wraparound
573 				 * happened.
574 				 */
575 #ifdef DIAGNOSTIC
576 				if (wpipe->pipe_buffer.in + segsize !=
577 				    wpipe->pipe_buffer.size)
578 					panic("Expected pipe buffer wraparound disappeared");
579 #endif
580 
581 				rw_exit_write(lock);
582 				error = uiomove(&wpipe->pipe_buffer.buffer[0],
583 						size - segsize, uio);
584 				rw_enter_write(lock);
585 			}
586 			if (error == 0) {
587 				wpipe->pipe_buffer.in += size;
588 				if (wpipe->pipe_buffer.in >=
589 				    wpipe->pipe_buffer.size) {
590 #ifdef DIAGNOSTIC
591 					if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
592 						panic("Expected wraparound bad");
593 #endif
594 					wpipe->pipe_buffer.in = size - segsize;
595 				}
596 
597 				wpipe->pipe_buffer.cnt += size;
598 #ifdef DIAGNOSTIC
599 				if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
600 					panic("Pipe buffer overflow");
601 #endif
602 			}
603 			if (error)
604 				break;
605 		} else {
606 			/* If the "read-side" has been blocked, wake it up. */
607 			if (wpipe->pipe_state & PIPE_WANTR) {
608 				wpipe->pipe_state &= ~PIPE_WANTR;
609 				wakeup(wpipe);
610 			}
611 
612 			/* Don't block on non-blocking I/O. */
613 			if (fp->f_flag & FNONBLOCK) {
614 				error = EAGAIN;
615 				break;
616 			}
617 
618 			/*
619 			 * We have no more space and have something to offer,
620 			 * wake up select/poll.
621 			 */
622 			pipeselwakeup(wpipe);
623 
624 			wpipe->pipe_state |= PIPE_WANTW;
625 			error = pipe_iosleep(wpipe, "pipewr");
626 			if (error)
627 				goto unlocked_error;
628 
629 			/*
630 			 * If read side wants to go away, we just issue a
631 			 * signal to ourselves.
632 			 */
633 			if (wpipe->pipe_state & PIPE_EOF) {
634 				error = EPIPE;
635 				break;
636 			}
637 		}
638 	}
639 	pipe_iounlock(wpipe);
640 
641 unlocked_error:
642 	--wpipe->pipe_busy;
643 
644 	if (pipe_rundown(wpipe) == 0 && wpipe->pipe_buffer.cnt > 0) {
645 		/*
646 		 * If we have put any characters in the buffer, we wake up
647 		 * the reader.
648 		 */
649 		if (wpipe->pipe_state & PIPE_WANTR) {
650 			wpipe->pipe_state &= ~PIPE_WANTR;
651 			wakeup(wpipe);
652 		}
653 	}
654 
655 	/* Don't return EPIPE if I/O was successful. */
656 	if (wpipe->pipe_buffer.cnt == 0 &&
657 	    uio->uio_resid == 0 &&
658 	    error == EPIPE) {
659 		error = 0;
660 	}
661 
662 	if (error == 0)
663 		getnanotime(&wpipe->pipe_mtime);
664 	/* We have something to offer, wake up select/poll. */
665 	if (wpipe->pipe_buffer.cnt)
666 		pipeselwakeup(wpipe);
667 
668 	rw_exit_write(lock);
669 	return (error);
670 }
671 
672 /*
673  * we implement a very minimal set of ioctls for compatibility with sockets.
674  */
675 int
676 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
677 {
678 	struct pipe *mpipe = fp->f_data;
679 	int error = 0;
680 
681 	switch (cmd) {
682 
683 	case FIONBIO:
684 		break;
685 
686 	case FIOASYNC:
687 		rw_enter_write(mpipe->pipe_lock);
688 		if (*(int *)data) {
689 			mpipe->pipe_state |= PIPE_ASYNC;
690 		} else {
691 			mpipe->pipe_state &= ~PIPE_ASYNC;
692 		}
693 		rw_exit_write(mpipe->pipe_lock);
694 		break;
695 
696 	case FIONREAD:
697 		rw_enter_read(mpipe->pipe_lock);
698 		*(int *)data = mpipe->pipe_buffer.cnt;
699 		rw_exit_read(mpipe->pipe_lock);
700 		break;
701 
702 	case FIOSETOWN:
703 	case SIOCSPGRP:
704 	case TIOCSPGRP:
705 		error = sigio_setown(&mpipe->pipe_sigio, cmd, data);
706 		break;
707 
708 	case FIOGETOWN:
709 	case SIOCGPGRP:
710 	case TIOCGPGRP:
711 		sigio_getown(&mpipe->pipe_sigio, cmd, data);
712 		break;
713 
714 	default:
715 		error = ENOTTY;
716 	}
717 
718 	return (error);
719 }
720 
721 int
722 pipe_poll(struct file *fp, int events, struct proc *p)
723 {
724 	struct pipe *rpipe = fp->f_data, *wpipe;
725 	struct rwlock *lock = rpipe->pipe_lock;
726 	int revents = 0;
727 
728 	rw_enter_write(lock);
729 	wpipe = pipe_peer(rpipe);
730 
731 	if (events & (POLLIN | POLLRDNORM)) {
732 		if (rpipe->pipe_buffer.cnt > 0 ||
733 		    (rpipe->pipe_state & PIPE_EOF))
734 			revents |= events & (POLLIN | POLLRDNORM);
735 	}
736 
737 	/* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
738 	if ((rpipe->pipe_state & PIPE_EOF) || wpipe == NULL)
739 		revents |= POLLHUP;
740 	else if (events & (POLLOUT | POLLWRNORM)) {
741 		if (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt >= PIPE_BUF)
742 			revents |= events & (POLLOUT | POLLWRNORM);
743 	}
744 
745 	if (revents == 0) {
746 		if (events & (POLLIN | POLLRDNORM)) {
747 			selrecord(p, &rpipe->pipe_sel);
748 			rpipe->pipe_state |= PIPE_SEL;
749 		}
750 		if (events & (POLLOUT | POLLWRNORM)) {
751 			selrecord(p, &wpipe->pipe_sel);
752 			wpipe->pipe_state |= PIPE_SEL;
753 		}
754 	}
755 
756 	rw_exit_write(lock);
757 
758 	return (revents);
759 }
760 
761 int
762 pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
763 {
764 	struct pipe *pipe = fp->f_data;
765 
766 	memset(ub, 0, sizeof(*ub));
767 
768 	rw_enter_read(pipe->pipe_lock);
769 	ub->st_mode = S_IFIFO;
770 	ub->st_blksize = pipe->pipe_buffer.size;
771 	ub->st_size = pipe->pipe_buffer.cnt;
772 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
773 	ub->st_atim.tv_sec  = pipe->pipe_atime.tv_sec;
774 	ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec;
775 	ub->st_mtim.tv_sec  = pipe->pipe_mtime.tv_sec;
776 	ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec;
777 	ub->st_ctim.tv_sec  = pipe->pipe_ctime.tv_sec;
778 	ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec;
779 	ub->st_uid = fp->f_cred->cr_uid;
780 	ub->st_gid = fp->f_cred->cr_gid;
781 	rw_exit_read(pipe->pipe_lock);
782 	/*
783 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
784 	 * XXX (st_dev, st_ino) should be unique.
785 	 */
786 	return (0);
787 }
788 
789 int
790 pipe_close(struct file *fp, struct proc *p)
791 {
792 	struct pipe *cpipe = fp->f_data;
793 
794 	fp->f_ops = NULL;
795 	fp->f_data = NULL;
796 	pipe_destroy(cpipe);
797 	return (0);
798 }
799 
800 /*
801  * Free kva for pipe circular buffer.
802  * No pipe lock check as only called from pipe_buffer_realloc() and pipeclose()
803  */
804 void
805 pipe_buffer_free(struct pipe *cpipe)
806 {
807 	u_int size;
808 
809 	if (cpipe->pipe_buffer.buffer == NULL)
810 		return;
811 
812 	size = cpipe->pipe_buffer.size;
813 
814 	KERNEL_LOCK();
815 	km_free(cpipe->pipe_buffer.buffer, size, &kv_any, &kp_pageable);
816 	KERNEL_UNLOCK();
817 
818 	cpipe->pipe_buffer.buffer = NULL;
819 
820 	atomic_sub_int(&amountpipekva, size);
821 	if (size > PIPE_SIZE)
822 		atomic_dec_int(&nbigpipe);
823 }
824 
825 /*
826  * shutdown the pipe, and free resources.
827  */
828 void
829 pipe_destroy(struct pipe *cpipe)
830 {
831 	struct pipe *ppipe;
832 
833 	if (cpipe == NULL)
834 		return;
835 
836 	rw_enter_write(cpipe->pipe_lock);
837 
838 	pipeselwakeup(cpipe);
839 	sigio_free(&cpipe->pipe_sigio);
840 
841 	/*
842 	 * If the other side is blocked, wake it up saying that
843 	 * we want to close it down.
844 	 */
845 	cpipe->pipe_state |= PIPE_EOF;
846 	while (cpipe->pipe_busy) {
847 		wakeup(cpipe);
848 		cpipe->pipe_state |= PIPE_WANTD;
849 		rwsleep_nsec(cpipe, cpipe->pipe_lock, PRIBIO, "pipecl", INFSLP);
850 	}
851 
852 	/* Disconnect from peer. */
853 	if ((ppipe = cpipe->pipe_peer) != NULL) {
854 		pipeselwakeup(ppipe);
855 
856 		ppipe->pipe_state |= PIPE_EOF;
857 		wakeup(ppipe);
858 		ppipe->pipe_peer = NULL;
859 	}
860 
861 	pipe_buffer_free(cpipe);
862 
863 	rw_exit_write(cpipe->pipe_lock);
864 
865 	if (ppipe == NULL)
866 		pipe_pair_destroy(cpipe->pipe_pair);
867 }
868 
869 /*
870  * Returns non-zero if a rundown is currently ongoing.
871  */
872 int
873 pipe_rundown(struct pipe *cpipe)
874 {
875 	rw_assert_wrlock(cpipe->pipe_lock);
876 
877 	if (cpipe->pipe_busy > 0 || (cpipe->pipe_state & PIPE_WANTD) == 0)
878 		return (0);
879 
880 	/* Only wakeup pipe_destroy() once the pipe is no longer busy. */
881 	cpipe->pipe_state &= ~(PIPE_WANTD | PIPE_WANTR | PIPE_WANTW);
882 	wakeup(cpipe);
883 	return (1);
884 }
885 
886 int
887 pipe_kqfilter(struct file *fp, struct knote *kn)
888 {
889 	struct pipe *rpipe = kn->kn_fp->f_data, *wpipe;
890 	struct rwlock *lock = rpipe->pipe_lock;
891 	int error = 0;
892 
893 	rw_enter_write(lock);
894 	wpipe = pipe_peer(rpipe);
895 
896 	switch (kn->kn_filter) {
897 	case EVFILT_READ:
898 		kn->kn_fop = &pipe_rfiltops;
899 		kn->kn_hook = rpipe;
900 		klist_insert_locked(&rpipe->pipe_sel.si_note, kn);
901 		break;
902 	case EVFILT_WRITE:
903 		if (wpipe == NULL) {
904 			/* other end of pipe has been closed */
905 			error = EPIPE;
906 			break;
907 		}
908 		kn->kn_fop = &pipe_wfiltops;
909 		kn->kn_hook = wpipe;
910 		klist_insert_locked(&wpipe->pipe_sel.si_note, kn);
911 		break;
912 	case EVFILT_EXCEPT:
913 		if (kn->kn_flags & __EV_SELECT) {
914 			/* Prevent triggering exceptfds. */
915 			error = EPERM;
916 			break;
917 		}
918 		if ((kn->kn_flags & __EV_POLL) == 0) {
919 			/* Disallow usage through kevent(2). */
920 			error = EINVAL;
921 			break;
922 		}
923 		kn->kn_fop = &pipe_efiltops;
924 		kn->kn_hook = rpipe;
925 		klist_insert_locked(&rpipe->pipe_sel.si_note, kn);
926 		break;
927 	default:
928 		error = EINVAL;
929 	}
930 
931 	rw_exit_write(lock);
932 
933 	return (error);
934 }
935 
936 void
937 filt_pipedetach(struct knote *kn)
938 {
939 	struct pipe *cpipe = kn->kn_hook;
940 
941 	klist_remove(&cpipe->pipe_sel.si_note, kn);
942 }
943 
944 int
945 filt_piperead(struct knote *kn, long hint)
946 {
947 	struct pipe *rpipe = kn->kn_fp->f_data, *wpipe;
948 
949 	rw_assert_wrlock(rpipe->pipe_lock);
950 
951 	wpipe = pipe_peer(rpipe);
952 
953 	kn->kn_data = rpipe->pipe_buffer.cnt;
954 
955 	if ((rpipe->pipe_state & PIPE_EOF) || wpipe == NULL) {
956 		kn->kn_flags |= EV_EOF;
957 		if (kn->kn_flags & __EV_POLL)
958 			kn->kn_flags |= __EV_HUP;
959 		return (1);
960 	}
961 
962 	return (kn->kn_data > 0);
963 }
964 
965 int
966 filt_pipewrite(struct knote *kn, long hint)
967 {
968 	struct pipe *rpipe = kn->kn_fp->f_data, *wpipe;
969 
970 	rw_assert_wrlock(rpipe->pipe_lock);
971 
972 	wpipe = pipe_peer(rpipe);
973 
974 	if (wpipe == NULL) {
975 		kn->kn_data = 0;
976 		kn->kn_flags |= EV_EOF;
977 		if (kn->kn_flags & __EV_POLL)
978 			kn->kn_flags |= __EV_HUP;
979 		return (1);
980 	}
981 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
982 
983 	return (kn->kn_data >= PIPE_BUF);
984 }
985 
986 int
987 filt_pipeexcept(struct knote *kn, long hint)
988 {
989 	struct pipe *rpipe = kn->kn_fp->f_data, *wpipe;
990 	int active = 0;
991 
992 	rw_assert_wrlock(rpipe->pipe_lock);
993 
994 	wpipe = pipe_peer(rpipe);
995 
996 	if (kn->kn_flags & __EV_POLL) {
997 		if ((rpipe->pipe_state & PIPE_EOF) || wpipe == NULL) {
998 			kn->kn_flags |= __EV_HUP;
999 			active = 1;
1000 		}
1001 	}
1002 
1003 	return (active);
1004 }
1005 
1006 int
1007 filt_pipemodify(struct kevent *kev, struct knote *kn)
1008 {
1009 	struct pipe *rpipe = kn->kn_fp->f_data;
1010 	int active;
1011 
1012 	rw_enter_write(rpipe->pipe_lock);
1013 	active = knote_modify(kev, kn);
1014 	rw_exit_write(rpipe->pipe_lock);
1015 
1016 	return (active);
1017 }
1018 
1019 int
1020 filt_pipeprocess(struct knote *kn, struct kevent *kev)
1021 {
1022 	struct pipe *rpipe = kn->kn_fp->f_data;
1023 	int active;
1024 
1025 	rw_enter_write(rpipe->pipe_lock);
1026 	active = knote_process(kn, kev);
1027 	rw_exit_write(rpipe->pipe_lock);
1028 
1029 	return (active);
1030 }
1031 
1032 void
1033 pipe_init(void)
1034 {
1035 	pool_init(&pipe_pair_pool, sizeof(struct pipe_pair), 0, IPL_MPFLOOR,
1036 	    PR_WAITOK, "pipepl", NULL);
1037 }
1038 
1039 struct pipe_pair *
1040 pipe_pair_create(void)
1041 {
1042 	struct pipe_pair *pp;
1043 
1044 	pp = pool_get(&pipe_pair_pool, PR_WAITOK | PR_ZERO);
1045 	pp->pp_wpipe.pipe_pair = pp;
1046 	pp->pp_rpipe.pipe_pair = pp;
1047 	pp->pp_wpipe.pipe_peer = &pp->pp_rpipe;
1048 	pp->pp_rpipe.pipe_peer = &pp->pp_wpipe;
1049 	/*
1050 	 * One lock is used per pipe pair in order to obtain exclusive access to
1051 	 * the pipe pair.
1052 	 */
1053 	rw_init(&pp->pp_lock, "pipelk");
1054 	pp->pp_wpipe.pipe_lock = &pp->pp_lock;
1055 	pp->pp_rpipe.pipe_lock = &pp->pp_lock;
1056 
1057 	klist_init_rwlock(&pp->pp_wpipe.pipe_sel.si_note, &pp->pp_lock);
1058 	klist_init_rwlock(&pp->pp_rpipe.pipe_sel.si_note, &pp->pp_lock);
1059 
1060 	if (pipe_create(&pp->pp_wpipe) || pipe_create(&pp->pp_rpipe))
1061 		goto err;
1062 	return (pp);
1063 err:
1064 	pipe_destroy(&pp->pp_wpipe);
1065 	pipe_destroy(&pp->pp_rpipe);
1066 	return (NULL);
1067 }
1068 
1069 void
1070 pipe_pair_destroy(struct pipe_pair *pp)
1071 {
1072 	klist_free(&pp->pp_wpipe.pipe_sel.si_note);
1073 	klist_free(&pp->pp_rpipe.pipe_sel.si_note);
1074 	pool_put(&pipe_pair_pool, pp);
1075 }
1076