xref: /openbsd-src/sys/kern/sys_pipe.c (revision a28daedfc357b214be5c701aa8ba8adb29a7f1c2)
1 /*	$OpenBSD: sys_pipe.c,v 1.55 2009/01/29 22:08:45 guenther Exp $	*/
2 
3 /*
4  * Copyright (c) 1996 John S. Dyson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice immediately at the beginning of the file, without modification,
12  *    this list of conditions, and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Absolutely no warranty of function or purpose is made by the author
17  *    John S. Dyson.
18  * 4. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/file.h>
33 #include <sys/filedesc.h>
34 #include <sys/pool.h>
35 #include <sys/ioctl.h>
36 #include <sys/stat.h>
37 #include <sys/signalvar.h>
38 #include <sys/mount.h>
39 #include <sys/syscallargs.h>
40 #include <sys/event.h>
41 #include <sys/lock.h>
42 #include <sys/poll.h>
43 
44 #include <uvm/uvm_extern.h>
45 
46 #include <sys/pipe.h>
47 
48 /*
49  * interfaces to the outside world
50  */
51 int	pipe_read(struct file *, off_t *, struct uio *, struct ucred *);
52 int	pipe_write(struct file *, off_t *, struct uio *, struct ucred *);
53 int	pipe_close(struct file *, struct proc *);
54 int	pipe_poll(struct file *, int events, struct proc *);
55 int	pipe_kqfilter(struct file *fp, struct knote *kn);
56 int	pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
57 int	pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
58 
59 static struct fileops pipeops = {
60 	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
61 	pipe_stat, pipe_close
62 };
63 
64 void	filt_pipedetach(struct knote *kn);
65 int	filt_piperead(struct knote *kn, long hint);
66 int	filt_pipewrite(struct knote *kn, long hint);
67 
68 struct filterops pipe_rfiltops =
69 	{ 1, NULL, filt_pipedetach, filt_piperead };
70 struct filterops pipe_wfiltops =
71 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
72 
73 /*
74  * Default pipe buffer size(s), this can be kind-of large now because pipe
75  * space is pageable.  The pipe code will try to maintain locality of
76  * reference for performance reasons, so small amounts of outstanding I/O
77  * will not wipe the cache.
78  */
79 #define MINPIPESIZE (PIPE_SIZE/3)
80 
81 /*
82  * Limit the number of "big" pipes
83  */
84 #define LIMITBIGPIPES	32
85 int nbigpipe;
86 static int amountpipekva;
87 
88 struct pool pipe_pool;
89 
90 void	pipeclose(struct pipe *);
91 void	pipe_free_kmem(struct pipe *);
92 int	pipe_create(struct pipe *);
93 int	pipelock(struct pipe *);
94 void	pipeunlock(struct pipe *);
95 void	pipeselwakeup(struct pipe *);
96 int	pipespace(struct pipe *, u_int);
97 
98 /*
99  * The pipe system call for the DTYPE_PIPE type of pipes
100  */
101 
102 /* ARGSUSED */
103 int
104 sys_opipe(struct proc *p, void *v, register_t *retval)
105 {
106 	struct filedesc *fdp = p->p_fd;
107 	struct file *rf, *wf;
108 	struct pipe *rpipe, *wpipe;
109 	int fd, error;
110 
111 	fdplock(fdp);
112 
113 	rpipe = pool_get(&pipe_pool, PR_WAITOK);
114 	error = pipe_create(rpipe);
115 	if (error != 0)
116 		goto free1;
117 	wpipe = pool_get(&pipe_pool, PR_WAITOK);
118 	error = pipe_create(wpipe);
119 	if (error != 0)
120 		goto free2;
121 
122 	error = falloc(p, &rf, &fd);
123 	if (error != 0)
124 		goto free2;
125 	rf->f_flag = FREAD | FWRITE;
126 	rf->f_type = DTYPE_PIPE;
127 	rf->f_data = rpipe;
128 	rf->f_ops = &pipeops;
129 	retval[0] = fd;
130 
131 	error = falloc(p, &wf, &fd);
132 	if (error != 0)
133 		goto free3;
134 	wf->f_flag = FREAD | FWRITE;
135 	wf->f_type = DTYPE_PIPE;
136 	wf->f_data = wpipe;
137 	wf->f_ops = &pipeops;
138 	retval[1] = fd;
139 
140 	rpipe->pipe_peer = wpipe;
141 	wpipe->pipe_peer = rpipe;
142 
143 	FILE_SET_MATURE(rf);
144 	FILE_SET_MATURE(wf);
145 
146 	fdpunlock(fdp);
147 	return (0);
148 
149 free3:
150 	fdremove(fdp, retval[0]);
151 	closef(rf, p);
152 	rpipe = NULL;
153 free2:
154 	(void)pipeclose(wpipe);
155 free1:
156 	if (rpipe != NULL)
157 		(void)pipeclose(rpipe);
158 	fdpunlock(fdp);
159 	return (error);
160 }
161 
162 /*
163  * Allocate kva for pipe circular buffer, the space is pageable.
164  * This routine will 'realloc' the size of a pipe safely, if it fails
165  * it will retain the old buffer.
166  * If it fails it will return ENOMEM.
167  */
168 int
169 pipespace(struct pipe *cpipe, u_int size)
170 {
171 	caddr_t buffer;
172 
173 	buffer = (caddr_t)uvm_km_valloc(kernel_map, size);
174 	if (buffer == NULL) {
175 		return (ENOMEM);
176 	}
177 
178 	/* free old resources if we are resizing */
179 	pipe_free_kmem(cpipe);
180 	cpipe->pipe_buffer.buffer = buffer;
181 	cpipe->pipe_buffer.size = size;
182 	cpipe->pipe_buffer.in = 0;
183 	cpipe->pipe_buffer.out = 0;
184 	cpipe->pipe_buffer.cnt = 0;
185 
186 	amountpipekva += cpipe->pipe_buffer.size;
187 
188 	return (0);
189 }
190 
191 /*
192  * initialize and allocate VM and memory for pipe
193  */
194 int
195 pipe_create(struct pipe *cpipe)
196 {
197 	int error;
198 
199 	/* so pipe_free_kmem() doesn't follow junk pointer */
200 	cpipe->pipe_buffer.buffer = NULL;
201 	/*
202 	 * protect so pipeclose() doesn't follow a junk pointer
203 	 * if pipespace() fails.
204 	 */
205 	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
206 	cpipe->pipe_state = 0;
207 	cpipe->pipe_peer = NULL;
208 	cpipe->pipe_busy = 0;
209 
210 	error = pipespace(cpipe, PIPE_SIZE);
211 	if (error != 0)
212 		return (error);
213 
214 	getnanotime(&cpipe->pipe_ctime);
215 	cpipe->pipe_atime = cpipe->pipe_ctime;
216 	cpipe->pipe_mtime = cpipe->pipe_ctime;
217 	cpipe->pipe_pgid = NO_PID;
218 
219 	return (0);
220 }
221 
222 
223 /*
224  * lock a pipe for I/O, blocking other access
225  */
226 int
227 pipelock(struct pipe *cpipe)
228 {
229 	int error;
230 	while (cpipe->pipe_state & PIPE_LOCK) {
231 		cpipe->pipe_state |= PIPE_LWANT;
232 		if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0)))
233 			return error;
234 	}
235 	cpipe->pipe_state |= PIPE_LOCK;
236 	return 0;
237 }
238 
239 /*
240  * unlock a pipe I/O lock
241  */
242 void
243 pipeunlock(struct pipe *cpipe)
244 {
245 	cpipe->pipe_state &= ~PIPE_LOCK;
246 	if (cpipe->pipe_state & PIPE_LWANT) {
247 		cpipe->pipe_state &= ~PIPE_LWANT;
248 		wakeup(cpipe);
249 	}
250 }
251 
252 void
253 pipeselwakeup(struct pipe *cpipe)
254 {
255 	if (cpipe->pipe_state & PIPE_SEL) {
256 		cpipe->pipe_state &= ~PIPE_SEL;
257 		selwakeup(&cpipe->pipe_sel);
258 	}
259 	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID)
260 		gsignal(cpipe->pipe_pgid, SIGIO);
261 	KNOTE(&cpipe->pipe_sel.si_note, 0);
262 }
263 
264 /* ARGSUSED */
265 int
266 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
267 {
268 	struct pipe *rpipe = (struct pipe *) fp->f_data;
269 	int error;
270 	int nread = 0;
271 	int size;
272 
273 	error = pipelock(rpipe);
274 	if (error)
275 		return (error);
276 
277 	++rpipe->pipe_busy;
278 
279 	while (uio->uio_resid) {
280 		/*
281 		 * normal pipe buffer receive
282 		 */
283 		if (rpipe->pipe_buffer.cnt > 0) {
284 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
285 			if (size > rpipe->pipe_buffer.cnt)
286 				size = rpipe->pipe_buffer.cnt;
287 			if (size > uio->uio_resid)
288 				size = uio->uio_resid;
289 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
290 					size, uio);
291 			if (error) {
292 				break;
293 			}
294 			rpipe->pipe_buffer.out += size;
295 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
296 				rpipe->pipe_buffer.out = 0;
297 
298 			rpipe->pipe_buffer.cnt -= size;
299 			/*
300 			 * If there is no more to read in the pipe, reset
301 			 * its pointers to the beginning.  This improves
302 			 * cache hit stats.
303 			 */
304 			if (rpipe->pipe_buffer.cnt == 0) {
305 				rpipe->pipe_buffer.in = 0;
306 				rpipe->pipe_buffer.out = 0;
307 			}
308 			nread += size;
309 		} else {
310 			/*
311 			 * detect EOF condition
312 			 * read returns 0 on EOF, no need to set error
313 			 */
314 			if (rpipe->pipe_state & PIPE_EOF)
315 				break;
316 
317 			/*
318 			 * If the "write-side" has been blocked, wake it up now.
319 			 */
320 			if (rpipe->pipe_state & PIPE_WANTW) {
321 				rpipe->pipe_state &= ~PIPE_WANTW;
322 				wakeup(rpipe);
323 			}
324 
325 			/*
326 			 * Break if some data was read.
327 			 */
328 			if (nread > 0)
329 				break;
330 
331 			/*
332 			 * Unlock the pipe buffer for our remaining processing.
333 			 * We will either break out with an error or we will
334 			 * sleep and relock to loop.
335 			 */
336 			pipeunlock(rpipe);
337 
338 			/*
339 			 * Handle non-blocking mode operation or
340 			 * wait for more data.
341 			 */
342 			if (fp->f_flag & FNONBLOCK) {
343 				error = EAGAIN;
344 			} else {
345 				rpipe->pipe_state |= PIPE_WANTR;
346 				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
347 					error = pipelock(rpipe);
348 			}
349 			if (error)
350 				goto unlocked_error;
351 		}
352 	}
353 	pipeunlock(rpipe);
354 
355 	if (error == 0)
356 		getnanotime(&rpipe->pipe_atime);
357 unlocked_error:
358 	--rpipe->pipe_busy;
359 
360 	/*
361 	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
362 	 */
363 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
364 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
365 		wakeup(rpipe);
366 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
367 		/*
368 		 * Handle write blocking hysteresis.
369 		 */
370 		if (rpipe->pipe_state & PIPE_WANTW) {
371 			rpipe->pipe_state &= ~PIPE_WANTW;
372 			wakeup(rpipe);
373 		}
374 	}
375 
376 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
377 		pipeselwakeup(rpipe);
378 
379 	return (error);
380 }
381 
382 int
383 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred)
384 {
385 	int error = 0;
386 	int orig_resid;
387 
388 	struct pipe *wpipe, *rpipe;
389 
390 	rpipe = (struct pipe *) fp->f_data;
391 	wpipe = rpipe->pipe_peer;
392 
393 	/*
394 	 * detect loss of pipe read side, issue SIGPIPE if lost.
395 	 */
396 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
397 		return (EPIPE);
398 	}
399 	++wpipe->pipe_busy;
400 
401 	/*
402 	 * If it is advantageous to resize the pipe buffer, do
403 	 * so.
404 	 */
405 	if ((uio->uio_resid > PIPE_SIZE) &&
406 	    (nbigpipe < LIMITBIGPIPES) &&
407 	    (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
408 	    (wpipe->pipe_buffer.cnt == 0)) {
409 
410 		if ((error = pipelock(wpipe)) == 0) {
411 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
412 				nbigpipe++;
413 			pipeunlock(wpipe);
414 		}
415 	}
416 
417 	/*
418 	 * If an early error occured unbusy and return, waking up any pending
419 	 * readers.
420 	 */
421 	if (error) {
422 		--wpipe->pipe_busy;
423 		if ((wpipe->pipe_busy == 0) &&
424 		    (wpipe->pipe_state & PIPE_WANT)) {
425 			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
426 			wakeup(wpipe);
427 		}
428 		return (error);
429 	}
430 
431 	orig_resid = uio->uio_resid;
432 
433 	while (uio->uio_resid) {
434 		int space;
435 
436 retrywrite:
437 		if (wpipe->pipe_state & PIPE_EOF) {
438 			error = EPIPE;
439 			break;
440 		}
441 
442 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
443 
444 		/* Writes of size <= PIPE_BUF must be atomic. */
445 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
446 			space = 0;
447 
448 		if (space > 0) {
449 			if ((error = pipelock(wpipe)) == 0) {
450 				int size;	/* Transfer size */
451 				int segsize;	/* first segment to transfer */
452 
453 				/*
454 				 * If a process blocked in uiomove, our
455 				 * value for space might be bad.
456 				 *
457 				 * XXX will we be ok if the reader has gone
458 				 * away here?
459 				 */
460 				if (space > wpipe->pipe_buffer.size -
461 				    wpipe->pipe_buffer.cnt) {
462 					pipeunlock(wpipe);
463 					goto retrywrite;
464 				}
465 
466 				/*
467 				 * Transfer size is minimum of uio transfer
468 				 * and free space in pipe buffer.
469 				 */
470 				if (space > uio->uio_resid)
471 					size = uio->uio_resid;
472 				else
473 					size = space;
474 				/*
475 				 * First segment to transfer is minimum of
476 				 * transfer size and contiguous space in
477 				 * pipe buffer.  If first segment to transfer
478 				 * is less than the transfer size, we've got
479 				 * a wraparound in the buffer.
480 				 */
481 				segsize = wpipe->pipe_buffer.size -
482 					wpipe->pipe_buffer.in;
483 				if (segsize > size)
484 					segsize = size;
485 
486 				/* Transfer first segment */
487 
488 				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
489 						segsize, uio);
490 
491 				if (error == 0 && segsize < size) {
492 					/*
493 					 * Transfer remaining part now, to
494 					 * support atomic writes.  Wraparound
495 					 * happened.
496 					 */
497 #ifdef DIAGNOSTIC
498 					if (wpipe->pipe_buffer.in + segsize !=
499 					    wpipe->pipe_buffer.size)
500 						panic("Expected pipe buffer wraparound disappeared");
501 #endif
502 
503 					error = uiomove(&wpipe->pipe_buffer.buffer[0],
504 							size - segsize, uio);
505 				}
506 				if (error == 0) {
507 					wpipe->pipe_buffer.in += size;
508 					if (wpipe->pipe_buffer.in >=
509 					    wpipe->pipe_buffer.size) {
510 #ifdef DIAGNOSTIC
511 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
512 							panic("Expected wraparound bad");
513 #endif
514 						wpipe->pipe_buffer.in = size - segsize;
515 					}
516 
517 					wpipe->pipe_buffer.cnt += size;
518 #ifdef DIAGNOSTIC
519 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
520 						panic("Pipe buffer overflow");
521 #endif
522 				}
523 				pipeunlock(wpipe);
524 			}
525 			if (error)
526 				break;
527 		} else {
528 			/*
529 			 * If the "read-side" has been blocked, wake it up now.
530 			 */
531 			if (wpipe->pipe_state & PIPE_WANTR) {
532 				wpipe->pipe_state &= ~PIPE_WANTR;
533 				wakeup(wpipe);
534 			}
535 
536 			/*
537 			 * don't block on non-blocking I/O
538 			 */
539 			if (fp->f_flag & FNONBLOCK) {
540 				error = EAGAIN;
541 				break;
542 			}
543 
544 			/*
545 			 * We have no more space and have something to offer,
546 			 * wake up select/poll.
547 			 */
548 			pipeselwakeup(wpipe);
549 
550 			wpipe->pipe_state |= PIPE_WANTW;
551 			error = tsleep(wpipe, (PRIBIO + 1)|PCATCH,
552 			    "pipewr", 0);
553 			if (error)
554 				break;
555 			/*
556 			 * If read side wants to go away, we just issue a
557 			 * signal to ourselves.
558 			 */
559 			if (wpipe->pipe_state & PIPE_EOF) {
560 				error = EPIPE;
561 				break;
562 			}
563 		}
564 	}
565 
566 	--wpipe->pipe_busy;
567 
568 	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
569 		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
570 		wakeup(wpipe);
571 	} else if (wpipe->pipe_buffer.cnt > 0) {
572 		/*
573 		 * If we have put any characters in the buffer, we wake up
574 		 * the reader.
575 		 */
576 		if (wpipe->pipe_state & PIPE_WANTR) {
577 			wpipe->pipe_state &= ~PIPE_WANTR;
578 			wakeup(wpipe);
579 		}
580 	}
581 
582 	/*
583 	 * Don't return EPIPE if I/O was successful
584 	 */
585 	if ((wpipe->pipe_buffer.cnt == 0) &&
586 	    (uio->uio_resid == 0) &&
587 	    (error == EPIPE)) {
588 		error = 0;
589 	}
590 
591 	if (error == 0)
592 		getnanotime(&wpipe->pipe_mtime);
593 	/*
594 	 * We have something to offer, wake up select/poll.
595 	 */
596 	if (wpipe->pipe_buffer.cnt)
597 		pipeselwakeup(wpipe);
598 
599 	return (error);
600 }
601 
602 /*
603  * we implement a very minimal set of ioctls for compatibility with sockets.
604  */
605 int
606 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
607 {
608 	struct pipe *mpipe = (struct pipe *)fp->f_data;
609 
610 	switch (cmd) {
611 
612 	case FIONBIO:
613 		return (0);
614 
615 	case FIOASYNC:
616 		if (*(int *)data) {
617 			mpipe->pipe_state |= PIPE_ASYNC;
618 		} else {
619 			mpipe->pipe_state &= ~PIPE_ASYNC;
620 		}
621 		return (0);
622 
623 	case FIONREAD:
624 		*(int *)data = mpipe->pipe_buffer.cnt;
625 		return (0);
626 
627 	case SIOCSPGRP:
628 		mpipe->pipe_pgid = *(int *)data;
629 		return (0);
630 
631 	case SIOCGPGRP:
632 		*(int *)data = mpipe->pipe_pgid;
633 		return (0);
634 
635 	}
636 	return (ENOTTY);
637 }
638 
639 int
640 pipe_poll(struct file *fp, int events, struct proc *p)
641 {
642 	struct pipe *rpipe = (struct pipe *)fp->f_data;
643 	struct pipe *wpipe;
644 	int revents = 0;
645 
646 	wpipe = rpipe->pipe_peer;
647 	if (events & (POLLIN | POLLRDNORM)) {
648 		if ((rpipe->pipe_buffer.cnt > 0) ||
649 		    (rpipe->pipe_state & PIPE_EOF))
650 			revents |= events & (POLLIN | POLLRDNORM);
651 	}
652 
653 	/* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
654 	if ((rpipe->pipe_state & PIPE_EOF) ||
655 	    (wpipe == NULL) ||
656 	    (wpipe->pipe_state & PIPE_EOF))
657 		revents |= POLLHUP;
658 	else if (events & (POLLOUT | POLLWRNORM)) {
659 		if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)
660 			revents |= events & (POLLOUT | POLLWRNORM);
661 	}
662 
663 	if (revents == 0) {
664 		if (events & (POLLIN | POLLRDNORM)) {
665 			selrecord(p, &rpipe->pipe_sel);
666 			rpipe->pipe_state |= PIPE_SEL;
667 		}
668 		if (events & (POLLOUT | POLLWRNORM)) {
669 			selrecord(p, &wpipe->pipe_sel);
670 			wpipe->pipe_state |= PIPE_SEL;
671 		}
672 	}
673 	return (revents);
674 }
675 
676 int
677 pipe_stat(struct file *fp, struct stat *ub, struct proc *p)
678 {
679 	struct pipe *pipe = (struct pipe *)fp->f_data;
680 
681 	bzero(ub, sizeof(*ub));
682 	ub->st_mode = S_IFIFO;
683 	ub->st_blksize = pipe->pipe_buffer.size;
684 	ub->st_size = pipe->pipe_buffer.cnt;
685 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
686 	ub->st_atim = pipe->pipe_atime;
687 	ub->st_mtim = pipe->pipe_mtime;
688 	ub->st_ctim = pipe->pipe_ctime;
689 	ub->st_uid = fp->f_cred->cr_uid;
690 	ub->st_gid = fp->f_cred->cr_gid;
691 	/*
692 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
693 	 * XXX (st_dev, st_ino) should be unique.
694 	 */
695 	return (0);
696 }
697 
698 /* ARGSUSED */
699 int
700 pipe_close(struct file *fp, struct proc *p)
701 {
702 	struct pipe *cpipe = (struct pipe *)fp->f_data;
703 
704 	fp->f_ops = NULL;
705 	fp->f_data = NULL;
706 	pipeclose(cpipe);
707 	return (0);
708 }
709 
710 void
711 pipe_free_kmem(struct pipe *cpipe)
712 {
713 	if (cpipe->pipe_buffer.buffer != NULL) {
714 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
715 			--nbigpipe;
716 		amountpipekva -= cpipe->pipe_buffer.size;
717 		uvm_km_free(kernel_map, (vaddr_t)cpipe->pipe_buffer.buffer,
718 		    cpipe->pipe_buffer.size);
719 		cpipe->pipe_buffer.buffer = NULL;
720 	}
721 }
722 
723 /*
724  * shutdown the pipe
725  */
726 void
727 pipeclose(struct pipe *cpipe)
728 {
729 	struct pipe *ppipe;
730 	if (cpipe) {
731 
732 		pipeselwakeup(cpipe);
733 
734 		/*
735 		 * If the other side is blocked, wake it up saying that
736 		 * we want to close it down.
737 		 */
738 		cpipe->pipe_state |= PIPE_EOF;
739 		while (cpipe->pipe_busy) {
740 			wakeup(cpipe);
741 			cpipe->pipe_state |= PIPE_WANT;
742 			tsleep(cpipe, PRIBIO, "pipecl", 0);
743 		}
744 
745 		/*
746 		 * Disconnect from peer
747 		 */
748 		if ((ppipe = cpipe->pipe_peer) != NULL) {
749 			pipeselwakeup(ppipe);
750 
751 			ppipe->pipe_state |= PIPE_EOF;
752 			wakeup(ppipe);
753 			KNOTE(&ppipe->pipe_sel.si_note, 0);
754 			ppipe->pipe_peer = NULL;
755 		}
756 
757 		/*
758 		 * free resources
759 		 */
760 		pipe_free_kmem(cpipe);
761 		pool_put(&pipe_pool, cpipe);
762 	}
763 }
764 
765 int
766 pipe_kqfilter(struct file *fp, struct knote *kn)
767 {
768 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
769 	struct pipe *wpipe = rpipe->pipe_peer;
770 
771 	switch (kn->kn_filter) {
772 	case EVFILT_READ:
773 		kn->kn_fop = &pipe_rfiltops;
774 		SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
775 		break;
776 	case EVFILT_WRITE:
777 		if (wpipe == NULL)
778 			/* other end of pipe has been closed */
779 			return (1);
780 		kn->kn_fop = &pipe_wfiltops;
781 		SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext);
782 		break;
783 	default:
784 		return (1);
785 	}
786 
787 	return (0);
788 }
789 
790 void
791 filt_pipedetach(struct knote *kn)
792 {
793 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
794 	struct pipe *wpipe = rpipe->pipe_peer;
795 
796 	switch (kn->kn_filter) {
797 	case EVFILT_READ:
798 		SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
799 		break;
800 	case EVFILT_WRITE:
801 		if (wpipe == NULL)
802 			return;
803 		SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext);
804 		break;
805 	}
806 }
807 
808 /*ARGSUSED*/
809 int
810 filt_piperead(struct knote *kn, long hint)
811 {
812 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
813 	struct pipe *wpipe = rpipe->pipe_peer;
814 
815 	kn->kn_data = rpipe->pipe_buffer.cnt;
816 
817 	if ((rpipe->pipe_state & PIPE_EOF) ||
818 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
819 		kn->kn_flags |= EV_EOF;
820 		return (1);
821 	}
822 	return (kn->kn_data > 0);
823 }
824 
825 /*ARGSUSED*/
826 int
827 filt_pipewrite(struct knote *kn, long hint)
828 {
829 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
830 	struct pipe *wpipe = rpipe->pipe_peer;
831 
832 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
833 		kn->kn_data = 0;
834 		kn->kn_flags |= EV_EOF;
835 		return (1);
836 	}
837 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
838 
839 	return (kn->kn_data >= PIPE_BUF);
840 }
841 
842 void
843 pipe_init(void)
844 {
845 	pool_init(&pipe_pool, sizeof(struct pipe), 0, 0, 0, "pipepl",
846 	    &pool_allocator_nointr);
847 }
848 
849