xref: /openbsd-src/sys/kern/sys_pipe.c (revision c90a81c56dcebd6a1b73fe4aff9b03385b8e63b3)
1 /*	$OpenBSD: sys_pipe.c,v 1.87 2018/11/13 13:02:20 visa Exp $	*/
2 
3 /*
4  * Copyright (c) 1996 John S. Dyson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice immediately at the beginning of the file, without modification,
12  *    this list of conditions, and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Absolutely no warranty of function or purpose is made by the author
17  *    John S. Dyson.
18  * 4. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/pool.h>
36 #include <sys/ioctl.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/mount.h>
40 #include <sys/syscallargs.h>
41 #include <sys/event.h>
42 #include <sys/lock.h>
43 #include <sys/poll.h>
44 #ifdef KTRACE
45 #include <sys/ktrace.h>
46 #endif
47 
48 #include <uvm/uvm_extern.h>
49 
50 #include <sys/pipe.h>
51 
52 /*
53  * interfaces to the outside world
54  */
55 int	pipe_read(struct file *, struct uio *, int);
56 int	pipe_write(struct file *, struct uio *, int);
57 int	pipe_close(struct file *, struct proc *);
58 int	pipe_poll(struct file *, int events, struct proc *);
59 int	pipe_kqfilter(struct file *fp, struct knote *kn);
60 int	pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
61 int	pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
62 
63 static struct fileops pipeops = {
64 	.fo_read	= pipe_read,
65 	.fo_write	= pipe_write,
66 	.fo_ioctl	= pipe_ioctl,
67 	.fo_poll	= pipe_poll,
68 	.fo_kqfilter	= pipe_kqfilter,
69 	.fo_stat	= pipe_stat,
70 	.fo_close	= pipe_close
71 };
72 
73 void	filt_pipedetach(struct knote *kn);
74 int	filt_piperead(struct knote *kn, long hint);
75 int	filt_pipewrite(struct knote *kn, long hint);
76 
77 struct filterops pipe_rfiltops =
78 	{ 1, NULL, filt_pipedetach, filt_piperead };
79 struct filterops pipe_wfiltops =
80 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
81 
82 /*
83  * Default pipe buffer size(s), this can be kind-of large now because pipe
84  * space is pageable.  The pipe code will try to maintain locality of
85  * reference for performance reasons, so small amounts of outstanding I/O
86  * will not wipe the cache.
87  */
88 #define MINPIPESIZE (PIPE_SIZE/3)
89 
90 /*
91  * Limit the number of "big" pipes
92  */
93 #define LIMITBIGPIPES	32
94 unsigned int nbigpipe;
95 static unsigned int amountpipekva;
96 
97 struct pool pipe_pool;
98 
99 int	dopipe(struct proc *, int *, int);
100 void	pipeclose(struct pipe *);
101 void	pipe_free_kmem(struct pipe *);
102 int	pipe_create(struct pipe *);
103 int	pipelock(struct pipe *);
104 void	pipeunlock(struct pipe *);
105 void	pipeselwakeup(struct pipe *);
106 int	pipespace(struct pipe *, u_int);
107 
108 /*
109  * The pipe system call for the DTYPE_PIPE type of pipes
110  */
111 
112 int
113 sys_pipe(struct proc *p, void *v, register_t *retval)
114 {
115 	struct sys_pipe_args /* {
116 		syscallarg(int *) fdp;
117 	} */ *uap = v;
118 
119 	return (dopipe(p, SCARG(uap, fdp), 0));
120 }
121 
122 int
123 sys_pipe2(struct proc *p, void *v, register_t *retval)
124 {
125 	struct sys_pipe2_args /* {
126 		syscallarg(int *) fdp;
127 		syscallarg(int) flags;
128 	} */ *uap = v;
129 
130 	if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK))
131 		return (EINVAL);
132 
133 	return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags)));
134 }
135 
136 int
137 dopipe(struct proc *p, int *ufds, int flags)
138 {
139 	struct filedesc *fdp = p->p_fd;
140 	struct file *rf, *wf;
141 	struct pipe *rpipe, *wpipe = NULL;
142 	int fds[2], cloexec, error;
143 
144 	cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0;
145 
146 	rpipe = pool_get(&pipe_pool, PR_WAITOK);
147 	error = pipe_create(rpipe);
148 	if (error != 0)
149 		goto free1;
150 	wpipe = pool_get(&pipe_pool, PR_WAITOK);
151 	error = pipe_create(wpipe);
152 	if (error != 0)
153 		goto free1;
154 
155 	fdplock(fdp);
156 
157 	error = falloc(p, &rf, &fds[0]);
158 	if (error != 0)
159 		goto free2;
160 	rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
161 	rf->f_type = DTYPE_PIPE;
162 	rf->f_data = rpipe;
163 	rf->f_ops = &pipeops;
164 
165 	error = falloc(p, &wf, &fds[1]);
166 	if (error != 0)
167 		goto free3;
168 	wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK);
169 	wf->f_type = DTYPE_PIPE;
170 	wf->f_data = wpipe;
171 	wf->f_ops = &pipeops;
172 
173 	rpipe->pipe_peer = wpipe;
174 	wpipe->pipe_peer = rpipe;
175 
176 	fdinsert(fdp, fds[0], cloexec, rf);
177 	fdinsert(fdp, fds[1], cloexec, wf);
178 
179 	error = copyout(fds, ufds, sizeof(fds));
180 	if (error != 0) {
181 		fdrelease(p, fds[0]);
182 		fdrelease(p, fds[1]);
183 	}
184 #ifdef KTRACE
185 	else if (KTRPOINT(p, KTR_STRUCT))
186 		ktrfds(p, fds, 2);
187 #endif
188 	fdpunlock(fdp);
189 
190 	FRELE(rf, p);
191 	FRELE(wf, p);
192 	return (error);
193 
194 free3:
195 	fdremove(fdp, fds[0]);
196 	closef(rf, p);
197 	rpipe = NULL;
198 free2:
199 	fdpunlock(fdp);
200 free1:
201 	pipeclose(wpipe);
202 	pipeclose(rpipe);
203 	return (error);
204 }
205 
206 /*
207  * Allocate kva for pipe circular buffer, the space is pageable.
208  * This routine will 'realloc' the size of a pipe safely, if it fails
209  * it will retain the old buffer.
210  * If it fails it will return ENOMEM.
211  */
212 int
213 pipespace(struct pipe *cpipe, u_int size)
214 {
215 	caddr_t buffer;
216 
217 	KERNEL_LOCK();
218 	buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok);
219 	KERNEL_UNLOCK();
220 	if (buffer == NULL) {
221 		return (ENOMEM);
222 	}
223 
224 	/* free old resources if we are resizing */
225 	pipe_free_kmem(cpipe);
226 	cpipe->pipe_buffer.buffer = buffer;
227 	cpipe->pipe_buffer.size = size;
228 	cpipe->pipe_buffer.in = 0;
229 	cpipe->pipe_buffer.out = 0;
230 	cpipe->pipe_buffer.cnt = 0;
231 
232 	atomic_add_int(&amountpipekva, cpipe->pipe_buffer.size);
233 
234 	return (0);
235 }
236 
237 /*
238  * initialize and allocate VM and memory for pipe
239  */
240 int
241 pipe_create(struct pipe *cpipe)
242 {
243 	int error;
244 
245 	/* so pipe_free_kmem() doesn't follow junk pointer */
246 	cpipe->pipe_buffer.buffer = NULL;
247 	/*
248 	 * protect so pipeclose() doesn't follow a junk pointer
249 	 * if pipespace() fails.
250 	 */
251 	memset(&cpipe->pipe_sel, 0, sizeof(cpipe->pipe_sel));
252 	cpipe->pipe_state = 0;
253 	cpipe->pipe_peer = NULL;
254 	cpipe->pipe_busy = 0;
255 	sigio_init(&cpipe->pipe_sigio);
256 
257 	error = pipespace(cpipe, PIPE_SIZE);
258 	if (error != 0)
259 		return (error);
260 
261 	getnanotime(&cpipe->pipe_ctime);
262 	cpipe->pipe_atime = cpipe->pipe_ctime;
263 	cpipe->pipe_mtime = cpipe->pipe_ctime;
264 
265 	return (0);
266 }
267 
268 
269 /*
270  * lock a pipe for I/O, blocking other access
271  */
272 int
273 pipelock(struct pipe *cpipe)
274 {
275 	int error;
276 	while (cpipe->pipe_state & PIPE_LOCK) {
277 		cpipe->pipe_state |= PIPE_LWANT;
278 		if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0)))
279 			return error;
280 	}
281 	cpipe->pipe_state |= PIPE_LOCK;
282 	return 0;
283 }
284 
285 /*
286  * unlock a pipe I/O lock
287  */
288 void
289 pipeunlock(struct pipe *cpipe)
290 {
291 	cpipe->pipe_state &= ~PIPE_LOCK;
292 	if (cpipe->pipe_state & PIPE_LWANT) {
293 		cpipe->pipe_state &= ~PIPE_LWANT;
294 		wakeup(cpipe);
295 	}
296 }
297 
298 void
299 pipeselwakeup(struct pipe *cpipe)
300 {
301 	if (cpipe->pipe_state & PIPE_SEL) {
302 		cpipe->pipe_state &= ~PIPE_SEL;
303 		selwakeup(&cpipe->pipe_sel);
304 	} else
305 		KNOTE(&cpipe->pipe_sel.si_note, 0);
306 	if (cpipe->pipe_state & PIPE_ASYNC)
307 		pgsigio(&cpipe->pipe_sigio, SIGIO, 0);
308 }
309 
310 int
311 pipe_read(struct file *fp, struct uio *uio, int fflags)
312 {
313 	struct pipe *rpipe = fp->f_data;
314 	int error;
315 	size_t size, nread = 0;
316 
317 	error = pipelock(rpipe);
318 	if (error)
319 		return (error);
320 
321 	++rpipe->pipe_busy;
322 
323 	while (uio->uio_resid) {
324 		/*
325 		 * normal pipe buffer receive
326 		 */
327 		if (rpipe->pipe_buffer.cnt > 0) {
328 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
329 			if (size > rpipe->pipe_buffer.cnt)
330 				size = rpipe->pipe_buffer.cnt;
331 			if (size > uio->uio_resid)
332 				size = uio->uio_resid;
333 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
334 					size, uio);
335 			if (error) {
336 				break;
337 			}
338 			rpipe->pipe_buffer.out += size;
339 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
340 				rpipe->pipe_buffer.out = 0;
341 
342 			rpipe->pipe_buffer.cnt -= size;
343 			/*
344 			 * If there is no more to read in the pipe, reset
345 			 * its pointers to the beginning.  This improves
346 			 * cache hit stats.
347 			 */
348 			if (rpipe->pipe_buffer.cnt == 0) {
349 				rpipe->pipe_buffer.in = 0;
350 				rpipe->pipe_buffer.out = 0;
351 			}
352 			nread += size;
353 		} else {
354 			/*
355 			 * detect EOF condition
356 			 * read returns 0 on EOF, no need to set error
357 			 */
358 			if (rpipe->pipe_state & PIPE_EOF)
359 				break;
360 
361 			/*
362 			 * If the "write-side" has been blocked, wake it up now.
363 			 */
364 			if (rpipe->pipe_state & PIPE_WANTW) {
365 				rpipe->pipe_state &= ~PIPE_WANTW;
366 				wakeup(rpipe);
367 			}
368 
369 			/*
370 			 * Break if some data was read.
371 			 */
372 			if (nread > 0)
373 				break;
374 
375 			/*
376 			 * Unlock the pipe buffer for our remaining processing.
377 			 * We will either break out with an error or we will
378 			 * sleep and relock to loop.
379 			 */
380 			pipeunlock(rpipe);
381 
382 			/*
383 			 * Handle non-blocking mode operation or
384 			 * wait for more data.
385 			 */
386 			if (fp->f_flag & FNONBLOCK) {
387 				error = EAGAIN;
388 			} else {
389 				rpipe->pipe_state |= PIPE_WANTR;
390 				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
391 					error = pipelock(rpipe);
392 			}
393 			if (error)
394 				goto unlocked_error;
395 		}
396 	}
397 	pipeunlock(rpipe);
398 
399 	if (error == 0)
400 		getnanotime(&rpipe->pipe_atime);
401 unlocked_error:
402 	--rpipe->pipe_busy;
403 
404 	/*
405 	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
406 	 */
407 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
408 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
409 		wakeup(rpipe);
410 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
411 		/*
412 		 * Handle write blocking hysteresis.
413 		 */
414 		if (rpipe->pipe_state & PIPE_WANTW) {
415 			rpipe->pipe_state &= ~PIPE_WANTW;
416 			wakeup(rpipe);
417 		}
418 	}
419 
420 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
421 		pipeselwakeup(rpipe);
422 
423 	return (error);
424 }
425 
426 int
427 pipe_write(struct file *fp, struct uio *uio, int fflags)
428 {
429 	int error = 0;
430 	size_t orig_resid;
431 	struct pipe *wpipe, *rpipe;
432 
433 	rpipe = fp->f_data;
434 	wpipe = rpipe->pipe_peer;
435 
436 	/*
437 	 * detect loss of pipe read side, issue SIGPIPE if lost.
438 	 */
439 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
440 		return (EPIPE);
441 	}
442 	++wpipe->pipe_busy;
443 
444 	/*
445 	 * If it is advantageous to resize the pipe buffer, do
446 	 * so.
447 	 */
448 	if ((uio->uio_resid > PIPE_SIZE) &&
449 	    (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
450 	    (wpipe->pipe_buffer.cnt == 0)) {
451 	    	unsigned int npipe;
452 
453 		npipe = atomic_inc_int_nv(&nbigpipe);
454 		if ((npipe <= LIMITBIGPIPES) &&
455 		    (error = pipelock(wpipe)) == 0) {
456 			if (pipespace(wpipe, BIG_PIPE_SIZE) != 0)
457 				atomic_dec_int(&nbigpipe);
458 			pipeunlock(wpipe);
459 		} else
460 			atomic_dec_int(&nbigpipe);
461 	}
462 
463 	/*
464 	 * If an early error occurred unbusy and return, waking up any pending
465 	 * readers.
466 	 */
467 	if (error) {
468 		--wpipe->pipe_busy;
469 		if ((wpipe->pipe_busy == 0) &&
470 		    (wpipe->pipe_state & PIPE_WANT)) {
471 			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
472 			wakeup(wpipe);
473 		}
474 		return (error);
475 	}
476 
477 	orig_resid = uio->uio_resid;
478 
479 	while (uio->uio_resid) {
480 		size_t space;
481 
482 retrywrite:
483 		if (wpipe->pipe_state & PIPE_EOF) {
484 			error = EPIPE;
485 			break;
486 		}
487 
488 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
489 
490 		/* Writes of size <= PIPE_BUF must be atomic. */
491 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
492 			space = 0;
493 
494 		if (space > 0) {
495 			if ((error = pipelock(wpipe)) == 0) {
496 				size_t size;	/* Transfer size */
497 				size_t segsize;	/* first segment to transfer */
498 
499 				/*
500 				 * If a process blocked in uiomove, our
501 				 * value for space might be bad.
502 				 *
503 				 * XXX will we be ok if the reader has gone
504 				 * away here?
505 				 */
506 				if (space > wpipe->pipe_buffer.size -
507 				    wpipe->pipe_buffer.cnt) {
508 					pipeunlock(wpipe);
509 					goto retrywrite;
510 				}
511 
512 				/*
513 				 * Transfer size is minimum of uio transfer
514 				 * and free space in pipe buffer.
515 				 */
516 				if (space > uio->uio_resid)
517 					size = uio->uio_resid;
518 				else
519 					size = space;
520 				/*
521 				 * First segment to transfer is minimum of
522 				 * transfer size and contiguous space in
523 				 * pipe buffer.  If first segment to transfer
524 				 * is less than the transfer size, we've got
525 				 * a wraparound in the buffer.
526 				 */
527 				segsize = wpipe->pipe_buffer.size -
528 					wpipe->pipe_buffer.in;
529 				if (segsize > size)
530 					segsize = size;
531 
532 				/* Transfer first segment */
533 
534 				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
535 						segsize, uio);
536 
537 				if (error == 0 && segsize < size) {
538 					/*
539 					 * Transfer remaining part now, to
540 					 * support atomic writes.  Wraparound
541 					 * happened.
542 					 */
543 #ifdef DIAGNOSTIC
544 					if (wpipe->pipe_buffer.in + segsize !=
545 					    wpipe->pipe_buffer.size)
546 						panic("Expected pipe buffer wraparound disappeared");
547 #endif
548 
549 					error = uiomove(&wpipe->pipe_buffer.buffer[0],
550 							size - segsize, uio);
551 				}
552 				if (error == 0) {
553 					wpipe->pipe_buffer.in += size;
554 					if (wpipe->pipe_buffer.in >=
555 					    wpipe->pipe_buffer.size) {
556 #ifdef DIAGNOSTIC
557 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
558 							panic("Expected wraparound bad");
559 #endif
560 						wpipe->pipe_buffer.in = size - segsize;
561 					}
562 
563 					wpipe->pipe_buffer.cnt += size;
564 #ifdef DIAGNOSTIC
565 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
566 						panic("Pipe buffer overflow");
567 #endif
568 				}
569 				pipeunlock(wpipe);
570 			}
571 			if (error)
572 				break;
573 		} else {
574 			/*
575 			 * If the "read-side" has been blocked, wake it up now.
576 			 */
577 			if (wpipe->pipe_state & PIPE_WANTR) {
578 				wpipe->pipe_state &= ~PIPE_WANTR;
579 				wakeup(wpipe);
580 			}
581 
582 			/*
583 			 * don't block on non-blocking I/O
584 			 */
585 			if (fp->f_flag & FNONBLOCK) {
586 				error = EAGAIN;
587 				break;
588 			}
589 
590 			/*
591 			 * We have no more space and have something to offer,
592 			 * wake up select/poll.
593 			 */
594 			pipeselwakeup(wpipe);
595 
596 			wpipe->pipe_state |= PIPE_WANTW;
597 			error = tsleep(wpipe, (PRIBIO + 1)|PCATCH,
598 			    "pipewr", 0);
599 			if (error)
600 				break;
601 			/*
602 			 * If read side wants to go away, we just issue a
603 			 * signal to ourselves.
604 			 */
605 			if (wpipe->pipe_state & PIPE_EOF) {
606 				error = EPIPE;
607 				break;
608 			}
609 		}
610 	}
611 
612 	--wpipe->pipe_busy;
613 
614 	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
615 		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
616 		wakeup(wpipe);
617 	} else if (wpipe->pipe_buffer.cnt > 0) {
618 		/*
619 		 * If we have put any characters in the buffer, we wake up
620 		 * the reader.
621 		 */
622 		if (wpipe->pipe_state & PIPE_WANTR) {
623 			wpipe->pipe_state &= ~PIPE_WANTR;
624 			wakeup(wpipe);
625 		}
626 	}
627 
628 	/*
629 	 * Don't return EPIPE if I/O was successful
630 	 */
631 	if ((wpipe->pipe_buffer.cnt == 0) &&
632 	    (uio->uio_resid == 0) &&
633 	    (error == EPIPE)) {
634 		error = 0;
635 	}
636 
637 	if (error == 0)
638 		getnanotime(&wpipe->pipe_mtime);
639 	/*
640 	 * We have something to offer, wake up select/poll.
641 	 */
642 	if (wpipe->pipe_buffer.cnt)
643 		pipeselwakeup(wpipe);
644 
645 	return (error);
646 }
647 
648 /*
649  * we implement a very minimal set of ioctls for compatibility with sockets.
650  */
651 int
652 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
653 {
654 	struct pipe *mpipe = fp->f_data;
655 
656 	switch (cmd) {
657 
658 	case FIONBIO:
659 		return (0);
660 
661 	case FIOASYNC:
662 		if (*(int *)data) {
663 			mpipe->pipe_state |= PIPE_ASYNC;
664 		} else {
665 			mpipe->pipe_state &= ~PIPE_ASYNC;
666 		}
667 		return (0);
668 
669 	case FIONREAD:
670 		*(int *)data = mpipe->pipe_buffer.cnt;
671 		return (0);
672 
673 	case TIOCSPGRP:
674 		/* FALLTHROUGH */
675 	case SIOCSPGRP:
676 		return (sigio_setown(&mpipe->pipe_sigio, *(int *)data));
677 
678 	case SIOCGPGRP:
679 		*(int *)data = sigio_getown(&mpipe->pipe_sigio);
680 		return (0);
681 
682 	case TIOCGPGRP:
683 		*(int *)data = -sigio_getown(&mpipe->pipe_sigio);
684 		return (0);
685 
686 	}
687 	return (ENOTTY);
688 }
689 
690 int
691 pipe_poll(struct file *fp, int events, struct proc *p)
692 {
693 	struct pipe *rpipe = fp->f_data;
694 	struct pipe *wpipe;
695 	int revents = 0;
696 
697 	wpipe = rpipe->pipe_peer;
698 	if (events & (POLLIN | POLLRDNORM)) {
699 		if ((rpipe->pipe_buffer.cnt > 0) ||
700 		    (rpipe->pipe_state & PIPE_EOF))
701 			revents |= events & (POLLIN | POLLRDNORM);
702 	}
703 
704 	/* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
705 	if ((rpipe->pipe_state & PIPE_EOF) ||
706 	    (wpipe == NULL) ||
707 	    (wpipe->pipe_state & PIPE_EOF))
708 		revents |= POLLHUP;
709 	else if (events & (POLLOUT | POLLWRNORM)) {
710 		if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)
711 			revents |= events & (POLLOUT | POLLWRNORM);
712 	}
713 
714 	if (revents == 0) {
715 		if (events & (POLLIN | POLLRDNORM)) {
716 			selrecord(p, &rpipe->pipe_sel);
717 			rpipe->pipe_state |= PIPE_SEL;
718 		}
719 		if (events & (POLLOUT | POLLWRNORM)) {
720 			selrecord(p, &wpipe->pipe_sel);
721 			wpipe->pipe_state |= PIPE_SEL;
722 		}
723 	}
724 	return (revents);
725 }
726 
727 int
728 pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
729 {
730 	struct pipe *pipe = fp->f_data;
731 
732 	memset(ub, 0, sizeof(*ub));
733 	ub->st_mode = S_IFIFO;
734 	ub->st_blksize = pipe->pipe_buffer.size;
735 	ub->st_size = pipe->pipe_buffer.cnt;
736 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
737 	ub->st_atim.tv_sec  = pipe->pipe_atime.tv_sec;
738 	ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec;
739 	ub->st_mtim.tv_sec  = pipe->pipe_mtime.tv_sec;
740 	ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec;
741 	ub->st_ctim.tv_sec  = pipe->pipe_ctime.tv_sec;
742 	ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec;
743 	ub->st_uid = fp->f_cred->cr_uid;
744 	ub->st_gid = fp->f_cred->cr_gid;
745 	/*
746 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
747 	 * XXX (st_dev, st_ino) should be unique.
748 	 */
749 	return (0);
750 }
751 
752 int
753 pipe_close(struct file *fp, struct proc *p)
754 {
755 	struct pipe *cpipe = fp->f_data;
756 
757 	fp->f_ops = NULL;
758 	fp->f_data = NULL;
759 	KERNEL_LOCK();
760 	pipeclose(cpipe);
761 	KERNEL_UNLOCK();
762 	return (0);
763 }
764 
765 void
766 pipe_free_kmem(struct pipe *cpipe)
767 {
768 	u_int size = cpipe->pipe_buffer.size;
769 
770 	if (cpipe->pipe_buffer.buffer != NULL) {
771 		KERNEL_LOCK();
772 		km_free(cpipe->pipe_buffer.buffer, size, &kv_any, &kp_pageable);
773 		KERNEL_UNLOCK();
774 		atomic_sub_int(&amountpipekva, size);
775 		cpipe->pipe_buffer.buffer = NULL;
776 		if (size > PIPE_SIZE)
777 			atomic_dec_int(&nbigpipe);
778 	}
779 }
780 
781 /*
782  * shutdown the pipe
783  */
784 void
785 pipeclose(struct pipe *cpipe)
786 {
787 	struct pipe *ppipe;
788 	if (cpipe) {
789 		pipeselwakeup(cpipe);
790 		sigio_free(&cpipe->pipe_sigio);
791 
792 		/*
793 		 * If the other side is blocked, wake it up saying that
794 		 * we want to close it down.
795 		 */
796 		cpipe->pipe_state |= PIPE_EOF;
797 		while (cpipe->pipe_busy) {
798 			wakeup(cpipe);
799 			cpipe->pipe_state |= PIPE_WANT;
800 			tsleep(cpipe, PRIBIO, "pipecl", 0);
801 		}
802 
803 		/*
804 		 * Disconnect from peer
805 		 */
806 		if ((ppipe = cpipe->pipe_peer) != NULL) {
807 			pipeselwakeup(ppipe);
808 
809 			ppipe->pipe_state |= PIPE_EOF;
810 			wakeup(ppipe);
811 			ppipe->pipe_peer = NULL;
812 		}
813 
814 		/*
815 		 * free resources
816 		 */
817 		pipe_free_kmem(cpipe);
818 		pool_put(&pipe_pool, cpipe);
819 	}
820 }
821 
822 int
823 pipe_kqfilter(struct file *fp, struct knote *kn)
824 {
825 	struct pipe *rpipe = kn->kn_fp->f_data;
826 	struct pipe *wpipe = rpipe->pipe_peer;
827 
828 	switch (kn->kn_filter) {
829 	case EVFILT_READ:
830 		kn->kn_fop = &pipe_rfiltops;
831 		SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
832 		break;
833 	case EVFILT_WRITE:
834 		if (wpipe == NULL) {
835 			/* other end of pipe has been closed */
836 			return (EPIPE);
837 		}
838 		kn->kn_fop = &pipe_wfiltops;
839 		SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext);
840 		break;
841 	default:
842 		return (EINVAL);
843 	}
844 
845 	return (0);
846 }
847 
848 void
849 filt_pipedetach(struct knote *kn)
850 {
851 	struct pipe *rpipe = kn->kn_fp->f_data;
852 	struct pipe *wpipe = rpipe->pipe_peer;
853 
854 	switch (kn->kn_filter) {
855 	case EVFILT_READ:
856 		SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
857 		break;
858 	case EVFILT_WRITE:
859 		if (wpipe == NULL)
860 			return;
861 		SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext);
862 		break;
863 	}
864 }
865 
866 int
867 filt_piperead(struct knote *kn, long hint)
868 {
869 	struct pipe *rpipe = kn->kn_fp->f_data;
870 	struct pipe *wpipe = rpipe->pipe_peer;
871 
872 	kn->kn_data = rpipe->pipe_buffer.cnt;
873 
874 	if ((rpipe->pipe_state & PIPE_EOF) ||
875 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
876 		kn->kn_flags |= EV_EOF;
877 		return (1);
878 	}
879 	return (kn->kn_data > 0);
880 }
881 
882 int
883 filt_pipewrite(struct knote *kn, long hint)
884 {
885 	struct pipe *rpipe = kn->kn_fp->f_data;
886 	struct pipe *wpipe = rpipe->pipe_peer;
887 
888 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
889 		kn->kn_data = 0;
890 		kn->kn_flags |= EV_EOF;
891 		return (1);
892 	}
893 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
894 
895 	return (kn->kn_data >= PIPE_BUF);
896 }
897 
898 void
899 pipe_init(void)
900 {
901 	pool_init(&pipe_pool, sizeof(struct pipe), 0, IPL_MPFLOOR, PR_WAITOK,
902 	    "pipepl", NULL);
903 }
904 
905