1 /* $OpenBSD: sys_pipe.c,v 1.46 2004/01/06 04:18:18 tedu Exp $ */ 2 3 /* 4 * Copyright (c) 1996 John S. Dyson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice immediately at the beginning of the file, without modification, 12 * this list of conditions, and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Absolutely no warranty of function or purpose is made by the author 17 * John S. Dyson. 18 * 4. Modifications may be freely made to this file if the above conditions 19 * are met. 20 */ 21 22 /* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/proc.h> 32 #include <sys/file.h> 33 #include <sys/filedesc.h> 34 #include <sys/pool.h> 35 #include <sys/ioctl.h> 36 #include <sys/stat.h> 37 #include <sys/signalvar.h> 38 #include <sys/mount.h> 39 #include <sys/syscallargs.h> 40 #include <sys/event.h> 41 #include <sys/lock.h> 42 #include <sys/poll.h> 43 44 #include <uvm/uvm_extern.h> 45 46 #include <sys/pipe.h> 47 48 /* 49 * interfaces to the outside world 50 */ 51 int pipe_read(struct file *, off_t *, struct uio *, struct ucred *); 52 int pipe_write(struct file *, off_t *, struct uio *, struct ucred *); 53 int pipe_close(struct file *, struct proc *); 54 int pipe_poll(struct file *, int events, struct proc *); 55 int pipe_kqfilter(struct file *fp, struct knote *kn); 56 int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *); 57 int pipe_stat(struct file *fp, struct stat *ub, struct proc *p); 58 59 static struct fileops pipeops = { 60 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 61 pipe_stat, pipe_close 62 }; 63 64 void filt_pipedetach(struct knote *kn); 65 int filt_piperead(struct knote *kn, long hint); 66 int filt_pipewrite(struct knote *kn, long hint); 67 68 struct filterops pipe_rfiltops = 69 { 1, NULL, filt_pipedetach, filt_piperead }; 70 struct filterops pipe_wfiltops = 71 { 1, NULL, filt_pipedetach, filt_pipewrite }; 72 73 /* 74 * Default pipe buffer size(s), this can be kind-of large now because pipe 75 * space is pageable. The pipe code will try to maintain locality of 76 * reference for performance reasons, so small amounts of outstanding I/O 77 * will not wipe the cache. 78 */ 79 #define MINPIPESIZE (PIPE_SIZE/3) 80 81 /* 82 * Limit the number of "big" pipes 83 */ 84 #define LIMITBIGPIPES 32 85 int nbigpipe; 86 static int amountpipekva; 87 88 struct pool pipe_pool; 89 90 void pipeclose(struct pipe *); 91 void pipe_free_kmem(struct pipe *); 92 int pipe_create(struct pipe *); 93 static __inline int pipelock(struct pipe *); 94 static __inline void pipeunlock(struct pipe *); 95 static __inline void pipeselwakeup(struct pipe *); 96 int pipespace(struct pipe *, u_int); 97 98 /* 99 * The pipe system call for the DTYPE_PIPE type of pipes 100 */ 101 102 /* ARGSUSED */ 103 int 104 sys_opipe(p, v, retval) 105 struct proc *p; 106 void *v; 107 register_t *retval; 108 { 109 struct filedesc *fdp = p->p_fd; 110 struct file *rf, *wf; 111 struct pipe *rpipe, *wpipe; 112 int fd, error; 113 114 fdplock(fdp, p); 115 116 rpipe = pool_get(&pipe_pool, PR_WAITOK); 117 error = pipe_create(rpipe); 118 if (error != 0) 119 goto free1; 120 wpipe = pool_get(&pipe_pool, PR_WAITOK); 121 error = pipe_create(wpipe); 122 if (error != 0) 123 goto free2; 124 125 error = falloc(p, &rf, &fd); 126 if (error != 0) 127 goto free2; 128 rf->f_flag = FREAD | FWRITE; 129 rf->f_type = DTYPE_PIPE; 130 rf->f_data = rpipe; 131 rf->f_ops = &pipeops; 132 retval[0] = fd; 133 134 error = falloc(p, &wf, &fd); 135 if (error != 0) 136 goto free3; 137 wf->f_flag = FREAD | FWRITE; 138 wf->f_type = DTYPE_PIPE; 139 wf->f_data = wpipe; 140 wf->f_ops = &pipeops; 141 retval[1] = fd; 142 143 rpipe->pipe_peer = wpipe; 144 wpipe->pipe_peer = rpipe; 145 146 FILE_SET_MATURE(rf); 147 FILE_SET_MATURE(wf); 148 149 fdpunlock(fdp); 150 return (0); 151 152 free3: 153 fdremove(fdp, retval[0]); 154 closef(rf, p); 155 rpipe = NULL; 156 free2: 157 (void)pipeclose(wpipe); 158 free1: 159 if (rpipe != NULL) 160 (void)pipeclose(rpipe); 161 fdpunlock(fdp); 162 return (error); 163 } 164 165 /* 166 * Allocate kva for pipe circular buffer, the space is pageable 167 * This routine will 'realloc' the size of a pipe safely, if it fails 168 * it will retain the old buffer. 169 * If it fails it will return ENOMEM. 170 */ 171 int 172 pipespace(cpipe, size) 173 struct pipe *cpipe; 174 u_int size; 175 { 176 caddr_t buffer; 177 178 buffer = (caddr_t)uvm_km_valloc(kernel_map, size); 179 if (buffer == NULL) { 180 return (ENOMEM); 181 } 182 183 /* free old resources if we are resizing */ 184 pipe_free_kmem(cpipe); 185 cpipe->pipe_buffer.buffer = buffer; 186 cpipe->pipe_buffer.size = size; 187 cpipe->pipe_buffer.in = 0; 188 cpipe->pipe_buffer.out = 0; 189 cpipe->pipe_buffer.cnt = 0; 190 191 amountpipekva += cpipe->pipe_buffer.size; 192 193 return (0); 194 } 195 196 /* 197 * initialize and allocate VM and memory for pipe 198 */ 199 int 200 pipe_create(cpipe) 201 struct pipe *cpipe; 202 { 203 int error; 204 205 /* so pipe_free_kmem() doesn't follow junk pointer */ 206 cpipe->pipe_buffer.buffer = NULL; 207 /* 208 * protect so pipeclose() doesn't follow a junk pointer 209 * if pipespace() fails. 210 */ 211 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 212 cpipe->pipe_state = 0; 213 cpipe->pipe_peer = NULL; 214 cpipe->pipe_busy = 0; 215 216 error = pipespace(cpipe, PIPE_SIZE); 217 if (error != 0) 218 return (error); 219 220 microtime(&cpipe->pipe_ctime); 221 cpipe->pipe_atime = cpipe->pipe_ctime; 222 cpipe->pipe_mtime = cpipe->pipe_ctime; 223 cpipe->pipe_pgid = NO_PID; 224 225 return (0); 226 } 227 228 229 /* 230 * lock a pipe for I/O, blocking other access 231 */ 232 static __inline int 233 pipelock(cpipe) 234 struct pipe *cpipe; 235 { 236 int error; 237 while (cpipe->pipe_state & PIPE_LOCK) { 238 cpipe->pipe_state |= PIPE_LWANT; 239 if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0))) 240 return error; 241 } 242 cpipe->pipe_state |= PIPE_LOCK; 243 return 0; 244 } 245 246 /* 247 * unlock a pipe I/O lock 248 */ 249 static __inline void 250 pipeunlock(cpipe) 251 struct pipe *cpipe; 252 { 253 cpipe->pipe_state &= ~PIPE_LOCK; 254 if (cpipe->pipe_state & PIPE_LWANT) { 255 cpipe->pipe_state &= ~PIPE_LWANT; 256 wakeup(cpipe); 257 } 258 } 259 260 static __inline void 261 pipeselwakeup(cpipe) 262 struct pipe *cpipe; 263 { 264 if (cpipe->pipe_state & PIPE_SEL) { 265 cpipe->pipe_state &= ~PIPE_SEL; 266 selwakeup(&cpipe->pipe_sel); 267 } 268 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID) 269 gsignal(cpipe->pipe_pgid, SIGIO); 270 KNOTE(&cpipe->pipe_sel.si_note, 0); 271 } 272 273 /* ARGSUSED */ 274 int 275 pipe_read(fp, poff, uio, cred) 276 struct file *fp; 277 off_t *poff; 278 struct uio *uio; 279 struct ucred *cred; 280 { 281 struct pipe *rpipe = (struct pipe *) fp->f_data; 282 int error; 283 int nread = 0; 284 int size; 285 286 error = pipelock(rpipe); 287 if (error) 288 goto unlocked_error; 289 290 ++rpipe->pipe_busy; 291 292 while (uio->uio_resid) { 293 /* 294 * normal pipe buffer receive 295 */ 296 if (rpipe->pipe_buffer.cnt > 0) { 297 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 298 if (size > rpipe->pipe_buffer.cnt) 299 size = rpipe->pipe_buffer.cnt; 300 if (size > uio->uio_resid) 301 size = uio->uio_resid; 302 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 303 size, uio); 304 if (error) { 305 break; 306 } 307 rpipe->pipe_buffer.out += size; 308 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 309 rpipe->pipe_buffer.out = 0; 310 311 rpipe->pipe_buffer.cnt -= size; 312 /* 313 * If there is no more to read in the pipe, reset 314 * its pointers to the beginning. This improves 315 * cache hit stats. 316 */ 317 if (rpipe->pipe_buffer.cnt == 0) { 318 rpipe->pipe_buffer.in = 0; 319 rpipe->pipe_buffer.out = 0; 320 } 321 nread += size; 322 } else { 323 /* 324 * detect EOF condition 325 * read returns 0 on EOF, no need to set error 326 */ 327 if (rpipe->pipe_state & PIPE_EOF) 328 break; 329 330 /* 331 * If the "write-side" has been blocked, wake it up now. 332 */ 333 if (rpipe->pipe_state & PIPE_WANTW) { 334 rpipe->pipe_state &= ~PIPE_WANTW; 335 wakeup(rpipe); 336 } 337 338 /* 339 * Break if some data was read. 340 */ 341 if (nread > 0) 342 break; 343 344 /* 345 * Unlock the pipe buffer for our remaining processing. 346 * We will either break out with an error or we will 347 * sleep and relock to loop. 348 */ 349 pipeunlock(rpipe); 350 351 /* 352 * Handle non-blocking mode operation or 353 * wait for more data. 354 */ 355 if (fp->f_flag & FNONBLOCK) { 356 error = EAGAIN; 357 } else { 358 rpipe->pipe_state |= PIPE_WANTR; 359 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0) 360 error = pipelock(rpipe); 361 } 362 if (error) 363 goto unlocked_error; 364 } 365 } 366 pipeunlock(rpipe); 367 368 if (error == 0) 369 microtime(&rpipe->pipe_atime); 370 unlocked_error: 371 --rpipe->pipe_busy; 372 373 /* 374 * PIPE_WANT processing only makes sense if pipe_busy is 0. 375 */ 376 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 377 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 378 wakeup(rpipe); 379 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 380 /* 381 * Handle write blocking hysteresis. 382 */ 383 if (rpipe->pipe_state & PIPE_WANTW) { 384 rpipe->pipe_state &= ~PIPE_WANTW; 385 wakeup(rpipe); 386 } 387 } 388 389 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 390 pipeselwakeup(rpipe); 391 392 return (error); 393 } 394 395 int 396 pipe_write(fp, poff, uio, cred) 397 struct file *fp; 398 off_t *poff; 399 struct uio *uio; 400 struct ucred *cred; 401 { 402 int error = 0; 403 int orig_resid; 404 405 struct pipe *wpipe, *rpipe; 406 407 rpipe = (struct pipe *) fp->f_data; 408 wpipe = rpipe->pipe_peer; 409 410 /* 411 * detect loss of pipe read side, issue SIGPIPE if lost. 412 */ 413 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 414 return (EPIPE); 415 } 416 ++wpipe->pipe_busy; 417 418 /* 419 * If it is advantageous to resize the pipe buffer, do 420 * so. 421 */ 422 if ((uio->uio_resid > PIPE_SIZE) && 423 (nbigpipe < LIMITBIGPIPES) && 424 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 425 (wpipe->pipe_buffer.cnt == 0)) { 426 427 if ((error = pipelock(wpipe)) == 0) { 428 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 429 nbigpipe++; 430 pipeunlock(wpipe); 431 } 432 } 433 434 /* 435 * If an early error occured unbusy and return, waking up any pending 436 * readers. 437 */ 438 if (error) { 439 --wpipe->pipe_busy; 440 if ((wpipe->pipe_busy == 0) && 441 (wpipe->pipe_state & PIPE_WANT)) { 442 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 443 wakeup(wpipe); 444 } 445 return (error); 446 } 447 448 orig_resid = uio->uio_resid; 449 450 while (uio->uio_resid) { 451 int space; 452 453 retrywrite: 454 if (wpipe->pipe_state & PIPE_EOF) { 455 error = EPIPE; 456 break; 457 } 458 459 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 460 461 /* Writes of size <= PIPE_BUF must be atomic. */ 462 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 463 space = 0; 464 465 if (space > 0) { 466 if ((error = pipelock(wpipe)) == 0) { 467 int size; /* Transfer size */ 468 int segsize; /* first segment to transfer */ 469 470 /* 471 * If a process blocked in uiomove, our 472 * value for space might be bad. 473 * 474 * XXX will we be ok if the reader has gone 475 * away here? 476 */ 477 if (space > wpipe->pipe_buffer.size - 478 wpipe->pipe_buffer.cnt) { 479 pipeunlock(wpipe); 480 goto retrywrite; 481 } 482 483 /* 484 * Transfer size is minimum of uio transfer 485 * and free space in pipe buffer. 486 */ 487 if (space > uio->uio_resid) 488 size = uio->uio_resid; 489 else 490 size = space; 491 /* 492 * First segment to transfer is minimum of 493 * transfer size and contiguous space in 494 * pipe buffer. If first segment to transfer 495 * is less than the transfer size, we've got 496 * a wraparound in the buffer. 497 */ 498 segsize = wpipe->pipe_buffer.size - 499 wpipe->pipe_buffer.in; 500 if (segsize > size) 501 segsize = size; 502 503 /* Transfer first segment */ 504 505 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 506 segsize, uio); 507 508 if (error == 0 && segsize < size) { 509 /* 510 * Transfer remaining part now, to 511 * support atomic writes. Wraparound 512 * happened. 513 */ 514 #ifdef DIAGNOSTIC 515 if (wpipe->pipe_buffer.in + segsize != 516 wpipe->pipe_buffer.size) 517 panic("Expected pipe buffer wraparound disappeared"); 518 #endif 519 520 error = uiomove(&wpipe->pipe_buffer.buffer[0], 521 size - segsize, uio); 522 } 523 if (error == 0) { 524 wpipe->pipe_buffer.in += size; 525 if (wpipe->pipe_buffer.in >= 526 wpipe->pipe_buffer.size) { 527 #ifdef DIAGNOSTIC 528 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 529 panic("Expected wraparound bad"); 530 #endif 531 wpipe->pipe_buffer.in = size - segsize; 532 } 533 534 wpipe->pipe_buffer.cnt += size; 535 #ifdef DIAGNOSTIC 536 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 537 panic("Pipe buffer overflow"); 538 #endif 539 } 540 pipeunlock(wpipe); 541 } 542 if (error) 543 break; 544 } else { 545 /* 546 * If the "read-side" has been blocked, wake it up now. 547 */ 548 if (wpipe->pipe_state & PIPE_WANTR) { 549 wpipe->pipe_state &= ~PIPE_WANTR; 550 wakeup(wpipe); 551 } 552 553 /* 554 * don't block on non-blocking I/O 555 */ 556 if (fp->f_flag & FNONBLOCK) { 557 error = EAGAIN; 558 break; 559 } 560 561 /* 562 * We have no more space and have something to offer, 563 * wake up select/poll. 564 */ 565 pipeselwakeup(wpipe); 566 567 wpipe->pipe_state |= PIPE_WANTW; 568 error = tsleep(wpipe, (PRIBIO + 1)|PCATCH, 569 "pipewr", 0); 570 if (error) 571 break; 572 /* 573 * If read side wants to go away, we just issue a 574 * signal to ourselves. 575 */ 576 if (wpipe->pipe_state & PIPE_EOF) { 577 error = EPIPE; 578 break; 579 } 580 } 581 } 582 583 --wpipe->pipe_busy; 584 585 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 586 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 587 wakeup(wpipe); 588 } else if (wpipe->pipe_buffer.cnt > 0) { 589 /* 590 * If we have put any characters in the buffer, we wake up 591 * the reader. 592 */ 593 if (wpipe->pipe_state & PIPE_WANTR) { 594 wpipe->pipe_state &= ~PIPE_WANTR; 595 wakeup(wpipe); 596 } 597 } 598 599 /* 600 * Don't return EPIPE if I/O was successful 601 */ 602 if ((wpipe->pipe_buffer.cnt == 0) && 603 (uio->uio_resid == 0) && 604 (error == EPIPE)) { 605 error = 0; 606 } 607 608 if (error == 0) 609 microtime(&wpipe->pipe_mtime); 610 /* 611 * We have something to offer, wake up select/poll. 612 */ 613 if (wpipe->pipe_buffer.cnt) 614 pipeselwakeup(wpipe); 615 616 return (error); 617 } 618 619 /* 620 * we implement a very minimal set of ioctls for compatibility with sockets. 621 */ 622 int 623 pipe_ioctl(fp, cmd, data, p) 624 struct file *fp; 625 u_long cmd; 626 caddr_t data; 627 struct proc *p; 628 { 629 struct pipe *mpipe = (struct pipe *)fp->f_data; 630 631 switch (cmd) { 632 633 case FIONBIO: 634 return (0); 635 636 case FIOASYNC: 637 if (*(int *)data) { 638 mpipe->pipe_state |= PIPE_ASYNC; 639 } else { 640 mpipe->pipe_state &= ~PIPE_ASYNC; 641 } 642 return (0); 643 644 case FIONREAD: 645 *(int *)data = mpipe->pipe_buffer.cnt; 646 return (0); 647 648 case SIOCSPGRP: 649 mpipe->pipe_pgid = *(int *)data; 650 return (0); 651 652 case SIOCGPGRP: 653 *(int *)data = mpipe->pipe_pgid; 654 return (0); 655 656 } 657 return (ENOTTY); 658 } 659 660 int 661 pipe_poll(fp, events, p) 662 struct file *fp; 663 int events; 664 struct proc *p; 665 { 666 struct pipe *rpipe = (struct pipe *)fp->f_data; 667 struct pipe *wpipe; 668 int revents = 0; 669 670 wpipe = rpipe->pipe_peer; 671 if (events & (POLLIN | POLLRDNORM)) { 672 if ((rpipe->pipe_buffer.cnt > 0) || 673 (rpipe->pipe_state & PIPE_EOF)) 674 revents |= events & (POLLIN | POLLRDNORM); 675 } 676 677 /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */ 678 if ((rpipe->pipe_state & PIPE_EOF) || 679 (wpipe == NULL) || 680 (wpipe->pipe_state & PIPE_EOF)) 681 revents |= POLLHUP; 682 else if (events & (POLLOUT | POLLWRNORM)) { 683 if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF) 684 revents |= events & (POLLOUT | POLLWRNORM); 685 } 686 687 if (revents == 0) { 688 if (events & (POLLIN | POLLRDNORM)) { 689 selrecord(p, &rpipe->pipe_sel); 690 rpipe->pipe_state |= PIPE_SEL; 691 } 692 if (events & (POLLOUT | POLLWRNORM)) { 693 selrecord(p, &wpipe->pipe_sel); 694 wpipe->pipe_state |= PIPE_SEL; 695 } 696 } 697 return (revents); 698 } 699 700 int 701 pipe_stat(fp, ub, p) 702 struct file *fp; 703 struct stat *ub; 704 struct proc *p; 705 { 706 struct pipe *pipe = (struct pipe *)fp->f_data; 707 708 bzero(ub, sizeof(*ub)); 709 ub->st_mode = S_IFIFO; 710 ub->st_blksize = pipe->pipe_buffer.size; 711 ub->st_size = pipe->pipe_buffer.cnt; 712 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 713 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 714 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 715 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 716 ub->st_uid = fp->f_cred->cr_uid; 717 ub->st_gid = fp->f_cred->cr_gid; 718 /* 719 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 720 * XXX (st_dev, st_ino) should be unique. 721 */ 722 return (0); 723 } 724 725 /* ARGSUSED */ 726 int 727 pipe_close(fp, p) 728 struct file *fp; 729 struct proc *p; 730 { 731 struct pipe *cpipe = (struct pipe *)fp->f_data; 732 733 fp->f_ops = NULL; 734 fp->f_data = NULL; 735 pipeclose(cpipe); 736 return (0); 737 } 738 739 void 740 pipe_free_kmem(cpipe) 741 struct pipe *cpipe; 742 { 743 if (cpipe->pipe_buffer.buffer != NULL) { 744 if (cpipe->pipe_buffer.size > PIPE_SIZE) 745 --nbigpipe; 746 amountpipekva -= cpipe->pipe_buffer.size; 747 uvm_km_free(kernel_map, (vaddr_t)cpipe->pipe_buffer.buffer, 748 cpipe->pipe_buffer.size); 749 cpipe->pipe_buffer.buffer = NULL; 750 } 751 } 752 753 /* 754 * shutdown the pipe 755 */ 756 void 757 pipeclose(cpipe) 758 struct pipe *cpipe; 759 { 760 struct pipe *ppipe; 761 if (cpipe) { 762 763 pipeselwakeup(cpipe); 764 765 /* 766 * If the other side is blocked, wake it up saying that 767 * we want to close it down. 768 */ 769 while (cpipe->pipe_busy) { 770 wakeup(cpipe); 771 cpipe->pipe_state |= PIPE_WANT | PIPE_EOF; 772 tsleep(cpipe, PRIBIO, "pipecl", 0); 773 } 774 775 /* 776 * Disconnect from peer 777 */ 778 if ((ppipe = cpipe->pipe_peer) != NULL) { 779 pipeselwakeup(ppipe); 780 781 ppipe->pipe_state |= PIPE_EOF; 782 wakeup(ppipe); 783 KNOTE(&ppipe->pipe_sel.si_note, 0); 784 ppipe->pipe_peer = NULL; 785 } 786 787 /* 788 * free resources 789 */ 790 pipe_free_kmem(cpipe); 791 pool_put(&pipe_pool, cpipe); 792 } 793 } 794 795 int 796 pipe_kqfilter(struct file *fp, struct knote *kn) 797 { 798 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 799 struct pipe *wpipe = rpipe->pipe_peer; 800 801 switch (kn->kn_filter) { 802 case EVFILT_READ: 803 kn->kn_fop = &pipe_rfiltops; 804 SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext); 805 break; 806 case EVFILT_WRITE: 807 if (wpipe == NULL) 808 /* other end of pipe has been closed */ 809 return (1); 810 kn->kn_fop = &pipe_wfiltops; 811 SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext); 812 break; 813 default: 814 return (1); 815 } 816 817 return (0); 818 } 819 820 void 821 filt_pipedetach(struct knote *kn) 822 { 823 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 824 struct pipe *wpipe = rpipe->pipe_peer; 825 826 switch (kn->kn_filter) { 827 case EVFILT_READ: 828 SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext); 829 break; 830 case EVFILT_WRITE: 831 if (wpipe == NULL) 832 return; 833 SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext); 834 break; 835 } 836 } 837 838 /*ARGSUSED*/ 839 int 840 filt_piperead(struct knote *kn, long hint) 841 { 842 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 843 struct pipe *wpipe = rpipe->pipe_peer; 844 845 kn->kn_data = rpipe->pipe_buffer.cnt; 846 847 if ((rpipe->pipe_state & PIPE_EOF) || 848 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 849 kn->kn_flags |= EV_EOF; 850 return (1); 851 } 852 return (kn->kn_data > 0); 853 } 854 855 /*ARGSUSED*/ 856 int 857 filt_pipewrite(struct knote *kn, long hint) 858 { 859 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 860 struct pipe *wpipe = rpipe->pipe_peer; 861 862 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 863 kn->kn_data = 0; 864 kn->kn_flags |= EV_EOF; 865 return (1); 866 } 867 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 868 869 return (kn->kn_data >= PIPE_BUF); 870 } 871 872 void 873 pipe_init() 874 { 875 pool_init(&pipe_pool, sizeof(struct pipe), 0, 0, 0, "pipepl", 876 &pool_allocator_nointr); 877 } 878 879