1 /* $OpenBSD: sys_pipe.c,v 1.76 2017/02/11 19:51:06 guenther Exp $ */ 2 3 /* 4 * Copyright (c) 1996 John S. Dyson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice immediately at the beginning of the file, without modification, 12 * this list of conditions, and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Absolutely no warranty of function or purpose is made by the author 17 * John S. Dyson. 18 * 4. Modifications may be freely made to this file if the above conditions 19 * are met. 20 */ 21 22 /* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/proc.h> 32 #include <sys/file.h> 33 #include <sys/filedesc.h> 34 #include <sys/pool.h> 35 #include <sys/ioctl.h> 36 #include <sys/stat.h> 37 #include <sys/signalvar.h> 38 #include <sys/mount.h> 39 #include <sys/syscallargs.h> 40 #include <sys/event.h> 41 #include <sys/lock.h> 42 #include <sys/poll.h> 43 #ifdef KTRACE 44 #include <sys/ktrace.h> 45 #endif 46 47 #include <uvm/uvm_extern.h> 48 49 #include <sys/pipe.h> 50 51 /* 52 * interfaces to the outside world 53 */ 54 int pipe_read(struct file *, off_t *, struct uio *, struct ucred *); 55 int pipe_write(struct file *, off_t *, struct uio *, struct ucred *); 56 int pipe_close(struct file *, struct proc *); 57 int pipe_poll(struct file *, int events, struct proc *); 58 int pipe_kqfilter(struct file *fp, struct knote *kn); 59 int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *); 60 int pipe_stat(struct file *fp, struct stat *ub, struct proc *p); 61 62 static struct fileops pipeops = { 63 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 64 pipe_stat, pipe_close 65 }; 66 67 void filt_pipedetach(struct knote *kn); 68 int filt_piperead(struct knote *kn, long hint); 69 int filt_pipewrite(struct knote *kn, long hint); 70 71 struct filterops pipe_rfiltops = 72 { 1, NULL, filt_pipedetach, filt_piperead }; 73 struct filterops pipe_wfiltops = 74 { 1, NULL, filt_pipedetach, filt_pipewrite }; 75 76 /* 77 * Default pipe buffer size(s), this can be kind-of large now because pipe 78 * space is pageable. The pipe code will try to maintain locality of 79 * reference for performance reasons, so small amounts of outstanding I/O 80 * will not wipe the cache. 81 */ 82 #define MINPIPESIZE (PIPE_SIZE/3) 83 84 /* 85 * Limit the number of "big" pipes 86 */ 87 #define LIMITBIGPIPES 32 88 int nbigpipe; 89 static int amountpipekva; 90 91 struct pool pipe_pool; 92 93 int dopipe(struct proc *, int *, int); 94 void pipeclose(struct pipe *); 95 void pipe_free_kmem(struct pipe *); 96 int pipe_create(struct pipe *); 97 int pipelock(struct pipe *); 98 void pipeunlock(struct pipe *); 99 void pipeselwakeup(struct pipe *); 100 int pipespace(struct pipe *, u_int); 101 102 /* 103 * The pipe system call for the DTYPE_PIPE type of pipes 104 */ 105 106 int 107 sys_pipe(struct proc *p, void *v, register_t *retval) 108 { 109 struct sys_pipe_args /* { 110 syscallarg(int *) fdp; 111 } */ *uap = v; 112 113 return (dopipe(p, SCARG(uap, fdp), 0)); 114 } 115 116 int 117 sys_pipe2(struct proc *p, void *v, register_t *retval) 118 { 119 struct sys_pipe2_args /* { 120 syscallarg(int *) fdp; 121 syscallarg(int) flags; 122 } */ *uap = v; 123 124 if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK)) 125 return (EINVAL); 126 127 return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags))); 128 } 129 130 int 131 dopipe(struct proc *p, int *ufds, int flags) 132 { 133 struct filedesc *fdp = p->p_fd; 134 struct file *rf, *wf; 135 struct pipe *rpipe, *wpipe = NULL; 136 int fds[2], cloexec, error; 137 138 cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0; 139 140 rpipe = pool_get(&pipe_pool, PR_WAITOK); 141 error = pipe_create(rpipe); 142 if (error != 0) 143 goto free1; 144 wpipe = pool_get(&pipe_pool, PR_WAITOK); 145 error = pipe_create(wpipe); 146 if (error != 0) 147 goto free1; 148 149 fdplock(fdp); 150 151 error = falloc(p, cloexec, &rf, &fds[0]); 152 if (error != 0) 153 goto free2; 154 rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); 155 rf->f_type = DTYPE_PIPE; 156 rf->f_data = rpipe; 157 rf->f_ops = &pipeops; 158 159 error = falloc(p, cloexec, &wf, &fds[1]); 160 if (error != 0) 161 goto free3; 162 wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); 163 wf->f_type = DTYPE_PIPE; 164 wf->f_data = wpipe; 165 wf->f_ops = &pipeops; 166 167 rpipe->pipe_peer = wpipe; 168 wpipe->pipe_peer = rpipe; 169 170 FILE_SET_MATURE(rf, p); 171 FILE_SET_MATURE(wf, p); 172 173 error = copyout(fds, ufds, sizeof(fds)); 174 if (error != 0) { 175 fdrelease(p, fds[0]); 176 fdrelease(p, fds[1]); 177 } 178 #ifdef KTRACE 179 else if (KTRPOINT(p, KTR_STRUCT)) 180 ktrfds(p, fds, 2); 181 #endif 182 fdpunlock(fdp); 183 return (error); 184 185 free3: 186 fdremove(fdp, fds[0]); 187 closef(rf, p); 188 rpipe = NULL; 189 free2: 190 fdpunlock(fdp); 191 free1: 192 pipeclose(wpipe); 193 pipeclose(rpipe); 194 return (error); 195 } 196 197 /* 198 * Allocate kva for pipe circular buffer, the space is pageable. 199 * This routine will 'realloc' the size of a pipe safely, if it fails 200 * it will retain the old buffer. 201 * If it fails it will return ENOMEM. 202 */ 203 int 204 pipespace(struct pipe *cpipe, u_int size) 205 { 206 caddr_t buffer; 207 208 buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok); 209 if (buffer == NULL) { 210 return (ENOMEM); 211 } 212 213 /* free old resources if we are resizing */ 214 pipe_free_kmem(cpipe); 215 cpipe->pipe_buffer.buffer = buffer; 216 cpipe->pipe_buffer.size = size; 217 cpipe->pipe_buffer.in = 0; 218 cpipe->pipe_buffer.out = 0; 219 cpipe->pipe_buffer.cnt = 0; 220 221 amountpipekva += cpipe->pipe_buffer.size; 222 223 return (0); 224 } 225 226 /* 227 * initialize and allocate VM and memory for pipe 228 */ 229 int 230 pipe_create(struct pipe *cpipe) 231 { 232 int error; 233 234 /* so pipe_free_kmem() doesn't follow junk pointer */ 235 cpipe->pipe_buffer.buffer = NULL; 236 /* 237 * protect so pipeclose() doesn't follow a junk pointer 238 * if pipespace() fails. 239 */ 240 memset(&cpipe->pipe_sel, 0, sizeof(cpipe->pipe_sel)); 241 cpipe->pipe_state = 0; 242 cpipe->pipe_peer = NULL; 243 cpipe->pipe_busy = 0; 244 245 error = pipespace(cpipe, PIPE_SIZE); 246 if (error != 0) 247 return (error); 248 249 getnanotime(&cpipe->pipe_ctime); 250 cpipe->pipe_atime = cpipe->pipe_ctime; 251 cpipe->pipe_mtime = cpipe->pipe_ctime; 252 cpipe->pipe_pgid = NO_PID; 253 254 return (0); 255 } 256 257 258 /* 259 * lock a pipe for I/O, blocking other access 260 */ 261 int 262 pipelock(struct pipe *cpipe) 263 { 264 int error; 265 while (cpipe->pipe_state & PIPE_LOCK) { 266 cpipe->pipe_state |= PIPE_LWANT; 267 if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0))) 268 return error; 269 } 270 cpipe->pipe_state |= PIPE_LOCK; 271 return 0; 272 } 273 274 /* 275 * unlock a pipe I/O lock 276 */ 277 void 278 pipeunlock(struct pipe *cpipe) 279 { 280 cpipe->pipe_state &= ~PIPE_LOCK; 281 if (cpipe->pipe_state & PIPE_LWANT) { 282 cpipe->pipe_state &= ~PIPE_LWANT; 283 wakeup(cpipe); 284 } 285 } 286 287 void 288 pipeselwakeup(struct pipe *cpipe) 289 { 290 if (cpipe->pipe_state & PIPE_SEL) { 291 cpipe->pipe_state &= ~PIPE_SEL; 292 selwakeup(&cpipe->pipe_sel); 293 } else 294 KNOTE(&cpipe->pipe_sel.si_note, 0); 295 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID) 296 gsignal(cpipe->pipe_pgid, SIGIO); 297 } 298 299 int 300 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 301 { 302 struct pipe *rpipe = fp->f_data; 303 int error; 304 size_t size, nread = 0; 305 306 error = pipelock(rpipe); 307 if (error) 308 return (error); 309 310 ++rpipe->pipe_busy; 311 312 while (uio->uio_resid) { 313 /* 314 * normal pipe buffer receive 315 */ 316 if (rpipe->pipe_buffer.cnt > 0) { 317 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 318 if (size > rpipe->pipe_buffer.cnt) 319 size = rpipe->pipe_buffer.cnt; 320 if (size > uio->uio_resid) 321 size = uio->uio_resid; 322 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 323 size, uio); 324 if (error) { 325 break; 326 } 327 rpipe->pipe_buffer.out += size; 328 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 329 rpipe->pipe_buffer.out = 0; 330 331 rpipe->pipe_buffer.cnt -= size; 332 /* 333 * If there is no more to read in the pipe, reset 334 * its pointers to the beginning. This improves 335 * cache hit stats. 336 */ 337 if (rpipe->pipe_buffer.cnt == 0) { 338 rpipe->pipe_buffer.in = 0; 339 rpipe->pipe_buffer.out = 0; 340 } 341 nread += size; 342 } else { 343 /* 344 * detect EOF condition 345 * read returns 0 on EOF, no need to set error 346 */ 347 if (rpipe->pipe_state & PIPE_EOF) 348 break; 349 350 /* 351 * If the "write-side" has been blocked, wake it up now. 352 */ 353 if (rpipe->pipe_state & PIPE_WANTW) { 354 rpipe->pipe_state &= ~PIPE_WANTW; 355 wakeup(rpipe); 356 } 357 358 /* 359 * Break if some data was read. 360 */ 361 if (nread > 0) 362 break; 363 364 /* 365 * Unlock the pipe buffer for our remaining processing. 366 * We will either break out with an error or we will 367 * sleep and relock to loop. 368 */ 369 pipeunlock(rpipe); 370 371 /* 372 * Handle non-blocking mode operation or 373 * wait for more data. 374 */ 375 if (fp->f_flag & FNONBLOCK) { 376 error = EAGAIN; 377 } else { 378 rpipe->pipe_state |= PIPE_WANTR; 379 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0) 380 error = pipelock(rpipe); 381 } 382 if (error) 383 goto unlocked_error; 384 } 385 } 386 pipeunlock(rpipe); 387 388 if (error == 0) 389 getnanotime(&rpipe->pipe_atime); 390 unlocked_error: 391 --rpipe->pipe_busy; 392 393 /* 394 * PIPE_WANT processing only makes sense if pipe_busy is 0. 395 */ 396 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 397 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 398 wakeup(rpipe); 399 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 400 /* 401 * Handle write blocking hysteresis. 402 */ 403 if (rpipe->pipe_state & PIPE_WANTW) { 404 rpipe->pipe_state &= ~PIPE_WANTW; 405 wakeup(rpipe); 406 } 407 } 408 409 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 410 pipeselwakeup(rpipe); 411 412 return (error); 413 } 414 415 int 416 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 417 { 418 int error = 0; 419 size_t orig_resid; 420 struct pipe *wpipe, *rpipe; 421 422 rpipe = fp->f_data; 423 wpipe = rpipe->pipe_peer; 424 425 /* 426 * detect loss of pipe read side, issue SIGPIPE if lost. 427 */ 428 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 429 return (EPIPE); 430 } 431 ++wpipe->pipe_busy; 432 433 /* 434 * If it is advantageous to resize the pipe buffer, do 435 * so. 436 */ 437 if ((uio->uio_resid > PIPE_SIZE) && 438 (nbigpipe < LIMITBIGPIPES) && 439 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 440 (wpipe->pipe_buffer.cnt == 0)) { 441 442 if ((error = pipelock(wpipe)) == 0) { 443 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 444 nbigpipe++; 445 pipeunlock(wpipe); 446 } 447 } 448 449 /* 450 * If an early error occurred unbusy and return, waking up any pending 451 * readers. 452 */ 453 if (error) { 454 --wpipe->pipe_busy; 455 if ((wpipe->pipe_busy == 0) && 456 (wpipe->pipe_state & PIPE_WANT)) { 457 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 458 wakeup(wpipe); 459 } 460 return (error); 461 } 462 463 orig_resid = uio->uio_resid; 464 465 while (uio->uio_resid) { 466 size_t space; 467 468 retrywrite: 469 if (wpipe->pipe_state & PIPE_EOF) { 470 error = EPIPE; 471 break; 472 } 473 474 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 475 476 /* Writes of size <= PIPE_BUF must be atomic. */ 477 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 478 space = 0; 479 480 if (space > 0) { 481 if ((error = pipelock(wpipe)) == 0) { 482 size_t size; /* Transfer size */ 483 size_t segsize; /* first segment to transfer */ 484 485 /* 486 * If a process blocked in uiomove, our 487 * value for space might be bad. 488 * 489 * XXX will we be ok if the reader has gone 490 * away here? 491 */ 492 if (space > wpipe->pipe_buffer.size - 493 wpipe->pipe_buffer.cnt) { 494 pipeunlock(wpipe); 495 goto retrywrite; 496 } 497 498 /* 499 * Transfer size is minimum of uio transfer 500 * and free space in pipe buffer. 501 */ 502 if (space > uio->uio_resid) 503 size = uio->uio_resid; 504 else 505 size = space; 506 /* 507 * First segment to transfer is minimum of 508 * transfer size and contiguous space in 509 * pipe buffer. If first segment to transfer 510 * is less than the transfer size, we've got 511 * a wraparound in the buffer. 512 */ 513 segsize = wpipe->pipe_buffer.size - 514 wpipe->pipe_buffer.in; 515 if (segsize > size) 516 segsize = size; 517 518 /* Transfer first segment */ 519 520 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 521 segsize, uio); 522 523 if (error == 0 && segsize < size) { 524 /* 525 * Transfer remaining part now, to 526 * support atomic writes. Wraparound 527 * happened. 528 */ 529 #ifdef DIAGNOSTIC 530 if (wpipe->pipe_buffer.in + segsize != 531 wpipe->pipe_buffer.size) 532 panic("Expected pipe buffer wraparound disappeared"); 533 #endif 534 535 error = uiomove(&wpipe->pipe_buffer.buffer[0], 536 size - segsize, uio); 537 } 538 if (error == 0) { 539 wpipe->pipe_buffer.in += size; 540 if (wpipe->pipe_buffer.in >= 541 wpipe->pipe_buffer.size) { 542 #ifdef DIAGNOSTIC 543 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 544 panic("Expected wraparound bad"); 545 #endif 546 wpipe->pipe_buffer.in = size - segsize; 547 } 548 549 wpipe->pipe_buffer.cnt += size; 550 #ifdef DIAGNOSTIC 551 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 552 panic("Pipe buffer overflow"); 553 #endif 554 } 555 pipeunlock(wpipe); 556 } 557 if (error) 558 break; 559 } else { 560 /* 561 * If the "read-side" has been blocked, wake it up now. 562 */ 563 if (wpipe->pipe_state & PIPE_WANTR) { 564 wpipe->pipe_state &= ~PIPE_WANTR; 565 wakeup(wpipe); 566 } 567 568 /* 569 * don't block on non-blocking I/O 570 */ 571 if (fp->f_flag & FNONBLOCK) { 572 error = EAGAIN; 573 break; 574 } 575 576 /* 577 * We have no more space and have something to offer, 578 * wake up select/poll. 579 */ 580 pipeselwakeup(wpipe); 581 582 wpipe->pipe_state |= PIPE_WANTW; 583 error = tsleep(wpipe, (PRIBIO + 1)|PCATCH, 584 "pipewr", 0); 585 if (error) 586 break; 587 /* 588 * If read side wants to go away, we just issue a 589 * signal to ourselves. 590 */ 591 if (wpipe->pipe_state & PIPE_EOF) { 592 error = EPIPE; 593 break; 594 } 595 } 596 } 597 598 --wpipe->pipe_busy; 599 600 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 601 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 602 wakeup(wpipe); 603 } else if (wpipe->pipe_buffer.cnt > 0) { 604 /* 605 * If we have put any characters in the buffer, we wake up 606 * the reader. 607 */ 608 if (wpipe->pipe_state & PIPE_WANTR) { 609 wpipe->pipe_state &= ~PIPE_WANTR; 610 wakeup(wpipe); 611 } 612 } 613 614 /* 615 * Don't return EPIPE if I/O was successful 616 */ 617 if ((wpipe->pipe_buffer.cnt == 0) && 618 (uio->uio_resid == 0) && 619 (error == EPIPE)) { 620 error = 0; 621 } 622 623 if (error == 0) 624 getnanotime(&wpipe->pipe_mtime); 625 /* 626 * We have something to offer, wake up select/poll. 627 */ 628 if (wpipe->pipe_buffer.cnt) 629 pipeselwakeup(wpipe); 630 631 return (error); 632 } 633 634 /* 635 * we implement a very minimal set of ioctls for compatibility with sockets. 636 */ 637 int 638 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p) 639 { 640 struct pipe *mpipe = fp->f_data; 641 642 switch (cmd) { 643 644 case FIONBIO: 645 return (0); 646 647 case FIOASYNC: 648 if (*(int *)data) { 649 mpipe->pipe_state |= PIPE_ASYNC; 650 } else { 651 mpipe->pipe_state &= ~PIPE_ASYNC; 652 } 653 return (0); 654 655 case FIONREAD: 656 *(int *)data = mpipe->pipe_buffer.cnt; 657 return (0); 658 659 case SIOCSPGRP: 660 mpipe->pipe_pgid = *(int *)data; 661 return (0); 662 663 case SIOCGPGRP: 664 *(int *)data = mpipe->pipe_pgid; 665 return (0); 666 667 } 668 return (ENOTTY); 669 } 670 671 int 672 pipe_poll(struct file *fp, int events, struct proc *p) 673 { 674 struct pipe *rpipe = fp->f_data; 675 struct pipe *wpipe; 676 int revents = 0; 677 678 wpipe = rpipe->pipe_peer; 679 if (events & (POLLIN | POLLRDNORM)) { 680 if ((rpipe->pipe_buffer.cnt > 0) || 681 (rpipe->pipe_state & PIPE_EOF)) 682 revents |= events & (POLLIN | POLLRDNORM); 683 } 684 685 /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */ 686 if ((rpipe->pipe_state & PIPE_EOF) || 687 (wpipe == NULL) || 688 (wpipe->pipe_state & PIPE_EOF)) 689 revents |= POLLHUP; 690 else if (events & (POLLOUT | POLLWRNORM)) { 691 if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF) 692 revents |= events & (POLLOUT | POLLWRNORM); 693 } 694 695 if (revents == 0) { 696 if (events & (POLLIN | POLLRDNORM)) { 697 selrecord(p, &rpipe->pipe_sel); 698 rpipe->pipe_state |= PIPE_SEL; 699 } 700 if (events & (POLLOUT | POLLWRNORM)) { 701 selrecord(p, &wpipe->pipe_sel); 702 wpipe->pipe_state |= PIPE_SEL; 703 } 704 } 705 return (revents); 706 } 707 708 int 709 pipe_stat(struct file *fp, struct stat *ub, struct proc *p) 710 { 711 struct pipe *pipe = fp->f_data; 712 713 memset(ub, 0, sizeof(*ub)); 714 ub->st_mode = S_IFIFO; 715 ub->st_blksize = pipe->pipe_buffer.size; 716 ub->st_size = pipe->pipe_buffer.cnt; 717 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 718 ub->st_atim.tv_sec = pipe->pipe_atime.tv_sec; 719 ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec; 720 ub->st_mtim.tv_sec = pipe->pipe_mtime.tv_sec; 721 ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec; 722 ub->st_ctim.tv_sec = pipe->pipe_ctime.tv_sec; 723 ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec; 724 ub->st_uid = fp->f_cred->cr_uid; 725 ub->st_gid = fp->f_cred->cr_gid; 726 /* 727 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 728 * XXX (st_dev, st_ino) should be unique. 729 */ 730 return (0); 731 } 732 733 int 734 pipe_close(struct file *fp, struct proc *p) 735 { 736 struct pipe *cpipe = fp->f_data; 737 738 fp->f_ops = NULL; 739 fp->f_data = NULL; 740 pipeclose(cpipe); 741 return (0); 742 } 743 744 void 745 pipe_free_kmem(struct pipe *cpipe) 746 { 747 if (cpipe->pipe_buffer.buffer != NULL) { 748 if (cpipe->pipe_buffer.size > PIPE_SIZE) 749 --nbigpipe; 750 amountpipekva -= cpipe->pipe_buffer.size; 751 km_free(cpipe->pipe_buffer.buffer, cpipe->pipe_buffer.size, 752 &kv_any, &kp_pageable); 753 cpipe->pipe_buffer.buffer = NULL; 754 } 755 } 756 757 /* 758 * shutdown the pipe 759 */ 760 void 761 pipeclose(struct pipe *cpipe) 762 { 763 struct pipe *ppipe; 764 if (cpipe) { 765 766 pipeselwakeup(cpipe); 767 768 /* 769 * If the other side is blocked, wake it up saying that 770 * we want to close it down. 771 */ 772 cpipe->pipe_state |= PIPE_EOF; 773 while (cpipe->pipe_busy) { 774 wakeup(cpipe); 775 cpipe->pipe_state |= PIPE_WANT; 776 tsleep(cpipe, PRIBIO, "pipecl", 0); 777 } 778 779 /* 780 * Disconnect from peer 781 */ 782 if ((ppipe = cpipe->pipe_peer) != NULL) { 783 pipeselwakeup(ppipe); 784 785 ppipe->pipe_state |= PIPE_EOF; 786 wakeup(ppipe); 787 ppipe->pipe_peer = NULL; 788 } 789 790 /* 791 * free resources 792 */ 793 pipe_free_kmem(cpipe); 794 pool_put(&pipe_pool, cpipe); 795 } 796 } 797 798 int 799 pipe_kqfilter(struct file *fp, struct knote *kn) 800 { 801 struct pipe *rpipe = kn->kn_fp->f_data; 802 struct pipe *wpipe = rpipe->pipe_peer; 803 804 switch (kn->kn_filter) { 805 case EVFILT_READ: 806 kn->kn_fop = &pipe_rfiltops; 807 SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext); 808 break; 809 case EVFILT_WRITE: 810 if (wpipe == NULL) { 811 /* other end of pipe has been closed */ 812 return (EPIPE); 813 } 814 kn->kn_fop = &pipe_wfiltops; 815 SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext); 816 break; 817 default: 818 return (EINVAL); 819 } 820 821 return (0); 822 } 823 824 void 825 filt_pipedetach(struct knote *kn) 826 { 827 struct pipe *rpipe = kn->kn_fp->f_data; 828 struct pipe *wpipe = rpipe->pipe_peer; 829 830 switch (kn->kn_filter) { 831 case EVFILT_READ: 832 SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext); 833 break; 834 case EVFILT_WRITE: 835 if (wpipe == NULL) 836 return; 837 SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext); 838 break; 839 } 840 } 841 842 int 843 filt_piperead(struct knote *kn, long hint) 844 { 845 struct pipe *rpipe = kn->kn_fp->f_data; 846 struct pipe *wpipe = rpipe->pipe_peer; 847 848 kn->kn_data = rpipe->pipe_buffer.cnt; 849 850 if ((rpipe->pipe_state & PIPE_EOF) || 851 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 852 kn->kn_flags |= EV_EOF; 853 return (1); 854 } 855 return (kn->kn_data > 0); 856 } 857 858 int 859 filt_pipewrite(struct knote *kn, long hint) 860 { 861 struct pipe *rpipe = kn->kn_fp->f_data; 862 struct pipe *wpipe = rpipe->pipe_peer; 863 864 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 865 kn->kn_data = 0; 866 kn->kn_flags |= EV_EOF; 867 return (1); 868 } 869 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 870 871 return (kn->kn_data >= PIPE_BUF); 872 } 873 874 void 875 pipe_init(void) 876 { 877 pool_init(&pipe_pool, sizeof(struct pipe), 0, IPL_NONE, PR_WAITOK, 878 "pipepl", NULL); 879 } 880 881