1 /* $OpenBSD: sys_pipe.c,v 1.45 2003/10/03 16:38:01 miod Exp $ */ 2 3 /* 4 * Copyright (c) 1996 John S. Dyson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice immediately at the beginning of the file, without modification, 12 * this list of conditions, and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Absolutely no warranty of function or purpose is made by the author 17 * John S. Dyson. 18 * 4. Modifications may be freely made to this file if the above conditions 19 * are met. 20 */ 21 22 /* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/proc.h> 32 #include <sys/file.h> 33 #include <sys/filedesc.h> 34 #include <sys/pool.h> 35 #include <sys/ioctl.h> 36 #include <sys/stat.h> 37 #include <sys/signalvar.h> 38 #include <sys/mount.h> 39 #include <sys/syscallargs.h> 40 #include <sys/event.h> 41 #include <sys/lock.h> 42 #include <sys/poll.h> 43 44 #include <uvm/uvm_extern.h> 45 46 #include <sys/pipe.h> 47 48 /* 49 * interfaces to the outside world 50 */ 51 int pipe_read(struct file *, off_t *, struct uio *, struct ucred *); 52 int pipe_write(struct file *, off_t *, struct uio *, struct ucred *); 53 int pipe_close(struct file *, struct proc *); 54 int pipe_poll(struct file *, int events, struct proc *); 55 int pipe_kqfilter(struct file *fp, struct knote *kn); 56 int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *); 57 int pipe_stat(struct file *fp, struct stat *ub, struct proc *p); 58 59 static struct fileops pipeops = { 60 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 61 pipe_stat, pipe_close 62 }; 63 64 void filt_pipedetach(struct knote *kn); 65 int filt_piperead(struct knote *kn, long hint); 66 int filt_pipewrite(struct knote *kn, long hint); 67 68 struct filterops pipe_rfiltops = 69 { 1, NULL, filt_pipedetach, filt_piperead }; 70 struct filterops pipe_wfiltops = 71 { 1, NULL, filt_pipedetach, filt_pipewrite }; 72 73 /* 74 * Default pipe buffer size(s), this can be kind-of large now because pipe 75 * space is pageable. The pipe code will try to maintain locality of 76 * reference for performance reasons, so small amounts of outstanding I/O 77 * will not wipe the cache. 78 */ 79 #define MINPIPESIZE (PIPE_SIZE/3) 80 81 /* 82 * Limit the number of "big" pipes 83 */ 84 #define LIMITBIGPIPES 32 85 int nbigpipe; 86 static int amountpipekva; 87 88 struct pool pipe_pool; 89 90 void pipeclose(struct pipe *); 91 void pipe_free_kmem(struct pipe *); 92 int pipe_create(struct pipe *); 93 static __inline int pipelock(struct pipe *); 94 static __inline void pipeunlock(struct pipe *); 95 static __inline void pipeselwakeup(struct pipe *); 96 int pipespace(struct pipe *, u_int); 97 98 /* 99 * The pipe system call for the DTYPE_PIPE type of pipes 100 */ 101 102 /* ARGSUSED */ 103 int 104 sys_opipe(p, v, retval) 105 struct proc *p; 106 void *v; 107 register_t *retval; 108 { 109 struct filedesc *fdp = p->p_fd; 110 struct file *rf, *wf; 111 struct pipe *rpipe, *wpipe; 112 int fd, error; 113 114 rpipe = pool_get(&pipe_pool, PR_WAITOK); 115 error = pipe_create(rpipe); 116 if (error != 0) 117 goto free1; 118 wpipe = pool_get(&pipe_pool, PR_WAITOK); 119 error = pipe_create(wpipe); 120 if (error != 0) 121 goto free2; 122 123 error = falloc(p, &rf, &fd); 124 if (error != 0) 125 goto free2; 126 rf->f_flag = FREAD | FWRITE; 127 rf->f_type = DTYPE_PIPE; 128 rf->f_data = rpipe; 129 rf->f_ops = &pipeops; 130 retval[0] = fd; 131 132 error = falloc(p, &wf, &fd); 133 if (error != 0) 134 goto free3; 135 wf->f_flag = FREAD | FWRITE; 136 wf->f_type = DTYPE_PIPE; 137 wf->f_data = wpipe; 138 wf->f_ops = &pipeops; 139 retval[1] = fd; 140 141 rpipe->pipe_peer = wpipe; 142 wpipe->pipe_peer = rpipe; 143 144 FILE_SET_MATURE(rf); 145 FILE_SET_MATURE(wf); 146 return (0); 147 free3: 148 fdremove(fdp, retval[0]); 149 closef(rf, p); 150 rpipe = NULL; 151 free2: 152 (void)pipeclose(wpipe); 153 free1: 154 if (rpipe != NULL) 155 (void)pipeclose(rpipe); 156 return (error); 157 } 158 159 /* 160 * Allocate kva for pipe circular buffer, the space is pageable 161 * This routine will 'realloc' the size of a pipe safely, if it fails 162 * it will retain the old buffer. 163 * If it fails it will return ENOMEM. 164 */ 165 int 166 pipespace(cpipe, size) 167 struct pipe *cpipe; 168 u_int size; 169 { 170 caddr_t buffer; 171 172 buffer = (caddr_t)uvm_km_valloc(kernel_map, size); 173 if (buffer == NULL) { 174 return (ENOMEM); 175 } 176 177 /* free old resources if we are resizing */ 178 pipe_free_kmem(cpipe); 179 cpipe->pipe_buffer.buffer = buffer; 180 cpipe->pipe_buffer.size = size; 181 cpipe->pipe_buffer.in = 0; 182 cpipe->pipe_buffer.out = 0; 183 cpipe->pipe_buffer.cnt = 0; 184 185 amountpipekva += cpipe->pipe_buffer.size; 186 187 return (0); 188 } 189 190 /* 191 * initialize and allocate VM and memory for pipe 192 */ 193 int 194 pipe_create(cpipe) 195 struct pipe *cpipe; 196 { 197 int error; 198 199 /* so pipe_free_kmem() doesn't follow junk pointer */ 200 cpipe->pipe_buffer.buffer = NULL; 201 /* 202 * protect so pipeclose() doesn't follow a junk pointer 203 * if pipespace() fails. 204 */ 205 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 206 cpipe->pipe_state = 0; 207 cpipe->pipe_peer = NULL; 208 cpipe->pipe_busy = 0; 209 210 error = pipespace(cpipe, PIPE_SIZE); 211 if (error != 0) 212 return (error); 213 214 microtime(&cpipe->pipe_ctime); 215 cpipe->pipe_atime = cpipe->pipe_ctime; 216 cpipe->pipe_mtime = cpipe->pipe_ctime; 217 cpipe->pipe_pgid = NO_PID; 218 219 return (0); 220 } 221 222 223 /* 224 * lock a pipe for I/O, blocking other access 225 */ 226 static __inline int 227 pipelock(cpipe) 228 struct pipe *cpipe; 229 { 230 int error; 231 while (cpipe->pipe_state & PIPE_LOCK) { 232 cpipe->pipe_state |= PIPE_LWANT; 233 if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0))) 234 return error; 235 } 236 cpipe->pipe_state |= PIPE_LOCK; 237 return 0; 238 } 239 240 /* 241 * unlock a pipe I/O lock 242 */ 243 static __inline void 244 pipeunlock(cpipe) 245 struct pipe *cpipe; 246 { 247 cpipe->pipe_state &= ~PIPE_LOCK; 248 if (cpipe->pipe_state & PIPE_LWANT) { 249 cpipe->pipe_state &= ~PIPE_LWANT; 250 wakeup(cpipe); 251 } 252 } 253 254 static __inline void 255 pipeselwakeup(cpipe) 256 struct pipe *cpipe; 257 { 258 if (cpipe->pipe_state & PIPE_SEL) { 259 cpipe->pipe_state &= ~PIPE_SEL; 260 selwakeup(&cpipe->pipe_sel); 261 } 262 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID) 263 gsignal(cpipe->pipe_pgid, SIGIO); 264 KNOTE(&cpipe->pipe_sel.si_note, 0); 265 } 266 267 /* ARGSUSED */ 268 int 269 pipe_read(fp, poff, uio, cred) 270 struct file *fp; 271 off_t *poff; 272 struct uio *uio; 273 struct ucred *cred; 274 { 275 struct pipe *rpipe = (struct pipe *) fp->f_data; 276 int error; 277 int nread = 0; 278 int size; 279 280 error = pipelock(rpipe); 281 if (error) 282 goto unlocked_error; 283 284 ++rpipe->pipe_busy; 285 286 while (uio->uio_resid) { 287 /* 288 * normal pipe buffer receive 289 */ 290 if (rpipe->pipe_buffer.cnt > 0) { 291 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 292 if (size > rpipe->pipe_buffer.cnt) 293 size = rpipe->pipe_buffer.cnt; 294 if (size > uio->uio_resid) 295 size = uio->uio_resid; 296 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 297 size, uio); 298 if (error) { 299 break; 300 } 301 rpipe->pipe_buffer.out += size; 302 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 303 rpipe->pipe_buffer.out = 0; 304 305 rpipe->pipe_buffer.cnt -= size; 306 /* 307 * If there is no more to read in the pipe, reset 308 * its pointers to the beginning. This improves 309 * cache hit stats. 310 */ 311 if (rpipe->pipe_buffer.cnt == 0) { 312 rpipe->pipe_buffer.in = 0; 313 rpipe->pipe_buffer.out = 0; 314 } 315 nread += size; 316 } else { 317 /* 318 * detect EOF condition 319 * read returns 0 on EOF, no need to set error 320 */ 321 if (rpipe->pipe_state & PIPE_EOF) 322 break; 323 324 /* 325 * If the "write-side" has been blocked, wake it up now. 326 */ 327 if (rpipe->pipe_state & PIPE_WANTW) { 328 rpipe->pipe_state &= ~PIPE_WANTW; 329 wakeup(rpipe); 330 } 331 332 /* 333 * Break if some data was read. 334 */ 335 if (nread > 0) 336 break; 337 338 /* 339 * Unlock the pipe buffer for our remaining processing. 340 * We will either break out with an error or we will 341 * sleep and relock to loop. 342 */ 343 pipeunlock(rpipe); 344 345 /* 346 * Handle non-blocking mode operation or 347 * wait for more data. 348 */ 349 if (fp->f_flag & FNONBLOCK) { 350 error = EAGAIN; 351 } else { 352 rpipe->pipe_state |= PIPE_WANTR; 353 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0) 354 error = pipelock(rpipe); 355 } 356 if (error) 357 goto unlocked_error; 358 } 359 } 360 pipeunlock(rpipe); 361 362 if (error == 0) 363 microtime(&rpipe->pipe_atime); 364 unlocked_error: 365 --rpipe->pipe_busy; 366 367 /* 368 * PIPE_WANT processing only makes sense if pipe_busy is 0. 369 */ 370 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 371 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 372 wakeup(rpipe); 373 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 374 /* 375 * Handle write blocking hysteresis. 376 */ 377 if (rpipe->pipe_state & PIPE_WANTW) { 378 rpipe->pipe_state &= ~PIPE_WANTW; 379 wakeup(rpipe); 380 } 381 } 382 383 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 384 pipeselwakeup(rpipe); 385 386 return (error); 387 } 388 389 int 390 pipe_write(fp, poff, uio, cred) 391 struct file *fp; 392 off_t *poff; 393 struct uio *uio; 394 struct ucred *cred; 395 { 396 int error = 0; 397 int orig_resid; 398 399 struct pipe *wpipe, *rpipe; 400 401 rpipe = (struct pipe *) fp->f_data; 402 wpipe = rpipe->pipe_peer; 403 404 /* 405 * detect loss of pipe read side, issue SIGPIPE if lost. 406 */ 407 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 408 return (EPIPE); 409 } 410 ++wpipe->pipe_busy; 411 412 /* 413 * If it is advantageous to resize the pipe buffer, do 414 * so. 415 */ 416 if ((uio->uio_resid > PIPE_SIZE) && 417 (nbigpipe < LIMITBIGPIPES) && 418 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 419 (wpipe->pipe_buffer.cnt == 0)) { 420 421 if ((error = pipelock(wpipe)) == 0) { 422 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 423 nbigpipe++; 424 pipeunlock(wpipe); 425 } 426 } 427 428 /* 429 * If an early error occured unbusy and return, waking up any pending 430 * readers. 431 */ 432 if (error) { 433 --wpipe->pipe_busy; 434 if ((wpipe->pipe_busy == 0) && 435 (wpipe->pipe_state & PIPE_WANT)) { 436 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 437 wakeup(wpipe); 438 } 439 return (error); 440 } 441 442 orig_resid = uio->uio_resid; 443 444 while (uio->uio_resid) { 445 int space; 446 447 retrywrite: 448 if (wpipe->pipe_state & PIPE_EOF) { 449 error = EPIPE; 450 break; 451 } 452 453 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 454 455 /* Writes of size <= PIPE_BUF must be atomic. */ 456 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 457 space = 0; 458 459 if (space > 0) { 460 if ((error = pipelock(wpipe)) == 0) { 461 int size; /* Transfer size */ 462 int segsize; /* first segment to transfer */ 463 464 /* 465 * If a process blocked in uiomove, our 466 * value for space might be bad. 467 * 468 * XXX will we be ok if the reader has gone 469 * away here? 470 */ 471 if (space > wpipe->pipe_buffer.size - 472 wpipe->pipe_buffer.cnt) { 473 pipeunlock(wpipe); 474 goto retrywrite; 475 } 476 477 /* 478 * Transfer size is minimum of uio transfer 479 * and free space in pipe buffer. 480 */ 481 if (space > uio->uio_resid) 482 size = uio->uio_resid; 483 else 484 size = space; 485 /* 486 * First segment to transfer is minimum of 487 * transfer size and contiguous space in 488 * pipe buffer. If first segment to transfer 489 * is less than the transfer size, we've got 490 * a wraparound in the buffer. 491 */ 492 segsize = wpipe->pipe_buffer.size - 493 wpipe->pipe_buffer.in; 494 if (segsize > size) 495 segsize = size; 496 497 /* Transfer first segment */ 498 499 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 500 segsize, uio); 501 502 if (error == 0 && segsize < size) { 503 /* 504 * Transfer remaining part now, to 505 * support atomic writes. Wraparound 506 * happened. 507 */ 508 #ifdef DIAGNOSTIC 509 if (wpipe->pipe_buffer.in + segsize != 510 wpipe->pipe_buffer.size) 511 panic("Expected pipe buffer wraparound disappeared"); 512 #endif 513 514 error = uiomove(&wpipe->pipe_buffer.buffer[0], 515 size - segsize, uio); 516 } 517 if (error == 0) { 518 wpipe->pipe_buffer.in += size; 519 if (wpipe->pipe_buffer.in >= 520 wpipe->pipe_buffer.size) { 521 #ifdef DIAGNOSTIC 522 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 523 panic("Expected wraparound bad"); 524 #endif 525 wpipe->pipe_buffer.in = size - segsize; 526 } 527 528 wpipe->pipe_buffer.cnt += size; 529 #ifdef DIAGNOSTIC 530 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 531 panic("Pipe buffer overflow"); 532 #endif 533 } 534 pipeunlock(wpipe); 535 } 536 if (error) 537 break; 538 } else { 539 /* 540 * If the "read-side" has been blocked, wake it up now. 541 */ 542 if (wpipe->pipe_state & PIPE_WANTR) { 543 wpipe->pipe_state &= ~PIPE_WANTR; 544 wakeup(wpipe); 545 } 546 547 /* 548 * don't block on non-blocking I/O 549 */ 550 if (fp->f_flag & FNONBLOCK) { 551 error = EAGAIN; 552 break; 553 } 554 555 /* 556 * We have no more space and have something to offer, 557 * wake up select/poll. 558 */ 559 pipeselwakeup(wpipe); 560 561 wpipe->pipe_state |= PIPE_WANTW; 562 error = tsleep(wpipe, (PRIBIO + 1)|PCATCH, 563 "pipewr", 0); 564 if (error) 565 break; 566 /* 567 * If read side wants to go away, we just issue a 568 * signal to ourselves. 569 */ 570 if (wpipe->pipe_state & PIPE_EOF) { 571 error = EPIPE; 572 break; 573 } 574 } 575 } 576 577 --wpipe->pipe_busy; 578 579 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 580 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 581 wakeup(wpipe); 582 } else if (wpipe->pipe_buffer.cnt > 0) { 583 /* 584 * If we have put any characters in the buffer, we wake up 585 * the reader. 586 */ 587 if (wpipe->pipe_state & PIPE_WANTR) { 588 wpipe->pipe_state &= ~PIPE_WANTR; 589 wakeup(wpipe); 590 } 591 } 592 593 /* 594 * Don't return EPIPE if I/O was successful 595 */ 596 if ((wpipe->pipe_buffer.cnt == 0) && 597 (uio->uio_resid == 0) && 598 (error == EPIPE)) { 599 error = 0; 600 } 601 602 if (error == 0) 603 microtime(&wpipe->pipe_mtime); 604 /* 605 * We have something to offer, wake up select/poll. 606 */ 607 if (wpipe->pipe_buffer.cnt) 608 pipeselwakeup(wpipe); 609 610 return (error); 611 } 612 613 /* 614 * we implement a very minimal set of ioctls for compatibility with sockets. 615 */ 616 int 617 pipe_ioctl(fp, cmd, data, p) 618 struct file *fp; 619 u_long cmd; 620 caddr_t data; 621 struct proc *p; 622 { 623 struct pipe *mpipe = (struct pipe *)fp->f_data; 624 625 switch (cmd) { 626 627 case FIONBIO: 628 return (0); 629 630 case FIOASYNC: 631 if (*(int *)data) { 632 mpipe->pipe_state |= PIPE_ASYNC; 633 } else { 634 mpipe->pipe_state &= ~PIPE_ASYNC; 635 } 636 return (0); 637 638 case FIONREAD: 639 *(int *)data = mpipe->pipe_buffer.cnt; 640 return (0); 641 642 case SIOCSPGRP: 643 mpipe->pipe_pgid = *(int *)data; 644 return (0); 645 646 case SIOCGPGRP: 647 *(int *)data = mpipe->pipe_pgid; 648 return (0); 649 650 } 651 return (ENOTTY); 652 } 653 654 int 655 pipe_poll(fp, events, p) 656 struct file *fp; 657 int events; 658 struct proc *p; 659 { 660 struct pipe *rpipe = (struct pipe *)fp->f_data; 661 struct pipe *wpipe; 662 int revents = 0; 663 664 wpipe = rpipe->pipe_peer; 665 if (events & (POLLIN | POLLRDNORM)) { 666 if ((rpipe->pipe_buffer.cnt > 0) || 667 (rpipe->pipe_state & PIPE_EOF)) 668 revents |= events & (POLLIN | POLLRDNORM); 669 } 670 671 /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */ 672 if ((rpipe->pipe_state & PIPE_EOF) || 673 (wpipe == NULL) || 674 (wpipe->pipe_state & PIPE_EOF)) 675 revents |= POLLHUP; 676 else if (events & (POLLOUT | POLLWRNORM)) { 677 if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF) 678 revents |= events & (POLLOUT | POLLWRNORM); 679 } 680 681 if (revents == 0) { 682 if (events & (POLLIN | POLLRDNORM)) { 683 selrecord(p, &rpipe->pipe_sel); 684 rpipe->pipe_state |= PIPE_SEL; 685 } 686 if (events & (POLLOUT | POLLWRNORM)) { 687 selrecord(p, &wpipe->pipe_sel); 688 wpipe->pipe_state |= PIPE_SEL; 689 } 690 } 691 return (revents); 692 } 693 694 int 695 pipe_stat(fp, ub, p) 696 struct file *fp; 697 struct stat *ub; 698 struct proc *p; 699 { 700 struct pipe *pipe = (struct pipe *)fp->f_data; 701 702 bzero(ub, sizeof(*ub)); 703 ub->st_mode = S_IFIFO; 704 ub->st_blksize = pipe->pipe_buffer.size; 705 ub->st_size = pipe->pipe_buffer.cnt; 706 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 707 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 708 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 709 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 710 ub->st_uid = fp->f_cred->cr_uid; 711 ub->st_gid = fp->f_cred->cr_gid; 712 /* 713 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 714 * XXX (st_dev, st_ino) should be unique. 715 */ 716 return (0); 717 } 718 719 /* ARGSUSED */ 720 int 721 pipe_close(fp, p) 722 struct file *fp; 723 struct proc *p; 724 { 725 struct pipe *cpipe = (struct pipe *)fp->f_data; 726 727 fp->f_ops = NULL; 728 fp->f_data = NULL; 729 pipeclose(cpipe); 730 return (0); 731 } 732 733 void 734 pipe_free_kmem(cpipe) 735 struct pipe *cpipe; 736 { 737 if (cpipe->pipe_buffer.buffer != NULL) { 738 if (cpipe->pipe_buffer.size > PIPE_SIZE) 739 --nbigpipe; 740 amountpipekva -= cpipe->pipe_buffer.size; 741 uvm_km_free(kernel_map, (vaddr_t)cpipe->pipe_buffer.buffer, 742 cpipe->pipe_buffer.size); 743 cpipe->pipe_buffer.buffer = NULL; 744 } 745 } 746 747 /* 748 * shutdown the pipe 749 */ 750 void 751 pipeclose(cpipe) 752 struct pipe *cpipe; 753 { 754 struct pipe *ppipe; 755 if (cpipe) { 756 757 pipeselwakeup(cpipe); 758 759 /* 760 * If the other side is blocked, wake it up saying that 761 * we want to close it down. 762 */ 763 while (cpipe->pipe_busy) { 764 wakeup(cpipe); 765 cpipe->pipe_state |= PIPE_WANT | PIPE_EOF; 766 tsleep(cpipe, PRIBIO, "pipecl", 0); 767 } 768 769 /* 770 * Disconnect from peer 771 */ 772 if ((ppipe = cpipe->pipe_peer) != NULL) { 773 pipeselwakeup(ppipe); 774 775 ppipe->pipe_state |= PIPE_EOF; 776 wakeup(ppipe); 777 KNOTE(&ppipe->pipe_sel.si_note, 0); 778 ppipe->pipe_peer = NULL; 779 } 780 781 /* 782 * free resources 783 */ 784 pipe_free_kmem(cpipe); 785 pool_put(&pipe_pool, cpipe); 786 } 787 } 788 789 int 790 pipe_kqfilter(struct file *fp, struct knote *kn) 791 { 792 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 793 struct pipe *wpipe = rpipe->pipe_peer; 794 795 switch (kn->kn_filter) { 796 case EVFILT_READ: 797 kn->kn_fop = &pipe_rfiltops; 798 SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext); 799 break; 800 case EVFILT_WRITE: 801 if (wpipe == NULL) 802 /* other end of pipe has been closed */ 803 return (1); 804 kn->kn_fop = &pipe_wfiltops; 805 SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext); 806 break; 807 default: 808 return (1); 809 } 810 811 return (0); 812 } 813 814 void 815 filt_pipedetach(struct knote *kn) 816 { 817 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 818 struct pipe *wpipe = rpipe->pipe_peer; 819 820 switch (kn->kn_filter) { 821 case EVFILT_READ: 822 SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext); 823 break; 824 case EVFILT_WRITE: 825 if (wpipe == NULL) 826 return; 827 SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext); 828 break; 829 } 830 } 831 832 /*ARGSUSED*/ 833 int 834 filt_piperead(struct knote *kn, long hint) 835 { 836 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 837 struct pipe *wpipe = rpipe->pipe_peer; 838 839 kn->kn_data = rpipe->pipe_buffer.cnt; 840 841 if ((rpipe->pipe_state & PIPE_EOF) || 842 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 843 kn->kn_flags |= EV_EOF; 844 return (1); 845 } 846 return (kn->kn_data > 0); 847 } 848 849 /*ARGSUSED*/ 850 int 851 filt_pipewrite(struct knote *kn, long hint) 852 { 853 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 854 struct pipe *wpipe = rpipe->pipe_peer; 855 856 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 857 kn->kn_data = 0; 858 kn->kn_flags |= EV_EOF; 859 return (1); 860 } 861 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 862 863 return (kn->kn_data >= PIPE_BUF); 864 } 865 866 void 867 pipe_init() 868 { 869 pool_init(&pipe_pool, sizeof(struct pipe), 0, 0, 0, "pipepl", 870 &pool_allocator_nointr); 871 } 872 873