1 /* $NetBSD: sys_pipe.c,v 1.150 2020/06/25 16:19:07 maxv Exp $ */ 2 3 /*- 4 * Copyright (c) 2003, 2007, 2008, 2009 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Paul Kranenburg, and by Andrew Doran. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * Copyright (c) 1996 John S. Dyson 34 * All rights reserved. 35 * 36 * Redistribution and use in source and binary forms, with or without 37 * modification, are permitted provided that the following conditions 38 * are met: 39 * 1. Redistributions of source code must retain the above copyright 40 * notice immediately at the beginning of the file, without modification, 41 * this list of conditions, and the following disclaimer. 42 * 2. Redistributions in binary form must reproduce the above copyright 43 * notice, this list of conditions and the following disclaimer in the 44 * documentation and/or other materials provided with the distribution. 45 * 3. Absolutely no warranty of function or purpose is made by the author 46 * John S. Dyson. 47 * 4. Modifications may be freely made to this file if the above conditions 48 * are met. 49 */ 50 51 /* 52 * This file contains a high-performance replacement for the socket-based 53 * pipes scheme originally used. It does not support all features of 54 * sockets, but does do everything that pipes normally do. 55 * 56 * This code has two modes of operation, a small write mode and a large 57 * write mode. The small write mode acts like conventional pipes with 58 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 59 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 60 * and PIPE_SIZE in size it is mapped read-only into the kernel address space 61 * using the UVM page loan facility from where the receiving process can copy 62 * the data directly from the pages in the sending process. 63 * 64 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 65 * happen for small transfers so that the system will not spend all of 66 * its time context switching. PIPE_SIZE is constrained by the 67 * amount of kernel virtual memory. 68 */ 69 70 #include <sys/cdefs.h> 71 __KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.150 2020/06/25 16:19:07 maxv Exp $"); 72 73 #include <sys/param.h> 74 #include <sys/systm.h> 75 #include <sys/proc.h> 76 #include <sys/fcntl.h> 77 #include <sys/file.h> 78 #include <sys/filedesc.h> 79 #include <sys/filio.h> 80 #include <sys/kernel.h> 81 #include <sys/ttycom.h> 82 #include <sys/stat.h> 83 #include <sys/poll.h> 84 #include <sys/signalvar.h> 85 #include <sys/vnode.h> 86 #include <sys/uio.h> 87 #include <sys/select.h> 88 #include <sys/mount.h> 89 #include <sys/syscallargs.h> 90 #include <sys/sysctl.h> 91 #include <sys/kauth.h> 92 #include <sys/atomic.h> 93 #include <sys/pipe.h> 94 95 static int pipe_read(file_t *, off_t *, struct uio *, kauth_cred_t, int); 96 static int pipe_write(file_t *, off_t *, struct uio *, kauth_cred_t, int); 97 static int pipe_close(file_t *); 98 static int pipe_poll(file_t *, int); 99 static int pipe_kqfilter(file_t *, struct knote *); 100 static int pipe_stat(file_t *, struct stat *); 101 static int pipe_ioctl(file_t *, u_long, void *); 102 static void pipe_restart(file_t *); 103 104 static const struct fileops pipeops = { 105 .fo_name = "pipe", 106 .fo_read = pipe_read, 107 .fo_write = pipe_write, 108 .fo_ioctl = pipe_ioctl, 109 .fo_fcntl = fnullop_fcntl, 110 .fo_poll = pipe_poll, 111 .fo_stat = pipe_stat, 112 .fo_close = pipe_close, 113 .fo_kqfilter = pipe_kqfilter, 114 .fo_restart = pipe_restart, 115 }; 116 117 /* 118 * Default pipe buffer size(s), this can be kind-of large now because pipe 119 * space is pageable. The pipe code will try to maintain locality of 120 * reference for performance reasons, so small amounts of outstanding I/O 121 * will not wipe the cache. 122 */ 123 #define MINPIPESIZE (PIPE_SIZE / 3) 124 #define MAXPIPESIZE (2 * PIPE_SIZE / 3) 125 126 /* 127 * Limit the number of "big" pipes 128 */ 129 #define LIMITBIGPIPES 32 130 static u_int maxbigpipes = LIMITBIGPIPES; 131 static u_int nbigpipe = 0; 132 133 /* 134 * Amount of KVA consumed by pipe buffers. 135 */ 136 static u_int amountpipekva = 0; 137 138 static void pipeclose(struct pipe *); 139 static void pipe_free_kmem(struct pipe *); 140 static int pipe_create(struct pipe **, pool_cache_t); 141 static int pipelock(struct pipe *, bool); 142 static inline void pipeunlock(struct pipe *); 143 static void pipeselwakeup(struct pipe *, struct pipe *, int); 144 static int pipespace(struct pipe *, int); 145 static int pipe_ctor(void *, void *, int); 146 static void pipe_dtor(void *, void *); 147 148 static pool_cache_t pipe_wr_cache; 149 static pool_cache_t pipe_rd_cache; 150 151 void 152 pipe_init(void) 153 { 154 155 /* Writer side is not automatically allocated KVA. */ 156 pipe_wr_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "pipewr", 157 NULL, IPL_NONE, pipe_ctor, pipe_dtor, NULL); 158 KASSERT(pipe_wr_cache != NULL); 159 160 /* Reader side gets preallocated KVA. */ 161 pipe_rd_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "piperd", 162 NULL, IPL_NONE, pipe_ctor, pipe_dtor, (void *)1); 163 KASSERT(pipe_rd_cache != NULL); 164 } 165 166 static int 167 pipe_ctor(void *arg, void *obj, int flags) 168 { 169 struct pipe *pipe; 170 vaddr_t va; 171 172 pipe = obj; 173 174 memset(pipe, 0, sizeof(struct pipe)); 175 if (arg != NULL) { 176 /* Preallocate space. */ 177 va = uvm_km_alloc(kernel_map, PIPE_SIZE, 0, 178 UVM_KMF_PAGEABLE | UVM_KMF_WAITVA); 179 KASSERT(va != 0); 180 pipe->pipe_kmem = va; 181 atomic_add_int(&amountpipekva, PIPE_SIZE); 182 } 183 cv_init(&pipe->pipe_rcv, "pipe_rd"); 184 cv_init(&pipe->pipe_wcv, "pipe_wr"); 185 cv_init(&pipe->pipe_draincv, "pipe_drn"); 186 cv_init(&pipe->pipe_lkcv, "pipe_lk"); 187 selinit(&pipe->pipe_sel); 188 pipe->pipe_state = PIPE_SIGNALR; 189 190 return 0; 191 } 192 193 static void 194 pipe_dtor(void *arg, void *obj) 195 { 196 struct pipe *pipe; 197 198 pipe = obj; 199 200 cv_destroy(&pipe->pipe_rcv); 201 cv_destroy(&pipe->pipe_wcv); 202 cv_destroy(&pipe->pipe_draincv); 203 cv_destroy(&pipe->pipe_lkcv); 204 seldestroy(&pipe->pipe_sel); 205 if (pipe->pipe_kmem != 0) { 206 uvm_km_free(kernel_map, pipe->pipe_kmem, PIPE_SIZE, 207 UVM_KMF_PAGEABLE); 208 atomic_add_int(&amountpipekva, -PIPE_SIZE); 209 } 210 } 211 212 /* 213 * The pipe system call for the DTYPE_PIPE type of pipes 214 */ 215 int 216 pipe1(struct lwp *l, int *fildes, int flags) 217 { 218 struct pipe *rpipe, *wpipe; 219 file_t *rf, *wf; 220 int fd, error; 221 proc_t *p; 222 223 if (flags & ~(O_CLOEXEC|O_NONBLOCK|O_NOSIGPIPE)) 224 return EINVAL; 225 p = curproc; 226 rpipe = wpipe = NULL; 227 if ((error = pipe_create(&rpipe, pipe_rd_cache)) || 228 (error = pipe_create(&wpipe, pipe_wr_cache))) { 229 goto free2; 230 } 231 rpipe->pipe_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE); 232 wpipe->pipe_lock = rpipe->pipe_lock; 233 mutex_obj_hold(wpipe->pipe_lock); 234 235 error = fd_allocfile(&rf, &fd); 236 if (error) 237 goto free2; 238 fildes[0] = fd; 239 240 error = fd_allocfile(&wf, &fd); 241 if (error) 242 goto free3; 243 fildes[1] = fd; 244 245 rf->f_flag = FREAD | flags; 246 rf->f_type = DTYPE_PIPE; 247 rf->f_pipe = rpipe; 248 rf->f_ops = &pipeops; 249 fd_set_exclose(l, fildes[0], (flags & O_CLOEXEC) != 0); 250 251 wf->f_flag = FWRITE | flags; 252 wf->f_type = DTYPE_PIPE; 253 wf->f_pipe = wpipe; 254 wf->f_ops = &pipeops; 255 fd_set_exclose(l, fildes[1], (flags & O_CLOEXEC) != 0); 256 257 rpipe->pipe_peer = wpipe; 258 wpipe->pipe_peer = rpipe; 259 260 fd_affix(p, rf, fildes[0]); 261 fd_affix(p, wf, fildes[1]); 262 return (0); 263 free3: 264 fd_abort(p, rf, fildes[0]); 265 free2: 266 pipeclose(wpipe); 267 pipeclose(rpipe); 268 269 return (error); 270 } 271 272 /* 273 * Allocate kva for pipe circular buffer, the space is pageable 274 * This routine will 'realloc' the size of a pipe safely, if it fails 275 * it will retain the old buffer. 276 * If it fails it will return ENOMEM. 277 */ 278 static int 279 pipespace(struct pipe *pipe, int size) 280 { 281 void *buffer; 282 283 /* 284 * Allocate pageable virtual address space. Physical memory is 285 * allocated on demand. 286 */ 287 if (size == PIPE_SIZE && pipe->pipe_kmem != 0) { 288 buffer = (void *)pipe->pipe_kmem; 289 } else { 290 buffer = (void *)uvm_km_alloc(kernel_map, round_page(size), 291 0, UVM_KMF_PAGEABLE); 292 if (buffer == NULL) 293 return (ENOMEM); 294 atomic_add_int(&amountpipekva, size); 295 } 296 297 /* free old resources if we're resizing */ 298 pipe_free_kmem(pipe); 299 pipe->pipe_buffer.buffer = buffer; 300 pipe->pipe_buffer.size = size; 301 pipe->pipe_buffer.in = 0; 302 pipe->pipe_buffer.out = 0; 303 pipe->pipe_buffer.cnt = 0; 304 return (0); 305 } 306 307 /* 308 * Initialize and allocate VM and memory for pipe. 309 */ 310 static int 311 pipe_create(struct pipe **pipep, pool_cache_t cache) 312 { 313 struct pipe *pipe; 314 int error; 315 316 pipe = pool_cache_get(cache, PR_WAITOK); 317 KASSERT(pipe != NULL); 318 *pipep = pipe; 319 error = 0; 320 getnanotime(&pipe->pipe_btime); 321 pipe->pipe_atime = pipe->pipe_mtime = pipe->pipe_btime; 322 pipe->pipe_lock = NULL; 323 if (cache == pipe_rd_cache) { 324 error = pipespace(pipe, PIPE_SIZE); 325 } else { 326 pipe->pipe_buffer.buffer = NULL; 327 pipe->pipe_buffer.size = 0; 328 pipe->pipe_buffer.in = 0; 329 pipe->pipe_buffer.out = 0; 330 pipe->pipe_buffer.cnt = 0; 331 } 332 return error; 333 } 334 335 /* 336 * Lock a pipe for I/O, blocking other access 337 * Called with pipe spin lock held. 338 */ 339 static int 340 pipelock(struct pipe *pipe, bool catch_p) 341 { 342 int error; 343 344 KASSERT(mutex_owned(pipe->pipe_lock)); 345 346 while (pipe->pipe_state & PIPE_LOCKFL) { 347 pipe->pipe_state |= PIPE_LWANT; 348 if (catch_p) { 349 error = cv_wait_sig(&pipe->pipe_lkcv, pipe->pipe_lock); 350 if (error != 0) 351 return error; 352 } else 353 cv_wait(&pipe->pipe_lkcv, pipe->pipe_lock); 354 } 355 356 pipe->pipe_state |= PIPE_LOCKFL; 357 358 return 0; 359 } 360 361 /* 362 * unlock a pipe I/O lock 363 */ 364 static inline void 365 pipeunlock(struct pipe *pipe) 366 { 367 368 KASSERT(pipe->pipe_state & PIPE_LOCKFL); 369 370 pipe->pipe_state &= ~PIPE_LOCKFL; 371 if (pipe->pipe_state & PIPE_LWANT) { 372 pipe->pipe_state &= ~PIPE_LWANT; 373 cv_broadcast(&pipe->pipe_lkcv); 374 } 375 } 376 377 /* 378 * Select/poll wakup. This also sends SIGIO to peer connected to 379 * 'sigpipe' side of pipe. 380 */ 381 static void 382 pipeselwakeup(struct pipe *selp, struct pipe *sigp, int code) 383 { 384 int band; 385 386 switch (code) { 387 case POLL_IN: 388 band = POLLIN|POLLRDNORM; 389 break; 390 case POLL_OUT: 391 band = POLLOUT|POLLWRNORM; 392 break; 393 case POLL_HUP: 394 band = POLLHUP; 395 break; 396 case POLL_ERR: 397 band = POLLERR; 398 break; 399 default: 400 band = 0; 401 #ifdef DIAGNOSTIC 402 printf("bad siginfo code %d in pipe notification.\n", code); 403 #endif 404 break; 405 } 406 407 selnotify(&selp->pipe_sel, band, NOTE_SUBMIT); 408 409 if (sigp == NULL || (sigp->pipe_state & PIPE_ASYNC) == 0) 410 return; 411 412 fownsignal(sigp->pipe_pgid, SIGIO, code, band, selp); 413 } 414 415 static int 416 pipe_read(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 417 int flags) 418 { 419 struct pipe *rpipe = fp->f_pipe; 420 struct pipebuf *bp = &rpipe->pipe_buffer; 421 kmutex_t *lock = rpipe->pipe_lock; 422 int error; 423 size_t nread = 0; 424 size_t size; 425 size_t ocnt; 426 unsigned int wakeup_state = 0; 427 428 mutex_enter(lock); 429 ++rpipe->pipe_busy; 430 ocnt = bp->cnt; 431 432 again: 433 error = pipelock(rpipe, true); 434 if (error) 435 goto unlocked_error; 436 437 while (uio->uio_resid) { 438 /* 439 * Normal pipe buffer receive. 440 */ 441 if (bp->cnt > 0) { 442 size = bp->size - bp->out; 443 if (size > bp->cnt) 444 size = bp->cnt; 445 if (size > uio->uio_resid) 446 size = uio->uio_resid; 447 448 mutex_exit(lock); 449 error = uiomove((char *)bp->buffer + bp->out, size, uio); 450 mutex_enter(lock); 451 if (error) 452 break; 453 454 bp->out += size; 455 if (bp->out >= bp->size) 456 bp->out = 0; 457 458 bp->cnt -= size; 459 460 /* 461 * If there is no more to read in the pipe, reset 462 * its pointers to the beginning. This improves 463 * cache hit stats. 464 */ 465 if (bp->cnt == 0) { 466 bp->in = 0; 467 bp->out = 0; 468 } 469 nread += size; 470 continue; 471 } 472 473 /* 474 * Break if some data was read. 475 */ 476 if (nread > 0) 477 break; 478 479 /* 480 * Detect EOF condition. 481 * Read returns 0 on EOF, no need to set error. 482 */ 483 if (rpipe->pipe_state & PIPE_EOF) 484 break; 485 486 /* 487 * Don't block on non-blocking I/O. 488 */ 489 if (fp->f_flag & FNONBLOCK) { 490 error = EAGAIN; 491 break; 492 } 493 494 /* 495 * Unlock the pipe buffer for our remaining processing. 496 * We will either break out with an error or we will 497 * sleep and relock to loop. 498 */ 499 pipeunlock(rpipe); 500 501 #if 1 /* XXX (dsl) I'm sure these aren't needed here ... */ 502 /* 503 * We want to read more, wake up select/poll. 504 */ 505 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 506 507 /* 508 * If the "write-side" is blocked, wake it up now. 509 */ 510 cv_broadcast(&rpipe->pipe_wcv); 511 #endif 512 513 if (wakeup_state & PIPE_RESTART) { 514 error = ERESTART; 515 goto unlocked_error; 516 } 517 518 /* Now wait until the pipe is filled */ 519 error = cv_wait_sig(&rpipe->pipe_rcv, lock); 520 if (error != 0) 521 goto unlocked_error; 522 wakeup_state = rpipe->pipe_state; 523 goto again; 524 } 525 526 if (error == 0) 527 getnanotime(&rpipe->pipe_atime); 528 pipeunlock(rpipe); 529 530 unlocked_error: 531 --rpipe->pipe_busy; 532 if (rpipe->pipe_busy == 0) { 533 rpipe->pipe_state &= ~PIPE_RESTART; 534 cv_broadcast(&rpipe->pipe_draincv); 535 } 536 if (bp->cnt < MINPIPESIZE) { 537 cv_broadcast(&rpipe->pipe_wcv); 538 } 539 540 /* 541 * If anything was read off the buffer, signal to the writer it's 542 * possible to write more data. Also send signal if we are here for the 543 * first time after last write. 544 */ 545 if ((bp->size - bp->cnt) >= PIPE_BUF 546 && (ocnt != bp->cnt || (rpipe->pipe_state & PIPE_SIGNALR))) { 547 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 548 rpipe->pipe_state &= ~PIPE_SIGNALR; 549 } 550 551 mutex_exit(lock); 552 return (error); 553 } 554 555 static int 556 pipe_write(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 557 int flags) 558 { 559 struct pipe *wpipe, *rpipe; 560 struct pipebuf *bp; 561 kmutex_t *lock; 562 int error; 563 unsigned int wakeup_state = 0; 564 565 /* We want to write to our peer */ 566 rpipe = fp->f_pipe; 567 lock = rpipe->pipe_lock; 568 error = 0; 569 570 mutex_enter(lock); 571 wpipe = rpipe->pipe_peer; 572 573 /* 574 * Detect loss of pipe read side, issue SIGPIPE if lost. 575 */ 576 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) != 0) { 577 mutex_exit(lock); 578 return EPIPE; 579 } 580 ++wpipe->pipe_busy; 581 582 /* Aquire the long-term pipe lock */ 583 if ((error = pipelock(wpipe, true)) != 0) { 584 --wpipe->pipe_busy; 585 if (wpipe->pipe_busy == 0) { 586 wpipe->pipe_state &= ~PIPE_RESTART; 587 cv_broadcast(&wpipe->pipe_draincv); 588 } 589 mutex_exit(lock); 590 return (error); 591 } 592 593 bp = &wpipe->pipe_buffer; 594 595 /* 596 * If it is advantageous to resize the pipe buffer, do so. 597 */ 598 if ((uio->uio_resid > PIPE_SIZE) && 599 (nbigpipe < maxbigpipes) && 600 (bp->size <= PIPE_SIZE) && (bp->cnt == 0)) { 601 602 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 603 atomic_inc_uint(&nbigpipe); 604 } 605 606 while (uio->uio_resid) { 607 size_t space; 608 609 space = bp->size - bp->cnt; 610 611 /* Writes of size <= PIPE_BUF must be atomic. */ 612 if ((space < uio->uio_resid) && (uio->uio_resid <= PIPE_BUF)) 613 space = 0; 614 615 if (space > 0) { 616 int size; /* Transfer size */ 617 int segsize; /* first segment to transfer */ 618 619 /* 620 * Transfer size is minimum of uio transfer 621 * and free space in pipe buffer. 622 */ 623 if (space > uio->uio_resid) 624 size = uio->uio_resid; 625 else 626 size = space; 627 /* 628 * First segment to transfer is minimum of 629 * transfer size and contiguous space in 630 * pipe buffer. If first segment to transfer 631 * is less than the transfer size, we've got 632 * a wraparound in the buffer. 633 */ 634 segsize = bp->size - bp->in; 635 if (segsize > size) 636 segsize = size; 637 638 /* Transfer first segment */ 639 mutex_exit(lock); 640 error = uiomove((char *)bp->buffer + bp->in, segsize, 641 uio); 642 643 if (error == 0 && segsize < size) { 644 /* 645 * Transfer remaining part now, to 646 * support atomic writes. Wraparound 647 * happened. 648 */ 649 KASSERT(bp->in + segsize == bp->size); 650 error = uiomove(bp->buffer, 651 size - segsize, uio); 652 } 653 mutex_enter(lock); 654 if (error) 655 break; 656 657 bp->in += size; 658 if (bp->in >= bp->size) { 659 KASSERT(bp->in == size - segsize + bp->size); 660 bp->in = size - segsize; 661 } 662 663 bp->cnt += size; 664 KASSERT(bp->cnt <= bp->size); 665 wakeup_state = 0; 666 } else { 667 /* 668 * If the "read-side" has been blocked, wake it up now. 669 */ 670 cv_broadcast(&wpipe->pipe_rcv); 671 672 /* 673 * Don't block on non-blocking I/O. 674 */ 675 if (fp->f_flag & FNONBLOCK) { 676 error = EAGAIN; 677 break; 678 } 679 680 /* 681 * We have no more space and have something to offer, 682 * wake up select/poll. 683 */ 684 if (bp->cnt) 685 pipeselwakeup(wpipe, wpipe, POLL_IN); 686 687 if (wakeup_state & PIPE_RESTART) { 688 error = ERESTART; 689 break; 690 } 691 692 pipeunlock(wpipe); 693 error = cv_wait_sig(&wpipe->pipe_wcv, lock); 694 (void)pipelock(wpipe, false); 695 if (error != 0) 696 break; 697 /* 698 * If read side wants to go away, we just issue a signal 699 * to ourselves. 700 */ 701 if (wpipe->pipe_state & PIPE_EOF) { 702 error = EPIPE; 703 break; 704 } 705 wakeup_state = wpipe->pipe_state; 706 } 707 } 708 709 --wpipe->pipe_busy; 710 if (wpipe->pipe_busy == 0) { 711 wpipe->pipe_state &= ~PIPE_RESTART; 712 cv_broadcast(&wpipe->pipe_draincv); 713 } 714 if (bp->cnt > 0) { 715 cv_broadcast(&wpipe->pipe_rcv); 716 } 717 718 /* 719 * Don't return EPIPE if I/O was successful 720 */ 721 if (error == EPIPE && bp->cnt == 0 && uio->uio_resid == 0) 722 error = 0; 723 724 if (error == 0) 725 getnanotime(&wpipe->pipe_mtime); 726 727 /* 728 * We have something to offer, wake up select/poll. 729 * wmap->cnt is always 0 in this point (direct write 730 * is only done synchronously), so check only wpipe->pipe_buffer.cnt 731 */ 732 if (bp->cnt) 733 pipeselwakeup(wpipe, wpipe, POLL_IN); 734 735 /* 736 * Arrange for next read(2) to do a signal. 737 */ 738 wpipe->pipe_state |= PIPE_SIGNALR; 739 740 pipeunlock(wpipe); 741 mutex_exit(lock); 742 return (error); 743 } 744 745 /* 746 * We implement a very minimal set of ioctls for compatibility with sockets. 747 */ 748 int 749 pipe_ioctl(file_t *fp, u_long cmd, void *data) 750 { 751 struct pipe *pipe = fp->f_pipe; 752 kmutex_t *lock = pipe->pipe_lock; 753 754 switch (cmd) { 755 756 case FIONBIO: 757 return (0); 758 759 case FIOASYNC: 760 mutex_enter(lock); 761 if (*(int *)data) { 762 pipe->pipe_state |= PIPE_ASYNC; 763 } else { 764 pipe->pipe_state &= ~PIPE_ASYNC; 765 } 766 mutex_exit(lock); 767 return (0); 768 769 case FIONREAD: 770 mutex_enter(lock); 771 *(int *)data = pipe->pipe_buffer.cnt; 772 mutex_exit(lock); 773 return (0); 774 775 case FIONWRITE: 776 /* Look at other side */ 777 mutex_enter(lock); 778 pipe = pipe->pipe_peer; 779 if (pipe == NULL) 780 *(int *)data = 0; 781 else 782 *(int *)data = pipe->pipe_buffer.cnt; 783 mutex_exit(lock); 784 return (0); 785 786 case FIONSPACE: 787 /* Look at other side */ 788 mutex_enter(lock); 789 pipe = pipe->pipe_peer; 790 if (pipe == NULL) 791 *(int *)data = 0; 792 else 793 *(int *)data = pipe->pipe_buffer.size - 794 pipe->pipe_buffer.cnt; 795 mutex_exit(lock); 796 return (0); 797 798 case TIOCSPGRP: 799 case FIOSETOWN: 800 return fsetown(&pipe->pipe_pgid, cmd, data); 801 802 case TIOCGPGRP: 803 case FIOGETOWN: 804 return fgetown(pipe->pipe_pgid, cmd, data); 805 806 } 807 return (EPASSTHROUGH); 808 } 809 810 int 811 pipe_poll(file_t *fp, int events) 812 { 813 struct pipe *rpipe = fp->f_pipe; 814 struct pipe *wpipe; 815 int eof = 0; 816 int revents = 0; 817 818 mutex_enter(rpipe->pipe_lock); 819 wpipe = rpipe->pipe_peer; 820 821 if (events & (POLLIN | POLLRDNORM)) 822 if ((rpipe->pipe_buffer.cnt > 0) || 823 (rpipe->pipe_state & PIPE_EOF)) 824 revents |= events & (POLLIN | POLLRDNORM); 825 826 eof |= (rpipe->pipe_state & PIPE_EOF); 827 828 if (wpipe == NULL) 829 revents |= events & (POLLOUT | POLLWRNORM); 830 else { 831 if (events & (POLLOUT | POLLWRNORM)) 832 if ((wpipe->pipe_state & PIPE_EOF) || ( 833 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) 834 revents |= events & (POLLOUT | POLLWRNORM); 835 836 eof |= (wpipe->pipe_state & PIPE_EOF); 837 } 838 839 if (wpipe == NULL || eof) 840 revents |= POLLHUP; 841 842 if (revents == 0) { 843 if (events & (POLLIN | POLLRDNORM)) 844 selrecord(curlwp, &rpipe->pipe_sel); 845 846 if (events & (POLLOUT | POLLWRNORM)) 847 selrecord(curlwp, &wpipe->pipe_sel); 848 } 849 mutex_exit(rpipe->pipe_lock); 850 851 return (revents); 852 } 853 854 static int 855 pipe_stat(file_t *fp, struct stat *ub) 856 { 857 struct pipe *pipe = fp->f_pipe; 858 859 mutex_enter(pipe->pipe_lock); 860 memset(ub, 0, sizeof(*ub)); 861 ub->st_mode = S_IFIFO | S_IRUSR | S_IWUSR; 862 ub->st_blksize = pipe->pipe_buffer.size; 863 if (ub->st_blksize == 0 && pipe->pipe_peer) 864 ub->st_blksize = pipe->pipe_peer->pipe_buffer.size; 865 ub->st_size = pipe->pipe_buffer.cnt; 866 ub->st_blocks = (ub->st_size) ? 1 : 0; 867 ub->st_atimespec = pipe->pipe_atime; 868 ub->st_mtimespec = pipe->pipe_mtime; 869 ub->st_ctimespec = ub->st_birthtimespec = pipe->pipe_btime; 870 ub->st_uid = kauth_cred_geteuid(fp->f_cred); 871 ub->st_gid = kauth_cred_getegid(fp->f_cred); 872 873 /* 874 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 875 * XXX (st_dev, st_ino) should be unique. 876 */ 877 mutex_exit(pipe->pipe_lock); 878 return 0; 879 } 880 881 static int 882 pipe_close(file_t *fp) 883 { 884 struct pipe *pipe = fp->f_pipe; 885 886 fp->f_pipe = NULL; 887 pipeclose(pipe); 888 return (0); 889 } 890 891 static void 892 pipe_restart(file_t *fp) 893 { 894 struct pipe *pipe = fp->f_pipe; 895 896 /* 897 * Unblock blocked reads/writes in order to allow close() to complete. 898 * System calls return ERESTART so that the fd is revalidated. 899 * (Partial writes return the transfer length.) 900 */ 901 mutex_enter(pipe->pipe_lock); 902 pipe->pipe_state |= PIPE_RESTART; 903 /* Wakeup both cvs, maybe we only need one, but maybe there are some 904 * other paths where wakeup is needed, and it saves deciding which! */ 905 cv_broadcast(&pipe->pipe_rcv); 906 cv_broadcast(&pipe->pipe_wcv); 907 mutex_exit(pipe->pipe_lock); 908 } 909 910 static void 911 pipe_free_kmem(struct pipe *pipe) 912 { 913 914 if (pipe->pipe_buffer.buffer != NULL) { 915 if (pipe->pipe_buffer.size > PIPE_SIZE) { 916 atomic_dec_uint(&nbigpipe); 917 } 918 if (pipe->pipe_buffer.buffer != (void *)pipe->pipe_kmem) { 919 uvm_km_free(kernel_map, 920 (vaddr_t)pipe->pipe_buffer.buffer, 921 pipe->pipe_buffer.size, UVM_KMF_PAGEABLE); 922 atomic_add_int(&amountpipekva, 923 -pipe->pipe_buffer.size); 924 } 925 pipe->pipe_buffer.buffer = NULL; 926 } 927 } 928 929 /* 930 * Shutdown the pipe. 931 */ 932 static void 933 pipeclose(struct pipe *pipe) 934 { 935 kmutex_t *lock; 936 struct pipe *ppipe; 937 938 if (pipe == NULL) 939 return; 940 941 KASSERT(cv_is_valid(&pipe->pipe_rcv)); 942 KASSERT(cv_is_valid(&pipe->pipe_wcv)); 943 KASSERT(cv_is_valid(&pipe->pipe_draincv)); 944 KASSERT(cv_is_valid(&pipe->pipe_lkcv)); 945 946 lock = pipe->pipe_lock; 947 if (lock == NULL) 948 /* Must have failed during create */ 949 goto free_resources; 950 951 mutex_enter(lock); 952 pipeselwakeup(pipe, pipe, POLL_HUP); 953 954 /* 955 * If the other side is blocked, wake it up saying that 956 * we want to close it down. 957 */ 958 pipe->pipe_state |= PIPE_EOF; 959 if (pipe->pipe_busy) { 960 while (pipe->pipe_busy) { 961 cv_broadcast(&pipe->pipe_wcv); 962 cv_wait_sig(&pipe->pipe_draincv, lock); 963 } 964 } 965 966 /* 967 * Disconnect from peer. 968 */ 969 if ((ppipe = pipe->pipe_peer) != NULL) { 970 pipeselwakeup(ppipe, ppipe, POLL_HUP); 971 ppipe->pipe_state |= PIPE_EOF; 972 cv_broadcast(&ppipe->pipe_rcv); 973 ppipe->pipe_peer = NULL; 974 } 975 976 /* 977 * Any knote objects still left in the list are 978 * the one attached by peer. Since no one will 979 * traverse this list, we just clear it. 980 */ 981 SLIST_INIT(&pipe->pipe_sel.sel_klist); 982 983 KASSERT((pipe->pipe_state & PIPE_LOCKFL) == 0); 984 mutex_exit(lock); 985 mutex_obj_free(lock); 986 987 /* 988 * Free resources. 989 */ 990 free_resources: 991 pipe->pipe_pgid = 0; 992 pipe->pipe_state = PIPE_SIGNALR; 993 pipe->pipe_peer = NULL; 994 pipe->pipe_lock = NULL; 995 pipe_free_kmem(pipe); 996 if (pipe->pipe_kmem != 0) { 997 pool_cache_put(pipe_rd_cache, pipe); 998 } else { 999 pool_cache_put(pipe_wr_cache, pipe); 1000 } 1001 } 1002 1003 static void 1004 filt_pipedetach(struct knote *kn) 1005 { 1006 struct pipe *pipe; 1007 kmutex_t *lock; 1008 1009 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1010 lock = pipe->pipe_lock; 1011 1012 mutex_enter(lock); 1013 1014 switch(kn->kn_filter) { 1015 case EVFILT_WRITE: 1016 /* Need the peer structure, not our own. */ 1017 pipe = pipe->pipe_peer; 1018 1019 /* If reader end already closed, just return. */ 1020 if (pipe == NULL) { 1021 mutex_exit(lock); 1022 return; 1023 } 1024 1025 break; 1026 default: 1027 /* Nothing to do. */ 1028 break; 1029 } 1030 1031 KASSERT(kn->kn_hook == pipe); 1032 SLIST_REMOVE(&pipe->pipe_sel.sel_klist, kn, knote, kn_selnext); 1033 mutex_exit(lock); 1034 } 1035 1036 static int 1037 filt_piperead(struct knote *kn, long hint) 1038 { 1039 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1040 struct pipe *wpipe; 1041 1042 if ((hint & NOTE_SUBMIT) == 0) { 1043 mutex_enter(rpipe->pipe_lock); 1044 } 1045 wpipe = rpipe->pipe_peer; 1046 kn->kn_data = rpipe->pipe_buffer.cnt; 1047 1048 if ((rpipe->pipe_state & PIPE_EOF) || 1049 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1050 kn->kn_flags |= EV_EOF; 1051 if ((hint & NOTE_SUBMIT) == 0) { 1052 mutex_exit(rpipe->pipe_lock); 1053 } 1054 return (1); 1055 } 1056 1057 if ((hint & NOTE_SUBMIT) == 0) { 1058 mutex_exit(rpipe->pipe_lock); 1059 } 1060 return (kn->kn_data > 0); 1061 } 1062 1063 static int 1064 filt_pipewrite(struct knote *kn, long hint) 1065 { 1066 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1067 struct pipe *wpipe; 1068 1069 if ((hint & NOTE_SUBMIT) == 0) { 1070 mutex_enter(rpipe->pipe_lock); 1071 } 1072 wpipe = rpipe->pipe_peer; 1073 1074 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1075 kn->kn_data = 0; 1076 kn->kn_flags |= EV_EOF; 1077 if ((hint & NOTE_SUBMIT) == 0) { 1078 mutex_exit(rpipe->pipe_lock); 1079 } 1080 return (1); 1081 } 1082 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 1083 1084 if ((hint & NOTE_SUBMIT) == 0) { 1085 mutex_exit(rpipe->pipe_lock); 1086 } 1087 return (kn->kn_data >= PIPE_BUF); 1088 } 1089 1090 static const struct filterops pipe_rfiltops = { 1091 .f_isfd = 1, 1092 .f_attach = NULL, 1093 .f_detach = filt_pipedetach, 1094 .f_event = filt_piperead, 1095 }; 1096 1097 static const struct filterops pipe_wfiltops = { 1098 .f_isfd = 1, 1099 .f_attach = NULL, 1100 .f_detach = filt_pipedetach, 1101 .f_event = filt_pipewrite, 1102 }; 1103 1104 static int 1105 pipe_kqfilter(file_t *fp, struct knote *kn) 1106 { 1107 struct pipe *pipe; 1108 kmutex_t *lock; 1109 1110 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1111 lock = pipe->pipe_lock; 1112 1113 mutex_enter(lock); 1114 1115 switch (kn->kn_filter) { 1116 case EVFILT_READ: 1117 kn->kn_fop = &pipe_rfiltops; 1118 break; 1119 case EVFILT_WRITE: 1120 kn->kn_fop = &pipe_wfiltops; 1121 pipe = pipe->pipe_peer; 1122 if (pipe == NULL) { 1123 /* Other end of pipe has been closed. */ 1124 mutex_exit(lock); 1125 return (EBADF); 1126 } 1127 break; 1128 default: 1129 mutex_exit(lock); 1130 return (EINVAL); 1131 } 1132 1133 kn->kn_hook = pipe; 1134 SLIST_INSERT_HEAD(&pipe->pipe_sel.sel_klist, kn, kn_selnext); 1135 mutex_exit(lock); 1136 1137 return (0); 1138 } 1139 1140 /* 1141 * Handle pipe sysctls. 1142 */ 1143 SYSCTL_SETUP(sysctl_kern_pipe_setup, "sysctl kern.pipe subtree setup") 1144 { 1145 1146 sysctl_createv(clog, 0, NULL, NULL, 1147 CTLFLAG_PERMANENT, 1148 CTLTYPE_NODE, "pipe", 1149 SYSCTL_DESCR("Pipe settings"), 1150 NULL, 0, NULL, 0, 1151 CTL_KERN, KERN_PIPE, CTL_EOL); 1152 1153 sysctl_createv(clog, 0, NULL, NULL, 1154 CTLFLAG_PERMANENT|CTLFLAG_READWRITE, 1155 CTLTYPE_INT, "maxbigpipes", 1156 SYSCTL_DESCR("Maximum number of \"big\" pipes"), 1157 NULL, 0, &maxbigpipes, 0, 1158 CTL_KERN, KERN_PIPE, KERN_PIPE_MAXBIGPIPES, CTL_EOL); 1159 sysctl_createv(clog, 0, NULL, NULL, 1160 CTLFLAG_PERMANENT, 1161 CTLTYPE_INT, "nbigpipes", 1162 SYSCTL_DESCR("Number of \"big\" pipes"), 1163 NULL, 0, &nbigpipe, 0, 1164 CTL_KERN, KERN_PIPE, KERN_PIPE_NBIGPIPES, CTL_EOL); 1165 sysctl_createv(clog, 0, NULL, NULL, 1166 CTLFLAG_PERMANENT, 1167 CTLTYPE_INT, "kvasize", 1168 SYSCTL_DESCR("Amount of kernel memory consumed by pipe " 1169 "buffers"), 1170 NULL, 0, &amountpipekva, 0, 1171 CTL_KERN, KERN_PIPE, KERN_PIPE_KVASIZE, CTL_EOL); 1172 } 1173