1 /* $OpenBSD: sys_pipe.c,v 1.55 2009/01/29 22:08:45 guenther Exp $ */ 2 3 /* 4 * Copyright (c) 1996 John S. Dyson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice immediately at the beginning of the file, without modification, 12 * this list of conditions, and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Absolutely no warranty of function or purpose is made by the author 17 * John S. Dyson. 18 * 4. Modifications may be freely made to this file if the above conditions 19 * are met. 20 */ 21 22 /* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/proc.h> 32 #include <sys/file.h> 33 #include <sys/filedesc.h> 34 #include <sys/pool.h> 35 #include <sys/ioctl.h> 36 #include <sys/stat.h> 37 #include <sys/signalvar.h> 38 #include <sys/mount.h> 39 #include <sys/syscallargs.h> 40 #include <sys/event.h> 41 #include <sys/lock.h> 42 #include <sys/poll.h> 43 44 #include <uvm/uvm_extern.h> 45 46 #include <sys/pipe.h> 47 48 /* 49 * interfaces to the outside world 50 */ 51 int pipe_read(struct file *, off_t *, struct uio *, struct ucred *); 52 int pipe_write(struct file *, off_t *, struct uio *, struct ucred *); 53 int pipe_close(struct file *, struct proc *); 54 int pipe_poll(struct file *, int events, struct proc *); 55 int pipe_kqfilter(struct file *fp, struct knote *kn); 56 int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *); 57 int pipe_stat(struct file *fp, struct stat *ub, struct proc *p); 58 59 static struct fileops pipeops = { 60 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 61 pipe_stat, pipe_close 62 }; 63 64 void filt_pipedetach(struct knote *kn); 65 int filt_piperead(struct knote *kn, long hint); 66 int filt_pipewrite(struct knote *kn, long hint); 67 68 struct filterops pipe_rfiltops = 69 { 1, NULL, filt_pipedetach, filt_piperead }; 70 struct filterops pipe_wfiltops = 71 { 1, NULL, filt_pipedetach, filt_pipewrite }; 72 73 /* 74 * Default pipe buffer size(s), this can be kind-of large now because pipe 75 * space is pageable. The pipe code will try to maintain locality of 76 * reference for performance reasons, so small amounts of outstanding I/O 77 * will not wipe the cache. 78 */ 79 #define MINPIPESIZE (PIPE_SIZE/3) 80 81 /* 82 * Limit the number of "big" pipes 83 */ 84 #define LIMITBIGPIPES 32 85 int nbigpipe; 86 static int amountpipekva; 87 88 struct pool pipe_pool; 89 90 void pipeclose(struct pipe *); 91 void pipe_free_kmem(struct pipe *); 92 int pipe_create(struct pipe *); 93 int pipelock(struct pipe *); 94 void pipeunlock(struct pipe *); 95 void pipeselwakeup(struct pipe *); 96 int pipespace(struct pipe *, u_int); 97 98 /* 99 * The pipe system call for the DTYPE_PIPE type of pipes 100 */ 101 102 /* ARGSUSED */ 103 int 104 sys_opipe(struct proc *p, void *v, register_t *retval) 105 { 106 struct filedesc *fdp = p->p_fd; 107 struct file *rf, *wf; 108 struct pipe *rpipe, *wpipe; 109 int fd, error; 110 111 fdplock(fdp); 112 113 rpipe = pool_get(&pipe_pool, PR_WAITOK); 114 error = pipe_create(rpipe); 115 if (error != 0) 116 goto free1; 117 wpipe = pool_get(&pipe_pool, PR_WAITOK); 118 error = pipe_create(wpipe); 119 if (error != 0) 120 goto free2; 121 122 error = falloc(p, &rf, &fd); 123 if (error != 0) 124 goto free2; 125 rf->f_flag = FREAD | FWRITE; 126 rf->f_type = DTYPE_PIPE; 127 rf->f_data = rpipe; 128 rf->f_ops = &pipeops; 129 retval[0] = fd; 130 131 error = falloc(p, &wf, &fd); 132 if (error != 0) 133 goto free3; 134 wf->f_flag = FREAD | FWRITE; 135 wf->f_type = DTYPE_PIPE; 136 wf->f_data = wpipe; 137 wf->f_ops = &pipeops; 138 retval[1] = fd; 139 140 rpipe->pipe_peer = wpipe; 141 wpipe->pipe_peer = rpipe; 142 143 FILE_SET_MATURE(rf); 144 FILE_SET_MATURE(wf); 145 146 fdpunlock(fdp); 147 return (0); 148 149 free3: 150 fdremove(fdp, retval[0]); 151 closef(rf, p); 152 rpipe = NULL; 153 free2: 154 (void)pipeclose(wpipe); 155 free1: 156 if (rpipe != NULL) 157 (void)pipeclose(rpipe); 158 fdpunlock(fdp); 159 return (error); 160 } 161 162 /* 163 * Allocate kva for pipe circular buffer, the space is pageable. 164 * This routine will 'realloc' the size of a pipe safely, if it fails 165 * it will retain the old buffer. 166 * If it fails it will return ENOMEM. 167 */ 168 int 169 pipespace(struct pipe *cpipe, u_int size) 170 { 171 caddr_t buffer; 172 173 buffer = (caddr_t)uvm_km_valloc(kernel_map, size); 174 if (buffer == NULL) { 175 return (ENOMEM); 176 } 177 178 /* free old resources if we are resizing */ 179 pipe_free_kmem(cpipe); 180 cpipe->pipe_buffer.buffer = buffer; 181 cpipe->pipe_buffer.size = size; 182 cpipe->pipe_buffer.in = 0; 183 cpipe->pipe_buffer.out = 0; 184 cpipe->pipe_buffer.cnt = 0; 185 186 amountpipekva += cpipe->pipe_buffer.size; 187 188 return (0); 189 } 190 191 /* 192 * initialize and allocate VM and memory for pipe 193 */ 194 int 195 pipe_create(struct pipe *cpipe) 196 { 197 int error; 198 199 /* so pipe_free_kmem() doesn't follow junk pointer */ 200 cpipe->pipe_buffer.buffer = NULL; 201 /* 202 * protect so pipeclose() doesn't follow a junk pointer 203 * if pipespace() fails. 204 */ 205 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 206 cpipe->pipe_state = 0; 207 cpipe->pipe_peer = NULL; 208 cpipe->pipe_busy = 0; 209 210 error = pipespace(cpipe, PIPE_SIZE); 211 if (error != 0) 212 return (error); 213 214 getnanotime(&cpipe->pipe_ctime); 215 cpipe->pipe_atime = cpipe->pipe_ctime; 216 cpipe->pipe_mtime = cpipe->pipe_ctime; 217 cpipe->pipe_pgid = NO_PID; 218 219 return (0); 220 } 221 222 223 /* 224 * lock a pipe for I/O, blocking other access 225 */ 226 int 227 pipelock(struct pipe *cpipe) 228 { 229 int error; 230 while (cpipe->pipe_state & PIPE_LOCK) { 231 cpipe->pipe_state |= PIPE_LWANT; 232 if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0))) 233 return error; 234 } 235 cpipe->pipe_state |= PIPE_LOCK; 236 return 0; 237 } 238 239 /* 240 * unlock a pipe I/O lock 241 */ 242 void 243 pipeunlock(struct pipe *cpipe) 244 { 245 cpipe->pipe_state &= ~PIPE_LOCK; 246 if (cpipe->pipe_state & PIPE_LWANT) { 247 cpipe->pipe_state &= ~PIPE_LWANT; 248 wakeup(cpipe); 249 } 250 } 251 252 void 253 pipeselwakeup(struct pipe *cpipe) 254 { 255 if (cpipe->pipe_state & PIPE_SEL) { 256 cpipe->pipe_state &= ~PIPE_SEL; 257 selwakeup(&cpipe->pipe_sel); 258 } 259 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID) 260 gsignal(cpipe->pipe_pgid, SIGIO); 261 KNOTE(&cpipe->pipe_sel.si_note, 0); 262 } 263 264 /* ARGSUSED */ 265 int 266 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 267 { 268 struct pipe *rpipe = (struct pipe *) fp->f_data; 269 int error; 270 int nread = 0; 271 int size; 272 273 error = pipelock(rpipe); 274 if (error) 275 return (error); 276 277 ++rpipe->pipe_busy; 278 279 while (uio->uio_resid) { 280 /* 281 * normal pipe buffer receive 282 */ 283 if (rpipe->pipe_buffer.cnt > 0) { 284 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 285 if (size > rpipe->pipe_buffer.cnt) 286 size = rpipe->pipe_buffer.cnt; 287 if (size > uio->uio_resid) 288 size = uio->uio_resid; 289 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 290 size, uio); 291 if (error) { 292 break; 293 } 294 rpipe->pipe_buffer.out += size; 295 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 296 rpipe->pipe_buffer.out = 0; 297 298 rpipe->pipe_buffer.cnt -= size; 299 /* 300 * If there is no more to read in the pipe, reset 301 * its pointers to the beginning. This improves 302 * cache hit stats. 303 */ 304 if (rpipe->pipe_buffer.cnt == 0) { 305 rpipe->pipe_buffer.in = 0; 306 rpipe->pipe_buffer.out = 0; 307 } 308 nread += size; 309 } else { 310 /* 311 * detect EOF condition 312 * read returns 0 on EOF, no need to set error 313 */ 314 if (rpipe->pipe_state & PIPE_EOF) 315 break; 316 317 /* 318 * If the "write-side" has been blocked, wake it up now. 319 */ 320 if (rpipe->pipe_state & PIPE_WANTW) { 321 rpipe->pipe_state &= ~PIPE_WANTW; 322 wakeup(rpipe); 323 } 324 325 /* 326 * Break if some data was read. 327 */ 328 if (nread > 0) 329 break; 330 331 /* 332 * Unlock the pipe buffer for our remaining processing. 333 * We will either break out with an error or we will 334 * sleep and relock to loop. 335 */ 336 pipeunlock(rpipe); 337 338 /* 339 * Handle non-blocking mode operation or 340 * wait for more data. 341 */ 342 if (fp->f_flag & FNONBLOCK) { 343 error = EAGAIN; 344 } else { 345 rpipe->pipe_state |= PIPE_WANTR; 346 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0) 347 error = pipelock(rpipe); 348 } 349 if (error) 350 goto unlocked_error; 351 } 352 } 353 pipeunlock(rpipe); 354 355 if (error == 0) 356 getnanotime(&rpipe->pipe_atime); 357 unlocked_error: 358 --rpipe->pipe_busy; 359 360 /* 361 * PIPE_WANT processing only makes sense if pipe_busy is 0. 362 */ 363 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 364 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 365 wakeup(rpipe); 366 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 367 /* 368 * Handle write blocking hysteresis. 369 */ 370 if (rpipe->pipe_state & PIPE_WANTW) { 371 rpipe->pipe_state &= ~PIPE_WANTW; 372 wakeup(rpipe); 373 } 374 } 375 376 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 377 pipeselwakeup(rpipe); 378 379 return (error); 380 } 381 382 int 383 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 384 { 385 int error = 0; 386 int orig_resid; 387 388 struct pipe *wpipe, *rpipe; 389 390 rpipe = (struct pipe *) fp->f_data; 391 wpipe = rpipe->pipe_peer; 392 393 /* 394 * detect loss of pipe read side, issue SIGPIPE if lost. 395 */ 396 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 397 return (EPIPE); 398 } 399 ++wpipe->pipe_busy; 400 401 /* 402 * If it is advantageous to resize the pipe buffer, do 403 * so. 404 */ 405 if ((uio->uio_resid > PIPE_SIZE) && 406 (nbigpipe < LIMITBIGPIPES) && 407 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 408 (wpipe->pipe_buffer.cnt == 0)) { 409 410 if ((error = pipelock(wpipe)) == 0) { 411 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 412 nbigpipe++; 413 pipeunlock(wpipe); 414 } 415 } 416 417 /* 418 * If an early error occured unbusy and return, waking up any pending 419 * readers. 420 */ 421 if (error) { 422 --wpipe->pipe_busy; 423 if ((wpipe->pipe_busy == 0) && 424 (wpipe->pipe_state & PIPE_WANT)) { 425 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 426 wakeup(wpipe); 427 } 428 return (error); 429 } 430 431 orig_resid = uio->uio_resid; 432 433 while (uio->uio_resid) { 434 int space; 435 436 retrywrite: 437 if (wpipe->pipe_state & PIPE_EOF) { 438 error = EPIPE; 439 break; 440 } 441 442 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 443 444 /* Writes of size <= PIPE_BUF must be atomic. */ 445 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 446 space = 0; 447 448 if (space > 0) { 449 if ((error = pipelock(wpipe)) == 0) { 450 int size; /* Transfer size */ 451 int segsize; /* first segment to transfer */ 452 453 /* 454 * If a process blocked in uiomove, our 455 * value for space might be bad. 456 * 457 * XXX will we be ok if the reader has gone 458 * away here? 459 */ 460 if (space > wpipe->pipe_buffer.size - 461 wpipe->pipe_buffer.cnt) { 462 pipeunlock(wpipe); 463 goto retrywrite; 464 } 465 466 /* 467 * Transfer size is minimum of uio transfer 468 * and free space in pipe buffer. 469 */ 470 if (space > uio->uio_resid) 471 size = uio->uio_resid; 472 else 473 size = space; 474 /* 475 * First segment to transfer is minimum of 476 * transfer size and contiguous space in 477 * pipe buffer. If first segment to transfer 478 * is less than the transfer size, we've got 479 * a wraparound in the buffer. 480 */ 481 segsize = wpipe->pipe_buffer.size - 482 wpipe->pipe_buffer.in; 483 if (segsize > size) 484 segsize = size; 485 486 /* Transfer first segment */ 487 488 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 489 segsize, uio); 490 491 if (error == 0 && segsize < size) { 492 /* 493 * Transfer remaining part now, to 494 * support atomic writes. Wraparound 495 * happened. 496 */ 497 #ifdef DIAGNOSTIC 498 if (wpipe->pipe_buffer.in + segsize != 499 wpipe->pipe_buffer.size) 500 panic("Expected pipe buffer wraparound disappeared"); 501 #endif 502 503 error = uiomove(&wpipe->pipe_buffer.buffer[0], 504 size - segsize, uio); 505 } 506 if (error == 0) { 507 wpipe->pipe_buffer.in += size; 508 if (wpipe->pipe_buffer.in >= 509 wpipe->pipe_buffer.size) { 510 #ifdef DIAGNOSTIC 511 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 512 panic("Expected wraparound bad"); 513 #endif 514 wpipe->pipe_buffer.in = size - segsize; 515 } 516 517 wpipe->pipe_buffer.cnt += size; 518 #ifdef DIAGNOSTIC 519 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 520 panic("Pipe buffer overflow"); 521 #endif 522 } 523 pipeunlock(wpipe); 524 } 525 if (error) 526 break; 527 } else { 528 /* 529 * If the "read-side" has been blocked, wake it up now. 530 */ 531 if (wpipe->pipe_state & PIPE_WANTR) { 532 wpipe->pipe_state &= ~PIPE_WANTR; 533 wakeup(wpipe); 534 } 535 536 /* 537 * don't block on non-blocking I/O 538 */ 539 if (fp->f_flag & FNONBLOCK) { 540 error = EAGAIN; 541 break; 542 } 543 544 /* 545 * We have no more space and have something to offer, 546 * wake up select/poll. 547 */ 548 pipeselwakeup(wpipe); 549 550 wpipe->pipe_state |= PIPE_WANTW; 551 error = tsleep(wpipe, (PRIBIO + 1)|PCATCH, 552 "pipewr", 0); 553 if (error) 554 break; 555 /* 556 * If read side wants to go away, we just issue a 557 * signal to ourselves. 558 */ 559 if (wpipe->pipe_state & PIPE_EOF) { 560 error = EPIPE; 561 break; 562 } 563 } 564 } 565 566 --wpipe->pipe_busy; 567 568 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 569 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 570 wakeup(wpipe); 571 } else if (wpipe->pipe_buffer.cnt > 0) { 572 /* 573 * If we have put any characters in the buffer, we wake up 574 * the reader. 575 */ 576 if (wpipe->pipe_state & PIPE_WANTR) { 577 wpipe->pipe_state &= ~PIPE_WANTR; 578 wakeup(wpipe); 579 } 580 } 581 582 /* 583 * Don't return EPIPE if I/O was successful 584 */ 585 if ((wpipe->pipe_buffer.cnt == 0) && 586 (uio->uio_resid == 0) && 587 (error == EPIPE)) { 588 error = 0; 589 } 590 591 if (error == 0) 592 getnanotime(&wpipe->pipe_mtime); 593 /* 594 * We have something to offer, wake up select/poll. 595 */ 596 if (wpipe->pipe_buffer.cnt) 597 pipeselwakeup(wpipe); 598 599 return (error); 600 } 601 602 /* 603 * we implement a very minimal set of ioctls for compatibility with sockets. 604 */ 605 int 606 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p) 607 { 608 struct pipe *mpipe = (struct pipe *)fp->f_data; 609 610 switch (cmd) { 611 612 case FIONBIO: 613 return (0); 614 615 case FIOASYNC: 616 if (*(int *)data) { 617 mpipe->pipe_state |= PIPE_ASYNC; 618 } else { 619 mpipe->pipe_state &= ~PIPE_ASYNC; 620 } 621 return (0); 622 623 case FIONREAD: 624 *(int *)data = mpipe->pipe_buffer.cnt; 625 return (0); 626 627 case SIOCSPGRP: 628 mpipe->pipe_pgid = *(int *)data; 629 return (0); 630 631 case SIOCGPGRP: 632 *(int *)data = mpipe->pipe_pgid; 633 return (0); 634 635 } 636 return (ENOTTY); 637 } 638 639 int 640 pipe_poll(struct file *fp, int events, struct proc *p) 641 { 642 struct pipe *rpipe = (struct pipe *)fp->f_data; 643 struct pipe *wpipe; 644 int revents = 0; 645 646 wpipe = rpipe->pipe_peer; 647 if (events & (POLLIN | POLLRDNORM)) { 648 if ((rpipe->pipe_buffer.cnt > 0) || 649 (rpipe->pipe_state & PIPE_EOF)) 650 revents |= events & (POLLIN | POLLRDNORM); 651 } 652 653 /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */ 654 if ((rpipe->pipe_state & PIPE_EOF) || 655 (wpipe == NULL) || 656 (wpipe->pipe_state & PIPE_EOF)) 657 revents |= POLLHUP; 658 else if (events & (POLLOUT | POLLWRNORM)) { 659 if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF) 660 revents |= events & (POLLOUT | POLLWRNORM); 661 } 662 663 if (revents == 0) { 664 if (events & (POLLIN | POLLRDNORM)) { 665 selrecord(p, &rpipe->pipe_sel); 666 rpipe->pipe_state |= PIPE_SEL; 667 } 668 if (events & (POLLOUT | POLLWRNORM)) { 669 selrecord(p, &wpipe->pipe_sel); 670 wpipe->pipe_state |= PIPE_SEL; 671 } 672 } 673 return (revents); 674 } 675 676 int 677 pipe_stat(struct file *fp, struct stat *ub, struct proc *p) 678 { 679 struct pipe *pipe = (struct pipe *)fp->f_data; 680 681 bzero(ub, sizeof(*ub)); 682 ub->st_mode = S_IFIFO; 683 ub->st_blksize = pipe->pipe_buffer.size; 684 ub->st_size = pipe->pipe_buffer.cnt; 685 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 686 ub->st_atim = pipe->pipe_atime; 687 ub->st_mtim = pipe->pipe_mtime; 688 ub->st_ctim = pipe->pipe_ctime; 689 ub->st_uid = fp->f_cred->cr_uid; 690 ub->st_gid = fp->f_cred->cr_gid; 691 /* 692 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 693 * XXX (st_dev, st_ino) should be unique. 694 */ 695 return (0); 696 } 697 698 /* ARGSUSED */ 699 int 700 pipe_close(struct file *fp, struct proc *p) 701 { 702 struct pipe *cpipe = (struct pipe *)fp->f_data; 703 704 fp->f_ops = NULL; 705 fp->f_data = NULL; 706 pipeclose(cpipe); 707 return (0); 708 } 709 710 void 711 pipe_free_kmem(struct pipe *cpipe) 712 { 713 if (cpipe->pipe_buffer.buffer != NULL) { 714 if (cpipe->pipe_buffer.size > PIPE_SIZE) 715 --nbigpipe; 716 amountpipekva -= cpipe->pipe_buffer.size; 717 uvm_km_free(kernel_map, (vaddr_t)cpipe->pipe_buffer.buffer, 718 cpipe->pipe_buffer.size); 719 cpipe->pipe_buffer.buffer = NULL; 720 } 721 } 722 723 /* 724 * shutdown the pipe 725 */ 726 void 727 pipeclose(struct pipe *cpipe) 728 { 729 struct pipe *ppipe; 730 if (cpipe) { 731 732 pipeselwakeup(cpipe); 733 734 /* 735 * If the other side is blocked, wake it up saying that 736 * we want to close it down. 737 */ 738 cpipe->pipe_state |= PIPE_EOF; 739 while (cpipe->pipe_busy) { 740 wakeup(cpipe); 741 cpipe->pipe_state |= PIPE_WANT; 742 tsleep(cpipe, PRIBIO, "pipecl", 0); 743 } 744 745 /* 746 * Disconnect from peer 747 */ 748 if ((ppipe = cpipe->pipe_peer) != NULL) { 749 pipeselwakeup(ppipe); 750 751 ppipe->pipe_state |= PIPE_EOF; 752 wakeup(ppipe); 753 KNOTE(&ppipe->pipe_sel.si_note, 0); 754 ppipe->pipe_peer = NULL; 755 } 756 757 /* 758 * free resources 759 */ 760 pipe_free_kmem(cpipe); 761 pool_put(&pipe_pool, cpipe); 762 } 763 } 764 765 int 766 pipe_kqfilter(struct file *fp, struct knote *kn) 767 { 768 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 769 struct pipe *wpipe = rpipe->pipe_peer; 770 771 switch (kn->kn_filter) { 772 case EVFILT_READ: 773 kn->kn_fop = &pipe_rfiltops; 774 SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext); 775 break; 776 case EVFILT_WRITE: 777 if (wpipe == NULL) 778 /* other end of pipe has been closed */ 779 return (1); 780 kn->kn_fop = &pipe_wfiltops; 781 SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext); 782 break; 783 default: 784 return (1); 785 } 786 787 return (0); 788 } 789 790 void 791 filt_pipedetach(struct knote *kn) 792 { 793 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 794 struct pipe *wpipe = rpipe->pipe_peer; 795 796 switch (kn->kn_filter) { 797 case EVFILT_READ: 798 SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext); 799 break; 800 case EVFILT_WRITE: 801 if (wpipe == NULL) 802 return; 803 SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext); 804 break; 805 } 806 } 807 808 /*ARGSUSED*/ 809 int 810 filt_piperead(struct knote *kn, long hint) 811 { 812 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 813 struct pipe *wpipe = rpipe->pipe_peer; 814 815 kn->kn_data = rpipe->pipe_buffer.cnt; 816 817 if ((rpipe->pipe_state & PIPE_EOF) || 818 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 819 kn->kn_flags |= EV_EOF; 820 return (1); 821 } 822 return (kn->kn_data > 0); 823 } 824 825 /*ARGSUSED*/ 826 int 827 filt_pipewrite(struct knote *kn, long hint) 828 { 829 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 830 struct pipe *wpipe = rpipe->pipe_peer; 831 832 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 833 kn->kn_data = 0; 834 kn->kn_flags |= EV_EOF; 835 return (1); 836 } 837 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 838 839 return (kn->kn_data >= PIPE_BUF); 840 } 841 842 void 843 pipe_init(void) 844 { 845 pool_init(&pipe_pool, sizeof(struct pipe), 0, 0, 0, "pipepl", 846 &pool_allocator_nointr); 847 } 848 849