xref: /openbsd-src/sys/kern/sys_pipe.c (revision 8e0c768258d4632c51876b4397034bc3152bf8db)
1 /*	$OpenBSD: sys_pipe.c,v 1.95 2019/07/16 12:16:58 semarie Exp $	*/
2 
3 /*
4  * Copyright (c) 1996 John S. Dyson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice immediately at the beginning of the file, without modification,
12  *    this list of conditions, and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Absolutely no warranty of function or purpose is made by the author
17  *    John S. Dyson.
18  * 4. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/pool.h>
36 #include <sys/ioctl.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/mount.h>
40 #include <sys/syscallargs.h>
41 #include <sys/event.h>
42 #include <sys/lock.h>
43 #include <sys/poll.h>
44 #ifdef KTRACE
45 #include <sys/ktrace.h>
46 #endif
47 
48 #include <uvm/uvm_extern.h>
49 
50 #include <sys/pipe.h>
51 
52 /*
53  * interfaces to the outside world
54  */
55 int	pipe_read(struct file *, struct uio *, int);
56 int	pipe_write(struct file *, struct uio *, int);
57 int	pipe_close(struct file *, struct proc *);
58 int	pipe_poll(struct file *, int events, struct proc *);
59 int	pipe_kqfilter(struct file *fp, struct knote *kn);
60 int	pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
61 int	pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
62 
63 static struct fileops pipeops = {
64 	.fo_read	= pipe_read,
65 	.fo_write	= pipe_write,
66 	.fo_ioctl	= pipe_ioctl,
67 	.fo_poll	= pipe_poll,
68 	.fo_kqfilter	= pipe_kqfilter,
69 	.fo_stat	= pipe_stat,
70 	.fo_close	= pipe_close
71 };
72 
73 void	filt_pipedetach(struct knote *kn);
74 int	filt_piperead(struct knote *kn, long hint);
75 int	filt_pipewrite(struct knote *kn, long hint);
76 
77 struct filterops pipe_rfiltops =
78 	{ 1, NULL, filt_pipedetach, filt_piperead };
79 struct filterops pipe_wfiltops =
80 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
81 
82 /*
83  * Default pipe buffer size(s), this can be kind-of large now because pipe
84  * space is pageable.  The pipe code will try to maintain locality of
85  * reference for performance reasons, so small amounts of outstanding I/O
86  * will not wipe the cache.
87  */
88 #define MINPIPESIZE (PIPE_SIZE/3)
89 
90 /*
91  * Limit the number of "big" pipes
92  */
93 #define LIMITBIGPIPES	32
94 unsigned int nbigpipe;
95 static unsigned int amountpipekva;
96 
97 struct pool pipe_pool;
98 
99 int	dopipe(struct proc *, int *, int);
100 int	pipelock(struct pipe *);
101 void	pipeunlock(struct pipe *);
102 void	pipeselwakeup(struct pipe *);
103 
104 struct pipe *pipe_create(void);
105 void	pipe_destroy(struct pipe *);
106 int	pipe_buffer_realloc(struct pipe *, u_int);
107 void	pipe_buffer_free(struct pipe *);
108 
109 /*
110  * The pipe system call for the DTYPE_PIPE type of pipes
111  */
112 
113 int
114 sys_pipe(struct proc *p, void *v, register_t *retval)
115 {
116 	struct sys_pipe_args /* {
117 		syscallarg(int *) fdp;
118 	} */ *uap = v;
119 
120 	return (dopipe(p, SCARG(uap, fdp), 0));
121 }
122 
123 int
124 sys_pipe2(struct proc *p, void *v, register_t *retval)
125 {
126 	struct sys_pipe2_args /* {
127 		syscallarg(int *) fdp;
128 		syscallarg(int) flags;
129 	} */ *uap = v;
130 
131 	if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK))
132 		return (EINVAL);
133 
134 	return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags)));
135 }
136 
137 int
138 dopipe(struct proc *p, int *ufds, int flags)
139 {
140 	struct filedesc *fdp = p->p_fd;
141 	struct file *rf, *wf;
142 	struct pipe *rpipe, *wpipe = NULL;
143 	int fds[2], cloexec, error;
144 
145 	cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0;
146 
147 	if (((rpipe = pipe_create()) == NULL) ||
148 	    ((wpipe = pipe_create()) == NULL)) {
149 		error = ENOMEM;
150 		goto free1;
151 	}
152 
153 	fdplock(fdp);
154 
155 	error = falloc(p, &rf, &fds[0]);
156 	if (error != 0)
157 		goto free2;
158 	rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
159 	rf->f_type = DTYPE_PIPE;
160 	rf->f_data = rpipe;
161 	rf->f_ops = &pipeops;
162 
163 	error = falloc(p, &wf, &fds[1]);
164 	if (error != 0)
165 		goto free3;
166 	wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
167 	wf->f_type = DTYPE_PIPE;
168 	wf->f_data = wpipe;
169 	wf->f_ops = &pipeops;
170 
171 	rpipe->pipe_peer = wpipe;
172 	wpipe->pipe_peer = rpipe;
173 
174 	fdinsert(fdp, fds[0], cloexec, rf);
175 	fdinsert(fdp, fds[1], cloexec, wf);
176 
177 	error = copyout(fds, ufds, sizeof(fds));
178 	if (error == 0) {
179 		fdpunlock(fdp);
180 #ifdef KTRACE
181 		if (KTRPOINT(p, KTR_STRUCT))
182 			ktrfds(p, fds, 2);
183 #endif
184 	} else {
185 		/* fdrelease() unlocks fdp. */
186 		fdrelease(p, fds[0]);
187 		fdplock(fdp);
188 		fdrelease(p, fds[1]);
189 	}
190 
191 	FRELE(rf, p);
192 	FRELE(wf, p);
193 	return (error);
194 
195 free3:
196 	fdremove(fdp, fds[0]);
197 	closef(rf, p);
198 	rpipe = NULL;
199 free2:
200 	fdpunlock(fdp);
201 free1:
202 	pipe_destroy(wpipe);
203 	pipe_destroy(rpipe);
204 	return (error);
205 }
206 
207 /*
208  * Allocate kva for pipe circular buffer, the space is pageable.
209  * This routine will 'realloc' the size of a pipe safely, if it fails
210  * it will retain the old buffer.
211  * If it fails it will return ENOMEM.
212  */
213 int
214 pipe_buffer_realloc(struct pipe *cpipe, u_int size)
215 {
216 	caddr_t buffer;
217 
218 	/* buffer uninitialized or pipe locked */
219 	KASSERT((cpipe->pipe_buffer.buffer == NULL) ||
220 	    (cpipe->pipe_state & PIPE_LOCK));
221 
222 	/* buffer should be empty */
223 	KASSERT(cpipe->pipe_buffer.cnt == 0);
224 
225 	KERNEL_LOCK();
226 	buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok);
227 	KERNEL_UNLOCK();
228 	if (buffer == NULL)
229 		return (ENOMEM);
230 
231 	/* free old resources if we are resizing */
232 	pipe_buffer_free(cpipe);
233 
234 	cpipe->pipe_buffer.buffer = buffer;
235 	cpipe->pipe_buffer.size = size;
236 	cpipe->pipe_buffer.in = 0;
237 	cpipe->pipe_buffer.out = 0;
238 
239 	atomic_add_int(&amountpipekva, cpipe->pipe_buffer.size);
240 
241 	return (0);
242 }
243 
244 /*
245  * initialize and allocate VM and memory for pipe
246  */
247 struct pipe *
248 pipe_create(void)
249 {
250 	struct pipe *cpipe;
251 	int error;
252 
253 	cpipe = pool_get(&pipe_pool, PR_WAITOK | PR_ZERO);
254 
255 	error = pipe_buffer_realloc(cpipe, PIPE_SIZE);
256 	if (error != 0) {
257 		pool_put(&pipe_pool, cpipe);
258 		return (NULL);
259 	}
260 
261 	sigio_init(&cpipe->pipe_sigio);
262 
263 	getnanotime(&cpipe->pipe_ctime);
264 	cpipe->pipe_atime = cpipe->pipe_ctime;
265 	cpipe->pipe_mtime = cpipe->pipe_ctime;
266 
267 	return (cpipe);
268 }
269 
270 
271 /*
272  * lock a pipe for I/O, blocking other access
273  */
274 int
275 pipelock(struct pipe *cpipe)
276 {
277 	int error;
278 	while (cpipe->pipe_state & PIPE_LOCK) {
279 		cpipe->pipe_state |= PIPE_LWANT;
280 		if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0)))
281 			return error;
282 	}
283 	cpipe->pipe_state |= PIPE_LOCK;
284 	return 0;
285 }
286 
287 /*
288  * unlock a pipe I/O lock
289  */
290 void
291 pipeunlock(struct pipe *cpipe)
292 {
293 	cpipe->pipe_state &= ~PIPE_LOCK;
294 	if (cpipe->pipe_state & PIPE_LWANT) {
295 		cpipe->pipe_state &= ~PIPE_LWANT;
296 		wakeup(cpipe);
297 	}
298 }
299 
300 void
301 pipeselwakeup(struct pipe *cpipe)
302 {
303 	if (cpipe->pipe_state & PIPE_SEL) {
304 		cpipe->pipe_state &= ~PIPE_SEL;
305 		selwakeup(&cpipe->pipe_sel);
306 	} else
307 		KNOTE(&cpipe->pipe_sel.si_note, 0);
308 
309 	if (cpipe->pipe_state & PIPE_ASYNC)
310 		pgsigio(&cpipe->pipe_sigio, SIGIO, 0);
311 }
312 
313 int
314 pipe_read(struct file *fp, struct uio *uio, int fflags)
315 {
316 	struct pipe *rpipe = fp->f_data;
317 	int error;
318 	size_t size, nread = 0;
319 
320 	KERNEL_LOCK();
321 
322 	error = pipelock(rpipe);
323 	if (error)
324 		goto done;
325 
326 	++rpipe->pipe_busy;
327 
328 	while (uio->uio_resid) {
329 		/*
330 		 * normal pipe buffer receive
331 		 */
332 		if (rpipe->pipe_buffer.cnt > 0) {
333 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
334 			if (size > rpipe->pipe_buffer.cnt)
335 				size = rpipe->pipe_buffer.cnt;
336 			if (size > uio->uio_resid)
337 				size = uio->uio_resid;
338 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
339 					size, uio);
340 			if (error) {
341 				break;
342 			}
343 			rpipe->pipe_buffer.out += size;
344 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
345 				rpipe->pipe_buffer.out = 0;
346 
347 			rpipe->pipe_buffer.cnt -= size;
348 			/*
349 			 * If there is no more to read in the pipe, reset
350 			 * its pointers to the beginning.  This improves
351 			 * cache hit stats.
352 			 */
353 			if (rpipe->pipe_buffer.cnt == 0) {
354 				rpipe->pipe_buffer.in = 0;
355 				rpipe->pipe_buffer.out = 0;
356 			}
357 			nread += size;
358 		} else {
359 			/*
360 			 * detect EOF condition
361 			 * read returns 0 on EOF, no need to set error
362 			 */
363 			if (rpipe->pipe_state & PIPE_EOF)
364 				break;
365 
366 			/*
367 			 * If the "write-side" has been blocked, wake it up now.
368 			 */
369 			if (rpipe->pipe_state & PIPE_WANTW) {
370 				rpipe->pipe_state &= ~PIPE_WANTW;
371 				wakeup(rpipe);
372 			}
373 
374 			/*
375 			 * Break if some data was read.
376 			 */
377 			if (nread > 0)
378 				break;
379 
380 			/*
381 			 * Unlock the pipe buffer for our remaining processing.
382 			 * We will either break out with an error or we will
383 			 * sleep and relock to loop.
384 			 */
385 			pipeunlock(rpipe);
386 
387 			/*
388 			 * Handle non-blocking mode operation or
389 			 * wait for more data.
390 			 */
391 			if (fp->f_flag & FNONBLOCK) {
392 				error = EAGAIN;
393 			} else {
394 				rpipe->pipe_state |= PIPE_WANTR;
395 				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
396 					error = pipelock(rpipe);
397 			}
398 			if (error)
399 				goto unlocked_error;
400 		}
401 	}
402 	pipeunlock(rpipe);
403 
404 	if (error == 0)
405 		getnanotime(&rpipe->pipe_atime);
406 unlocked_error:
407 	--rpipe->pipe_busy;
408 
409 	/*
410 	 * PIPE_WANTD processing only makes sense if pipe_busy is 0.
411 	 */
412 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANTD)) {
413 		rpipe->pipe_state &= ~(PIPE_WANTD|PIPE_WANTW);
414 		wakeup(rpipe);
415 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
416 		/*
417 		 * Handle write blocking hysteresis.
418 		 */
419 		if (rpipe->pipe_state & PIPE_WANTW) {
420 			rpipe->pipe_state &= ~PIPE_WANTW;
421 			wakeup(rpipe);
422 		}
423 	}
424 
425 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
426 		pipeselwakeup(rpipe);
427 
428 done:
429 	KERNEL_UNLOCK();
430 	return (error);
431 }
432 
433 int
434 pipe_write(struct file *fp, struct uio *uio, int fflags)
435 {
436 	int error = 0;
437 	size_t orig_resid;
438 	struct pipe *wpipe, *rpipe;
439 
440 	KERNEL_LOCK();
441 
442 	rpipe = fp->f_data;
443 	wpipe = rpipe->pipe_peer;
444 
445 	/*
446 	 * detect loss of pipe read side, issue SIGPIPE if lost.
447 	 */
448 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
449 		error = EPIPE;
450 		goto done;
451 	}
452 	++wpipe->pipe_busy;
453 
454 	/*
455 	 * If it is advantageous to resize the pipe buffer, do
456 	 * so.
457 	 */
458 	if ((uio->uio_resid > PIPE_SIZE) &&
459 	    (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
460 	    (wpipe->pipe_buffer.cnt == 0)) {
461 	    	unsigned int npipe;
462 
463 		npipe = atomic_inc_int_nv(&nbigpipe);
464 		if ((npipe <= LIMITBIGPIPES) &&
465 		    (error = pipelock(wpipe)) == 0) {
466 			if ((wpipe->pipe_buffer.cnt != 0) ||
467 			    (pipe_buffer_realloc(wpipe, BIG_PIPE_SIZE) != 0))
468 				atomic_dec_int(&nbigpipe);
469 			pipeunlock(wpipe);
470 		} else
471 			atomic_dec_int(&nbigpipe);
472 	}
473 
474 	/*
475 	 * If an early error occurred unbusy and return, waking up any pending
476 	 * readers.
477 	 */
478 	if (error) {
479 		--wpipe->pipe_busy;
480 		if ((wpipe->pipe_busy == 0) &&
481 		    (wpipe->pipe_state & PIPE_WANTD)) {
482 			wpipe->pipe_state &= ~(PIPE_WANTD | PIPE_WANTR);
483 			wakeup(wpipe);
484 		}
485 		goto done;
486 	}
487 
488 	orig_resid = uio->uio_resid;
489 
490 	while (uio->uio_resid) {
491 		size_t space;
492 
493 retrywrite:
494 		if (wpipe->pipe_state & PIPE_EOF) {
495 			error = EPIPE;
496 			break;
497 		}
498 
499 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
500 
501 		/* Writes of size <= PIPE_BUF must be atomic. */
502 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
503 			space = 0;
504 
505 		if (space > 0) {
506 			if ((error = pipelock(wpipe)) == 0) {
507 				size_t size;	/* Transfer size */
508 				size_t segsize;	/* first segment to transfer */
509 
510 				/*
511 				 * If a process blocked in uiomove, our
512 				 * value for space might be bad.
513 				 *
514 				 * XXX will we be ok if the reader has gone
515 				 * away here?
516 				 */
517 				if (space > wpipe->pipe_buffer.size -
518 				    wpipe->pipe_buffer.cnt) {
519 					pipeunlock(wpipe);
520 					goto retrywrite;
521 				}
522 
523 				/*
524 				 * Transfer size is minimum of uio transfer
525 				 * and free space in pipe buffer.
526 				 */
527 				if (space > uio->uio_resid)
528 					size = uio->uio_resid;
529 				else
530 					size = space;
531 				/*
532 				 * First segment to transfer is minimum of
533 				 * transfer size and contiguous space in
534 				 * pipe buffer.  If first segment to transfer
535 				 * is less than the transfer size, we've got
536 				 * a wraparound in the buffer.
537 				 */
538 				segsize = wpipe->pipe_buffer.size -
539 					wpipe->pipe_buffer.in;
540 				if (segsize > size)
541 					segsize = size;
542 
543 				/* Transfer first segment */
544 
545 				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
546 						segsize, uio);
547 
548 				if (error == 0 && segsize < size) {
549 					/*
550 					 * Transfer remaining part now, to
551 					 * support atomic writes.  Wraparound
552 					 * happened.
553 					 */
554 #ifdef DIAGNOSTIC
555 					if (wpipe->pipe_buffer.in + segsize !=
556 					    wpipe->pipe_buffer.size)
557 						panic("Expected pipe buffer wraparound disappeared");
558 #endif
559 
560 					error = uiomove(&wpipe->pipe_buffer.buffer[0],
561 							size - segsize, uio);
562 				}
563 				if (error == 0) {
564 					wpipe->pipe_buffer.in += size;
565 					if (wpipe->pipe_buffer.in >=
566 					    wpipe->pipe_buffer.size) {
567 #ifdef DIAGNOSTIC
568 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
569 							panic("Expected wraparound bad");
570 #endif
571 						wpipe->pipe_buffer.in = size - segsize;
572 					}
573 
574 					wpipe->pipe_buffer.cnt += size;
575 #ifdef DIAGNOSTIC
576 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
577 						panic("Pipe buffer overflow");
578 #endif
579 				}
580 				pipeunlock(wpipe);
581 			}
582 			if (error)
583 				break;
584 		} else {
585 			/*
586 			 * If the "read-side" has been blocked, wake it up now.
587 			 */
588 			if (wpipe->pipe_state & PIPE_WANTR) {
589 				wpipe->pipe_state &= ~PIPE_WANTR;
590 				wakeup(wpipe);
591 			}
592 
593 			/*
594 			 * don't block on non-blocking I/O
595 			 */
596 			if (fp->f_flag & FNONBLOCK) {
597 				error = EAGAIN;
598 				break;
599 			}
600 
601 			/*
602 			 * We have no more space and have something to offer,
603 			 * wake up select/poll.
604 			 */
605 			pipeselwakeup(wpipe);
606 
607 			wpipe->pipe_state |= PIPE_WANTW;
608 			error = tsleep(wpipe, (PRIBIO + 1)|PCATCH,
609 			    "pipewr", 0);
610 			if (error)
611 				break;
612 			/*
613 			 * If read side wants to go away, we just issue a
614 			 * signal to ourselves.
615 			 */
616 			if (wpipe->pipe_state & PIPE_EOF) {
617 				error = EPIPE;
618 				break;
619 			}
620 		}
621 	}
622 
623 	--wpipe->pipe_busy;
624 
625 	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANTD)) {
626 		wpipe->pipe_state &= ~(PIPE_WANTD | PIPE_WANTR);
627 		wakeup(wpipe);
628 	} else if (wpipe->pipe_buffer.cnt > 0) {
629 		/*
630 		 * If we have put any characters in the buffer, we wake up
631 		 * the reader.
632 		 */
633 		if (wpipe->pipe_state & PIPE_WANTR) {
634 			wpipe->pipe_state &= ~PIPE_WANTR;
635 			wakeup(wpipe);
636 		}
637 	}
638 
639 	/*
640 	 * Don't return EPIPE if I/O was successful
641 	 */
642 	if ((wpipe->pipe_buffer.cnt == 0) &&
643 	    (uio->uio_resid == 0) &&
644 	    (error == EPIPE)) {
645 		error = 0;
646 	}
647 
648 	if (error == 0)
649 		getnanotime(&wpipe->pipe_mtime);
650 	/*
651 	 * We have something to offer, wake up select/poll.
652 	 */
653 	if (wpipe->pipe_buffer.cnt)
654 		pipeselwakeup(wpipe);
655 
656 done:
657 	KERNEL_UNLOCK();
658 	return (error);
659 }
660 
661 /*
662  * we implement a very minimal set of ioctls for compatibility with sockets.
663  */
664 int
665 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
666 {
667 	struct pipe *mpipe = fp->f_data;
668 
669 	switch (cmd) {
670 
671 	case FIONBIO:
672 		return (0);
673 
674 	case FIOASYNC:
675 		if (*(int *)data) {
676 			mpipe->pipe_state |= PIPE_ASYNC;
677 		} else {
678 			mpipe->pipe_state &= ~PIPE_ASYNC;
679 		}
680 		return (0);
681 
682 	case FIONREAD:
683 		*(int *)data = mpipe->pipe_buffer.cnt;
684 		return (0);
685 
686 	case TIOCSPGRP:
687 		/* FALLTHROUGH */
688 	case SIOCSPGRP:
689 		return (sigio_setown(&mpipe->pipe_sigio, *(int *)data));
690 
691 	case SIOCGPGRP:
692 		*(int *)data = sigio_getown(&mpipe->pipe_sigio);
693 		return (0);
694 
695 	case TIOCGPGRP:
696 		*(int *)data = -sigio_getown(&mpipe->pipe_sigio);
697 		return (0);
698 
699 	}
700 	return (ENOTTY);
701 }
702 
703 int
704 pipe_poll(struct file *fp, int events, struct proc *p)
705 {
706 	struct pipe *rpipe = fp->f_data;
707 	struct pipe *wpipe;
708 	int revents = 0;
709 
710 	wpipe = rpipe->pipe_peer;
711 	if (events & (POLLIN | POLLRDNORM)) {
712 		if ((rpipe->pipe_buffer.cnt > 0) ||
713 		    (rpipe->pipe_state & PIPE_EOF))
714 			revents |= events & (POLLIN | POLLRDNORM);
715 	}
716 
717 	/* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
718 	if ((rpipe->pipe_state & PIPE_EOF) ||
719 	    (wpipe == NULL) ||
720 	    (wpipe->pipe_state & PIPE_EOF))
721 		revents |= POLLHUP;
722 	else if (events & (POLLOUT | POLLWRNORM)) {
723 		if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)
724 			revents |= events & (POLLOUT | POLLWRNORM);
725 	}
726 
727 	if (revents == 0) {
728 		if (events & (POLLIN | POLLRDNORM)) {
729 			selrecord(p, &rpipe->pipe_sel);
730 			rpipe->pipe_state |= PIPE_SEL;
731 		}
732 		if (events & (POLLOUT | POLLWRNORM)) {
733 			selrecord(p, &wpipe->pipe_sel);
734 			wpipe->pipe_state |= PIPE_SEL;
735 		}
736 	}
737 	return (revents);
738 }
739 
740 int
741 pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
742 {
743 	struct pipe *pipe = fp->f_data;
744 
745 	memset(ub, 0, sizeof(*ub));
746 	ub->st_mode = S_IFIFO;
747 	ub->st_blksize = pipe->pipe_buffer.size;
748 	ub->st_size = pipe->pipe_buffer.cnt;
749 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
750 	ub->st_atim.tv_sec  = pipe->pipe_atime.tv_sec;
751 	ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec;
752 	ub->st_mtim.tv_sec  = pipe->pipe_mtime.tv_sec;
753 	ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec;
754 	ub->st_ctim.tv_sec  = pipe->pipe_ctime.tv_sec;
755 	ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec;
756 	ub->st_uid = fp->f_cred->cr_uid;
757 	ub->st_gid = fp->f_cred->cr_gid;
758 	/*
759 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
760 	 * XXX (st_dev, st_ino) should be unique.
761 	 */
762 	return (0);
763 }
764 
765 int
766 pipe_close(struct file *fp, struct proc *p)
767 {
768 	struct pipe *cpipe = fp->f_data;
769 
770 	fp->f_ops = NULL;
771 	fp->f_data = NULL;
772 	KERNEL_LOCK();
773 	pipe_destroy(cpipe);
774 	KERNEL_UNLOCK();
775 	return (0);
776 }
777 
778 /*
779  * Free kva for pipe circular buffer.
780  * No pipe lock check as only called from pipe_buffer_realloc() and pipeclose()
781  */
782 void
783 pipe_buffer_free(struct pipe *cpipe)
784 {
785 	u_int size;
786 
787 	if (cpipe->pipe_buffer.buffer == NULL)
788 		return;
789 
790 	size = cpipe->pipe_buffer.size;
791 
792 	KERNEL_LOCK();
793 	km_free(cpipe->pipe_buffer.buffer, size, &kv_any, &kp_pageable);
794 	KERNEL_UNLOCK();
795 
796 	cpipe->pipe_buffer.buffer = NULL;
797 
798 	atomic_sub_int(&amountpipekva, size);
799 	if (size > PIPE_SIZE)
800 		atomic_dec_int(&nbigpipe);
801 }
802 
803 /*
804  * shutdown the pipe, and free resources.
805  */
806 void
807 pipe_destroy(struct pipe *cpipe)
808 {
809 	struct pipe *ppipe;
810 
811 	if (cpipe == NULL)
812 		return;
813 
814 	pipeselwakeup(cpipe);
815 	sigio_free(&cpipe->pipe_sigio);
816 
817 	/*
818 	 * If the other side is blocked, wake it up saying that
819 	 * we want to close it down.
820 	 */
821 	cpipe->pipe_state |= PIPE_EOF;
822 	while (cpipe->pipe_busy) {
823 		wakeup(cpipe);
824 		cpipe->pipe_state |= PIPE_WANTD;
825 		tsleep(cpipe, PRIBIO, "pipecl", 0);
826 	}
827 
828 	/*
829 	 * Disconnect from peer
830 	 */
831 	if ((ppipe = cpipe->pipe_peer) != NULL) {
832 		pipeselwakeup(ppipe);
833 
834 		ppipe->pipe_state |= PIPE_EOF;
835 		wakeup(ppipe);
836 		ppipe->pipe_peer = NULL;
837 	}
838 
839 	/*
840 	 * free resources
841 	 */
842 	pipe_buffer_free(cpipe);
843 	pool_put(&pipe_pool, cpipe);
844 }
845 
846 int
847 pipe_kqfilter(struct file *fp, struct knote *kn)
848 {
849 	struct pipe *rpipe = kn->kn_fp->f_data;
850 	struct pipe *wpipe = rpipe->pipe_peer;
851 
852 	switch (kn->kn_filter) {
853 	case EVFILT_READ:
854 		kn->kn_fop = &pipe_rfiltops;
855 		SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
856 		break;
857 	case EVFILT_WRITE:
858 		if (wpipe == NULL) {
859 			/* other end of pipe has been closed */
860 			return (EPIPE);
861 		}
862 		kn->kn_fop = &pipe_wfiltops;
863 		SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext);
864 		break;
865 	default:
866 		return (EINVAL);
867 	}
868 
869 	return (0);
870 }
871 
872 void
873 filt_pipedetach(struct knote *kn)
874 {
875 	struct pipe *rpipe = kn->kn_fp->f_data;
876 	struct pipe *wpipe = rpipe->pipe_peer;
877 
878 	switch (kn->kn_filter) {
879 	case EVFILT_READ:
880 		SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
881 		break;
882 	case EVFILT_WRITE:
883 		if (wpipe == NULL)
884 			return;
885 		SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext);
886 		break;
887 	}
888 }
889 
890 int
891 filt_piperead(struct knote *kn, long hint)
892 {
893 	struct pipe *rpipe = kn->kn_fp->f_data;
894 	struct pipe *wpipe = rpipe->pipe_peer;
895 
896 	kn->kn_data = rpipe->pipe_buffer.cnt;
897 
898 	if ((rpipe->pipe_state & PIPE_EOF) ||
899 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
900 		kn->kn_flags |= EV_EOF;
901 		return (1);
902 	}
903 	return (kn->kn_data > 0);
904 }
905 
906 int
907 filt_pipewrite(struct knote *kn, long hint)
908 {
909 	struct pipe *rpipe = kn->kn_fp->f_data;
910 	struct pipe *wpipe = rpipe->pipe_peer;
911 
912 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
913 		kn->kn_data = 0;
914 		kn->kn_flags |= EV_EOF;
915 		return (1);
916 	}
917 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
918 
919 	return (kn->kn_data >= PIPE_BUF);
920 }
921 
922 void
923 pipe_init(void)
924 {
925 	pool_init(&pipe_pool, sizeof(struct pipe), 0, IPL_MPFLOOR, PR_WAITOK,
926 	    "pipepl", NULL);
927 }
928 
929