1 /* $OpenBSD: sys_pipe.c,v 1.69 2015/02/10 21:56:10 miod Exp $ */ 2 3 /* 4 * Copyright (c) 1996 John S. Dyson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice immediately at the beginning of the file, without modification, 12 * this list of conditions, and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Absolutely no warranty of function or purpose is made by the author 17 * John S. Dyson. 18 * 4. Modifications may be freely made to this file if the above conditions 19 * are met. 20 */ 21 22 /* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/proc.h> 32 #include <sys/file.h> 33 #include <sys/filedesc.h> 34 #include <sys/pool.h> 35 #include <sys/ioctl.h> 36 #include <sys/stat.h> 37 #include <sys/signalvar.h> 38 #include <sys/mount.h> 39 #include <sys/syscallargs.h> 40 #include <sys/event.h> 41 #include <sys/lock.h> 42 #include <sys/poll.h> 43 44 #include <uvm/uvm_extern.h> 45 46 #include <sys/pipe.h> 47 48 /* 49 * interfaces to the outside world 50 */ 51 int pipe_read(struct file *, off_t *, struct uio *, struct ucred *); 52 int pipe_write(struct file *, off_t *, struct uio *, struct ucred *); 53 int pipe_close(struct file *, struct proc *); 54 int pipe_poll(struct file *, int events, struct proc *); 55 int pipe_kqfilter(struct file *fp, struct knote *kn); 56 int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *); 57 int pipe_stat(struct file *fp, struct stat *ub, struct proc *p); 58 59 static struct fileops pipeops = { 60 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 61 pipe_stat, pipe_close 62 }; 63 64 void filt_pipedetach(struct knote *kn); 65 int filt_piperead(struct knote *kn, long hint); 66 int filt_pipewrite(struct knote *kn, long hint); 67 68 struct filterops pipe_rfiltops = 69 { 1, NULL, filt_pipedetach, filt_piperead }; 70 struct filterops pipe_wfiltops = 71 { 1, NULL, filt_pipedetach, filt_pipewrite }; 72 73 /* 74 * Default pipe buffer size(s), this can be kind-of large now because pipe 75 * space is pageable. The pipe code will try to maintain locality of 76 * reference for performance reasons, so small amounts of outstanding I/O 77 * will not wipe the cache. 78 */ 79 #define MINPIPESIZE (PIPE_SIZE/3) 80 81 /* 82 * Limit the number of "big" pipes 83 */ 84 #define LIMITBIGPIPES 32 85 int nbigpipe; 86 static int amountpipekva; 87 88 struct pool pipe_pool; 89 90 int dopipe(struct proc *, int *, int); 91 void pipeclose(struct pipe *); 92 void pipe_free_kmem(struct pipe *); 93 int pipe_create(struct pipe *); 94 int pipelock(struct pipe *); 95 void pipeunlock(struct pipe *); 96 void pipeselwakeup(struct pipe *); 97 int pipespace(struct pipe *, u_int); 98 99 /* 100 * The pipe system call for the DTYPE_PIPE type of pipes 101 */ 102 103 int 104 sys_pipe(struct proc *p, void *v, register_t *retval) 105 { 106 struct sys_pipe_args /* { 107 syscallarg(int *) fdp; 108 } */ *uap = v; 109 110 return (dopipe(p, SCARG(uap, fdp), 0)); 111 } 112 113 int 114 sys_pipe2(struct proc *p, void *v, register_t *retval) 115 { 116 struct sys_pipe2_args /* { 117 syscallarg(int *) fdp; 118 syscallarg(int) flags; 119 } */ *uap = v; 120 121 if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK)) 122 return (EINVAL); 123 124 return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags))); 125 } 126 127 int 128 dopipe(struct proc *p, int *ufds, int flags) 129 { 130 struct filedesc *fdp = p->p_fd; 131 struct file *rf, *wf; 132 struct pipe *rpipe, *wpipe = NULL; 133 int fds[2], error; 134 135 rpipe = pool_get(&pipe_pool, PR_WAITOK); 136 error = pipe_create(rpipe); 137 if (error != 0) 138 goto free1; 139 wpipe = pool_get(&pipe_pool, PR_WAITOK); 140 error = pipe_create(wpipe); 141 if (error != 0) 142 goto free1; 143 144 fdplock(fdp); 145 146 error = falloc(p, &rf, &fds[0]); 147 if (error != 0) 148 goto free2; 149 rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); 150 rf->f_type = DTYPE_PIPE; 151 rf->f_data = rpipe; 152 rf->f_ops = &pipeops; 153 154 error = falloc(p, &wf, &fds[1]); 155 if (error != 0) 156 goto free3; 157 wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); 158 wf->f_type = DTYPE_PIPE; 159 wf->f_data = wpipe; 160 wf->f_ops = &pipeops; 161 162 if (flags & O_CLOEXEC) { 163 fdp->fd_ofileflags[fds[0]] |= UF_EXCLOSE; 164 fdp->fd_ofileflags[fds[1]] |= UF_EXCLOSE; 165 } 166 167 rpipe->pipe_peer = wpipe; 168 wpipe->pipe_peer = rpipe; 169 170 FILE_SET_MATURE(rf, p); 171 FILE_SET_MATURE(wf, p); 172 173 error = copyout(fds, ufds, sizeof(fds)); 174 if (error != 0) { 175 fdrelease(p, fds[0]); 176 fdrelease(p, fds[1]); 177 } 178 fdpunlock(fdp); 179 return (error); 180 181 free3: 182 fdremove(fdp, fds[0]); 183 closef(rf, p); 184 rpipe = NULL; 185 free2: 186 fdpunlock(fdp); 187 free1: 188 pipeclose(wpipe); 189 pipeclose(rpipe); 190 return (error); 191 } 192 193 /* 194 * Allocate kva for pipe circular buffer, the space is pageable. 195 * This routine will 'realloc' the size of a pipe safely, if it fails 196 * it will retain the old buffer. 197 * If it fails it will return ENOMEM. 198 */ 199 int 200 pipespace(struct pipe *cpipe, u_int size) 201 { 202 caddr_t buffer; 203 204 buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok); 205 if (buffer == NULL) { 206 return (ENOMEM); 207 } 208 209 /* free old resources if we are resizing */ 210 pipe_free_kmem(cpipe); 211 cpipe->pipe_buffer.buffer = buffer; 212 cpipe->pipe_buffer.size = size; 213 cpipe->pipe_buffer.in = 0; 214 cpipe->pipe_buffer.out = 0; 215 cpipe->pipe_buffer.cnt = 0; 216 217 amountpipekva += cpipe->pipe_buffer.size; 218 219 return (0); 220 } 221 222 /* 223 * initialize and allocate VM and memory for pipe 224 */ 225 int 226 pipe_create(struct pipe *cpipe) 227 { 228 int error; 229 230 /* so pipe_free_kmem() doesn't follow junk pointer */ 231 cpipe->pipe_buffer.buffer = NULL; 232 /* 233 * protect so pipeclose() doesn't follow a junk pointer 234 * if pipespace() fails. 235 */ 236 memset(&cpipe->pipe_sel, 0, sizeof(cpipe->pipe_sel)); 237 cpipe->pipe_state = 0; 238 cpipe->pipe_peer = NULL; 239 cpipe->pipe_busy = 0; 240 241 error = pipespace(cpipe, PIPE_SIZE); 242 if (error != 0) 243 return (error); 244 245 getnanotime(&cpipe->pipe_ctime); 246 cpipe->pipe_atime = cpipe->pipe_ctime; 247 cpipe->pipe_mtime = cpipe->pipe_ctime; 248 cpipe->pipe_pgid = NO_PID; 249 250 return (0); 251 } 252 253 254 /* 255 * lock a pipe for I/O, blocking other access 256 */ 257 int 258 pipelock(struct pipe *cpipe) 259 { 260 int error; 261 while (cpipe->pipe_state & PIPE_LOCK) { 262 cpipe->pipe_state |= PIPE_LWANT; 263 if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0))) 264 return error; 265 } 266 cpipe->pipe_state |= PIPE_LOCK; 267 return 0; 268 } 269 270 /* 271 * unlock a pipe I/O lock 272 */ 273 void 274 pipeunlock(struct pipe *cpipe) 275 { 276 cpipe->pipe_state &= ~PIPE_LOCK; 277 if (cpipe->pipe_state & PIPE_LWANT) { 278 cpipe->pipe_state &= ~PIPE_LWANT; 279 wakeup(cpipe); 280 } 281 } 282 283 void 284 pipeselwakeup(struct pipe *cpipe) 285 { 286 if (cpipe->pipe_state & PIPE_SEL) { 287 cpipe->pipe_state &= ~PIPE_SEL; 288 selwakeup(&cpipe->pipe_sel); 289 } else 290 KNOTE(&cpipe->pipe_sel.si_note, 0); 291 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID) 292 gsignal(cpipe->pipe_pgid, SIGIO); 293 } 294 295 /* ARGSUSED */ 296 int 297 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 298 { 299 struct pipe *rpipe = (struct pipe *) fp->f_data; 300 int error; 301 int nread = 0; 302 int size; 303 304 error = pipelock(rpipe); 305 if (error) 306 return (error); 307 308 ++rpipe->pipe_busy; 309 310 while (uio->uio_resid) { 311 /* 312 * normal pipe buffer receive 313 */ 314 if (rpipe->pipe_buffer.cnt > 0) { 315 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 316 if (size > rpipe->pipe_buffer.cnt) 317 size = rpipe->pipe_buffer.cnt; 318 if (size > uio->uio_resid) 319 size = uio->uio_resid; 320 error = uiomovei(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 321 size, uio); 322 if (error) { 323 break; 324 } 325 rpipe->pipe_buffer.out += size; 326 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 327 rpipe->pipe_buffer.out = 0; 328 329 rpipe->pipe_buffer.cnt -= size; 330 /* 331 * If there is no more to read in the pipe, reset 332 * its pointers to the beginning. This improves 333 * cache hit stats. 334 */ 335 if (rpipe->pipe_buffer.cnt == 0) { 336 rpipe->pipe_buffer.in = 0; 337 rpipe->pipe_buffer.out = 0; 338 } 339 nread += size; 340 } else { 341 /* 342 * detect EOF condition 343 * read returns 0 on EOF, no need to set error 344 */ 345 if (rpipe->pipe_state & PIPE_EOF) 346 break; 347 348 /* 349 * If the "write-side" has been blocked, wake it up now. 350 */ 351 if (rpipe->pipe_state & PIPE_WANTW) { 352 rpipe->pipe_state &= ~PIPE_WANTW; 353 wakeup(rpipe); 354 } 355 356 /* 357 * Break if some data was read. 358 */ 359 if (nread > 0) 360 break; 361 362 /* 363 * Unlock the pipe buffer for our remaining processing. 364 * We will either break out with an error or we will 365 * sleep and relock to loop. 366 */ 367 pipeunlock(rpipe); 368 369 /* 370 * Handle non-blocking mode operation or 371 * wait for more data. 372 */ 373 if (fp->f_flag & FNONBLOCK) { 374 error = EAGAIN; 375 } else { 376 rpipe->pipe_state |= PIPE_WANTR; 377 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0) 378 error = pipelock(rpipe); 379 } 380 if (error) 381 goto unlocked_error; 382 } 383 } 384 pipeunlock(rpipe); 385 386 if (error == 0) 387 getnanotime(&rpipe->pipe_atime); 388 unlocked_error: 389 --rpipe->pipe_busy; 390 391 /* 392 * PIPE_WANT processing only makes sense if pipe_busy is 0. 393 */ 394 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 395 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 396 wakeup(rpipe); 397 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 398 /* 399 * Handle write blocking hysteresis. 400 */ 401 if (rpipe->pipe_state & PIPE_WANTW) { 402 rpipe->pipe_state &= ~PIPE_WANTW; 403 wakeup(rpipe); 404 } 405 } 406 407 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 408 pipeselwakeup(rpipe); 409 410 return (error); 411 } 412 413 int 414 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 415 { 416 int error = 0; 417 int orig_resid; 418 419 struct pipe *wpipe, *rpipe; 420 421 rpipe = (struct pipe *) fp->f_data; 422 wpipe = rpipe->pipe_peer; 423 424 /* 425 * detect loss of pipe read side, issue SIGPIPE if lost. 426 */ 427 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 428 return (EPIPE); 429 } 430 ++wpipe->pipe_busy; 431 432 /* 433 * If it is advantageous to resize the pipe buffer, do 434 * so. 435 */ 436 if ((uio->uio_resid > PIPE_SIZE) && 437 (nbigpipe < LIMITBIGPIPES) && 438 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 439 (wpipe->pipe_buffer.cnt == 0)) { 440 441 if ((error = pipelock(wpipe)) == 0) { 442 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 443 nbigpipe++; 444 pipeunlock(wpipe); 445 } 446 } 447 448 /* 449 * If an early error occurred unbusy and return, waking up any pending 450 * readers. 451 */ 452 if (error) { 453 --wpipe->pipe_busy; 454 if ((wpipe->pipe_busy == 0) && 455 (wpipe->pipe_state & PIPE_WANT)) { 456 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 457 wakeup(wpipe); 458 } 459 return (error); 460 } 461 462 orig_resid = uio->uio_resid; 463 464 while (uio->uio_resid) { 465 int space; 466 467 retrywrite: 468 if (wpipe->pipe_state & PIPE_EOF) { 469 error = EPIPE; 470 break; 471 } 472 473 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 474 475 /* Writes of size <= PIPE_BUF must be atomic. */ 476 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 477 space = 0; 478 479 if (space > 0) { 480 if ((error = pipelock(wpipe)) == 0) { 481 int size; /* Transfer size */ 482 int segsize; /* first segment to transfer */ 483 484 /* 485 * If a process blocked in uiomove, our 486 * value for space might be bad. 487 * 488 * XXX will we be ok if the reader has gone 489 * away here? 490 */ 491 if (space > wpipe->pipe_buffer.size - 492 wpipe->pipe_buffer.cnt) { 493 pipeunlock(wpipe); 494 goto retrywrite; 495 } 496 497 /* 498 * Transfer size is minimum of uio transfer 499 * and free space in pipe buffer. 500 */ 501 if (space > uio->uio_resid) 502 size = uio->uio_resid; 503 else 504 size = space; 505 /* 506 * First segment to transfer is minimum of 507 * transfer size and contiguous space in 508 * pipe buffer. If first segment to transfer 509 * is less than the transfer size, we've got 510 * a wraparound in the buffer. 511 */ 512 segsize = wpipe->pipe_buffer.size - 513 wpipe->pipe_buffer.in; 514 if (segsize > size) 515 segsize = size; 516 517 /* Transfer first segment */ 518 519 error = uiomovei(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 520 segsize, uio); 521 522 if (error == 0 && segsize < size) { 523 /* 524 * Transfer remaining part now, to 525 * support atomic writes. Wraparound 526 * happened. 527 */ 528 #ifdef DIAGNOSTIC 529 if (wpipe->pipe_buffer.in + segsize != 530 wpipe->pipe_buffer.size) 531 panic("Expected pipe buffer wraparound disappeared"); 532 #endif 533 534 error = uiomovei(&wpipe->pipe_buffer.buffer[0], 535 size - segsize, uio); 536 } 537 if (error == 0) { 538 wpipe->pipe_buffer.in += size; 539 if (wpipe->pipe_buffer.in >= 540 wpipe->pipe_buffer.size) { 541 #ifdef DIAGNOSTIC 542 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 543 panic("Expected wraparound bad"); 544 #endif 545 wpipe->pipe_buffer.in = size - segsize; 546 } 547 548 wpipe->pipe_buffer.cnt += size; 549 #ifdef DIAGNOSTIC 550 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 551 panic("Pipe buffer overflow"); 552 #endif 553 } 554 pipeunlock(wpipe); 555 } 556 if (error) 557 break; 558 } else { 559 /* 560 * If the "read-side" has been blocked, wake it up now. 561 */ 562 if (wpipe->pipe_state & PIPE_WANTR) { 563 wpipe->pipe_state &= ~PIPE_WANTR; 564 wakeup(wpipe); 565 } 566 567 /* 568 * don't block on non-blocking I/O 569 */ 570 if (fp->f_flag & FNONBLOCK) { 571 error = EAGAIN; 572 break; 573 } 574 575 /* 576 * We have no more space and have something to offer, 577 * wake up select/poll. 578 */ 579 pipeselwakeup(wpipe); 580 581 wpipe->pipe_state |= PIPE_WANTW; 582 error = tsleep(wpipe, (PRIBIO + 1)|PCATCH, 583 "pipewr", 0); 584 if (error) 585 break; 586 /* 587 * If read side wants to go away, we just issue a 588 * signal to ourselves. 589 */ 590 if (wpipe->pipe_state & PIPE_EOF) { 591 error = EPIPE; 592 break; 593 } 594 } 595 } 596 597 --wpipe->pipe_busy; 598 599 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 600 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 601 wakeup(wpipe); 602 } else if (wpipe->pipe_buffer.cnt > 0) { 603 /* 604 * If we have put any characters in the buffer, we wake up 605 * the reader. 606 */ 607 if (wpipe->pipe_state & PIPE_WANTR) { 608 wpipe->pipe_state &= ~PIPE_WANTR; 609 wakeup(wpipe); 610 } 611 } 612 613 /* 614 * Don't return EPIPE if I/O was successful 615 */ 616 if ((wpipe->pipe_buffer.cnt == 0) && 617 (uio->uio_resid == 0) && 618 (error == EPIPE)) { 619 error = 0; 620 } 621 622 if (error == 0) 623 getnanotime(&wpipe->pipe_mtime); 624 /* 625 * We have something to offer, wake up select/poll. 626 */ 627 if (wpipe->pipe_buffer.cnt) 628 pipeselwakeup(wpipe); 629 630 return (error); 631 } 632 633 /* 634 * we implement a very minimal set of ioctls for compatibility with sockets. 635 */ 636 int 637 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p) 638 { 639 struct pipe *mpipe = (struct pipe *)fp->f_data; 640 641 switch (cmd) { 642 643 case FIONBIO: 644 return (0); 645 646 case FIOASYNC: 647 if (*(int *)data) { 648 mpipe->pipe_state |= PIPE_ASYNC; 649 } else { 650 mpipe->pipe_state &= ~PIPE_ASYNC; 651 } 652 return (0); 653 654 case FIONREAD: 655 *(int *)data = mpipe->pipe_buffer.cnt; 656 return (0); 657 658 case SIOCSPGRP: 659 mpipe->pipe_pgid = *(int *)data; 660 return (0); 661 662 case SIOCGPGRP: 663 *(int *)data = mpipe->pipe_pgid; 664 return (0); 665 666 } 667 return (ENOTTY); 668 } 669 670 int 671 pipe_poll(struct file *fp, int events, struct proc *p) 672 { 673 struct pipe *rpipe = (struct pipe *)fp->f_data; 674 struct pipe *wpipe; 675 int revents = 0; 676 677 wpipe = rpipe->pipe_peer; 678 if (events & (POLLIN | POLLRDNORM)) { 679 if ((rpipe->pipe_buffer.cnt > 0) || 680 (rpipe->pipe_state & PIPE_EOF)) 681 revents |= events & (POLLIN | POLLRDNORM); 682 } 683 684 /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */ 685 if ((rpipe->pipe_state & PIPE_EOF) || 686 (wpipe == NULL) || 687 (wpipe->pipe_state & PIPE_EOF)) 688 revents |= POLLHUP; 689 else if (events & (POLLOUT | POLLWRNORM)) { 690 if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF) 691 revents |= events & (POLLOUT | POLLWRNORM); 692 } 693 694 if (revents == 0) { 695 if (events & (POLLIN | POLLRDNORM)) { 696 selrecord(p, &rpipe->pipe_sel); 697 rpipe->pipe_state |= PIPE_SEL; 698 } 699 if (events & (POLLOUT | POLLWRNORM)) { 700 selrecord(p, &wpipe->pipe_sel); 701 wpipe->pipe_state |= PIPE_SEL; 702 } 703 } 704 return (revents); 705 } 706 707 int 708 pipe_stat(struct file *fp, struct stat *ub, struct proc *p) 709 { 710 struct pipe *pipe = (struct pipe *)fp->f_data; 711 712 memset(ub, 0, sizeof(*ub)); 713 ub->st_mode = S_IFIFO; 714 ub->st_blksize = pipe->pipe_buffer.size; 715 ub->st_size = pipe->pipe_buffer.cnt; 716 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 717 ub->st_atim.tv_sec = pipe->pipe_atime.tv_sec; 718 ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec; 719 ub->st_mtim.tv_sec = pipe->pipe_mtime.tv_sec; 720 ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec; 721 ub->st_ctim.tv_sec = pipe->pipe_ctime.tv_sec; 722 ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec; 723 ub->st_uid = fp->f_cred->cr_uid; 724 ub->st_gid = fp->f_cred->cr_gid; 725 /* 726 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 727 * XXX (st_dev, st_ino) should be unique. 728 */ 729 return (0); 730 } 731 732 /* ARGSUSED */ 733 int 734 pipe_close(struct file *fp, struct proc *p) 735 { 736 struct pipe *cpipe = (struct pipe *)fp->f_data; 737 738 fp->f_ops = NULL; 739 fp->f_data = NULL; 740 pipeclose(cpipe); 741 return (0); 742 } 743 744 void 745 pipe_free_kmem(struct pipe *cpipe) 746 { 747 if (cpipe->pipe_buffer.buffer != NULL) { 748 if (cpipe->pipe_buffer.size > PIPE_SIZE) 749 --nbigpipe; 750 amountpipekva -= cpipe->pipe_buffer.size; 751 km_free(cpipe->pipe_buffer.buffer, cpipe->pipe_buffer.size, 752 &kv_any, &kp_pageable); 753 cpipe->pipe_buffer.buffer = NULL; 754 } 755 } 756 757 /* 758 * shutdown the pipe 759 */ 760 void 761 pipeclose(struct pipe *cpipe) 762 { 763 struct pipe *ppipe; 764 if (cpipe) { 765 766 pipeselwakeup(cpipe); 767 768 /* 769 * If the other side is blocked, wake it up saying that 770 * we want to close it down. 771 */ 772 cpipe->pipe_state |= PIPE_EOF; 773 while (cpipe->pipe_busy) { 774 wakeup(cpipe); 775 cpipe->pipe_state |= PIPE_WANT; 776 tsleep(cpipe, PRIBIO, "pipecl", 0); 777 } 778 779 /* 780 * Disconnect from peer 781 */ 782 if ((ppipe = cpipe->pipe_peer) != NULL) { 783 pipeselwakeup(ppipe); 784 785 ppipe->pipe_state |= PIPE_EOF; 786 wakeup(ppipe); 787 ppipe->pipe_peer = NULL; 788 } 789 790 /* 791 * free resources 792 */ 793 pipe_free_kmem(cpipe); 794 pool_put(&pipe_pool, cpipe); 795 } 796 } 797 798 int 799 pipe_kqfilter(struct file *fp, struct knote *kn) 800 { 801 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 802 struct pipe *wpipe = rpipe->pipe_peer; 803 804 switch (kn->kn_filter) { 805 case EVFILT_READ: 806 kn->kn_fop = &pipe_rfiltops; 807 SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext); 808 break; 809 case EVFILT_WRITE: 810 if (wpipe == NULL) { 811 /* other end of pipe has been closed */ 812 return (EPIPE); 813 } 814 kn->kn_fop = &pipe_wfiltops; 815 SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext); 816 break; 817 default: 818 return (EINVAL); 819 } 820 821 return (0); 822 } 823 824 void 825 filt_pipedetach(struct knote *kn) 826 { 827 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 828 struct pipe *wpipe = rpipe->pipe_peer; 829 830 switch (kn->kn_filter) { 831 case EVFILT_READ: 832 SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext); 833 break; 834 case EVFILT_WRITE: 835 if (wpipe == NULL) 836 return; 837 SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext); 838 break; 839 } 840 } 841 842 /*ARGSUSED*/ 843 int 844 filt_piperead(struct knote *kn, long hint) 845 { 846 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 847 struct pipe *wpipe = rpipe->pipe_peer; 848 849 kn->kn_data = rpipe->pipe_buffer.cnt; 850 851 if ((rpipe->pipe_state & PIPE_EOF) || 852 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 853 kn->kn_flags |= EV_EOF; 854 return (1); 855 } 856 return (kn->kn_data > 0); 857 } 858 859 /*ARGSUSED*/ 860 int 861 filt_pipewrite(struct knote *kn, long hint) 862 { 863 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 864 struct pipe *wpipe = rpipe->pipe_peer; 865 866 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 867 kn->kn_data = 0; 868 kn->kn_flags |= EV_EOF; 869 return (1); 870 } 871 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 872 873 return (kn->kn_data >= PIPE_BUF); 874 } 875 876 void 877 pipe_init(void) 878 { 879 pool_init(&pipe_pool, sizeof(struct pipe), 0, 0, PR_WAITOK, "pipepl", 880 NULL); 881 } 882 883