xref: /openbsd-src/sys/kern/sys_pipe.c (revision ef70a379631ec7e97481e1d0e500e21496e6c4aa)
1 /*	$OpenBSD: sys_pipe.c,v 1.129 2021/10/24 06:59:54 visa Exp $	*/
2 
3 /*
4  * Copyright (c) 1996 John S. Dyson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice immediately at the beginning of the file, without modification,
12  *    this list of conditions, and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Absolutely no warranty of function or purpose is made by the author
17  *    John S. Dyson.
18  * 4. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/pool.h>
36 #include <sys/ioctl.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/mount.h>
40 #include <sys/syscallargs.h>
41 #include <sys/event.h>
42 #include <sys/lock.h>
43 #include <sys/poll.h>
44 #ifdef KTRACE
45 #include <sys/ktrace.h>
46 #endif
47 
48 #include <uvm/uvm_extern.h>
49 
50 #include <sys/pipe.h>
51 
52 struct pipe_pair {
53 	struct pipe pp_wpipe;
54 	struct pipe pp_rpipe;
55 	struct rwlock pp_lock;
56 };
57 
58 /*
59  * interfaces to the outside world
60  */
61 int	pipe_read(struct file *, struct uio *, int);
62 int	pipe_write(struct file *, struct uio *, int);
63 int	pipe_close(struct file *, struct proc *);
64 int	pipe_poll(struct file *, int events, struct proc *);
65 int	pipe_kqfilter(struct file *fp, struct knote *kn);
66 int	pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
67 int	pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
68 
69 static const struct fileops pipeops = {
70 	.fo_read	= pipe_read,
71 	.fo_write	= pipe_write,
72 	.fo_ioctl	= pipe_ioctl,
73 	.fo_poll	= pipe_poll,
74 	.fo_kqfilter	= pipe_kqfilter,
75 	.fo_stat	= pipe_stat,
76 	.fo_close	= pipe_close
77 };
78 
79 void	filt_pipedetach(struct knote *kn);
80 int	filt_piperead(struct knote *kn, long hint);
81 int	filt_pipereadmodify(struct kevent *kev, struct knote *kn);
82 int	filt_pipereadprocess(struct knote *kn, struct kevent *kev);
83 int	filt_piperead_common(struct knote *kn, struct pipe *rpipe);
84 int	filt_pipewrite(struct knote *kn, long hint);
85 int	filt_pipewritemodify(struct kevent *kev, struct knote *kn);
86 int	filt_pipewriteprocess(struct knote *kn, struct kevent *kev);
87 int	filt_pipewrite_common(struct knote *kn, struct pipe *rpipe);
88 
89 const struct filterops pipe_rfiltops = {
90 	.f_flags	= FILTEROP_ISFD | FILTEROP_MPSAFE,
91 	.f_attach	= NULL,
92 	.f_detach	= filt_pipedetach,
93 	.f_event	= filt_piperead,
94 	.f_modify	= filt_pipereadmodify,
95 	.f_process	= filt_pipereadprocess,
96 };
97 
98 const struct filterops pipe_wfiltops = {
99 	.f_flags	= FILTEROP_ISFD | FILTEROP_MPSAFE,
100 	.f_attach	= NULL,
101 	.f_detach	= filt_pipedetach,
102 	.f_event	= filt_pipewrite,
103 	.f_modify	= filt_pipewritemodify,
104 	.f_process	= filt_pipewriteprocess,
105 };
106 
107 /*
108  * Default pipe buffer size(s), this can be kind-of large now because pipe
109  * space is pageable.  The pipe code will try to maintain locality of
110  * reference for performance reasons, so small amounts of outstanding I/O
111  * will not wipe the cache.
112  */
113 #define MINPIPESIZE (PIPE_SIZE/3)
114 
115 /*
116  * Limit the number of "big" pipes
117  */
118 #define LIMITBIGPIPES	32
119 unsigned int nbigpipe;
120 static unsigned int amountpipekva;
121 
122 struct pool pipe_pair_pool;
123 
124 int	dopipe(struct proc *, int *, int);
125 void	pipeselwakeup(struct pipe *);
126 
127 int	pipe_create(struct pipe *);
128 void	pipe_destroy(struct pipe *);
129 int	pipe_rundown(struct pipe *);
130 struct pipe *pipe_peer(struct pipe *);
131 int	pipe_buffer_realloc(struct pipe *, u_int);
132 void	pipe_buffer_free(struct pipe *);
133 
134 int	pipe_iolock(struct pipe *);
135 void	pipe_iounlock(struct pipe *);
136 int	pipe_iosleep(struct pipe *, const char *);
137 
138 struct pipe_pair *pipe_pair_create(void);
139 void	pipe_pair_destroy(struct pipe_pair *);
140 
141 /*
142  * The pipe system call for the DTYPE_PIPE type of pipes
143  */
144 
145 int
146 sys_pipe(struct proc *p, void *v, register_t *retval)
147 {
148 	struct sys_pipe_args /* {
149 		syscallarg(int *) fdp;
150 	} */ *uap = v;
151 
152 	return (dopipe(p, SCARG(uap, fdp), 0));
153 }
154 
155 int
156 sys_pipe2(struct proc *p, void *v, register_t *retval)
157 {
158 	struct sys_pipe2_args /* {
159 		syscallarg(int *) fdp;
160 		syscallarg(int) flags;
161 	} */ *uap = v;
162 
163 	if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK))
164 		return (EINVAL);
165 
166 	return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags)));
167 }
168 
169 int
170 dopipe(struct proc *p, int *ufds, int flags)
171 {
172 	struct filedesc *fdp = p->p_fd;
173 	struct file *rf, *wf;
174 	struct pipe_pair *pp;
175 	struct pipe *rpipe, *wpipe = NULL;
176 	int fds[2], cloexec, error;
177 
178 	cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0;
179 
180 	pp = pipe_pair_create();
181 	if (pp == NULL)
182 		return (ENOMEM);
183 	wpipe = &pp->pp_wpipe;
184 	rpipe = &pp->pp_rpipe;
185 
186 	fdplock(fdp);
187 
188 	error = falloc(p, &rf, &fds[0]);
189 	if (error != 0)
190 		goto free2;
191 	rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
192 	rf->f_type = DTYPE_PIPE;
193 	rf->f_data = rpipe;
194 	rf->f_ops = &pipeops;
195 
196 	error = falloc(p, &wf, &fds[1]);
197 	if (error != 0)
198 		goto free3;
199 	wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
200 	wf->f_type = DTYPE_PIPE;
201 	wf->f_data = wpipe;
202 	wf->f_ops = &pipeops;
203 
204 	fdinsert(fdp, fds[0], cloexec, rf);
205 	fdinsert(fdp, fds[1], cloexec, wf);
206 
207 	error = copyout(fds, ufds, sizeof(fds));
208 	if (error == 0) {
209 		fdpunlock(fdp);
210 #ifdef KTRACE
211 		if (KTRPOINT(p, KTR_STRUCT))
212 			ktrfds(p, fds, 2);
213 #endif
214 	} else {
215 		/* fdrelease() unlocks fdp. */
216 		fdrelease(p, fds[0]);
217 		fdplock(fdp);
218 		fdrelease(p, fds[1]);
219 	}
220 
221 	FRELE(rf, p);
222 	FRELE(wf, p);
223 	return (error);
224 
225 free3:
226 	fdremove(fdp, fds[0]);
227 	closef(rf, p);
228 	rpipe = NULL;
229 free2:
230 	fdpunlock(fdp);
231 	pipe_destroy(wpipe);
232 	pipe_destroy(rpipe);
233 	return (error);
234 }
235 
236 /*
237  * Allocate kva for pipe circular buffer, the space is pageable.
238  * This routine will 'realloc' the size of a pipe safely, if it fails
239  * it will retain the old buffer.
240  * If it fails it will return ENOMEM.
241  */
242 int
243 pipe_buffer_realloc(struct pipe *cpipe, u_int size)
244 {
245 	caddr_t buffer;
246 
247 	/* buffer uninitialized or pipe locked */
248 	KASSERT((cpipe->pipe_buffer.buffer == NULL) ||
249 	    (cpipe->pipe_state & PIPE_LOCK));
250 
251 	/* buffer should be empty */
252 	KASSERT(cpipe->pipe_buffer.cnt == 0);
253 
254 	KERNEL_LOCK();
255 	buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok);
256 	KERNEL_UNLOCK();
257 	if (buffer == NULL)
258 		return (ENOMEM);
259 
260 	/* free old resources if we are resizing */
261 	pipe_buffer_free(cpipe);
262 
263 	cpipe->pipe_buffer.buffer = buffer;
264 	cpipe->pipe_buffer.size = size;
265 	cpipe->pipe_buffer.in = 0;
266 	cpipe->pipe_buffer.out = 0;
267 
268 	atomic_add_int(&amountpipekva, cpipe->pipe_buffer.size);
269 
270 	return (0);
271 }
272 
273 /*
274  * initialize and allocate VM and memory for pipe
275  */
276 int
277 pipe_create(struct pipe *cpipe)
278 {
279 	int error;
280 
281 	error = pipe_buffer_realloc(cpipe, PIPE_SIZE);
282 	if (error != 0)
283 		return (error);
284 
285 	sigio_init(&cpipe->pipe_sigio);
286 
287 	getnanotime(&cpipe->pipe_ctime);
288 	cpipe->pipe_atime = cpipe->pipe_ctime;
289 	cpipe->pipe_mtime = cpipe->pipe_ctime;
290 
291 	return (0);
292 }
293 
294 struct pipe *
295 pipe_peer(struct pipe *cpipe)
296 {
297 	struct pipe *peer;
298 
299 	rw_assert_anylock(cpipe->pipe_lock);
300 
301 	peer = cpipe->pipe_peer;
302 	if (peer == NULL || (peer->pipe_state & PIPE_EOF))
303 		return (NULL);
304 	return (peer);
305 }
306 
307 /*
308  * Lock a pipe for exclusive I/O access.
309  */
310 int
311 pipe_iolock(struct pipe *cpipe)
312 {
313 	int error;
314 
315 	rw_assert_wrlock(cpipe->pipe_lock);
316 
317 	while (cpipe->pipe_state & PIPE_LOCK) {
318 		cpipe->pipe_state |= PIPE_LWANT;
319 		error = rwsleep_nsec(cpipe, cpipe->pipe_lock, PRIBIO | PCATCH,
320 		    "pipeiolk", INFSLP);
321 		if (error)
322 			return (error);
323 	}
324 	cpipe->pipe_state |= PIPE_LOCK;
325 	return (0);
326 }
327 
328 /*
329  * Unlock a pipe I/O lock.
330  */
331 void
332 pipe_iounlock(struct pipe *cpipe)
333 {
334 	rw_assert_wrlock(cpipe->pipe_lock);
335 	KASSERT(cpipe->pipe_state & PIPE_LOCK);
336 
337 	cpipe->pipe_state &= ~PIPE_LOCK;
338 	if (cpipe->pipe_state & PIPE_LWANT) {
339 		cpipe->pipe_state &= ~PIPE_LWANT;
340 		wakeup(cpipe);
341 	}
342 }
343 
344 /*
345  * Unlock the pipe I/O lock and go to sleep. Returns 0 on success and the I/O
346  * lock is relocked. Otherwise if a signal was caught, non-zero is returned and
347  * the I/O lock is not locked.
348  *
349  * Any caller must obtain a reference to the pipe by incrementing `pipe_busy'
350  * before calling this function in order ensure that the same pipe is not
351  * destroyed while sleeping.
352  */
353 int
354 pipe_iosleep(struct pipe *cpipe, const char *wmesg)
355 {
356 	int error;
357 
358 	pipe_iounlock(cpipe);
359 	error = rwsleep_nsec(cpipe, cpipe->pipe_lock, PRIBIO | PCATCH, wmesg,
360 	    INFSLP);
361 	if (error)
362 		return (error);
363 	return (pipe_iolock(cpipe));
364 }
365 
366 void
367 pipeselwakeup(struct pipe *cpipe)
368 {
369 	rw_assert_wrlock(cpipe->pipe_lock);
370 
371 	if (cpipe->pipe_state & PIPE_SEL) {
372 		cpipe->pipe_state &= ~PIPE_SEL;
373 		selwakeup(&cpipe->pipe_sel);
374 	} else {
375 		KNOTE(&cpipe->pipe_sel.si_note, 0);
376 	}
377 
378 	if (cpipe->pipe_state & PIPE_ASYNC)
379 		pgsigio(&cpipe->pipe_sigio, SIGIO, 0);
380 }
381 
382 int
383 pipe_read(struct file *fp, struct uio *uio, int fflags)
384 {
385 	struct pipe *rpipe = fp->f_data;
386 	size_t nread = 0, size;
387 	int error;
388 
389 	rw_enter_write(rpipe->pipe_lock);
390 	++rpipe->pipe_busy;
391 	error = pipe_iolock(rpipe);
392 	if (error) {
393 		--rpipe->pipe_busy;
394 		pipe_rundown(rpipe);
395 		rw_exit_write(rpipe->pipe_lock);
396 		return (error);
397 	}
398 
399 	while (uio->uio_resid) {
400 		/* Normal pipe buffer receive. */
401 		if (rpipe->pipe_buffer.cnt > 0) {
402 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
403 			if (size > rpipe->pipe_buffer.cnt)
404 				size = rpipe->pipe_buffer.cnt;
405 			if (size > uio->uio_resid)
406 				size = uio->uio_resid;
407 			rw_exit_write(rpipe->pipe_lock);
408 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
409 					size, uio);
410 			rw_enter_write(rpipe->pipe_lock);
411 			if (error) {
412 				break;
413 			}
414 			rpipe->pipe_buffer.out += size;
415 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
416 				rpipe->pipe_buffer.out = 0;
417 
418 			rpipe->pipe_buffer.cnt -= size;
419 			/*
420 			 * If there is no more to read in the pipe, reset
421 			 * its pointers to the beginning.  This improves
422 			 * cache hit stats.
423 			 */
424 			if (rpipe->pipe_buffer.cnt == 0) {
425 				rpipe->pipe_buffer.in = 0;
426 				rpipe->pipe_buffer.out = 0;
427 			}
428 			nread += size;
429 		} else {
430 			/*
431 			 * detect EOF condition
432 			 * read returns 0 on EOF, no need to set error
433 			 */
434 			if (rpipe->pipe_state & PIPE_EOF)
435 				break;
436 
437 			/* If the "write-side" has been blocked, wake it up. */
438 			if (rpipe->pipe_state & PIPE_WANTW) {
439 				rpipe->pipe_state &= ~PIPE_WANTW;
440 				wakeup(rpipe);
441 			}
442 
443 			/* Break if some data was read. */
444 			if (nread > 0)
445 				break;
446 
447 			/* Handle non-blocking mode operation. */
448 			if (fp->f_flag & FNONBLOCK) {
449 				error = EAGAIN;
450 				break;
451 			}
452 
453 			/* Wait for more data. */
454 			rpipe->pipe_state |= PIPE_WANTR;
455 			error = pipe_iosleep(rpipe, "piperd");
456 			if (error)
457 				goto unlocked_error;
458 		}
459 	}
460 	pipe_iounlock(rpipe);
461 
462 	if (error == 0)
463 		getnanotime(&rpipe->pipe_atime);
464 unlocked_error:
465 	--rpipe->pipe_busy;
466 
467 	if (pipe_rundown(rpipe) == 0 && rpipe->pipe_buffer.cnt < MINPIPESIZE) {
468 		/* Handle write blocking hysteresis. */
469 		if (rpipe->pipe_state & PIPE_WANTW) {
470 			rpipe->pipe_state &= ~PIPE_WANTW;
471 			wakeup(rpipe);
472 		}
473 	}
474 
475 	if (rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt >= PIPE_BUF)
476 		pipeselwakeup(rpipe);
477 
478 	rw_exit_write(rpipe->pipe_lock);
479 	return (error);
480 }
481 
482 int
483 pipe_write(struct file *fp, struct uio *uio, int fflags)
484 {
485 	struct pipe *rpipe = fp->f_data, *wpipe;
486 	struct rwlock *lock = rpipe->pipe_lock;
487 	size_t orig_resid;
488 	int error;
489 
490 	rw_enter_write(lock);
491 	wpipe = pipe_peer(rpipe);
492 
493 	/* Detect loss of pipe read side, issue SIGPIPE if lost. */
494 	if (wpipe == NULL) {
495 		rw_exit_write(lock);
496 		return (EPIPE);
497 	}
498 
499 	++wpipe->pipe_busy;
500 	error = pipe_iolock(wpipe);
501 	if (error) {
502 		--wpipe->pipe_busy;
503 		pipe_rundown(wpipe);
504 		rw_exit_write(lock);
505 		return (error);
506 	}
507 
508 
509 	/* If it is advantageous to resize the pipe buffer, do so. */
510 	if (uio->uio_resid > PIPE_SIZE &&
511 	    wpipe->pipe_buffer.size <= PIPE_SIZE &&
512 	    wpipe->pipe_buffer.cnt == 0) {
513 	    	unsigned int npipe;
514 
515 		npipe = atomic_inc_int_nv(&nbigpipe);
516 		if (npipe > LIMITBIGPIPES ||
517 		    pipe_buffer_realloc(wpipe, BIG_PIPE_SIZE) != 0)
518 			atomic_dec_int(&nbigpipe);
519 	}
520 
521 	orig_resid = uio->uio_resid;
522 
523 	while (uio->uio_resid) {
524 		size_t space;
525 
526 		if (wpipe->pipe_state & PIPE_EOF) {
527 			error = EPIPE;
528 			break;
529 		}
530 
531 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
532 
533 		/* Writes of size <= PIPE_BUF must be atomic. */
534 		if (space < uio->uio_resid && orig_resid <= PIPE_BUF)
535 			space = 0;
536 
537 		if (space > 0) {
538 			size_t size;	/* Transfer size */
539 			size_t segsize;	/* first segment to transfer */
540 
541 			/*
542 			 * Transfer size is minimum of uio transfer
543 			 * and free space in pipe buffer.
544 			 */
545 			if (space > uio->uio_resid)
546 				size = uio->uio_resid;
547 			else
548 				size = space;
549 			/*
550 			 * First segment to transfer is minimum of
551 			 * transfer size and contiguous space in
552 			 * pipe buffer.  If first segment to transfer
553 			 * is less than the transfer size, we've got
554 			 * a wraparound in the buffer.
555 			 */
556 			segsize = wpipe->pipe_buffer.size -
557 				wpipe->pipe_buffer.in;
558 			if (segsize > size)
559 				segsize = size;
560 
561 			/* Transfer first segment */
562 
563 			rw_exit_write(lock);
564 			error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
565 					segsize, uio);
566 			rw_enter_write(lock);
567 
568 			if (error == 0 && segsize < size) {
569 				/*
570 				 * Transfer remaining part now, to
571 				 * support atomic writes.  Wraparound
572 				 * happened.
573 				 */
574 #ifdef DIAGNOSTIC
575 				if (wpipe->pipe_buffer.in + segsize !=
576 				    wpipe->pipe_buffer.size)
577 					panic("Expected pipe buffer wraparound disappeared");
578 #endif
579 
580 				rw_exit_write(lock);
581 				error = uiomove(&wpipe->pipe_buffer.buffer[0],
582 						size - segsize, uio);
583 				rw_enter_write(lock);
584 			}
585 			if (error == 0) {
586 				wpipe->pipe_buffer.in += size;
587 				if (wpipe->pipe_buffer.in >=
588 				    wpipe->pipe_buffer.size) {
589 #ifdef DIAGNOSTIC
590 					if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
591 						panic("Expected wraparound bad");
592 #endif
593 					wpipe->pipe_buffer.in = size - segsize;
594 				}
595 
596 				wpipe->pipe_buffer.cnt += size;
597 #ifdef DIAGNOSTIC
598 				if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
599 					panic("Pipe buffer overflow");
600 #endif
601 			}
602 			if (error)
603 				break;
604 		} else {
605 			/* If the "read-side" has been blocked, wake it up. */
606 			if (wpipe->pipe_state & PIPE_WANTR) {
607 				wpipe->pipe_state &= ~PIPE_WANTR;
608 				wakeup(wpipe);
609 			}
610 
611 			/* Don't block on non-blocking I/O. */
612 			if (fp->f_flag & FNONBLOCK) {
613 				error = EAGAIN;
614 				break;
615 			}
616 
617 			/*
618 			 * We have no more space and have something to offer,
619 			 * wake up select/poll.
620 			 */
621 			pipeselwakeup(wpipe);
622 
623 			wpipe->pipe_state |= PIPE_WANTW;
624 			error = pipe_iosleep(wpipe, "pipewr");
625 			if (error)
626 				goto unlocked_error;
627 
628 			/*
629 			 * If read side wants to go away, we just issue a
630 			 * signal to ourselves.
631 			 */
632 			if (wpipe->pipe_state & PIPE_EOF) {
633 				error = EPIPE;
634 				break;
635 			}
636 		}
637 	}
638 	pipe_iounlock(wpipe);
639 
640 unlocked_error:
641 	--wpipe->pipe_busy;
642 
643 	if (pipe_rundown(wpipe) == 0 && wpipe->pipe_buffer.cnt > 0) {
644 		/*
645 		 * If we have put any characters in the buffer, we wake up
646 		 * the reader.
647 		 */
648 		if (wpipe->pipe_state & PIPE_WANTR) {
649 			wpipe->pipe_state &= ~PIPE_WANTR;
650 			wakeup(wpipe);
651 		}
652 	}
653 
654 	/* Don't return EPIPE if I/O was successful. */
655 	if (wpipe->pipe_buffer.cnt == 0 &&
656 	    uio->uio_resid == 0 &&
657 	    error == EPIPE) {
658 		error = 0;
659 	}
660 
661 	if (error == 0)
662 		getnanotime(&wpipe->pipe_mtime);
663 	/* We have something to offer, wake up select/poll. */
664 	if (wpipe->pipe_buffer.cnt)
665 		pipeselwakeup(wpipe);
666 
667 	rw_exit_write(lock);
668 	return (error);
669 }
670 
671 /*
672  * we implement a very minimal set of ioctls for compatibility with sockets.
673  */
674 int
675 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
676 {
677 	struct pipe *mpipe = fp->f_data;
678 	int error = 0;
679 
680 	switch (cmd) {
681 
682 	case FIONBIO:
683 		break;
684 
685 	case FIOASYNC:
686 		rw_enter_write(mpipe->pipe_lock);
687 		if (*(int *)data) {
688 			mpipe->pipe_state |= PIPE_ASYNC;
689 		} else {
690 			mpipe->pipe_state &= ~PIPE_ASYNC;
691 		}
692 		rw_exit_write(mpipe->pipe_lock);
693 		break;
694 
695 	case FIONREAD:
696 		rw_enter_read(mpipe->pipe_lock);
697 		*(int *)data = mpipe->pipe_buffer.cnt;
698 		rw_exit_read(mpipe->pipe_lock);
699 		break;
700 
701 	case FIOSETOWN:
702 	case SIOCSPGRP:
703 	case TIOCSPGRP:
704 		error = sigio_setown(&mpipe->pipe_sigio, cmd, data);
705 		break;
706 
707 	case FIOGETOWN:
708 	case SIOCGPGRP:
709 	case TIOCGPGRP:
710 		sigio_getown(&mpipe->pipe_sigio, cmd, data);
711 		break;
712 
713 	default:
714 		error = ENOTTY;
715 	}
716 
717 	return (error);
718 }
719 
720 int
721 pipe_poll(struct file *fp, int events, struct proc *p)
722 {
723 	struct pipe *rpipe = fp->f_data, *wpipe;
724 	struct rwlock *lock = rpipe->pipe_lock;
725 	int revents = 0;
726 
727 	rw_enter_write(lock);
728 	wpipe = pipe_peer(rpipe);
729 
730 	if (events & (POLLIN | POLLRDNORM)) {
731 		if (rpipe->pipe_buffer.cnt > 0 ||
732 		    (rpipe->pipe_state & PIPE_EOF))
733 			revents |= events & (POLLIN | POLLRDNORM);
734 	}
735 
736 	/* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
737 	if ((rpipe->pipe_state & PIPE_EOF) || wpipe == NULL)
738 		revents |= POLLHUP;
739 	else if (events & (POLLOUT | POLLWRNORM)) {
740 		if (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt >= PIPE_BUF)
741 			revents |= events & (POLLOUT | POLLWRNORM);
742 	}
743 
744 	if (revents == 0) {
745 		if (events & (POLLIN | POLLRDNORM)) {
746 			selrecord(p, &rpipe->pipe_sel);
747 			rpipe->pipe_state |= PIPE_SEL;
748 		}
749 		if (events & (POLLOUT | POLLWRNORM)) {
750 			selrecord(p, &wpipe->pipe_sel);
751 			wpipe->pipe_state |= PIPE_SEL;
752 		}
753 	}
754 
755 	rw_exit_write(lock);
756 
757 	return (revents);
758 }
759 
760 int
761 pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
762 {
763 	struct pipe *pipe = fp->f_data;
764 
765 	memset(ub, 0, sizeof(*ub));
766 
767 	rw_enter_read(pipe->pipe_lock);
768 	ub->st_mode = S_IFIFO;
769 	ub->st_blksize = pipe->pipe_buffer.size;
770 	ub->st_size = pipe->pipe_buffer.cnt;
771 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
772 	ub->st_atim.tv_sec  = pipe->pipe_atime.tv_sec;
773 	ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec;
774 	ub->st_mtim.tv_sec  = pipe->pipe_mtime.tv_sec;
775 	ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec;
776 	ub->st_ctim.tv_sec  = pipe->pipe_ctime.tv_sec;
777 	ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec;
778 	ub->st_uid = fp->f_cred->cr_uid;
779 	ub->st_gid = fp->f_cred->cr_gid;
780 	rw_exit_read(pipe->pipe_lock);
781 	/*
782 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
783 	 * XXX (st_dev, st_ino) should be unique.
784 	 */
785 	return (0);
786 }
787 
788 int
789 pipe_close(struct file *fp, struct proc *p)
790 {
791 	struct pipe *cpipe = fp->f_data;
792 
793 	fp->f_ops = NULL;
794 	fp->f_data = NULL;
795 	pipe_destroy(cpipe);
796 	return (0);
797 }
798 
799 /*
800  * Free kva for pipe circular buffer.
801  * No pipe lock check as only called from pipe_buffer_realloc() and pipeclose()
802  */
803 void
804 pipe_buffer_free(struct pipe *cpipe)
805 {
806 	u_int size;
807 
808 	if (cpipe->pipe_buffer.buffer == NULL)
809 		return;
810 
811 	size = cpipe->pipe_buffer.size;
812 
813 	KERNEL_LOCK();
814 	km_free(cpipe->pipe_buffer.buffer, size, &kv_any, &kp_pageable);
815 	KERNEL_UNLOCK();
816 
817 	cpipe->pipe_buffer.buffer = NULL;
818 
819 	atomic_sub_int(&amountpipekva, size);
820 	if (size > PIPE_SIZE)
821 		atomic_dec_int(&nbigpipe);
822 }
823 
824 /*
825  * shutdown the pipe, and free resources.
826  */
827 void
828 pipe_destroy(struct pipe *cpipe)
829 {
830 	struct pipe *ppipe;
831 
832 	if (cpipe == NULL)
833 		return;
834 
835 	rw_enter_write(cpipe->pipe_lock);
836 
837 	pipeselwakeup(cpipe);
838 	sigio_free(&cpipe->pipe_sigio);
839 
840 	/*
841 	 * If the other side is blocked, wake it up saying that
842 	 * we want to close it down.
843 	 */
844 	cpipe->pipe_state |= PIPE_EOF;
845 	while (cpipe->pipe_busy) {
846 		wakeup(cpipe);
847 		cpipe->pipe_state |= PIPE_WANTD;
848 		rwsleep_nsec(cpipe, cpipe->pipe_lock, PRIBIO, "pipecl", INFSLP);
849 	}
850 
851 	/* Disconnect from peer. */
852 	if ((ppipe = cpipe->pipe_peer) != NULL) {
853 		pipeselwakeup(ppipe);
854 
855 		ppipe->pipe_state |= PIPE_EOF;
856 		wakeup(ppipe);
857 		ppipe->pipe_peer = NULL;
858 	}
859 
860 	pipe_buffer_free(cpipe);
861 
862 	rw_exit_write(cpipe->pipe_lock);
863 
864 	if (ppipe == NULL)
865 		pipe_pair_destroy(cpipe->pipe_pair);
866 }
867 
868 /*
869  * Returns non-zero if a rundown is currently ongoing.
870  */
871 int
872 pipe_rundown(struct pipe *cpipe)
873 {
874 	rw_assert_wrlock(cpipe->pipe_lock);
875 
876 	if (cpipe->pipe_busy > 0 || (cpipe->pipe_state & PIPE_WANTD) == 0)
877 		return (0);
878 
879 	/* Only wakeup pipe_destroy() once the pipe is no longer busy. */
880 	cpipe->pipe_state &= ~(PIPE_WANTD | PIPE_WANTR | PIPE_WANTW);
881 	wakeup(cpipe);
882 	return (1);
883 }
884 
885 int
886 pipe_kqfilter(struct file *fp, struct knote *kn)
887 {
888 	struct pipe *rpipe = kn->kn_fp->f_data, *wpipe;
889 	struct rwlock *lock = rpipe->pipe_lock;
890 	int error = 0;
891 
892 	rw_enter_write(lock);
893 	wpipe = pipe_peer(rpipe);
894 
895 	switch (kn->kn_filter) {
896 	case EVFILT_READ:
897 		kn->kn_fop = &pipe_rfiltops;
898 		kn->kn_hook = rpipe;
899 		klist_insert_locked(&rpipe->pipe_sel.si_note, kn);
900 		break;
901 	case EVFILT_WRITE:
902 		if (wpipe == NULL) {
903 			/* other end of pipe has been closed */
904 			error = EPIPE;
905 			break;
906 		}
907 		kn->kn_fop = &pipe_wfiltops;
908 		kn->kn_hook = wpipe;
909 		klist_insert_locked(&wpipe->pipe_sel.si_note, kn);
910 		break;
911 	default:
912 		error = EINVAL;
913 	}
914 
915 	rw_exit_write(lock);
916 
917 	return (error);
918 }
919 
920 void
921 filt_pipedetach(struct knote *kn)
922 {
923 	struct pipe *cpipe = kn->kn_hook;
924 
925 	klist_remove(&cpipe->pipe_sel.si_note, kn);
926 }
927 
928 int
929 filt_piperead_common(struct knote *kn, struct pipe *rpipe)
930 {
931 	struct pipe *wpipe;
932 
933 	rw_assert_wrlock(rpipe->pipe_lock);
934 
935 	wpipe = pipe_peer(rpipe);
936 
937 	kn->kn_data = rpipe->pipe_buffer.cnt;
938 
939 	if ((rpipe->pipe_state & PIPE_EOF) || wpipe == NULL) {
940 		kn->kn_flags |= EV_EOF;
941 		if (kn->kn_flags & __EV_POLL)
942 			kn->kn_flags |= __EV_HUP;
943 		return (1);
944 	}
945 
946 	return (kn->kn_data > 0);
947 }
948 
949 int
950 filt_piperead(struct knote *kn, long hint)
951 {
952 	struct pipe *rpipe = kn->kn_fp->f_data;
953 
954 	return (filt_piperead_common(kn, rpipe));
955 }
956 
957 int
958 filt_pipereadmodify(struct kevent *kev, struct knote *kn)
959 {
960 	struct pipe *rpipe = kn->kn_fp->f_data;
961 	int active;
962 
963 	rw_enter_write(rpipe->pipe_lock);
964 	knote_modify(kev, kn);
965 	active = filt_piperead_common(kn, rpipe);
966 	rw_exit_write(rpipe->pipe_lock);
967 
968 	return (active);
969 }
970 
971 int
972 filt_pipereadprocess(struct knote *kn, struct kevent *kev)
973 {
974 	struct pipe *rpipe = kn->kn_fp->f_data;
975 	int active;
976 
977 	rw_enter_write(rpipe->pipe_lock);
978 	if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
979 		active = 1;
980 	else
981 		active = filt_piperead_common(kn, rpipe);
982 	if (active)
983 		knote_submit(kn, kev);
984 	rw_exit_write(rpipe->pipe_lock);
985 
986 	return (active);
987 }
988 
989 int
990 filt_pipewrite_common(struct knote *kn, struct pipe *rpipe)
991 {
992 	struct pipe *wpipe;
993 
994 	rw_assert_wrlock(rpipe->pipe_lock);
995 
996 	wpipe = pipe_peer(rpipe);
997 
998 	if (wpipe == NULL) {
999 		kn->kn_data = 0;
1000 		kn->kn_flags |= EV_EOF;
1001 		if (kn->kn_flags & __EV_POLL)
1002 			kn->kn_flags |= __EV_HUP;
1003 		return (1);
1004 	}
1005 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1006 
1007 	return (kn->kn_data >= PIPE_BUF);
1008 }
1009 
1010 int
1011 filt_pipewrite(struct knote *kn, long hint)
1012 {
1013 	struct pipe *rpipe = kn->kn_fp->f_data;
1014 
1015 	return (filt_pipewrite_common(kn, rpipe));
1016 }
1017 
1018 int
1019 filt_pipewritemodify(struct kevent *kev, struct knote *kn)
1020 {
1021 	struct pipe *rpipe = kn->kn_fp->f_data;
1022 	int active;
1023 
1024 	rw_enter_write(rpipe->pipe_lock);
1025 	knote_modify(kev, kn);
1026 	active = filt_pipewrite_common(kn, rpipe);
1027 	rw_exit_write(rpipe->pipe_lock);
1028 
1029 	return (active);
1030 }
1031 
1032 int
1033 filt_pipewriteprocess(struct knote *kn, struct kevent *kev)
1034 {
1035 	struct pipe *rpipe = kn->kn_fp->f_data;
1036 	int active;
1037 
1038 	rw_enter_write(rpipe->pipe_lock);
1039 	if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
1040 		active = 1;
1041 	else
1042 		active = filt_pipewrite_common(kn, rpipe);
1043 	if (active)
1044 		knote_submit(kn, kev);
1045 	rw_exit_write(rpipe->pipe_lock);
1046 
1047 	return (active);
1048 }
1049 
1050 void
1051 pipe_init(void)
1052 {
1053 	pool_init(&pipe_pair_pool, sizeof(struct pipe_pair), 0, IPL_MPFLOOR,
1054 	    PR_WAITOK, "pipepl", NULL);
1055 }
1056 
1057 struct pipe_pair *
1058 pipe_pair_create(void)
1059 {
1060 	struct pipe_pair *pp;
1061 
1062 	pp = pool_get(&pipe_pair_pool, PR_WAITOK | PR_ZERO);
1063 	pp->pp_wpipe.pipe_pair = pp;
1064 	pp->pp_rpipe.pipe_pair = pp;
1065 	pp->pp_wpipe.pipe_peer = &pp->pp_rpipe;
1066 	pp->pp_rpipe.pipe_peer = &pp->pp_wpipe;
1067 	/*
1068 	 * One lock is used per pipe pair in order to obtain exclusive access to
1069 	 * the pipe pair.
1070 	 */
1071 	rw_init(&pp->pp_lock, "pipelk");
1072 	pp->pp_wpipe.pipe_lock = &pp->pp_lock;
1073 	pp->pp_rpipe.pipe_lock = &pp->pp_lock;
1074 
1075 	klist_init_rwlock(&pp->pp_wpipe.pipe_sel.si_note, &pp->pp_lock);
1076 	klist_init_rwlock(&pp->pp_rpipe.pipe_sel.si_note, &pp->pp_lock);
1077 
1078 	if (pipe_create(&pp->pp_wpipe) || pipe_create(&pp->pp_rpipe))
1079 		goto err;
1080 	return (pp);
1081 err:
1082 	pipe_destroy(&pp->pp_wpipe);
1083 	pipe_destroy(&pp->pp_rpipe);
1084 	return (NULL);
1085 }
1086 
1087 void
1088 pipe_pair_destroy(struct pipe_pair *pp)
1089 {
1090 	klist_free(&pp->pp_wpipe.pipe_sel.si_note);
1091 	klist_free(&pp->pp_rpipe.pipe_sel.si_note);
1092 	pool_put(&pipe_pair_pool, pp);
1093 }
1094