1 /* $NetBSD: sys_pipe.c,v 1.158 2021/10/11 01:07:36 thorpej Exp $ */ 2 3 /*- 4 * Copyright (c) 2003, 2007, 2008, 2009 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Paul Kranenburg, and by Andrew Doran. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * Copyright (c) 1996 John S. Dyson 34 * All rights reserved. 35 * 36 * Redistribution and use in source and binary forms, with or without 37 * modification, are permitted provided that the following conditions 38 * are met: 39 * 1. Redistributions of source code must retain the above copyright 40 * notice immediately at the beginning of the file, without modification, 41 * this list of conditions, and the following disclaimer. 42 * 2. Redistributions in binary form must reproduce the above copyright 43 * notice, this list of conditions and the following disclaimer in the 44 * documentation and/or other materials provided with the distribution. 45 * 3. Absolutely no warranty of function or purpose is made by the author 46 * John S. Dyson. 47 * 4. Modifications may be freely made to this file if the above conditions 48 * are met. 49 */ 50 51 /* 52 * This file contains a high-performance replacement for the socket-based 53 * pipes scheme originally used. It does not support all features of 54 * sockets, but does do everything that pipes normally do. 55 * 56 * This code has two modes of operation, a small write mode and a large 57 * write mode. The small write mode acts like conventional pipes with 58 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 59 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 60 * and PIPE_SIZE in size it is mapped read-only into the kernel address space 61 * using the UVM page loan facility from where the receiving process can copy 62 * the data directly from the pages in the sending process. 63 * 64 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 65 * happen for small transfers so that the system will not spend all of 66 * its time context switching. PIPE_SIZE is constrained by the 67 * amount of kernel virtual memory. 68 */ 69 70 #include <sys/cdefs.h> 71 __KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.158 2021/10/11 01:07:36 thorpej Exp $"); 72 73 #include <sys/param.h> 74 #include <sys/systm.h> 75 #include <sys/proc.h> 76 #include <sys/fcntl.h> 77 #include <sys/file.h> 78 #include <sys/filedesc.h> 79 #include <sys/filio.h> 80 #include <sys/kernel.h> 81 #include <sys/ttycom.h> 82 #include <sys/stat.h> 83 #include <sys/poll.h> 84 #include <sys/signalvar.h> 85 #include <sys/vnode.h> 86 #include <sys/uio.h> 87 #include <sys/select.h> 88 #include <sys/mount.h> 89 #include <sys/syscallargs.h> 90 #include <sys/sysctl.h> 91 #include <sys/kauth.h> 92 #include <sys/atomic.h> 93 #include <sys/pipe.h> 94 95 static int pipe_read(file_t *, off_t *, struct uio *, kauth_cred_t, int); 96 static int pipe_write(file_t *, off_t *, struct uio *, kauth_cred_t, int); 97 static int pipe_close(file_t *); 98 static int pipe_poll(file_t *, int); 99 static int pipe_kqfilter(file_t *, struct knote *); 100 static int pipe_stat(file_t *, struct stat *); 101 static int pipe_ioctl(file_t *, u_long, void *); 102 static void pipe_restart(file_t *); 103 104 static const struct fileops pipeops = { 105 .fo_name = "pipe", 106 .fo_read = pipe_read, 107 .fo_write = pipe_write, 108 .fo_ioctl = pipe_ioctl, 109 .fo_fcntl = fnullop_fcntl, 110 .fo_poll = pipe_poll, 111 .fo_stat = pipe_stat, 112 .fo_close = pipe_close, 113 .fo_kqfilter = pipe_kqfilter, 114 .fo_restart = pipe_restart, 115 }; 116 117 /* 118 * Default pipe buffer size(s), this can be kind-of large now because pipe 119 * space is pageable. The pipe code will try to maintain locality of 120 * reference for performance reasons, so small amounts of outstanding I/O 121 * will not wipe the cache. 122 */ 123 #define MINPIPESIZE (PIPE_SIZE / 3) 124 #define MAXPIPESIZE (2 * PIPE_SIZE / 3) 125 126 /* 127 * Limit the number of "big" pipes 128 */ 129 #define LIMITBIGPIPES 32 130 static u_int maxbigpipes = LIMITBIGPIPES; 131 static u_int nbigpipe = 0; 132 133 /* 134 * Amount of KVA consumed by pipe buffers. 135 */ 136 static u_int amountpipekva = 0; 137 138 static void pipeclose(struct pipe *); 139 static void pipe_free_kmem(struct pipe *); 140 static int pipe_create(struct pipe **, pool_cache_t); 141 static int pipelock(struct pipe *, bool); 142 static inline void pipeunlock(struct pipe *); 143 static void pipeselwakeup(struct pipe *, struct pipe *, int); 144 static int pipespace(struct pipe *, int); 145 static int pipe_ctor(void *, void *, int); 146 static void pipe_dtor(void *, void *); 147 148 static pool_cache_t pipe_wr_cache; 149 static pool_cache_t pipe_rd_cache; 150 151 void 152 pipe_init(void) 153 { 154 155 /* Writer side is not automatically allocated KVA. */ 156 pipe_wr_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "pipewr", 157 NULL, IPL_NONE, pipe_ctor, pipe_dtor, NULL); 158 KASSERT(pipe_wr_cache != NULL); 159 160 /* Reader side gets preallocated KVA. */ 161 pipe_rd_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "piperd", 162 NULL, IPL_NONE, pipe_ctor, pipe_dtor, (void *)1); 163 KASSERT(pipe_rd_cache != NULL); 164 } 165 166 static int 167 pipe_ctor(void *arg, void *obj, int flags) 168 { 169 struct pipe *pipe; 170 vaddr_t va; 171 172 pipe = obj; 173 174 memset(pipe, 0, sizeof(struct pipe)); 175 if (arg != NULL) { 176 /* Preallocate space. */ 177 va = uvm_km_alloc(kernel_map, PIPE_SIZE, 0, 178 UVM_KMF_PAGEABLE | UVM_KMF_WAITVA); 179 KASSERT(va != 0); 180 pipe->pipe_kmem = va; 181 atomic_add_int(&amountpipekva, PIPE_SIZE); 182 } 183 cv_init(&pipe->pipe_rcv, "pipe_rd"); 184 cv_init(&pipe->pipe_wcv, "pipe_wr"); 185 cv_init(&pipe->pipe_draincv, "pipe_drn"); 186 cv_init(&pipe->pipe_lkcv, "pipe_lk"); 187 selinit(&pipe->pipe_sel); 188 pipe->pipe_state = PIPE_SIGNALR; 189 190 return 0; 191 } 192 193 static void 194 pipe_dtor(void *arg, void *obj) 195 { 196 struct pipe *pipe; 197 198 pipe = obj; 199 200 cv_destroy(&pipe->pipe_rcv); 201 cv_destroy(&pipe->pipe_wcv); 202 cv_destroy(&pipe->pipe_draincv); 203 cv_destroy(&pipe->pipe_lkcv); 204 seldestroy(&pipe->pipe_sel); 205 if (pipe->pipe_kmem != 0) { 206 uvm_km_free(kernel_map, pipe->pipe_kmem, PIPE_SIZE, 207 UVM_KMF_PAGEABLE); 208 atomic_add_int(&amountpipekva, -PIPE_SIZE); 209 } 210 } 211 212 /* 213 * The pipe system call for the DTYPE_PIPE type of pipes 214 */ 215 int 216 pipe1(struct lwp *l, int *fildes, int flags) 217 { 218 struct pipe *rpipe, *wpipe; 219 file_t *rf, *wf; 220 int fd, error; 221 proc_t *p; 222 223 if (flags & ~(O_CLOEXEC|O_NONBLOCK|O_NOSIGPIPE)) 224 return EINVAL; 225 p = curproc; 226 rpipe = wpipe = NULL; 227 if ((error = pipe_create(&rpipe, pipe_rd_cache)) || 228 (error = pipe_create(&wpipe, pipe_wr_cache))) { 229 goto free2; 230 } 231 rpipe->pipe_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE); 232 wpipe->pipe_lock = rpipe->pipe_lock; 233 mutex_obj_hold(wpipe->pipe_lock); 234 235 error = fd_allocfile(&rf, &fd); 236 if (error) 237 goto free2; 238 fildes[0] = fd; 239 240 error = fd_allocfile(&wf, &fd); 241 if (error) 242 goto free3; 243 fildes[1] = fd; 244 245 rf->f_flag = FREAD | flags; 246 rf->f_type = DTYPE_PIPE; 247 rf->f_pipe = rpipe; 248 rf->f_ops = &pipeops; 249 fd_set_exclose(l, fildes[0], (flags & O_CLOEXEC) != 0); 250 251 wf->f_flag = FWRITE | flags; 252 wf->f_type = DTYPE_PIPE; 253 wf->f_pipe = wpipe; 254 wf->f_ops = &pipeops; 255 fd_set_exclose(l, fildes[1], (flags & O_CLOEXEC) != 0); 256 257 rpipe->pipe_peer = wpipe; 258 wpipe->pipe_peer = rpipe; 259 260 fd_affix(p, rf, fildes[0]); 261 fd_affix(p, wf, fildes[1]); 262 return (0); 263 free3: 264 fd_abort(p, rf, fildes[0]); 265 free2: 266 pipeclose(wpipe); 267 pipeclose(rpipe); 268 269 return (error); 270 } 271 272 /* 273 * Allocate kva for pipe circular buffer, the space is pageable 274 * This routine will 'realloc' the size of a pipe safely, if it fails 275 * it will retain the old buffer. 276 * If it fails it will return ENOMEM. 277 */ 278 static int 279 pipespace(struct pipe *pipe, int size) 280 { 281 void *buffer; 282 283 /* 284 * Allocate pageable virtual address space. Physical memory is 285 * allocated on demand. 286 */ 287 if (size == PIPE_SIZE && pipe->pipe_kmem != 0) { 288 buffer = (void *)pipe->pipe_kmem; 289 } else { 290 buffer = (void *)uvm_km_alloc(kernel_map, round_page(size), 291 0, UVM_KMF_PAGEABLE); 292 if (buffer == NULL) 293 return (ENOMEM); 294 atomic_add_int(&amountpipekva, size); 295 } 296 297 /* free old resources if we're resizing */ 298 pipe_free_kmem(pipe); 299 pipe->pipe_buffer.buffer = buffer; 300 pipe->pipe_buffer.size = size; 301 pipe->pipe_buffer.in = 0; 302 pipe->pipe_buffer.out = 0; 303 pipe->pipe_buffer.cnt = 0; 304 return (0); 305 } 306 307 /* 308 * Initialize and allocate VM and memory for pipe. 309 */ 310 static int 311 pipe_create(struct pipe **pipep, pool_cache_t cache) 312 { 313 struct pipe *pipe; 314 int error; 315 316 pipe = pool_cache_get(cache, PR_WAITOK); 317 KASSERT(pipe != NULL); 318 *pipep = pipe; 319 error = 0; 320 getnanotime(&pipe->pipe_btime); 321 pipe->pipe_atime = pipe->pipe_mtime = pipe->pipe_btime; 322 pipe->pipe_lock = NULL; 323 if (cache == pipe_rd_cache) { 324 error = pipespace(pipe, PIPE_SIZE); 325 } else { 326 pipe->pipe_buffer.buffer = NULL; 327 pipe->pipe_buffer.size = 0; 328 pipe->pipe_buffer.in = 0; 329 pipe->pipe_buffer.out = 0; 330 pipe->pipe_buffer.cnt = 0; 331 } 332 return error; 333 } 334 335 /* 336 * Lock a pipe for I/O, blocking other access 337 * Called with pipe spin lock held. 338 */ 339 static int 340 pipelock(struct pipe *pipe, bool catch_p) 341 { 342 int error; 343 344 KASSERT(mutex_owned(pipe->pipe_lock)); 345 346 while (pipe->pipe_state & PIPE_LOCKFL) { 347 pipe->pipe_waiters++; 348 KASSERT(pipe->pipe_waiters != 0); /* just in case */ 349 if (catch_p) { 350 error = cv_wait_sig(&pipe->pipe_lkcv, pipe->pipe_lock); 351 if (error != 0) { 352 KASSERT(pipe->pipe_waiters > 0); 353 pipe->pipe_waiters--; 354 return error; 355 } 356 } else 357 cv_wait(&pipe->pipe_lkcv, pipe->pipe_lock); 358 KASSERT(pipe->pipe_waiters > 0); 359 pipe->pipe_waiters--; 360 } 361 362 pipe->pipe_state |= PIPE_LOCKFL; 363 364 return 0; 365 } 366 367 /* 368 * unlock a pipe I/O lock 369 */ 370 static inline void 371 pipeunlock(struct pipe *pipe) 372 { 373 374 KASSERT(pipe->pipe_state & PIPE_LOCKFL); 375 376 pipe->pipe_state &= ~PIPE_LOCKFL; 377 if (pipe->pipe_waiters > 0) { 378 cv_signal(&pipe->pipe_lkcv); 379 } 380 } 381 382 /* 383 * Select/poll wakup. This also sends SIGIO to peer connected to 384 * 'sigpipe' side of pipe. 385 */ 386 static void 387 pipeselwakeup(struct pipe *selp, struct pipe *sigp, int code) 388 { 389 int band; 390 391 switch (code) { 392 case POLL_IN: 393 band = POLLIN|POLLRDNORM; 394 break; 395 case POLL_OUT: 396 band = POLLOUT|POLLWRNORM; 397 break; 398 case POLL_HUP: 399 band = POLLHUP; 400 break; 401 case POLL_ERR: 402 band = POLLERR; 403 break; 404 default: 405 band = 0; 406 #ifdef DIAGNOSTIC 407 printf("bad siginfo code %d in pipe notification.\n", code); 408 #endif 409 break; 410 } 411 412 selnotify(&selp->pipe_sel, band, NOTE_SUBMIT); 413 414 if (sigp == NULL || (sigp->pipe_state & PIPE_ASYNC) == 0) 415 return; 416 417 fownsignal(sigp->pipe_pgid, SIGIO, code, band, selp); 418 } 419 420 static int 421 pipe_read(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 422 int flags) 423 { 424 struct pipe *rpipe = fp->f_pipe; 425 struct pipebuf *bp = &rpipe->pipe_buffer; 426 kmutex_t *lock = rpipe->pipe_lock; 427 int error; 428 size_t nread = 0; 429 size_t size; 430 size_t ocnt; 431 unsigned int wakeup_state = 0; 432 433 mutex_enter(lock); 434 ++rpipe->pipe_busy; 435 ocnt = bp->cnt; 436 437 again: 438 error = pipelock(rpipe, true); 439 if (error) 440 goto unlocked_error; 441 442 while (uio->uio_resid) { 443 /* 444 * Normal pipe buffer receive. 445 */ 446 if (bp->cnt > 0) { 447 size = bp->size - bp->out; 448 if (size > bp->cnt) 449 size = bp->cnt; 450 if (size > uio->uio_resid) 451 size = uio->uio_resid; 452 453 mutex_exit(lock); 454 error = uiomove((char *)bp->buffer + bp->out, size, uio); 455 mutex_enter(lock); 456 if (error) 457 break; 458 459 bp->out += size; 460 if (bp->out >= bp->size) 461 bp->out = 0; 462 463 bp->cnt -= size; 464 465 /* 466 * If there is no more to read in the pipe, reset 467 * its pointers to the beginning. This improves 468 * cache hit stats. 469 */ 470 if (bp->cnt == 0) { 471 bp->in = 0; 472 bp->out = 0; 473 } 474 nread += size; 475 continue; 476 } 477 478 /* 479 * Break if some data was read. 480 */ 481 if (nread > 0) 482 break; 483 484 /* 485 * Detect EOF condition. 486 * Read returns 0 on EOF, no need to set error. 487 */ 488 if (rpipe->pipe_state & PIPE_EOF) 489 break; 490 491 /* 492 * Don't block on non-blocking I/O. 493 */ 494 if (fp->f_flag & FNONBLOCK) { 495 error = EAGAIN; 496 break; 497 } 498 499 /* 500 * Unlock the pipe buffer for our remaining processing. 501 * We will either break out with an error or we will 502 * sleep and relock to loop. 503 */ 504 pipeunlock(rpipe); 505 506 #if 1 /* XXX (dsl) I'm sure these aren't needed here ... */ 507 /* 508 * We want to read more, wake up select/poll. 509 */ 510 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 511 512 /* 513 * If the "write-side" is blocked, wake it up now. 514 */ 515 cv_broadcast(&rpipe->pipe_wcv); 516 #endif 517 518 if (wakeup_state & PIPE_RESTART) { 519 error = ERESTART; 520 goto unlocked_error; 521 } 522 523 /* Now wait until the pipe is filled */ 524 error = cv_wait_sig(&rpipe->pipe_rcv, lock); 525 if (error != 0) 526 goto unlocked_error; 527 wakeup_state = rpipe->pipe_state; 528 goto again; 529 } 530 531 if (error == 0) 532 getnanotime(&rpipe->pipe_atime); 533 pipeunlock(rpipe); 534 535 unlocked_error: 536 --rpipe->pipe_busy; 537 if (rpipe->pipe_busy == 0) { 538 rpipe->pipe_state &= ~PIPE_RESTART; 539 cv_broadcast(&rpipe->pipe_draincv); 540 } 541 if (bp->cnt < MINPIPESIZE) { 542 cv_broadcast(&rpipe->pipe_wcv); 543 } 544 545 /* 546 * If anything was read off the buffer, signal to the writer it's 547 * possible to write more data. Also send signal if we are here for the 548 * first time after last write. 549 */ 550 if ((bp->size - bp->cnt) >= PIPE_BUF 551 && (ocnt != bp->cnt || (rpipe->pipe_state & PIPE_SIGNALR))) { 552 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 553 rpipe->pipe_state &= ~PIPE_SIGNALR; 554 } 555 556 mutex_exit(lock); 557 return (error); 558 } 559 560 static int 561 pipe_write(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 562 int flags) 563 { 564 struct pipe *wpipe, *rpipe; 565 struct pipebuf *bp; 566 kmutex_t *lock; 567 int error; 568 unsigned int wakeup_state = 0; 569 570 /* We want to write to our peer */ 571 rpipe = fp->f_pipe; 572 lock = rpipe->pipe_lock; 573 error = 0; 574 575 mutex_enter(lock); 576 wpipe = rpipe->pipe_peer; 577 578 /* 579 * Detect loss of pipe read side, issue SIGPIPE if lost. 580 */ 581 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) != 0) { 582 mutex_exit(lock); 583 return EPIPE; 584 } 585 ++wpipe->pipe_busy; 586 587 /* Acquire the long-term pipe lock */ 588 if ((error = pipelock(wpipe, true)) != 0) { 589 --wpipe->pipe_busy; 590 if (wpipe->pipe_busy == 0) { 591 wpipe->pipe_state &= ~PIPE_RESTART; 592 cv_broadcast(&wpipe->pipe_draincv); 593 } 594 mutex_exit(lock); 595 return (error); 596 } 597 598 bp = &wpipe->pipe_buffer; 599 600 /* 601 * If it is advantageous to resize the pipe buffer, do so. 602 */ 603 if ((uio->uio_resid > PIPE_SIZE) && 604 (nbigpipe < maxbigpipes) && 605 (bp->size <= PIPE_SIZE) && (bp->cnt == 0)) { 606 607 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 608 atomic_inc_uint(&nbigpipe); 609 } 610 611 while (uio->uio_resid) { 612 size_t space; 613 614 space = bp->size - bp->cnt; 615 616 /* Writes of size <= PIPE_BUF must be atomic. */ 617 if ((space < uio->uio_resid) && (uio->uio_resid <= PIPE_BUF)) 618 space = 0; 619 620 if (space > 0) { 621 int size; /* Transfer size */ 622 int segsize; /* first segment to transfer */ 623 624 /* 625 * Transfer size is minimum of uio transfer 626 * and free space in pipe buffer. 627 */ 628 if (space > uio->uio_resid) 629 size = uio->uio_resid; 630 else 631 size = space; 632 /* 633 * First segment to transfer is minimum of 634 * transfer size and contiguous space in 635 * pipe buffer. If first segment to transfer 636 * is less than the transfer size, we've got 637 * a wraparound in the buffer. 638 */ 639 segsize = bp->size - bp->in; 640 if (segsize > size) 641 segsize = size; 642 643 /* Transfer first segment */ 644 mutex_exit(lock); 645 error = uiomove((char *)bp->buffer + bp->in, segsize, 646 uio); 647 648 if (error == 0 && segsize < size) { 649 /* 650 * Transfer remaining part now, to 651 * support atomic writes. Wraparound 652 * happened. 653 */ 654 KASSERT(bp->in + segsize == bp->size); 655 error = uiomove(bp->buffer, 656 size - segsize, uio); 657 } 658 mutex_enter(lock); 659 if (error) 660 break; 661 662 bp->in += size; 663 if (bp->in >= bp->size) { 664 KASSERT(bp->in == size - segsize + bp->size); 665 bp->in = size - segsize; 666 } 667 668 bp->cnt += size; 669 KASSERT(bp->cnt <= bp->size); 670 wakeup_state = 0; 671 } else { 672 /* 673 * If the "read-side" has been blocked, wake it up now. 674 */ 675 cv_broadcast(&wpipe->pipe_rcv); 676 677 /* 678 * Don't block on non-blocking I/O. 679 */ 680 if (fp->f_flag & FNONBLOCK) { 681 error = EAGAIN; 682 break; 683 } 684 685 /* 686 * We have no more space and have something to offer, 687 * wake up select/poll. 688 */ 689 if (bp->cnt) 690 pipeselwakeup(wpipe, wpipe, POLL_IN); 691 692 if (wakeup_state & PIPE_RESTART) { 693 error = ERESTART; 694 break; 695 } 696 697 /* 698 * If read side wants to go away, we just issue a signal 699 * to ourselves. 700 */ 701 if (wpipe->pipe_state & PIPE_EOF) { 702 error = EPIPE; 703 break; 704 } 705 706 pipeunlock(wpipe); 707 error = cv_wait_sig(&wpipe->pipe_wcv, lock); 708 (void)pipelock(wpipe, false); 709 if (error != 0) 710 break; 711 wakeup_state = wpipe->pipe_state; 712 } 713 } 714 715 --wpipe->pipe_busy; 716 if (wpipe->pipe_busy == 0) { 717 wpipe->pipe_state &= ~PIPE_RESTART; 718 cv_broadcast(&wpipe->pipe_draincv); 719 } 720 if (bp->cnt > 0) { 721 cv_broadcast(&wpipe->pipe_rcv); 722 } 723 724 /* 725 * Don't return EPIPE if I/O was successful 726 */ 727 if (error == EPIPE && bp->cnt == 0 && uio->uio_resid == 0) 728 error = 0; 729 730 if (error == 0) 731 getnanotime(&wpipe->pipe_mtime); 732 733 /* 734 * We have something to offer, wake up select/poll. 735 * wmap->cnt is always 0 in this point (direct write 736 * is only done synchronously), so check only wpipe->pipe_buffer.cnt 737 */ 738 if (bp->cnt) 739 pipeselwakeup(wpipe, wpipe, POLL_IN); 740 741 /* 742 * Arrange for next read(2) to do a signal. 743 */ 744 wpipe->pipe_state |= PIPE_SIGNALR; 745 746 pipeunlock(wpipe); 747 mutex_exit(lock); 748 return (error); 749 } 750 751 /* 752 * We implement a very minimal set of ioctls for compatibility with sockets. 753 */ 754 int 755 pipe_ioctl(file_t *fp, u_long cmd, void *data) 756 { 757 struct pipe *pipe = fp->f_pipe; 758 kmutex_t *lock = pipe->pipe_lock; 759 760 switch (cmd) { 761 762 case FIONBIO: 763 return (0); 764 765 case FIOASYNC: 766 mutex_enter(lock); 767 if (*(int *)data) { 768 pipe->pipe_state |= PIPE_ASYNC; 769 } else { 770 pipe->pipe_state &= ~PIPE_ASYNC; 771 } 772 mutex_exit(lock); 773 return (0); 774 775 case FIONREAD: 776 mutex_enter(lock); 777 *(int *)data = pipe->pipe_buffer.cnt; 778 mutex_exit(lock); 779 return (0); 780 781 case FIONWRITE: 782 /* Look at other side */ 783 mutex_enter(lock); 784 pipe = pipe->pipe_peer; 785 if (pipe == NULL) 786 *(int *)data = 0; 787 else 788 *(int *)data = pipe->pipe_buffer.cnt; 789 mutex_exit(lock); 790 return (0); 791 792 case FIONSPACE: 793 /* Look at other side */ 794 mutex_enter(lock); 795 pipe = pipe->pipe_peer; 796 if (pipe == NULL) 797 *(int *)data = 0; 798 else 799 *(int *)data = pipe->pipe_buffer.size - 800 pipe->pipe_buffer.cnt; 801 mutex_exit(lock); 802 return (0); 803 804 case TIOCSPGRP: 805 case FIOSETOWN: 806 return fsetown(&pipe->pipe_pgid, cmd, data); 807 808 case TIOCGPGRP: 809 case FIOGETOWN: 810 return fgetown(pipe->pipe_pgid, cmd, data); 811 812 } 813 return (EPASSTHROUGH); 814 } 815 816 int 817 pipe_poll(file_t *fp, int events) 818 { 819 struct pipe *rpipe = fp->f_pipe; 820 struct pipe *wpipe; 821 int eof = 0; 822 int revents = 0; 823 824 mutex_enter(rpipe->pipe_lock); 825 wpipe = rpipe->pipe_peer; 826 827 if (events & (POLLIN | POLLRDNORM)) 828 if ((rpipe->pipe_buffer.cnt > 0) || 829 (rpipe->pipe_state & PIPE_EOF)) 830 revents |= events & (POLLIN | POLLRDNORM); 831 832 eof |= (rpipe->pipe_state & PIPE_EOF); 833 834 if (wpipe == NULL) 835 revents |= events & (POLLOUT | POLLWRNORM); 836 else { 837 if (events & (POLLOUT | POLLWRNORM)) 838 if ((wpipe->pipe_state & PIPE_EOF) || ( 839 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) 840 revents |= events & (POLLOUT | POLLWRNORM); 841 842 eof |= (wpipe->pipe_state & PIPE_EOF); 843 } 844 845 if (wpipe == NULL || eof) 846 revents |= POLLHUP; 847 848 if (revents == 0) { 849 if (events & (POLLIN | POLLRDNORM)) 850 selrecord(curlwp, &rpipe->pipe_sel); 851 852 if (events & (POLLOUT | POLLWRNORM)) 853 selrecord(curlwp, &wpipe->pipe_sel); 854 } 855 mutex_exit(rpipe->pipe_lock); 856 857 return (revents); 858 } 859 860 static int 861 pipe_stat(file_t *fp, struct stat *ub) 862 { 863 struct pipe *pipe = fp->f_pipe; 864 865 mutex_enter(pipe->pipe_lock); 866 memset(ub, 0, sizeof(*ub)); 867 ub->st_mode = S_IFIFO | S_IRUSR | S_IWUSR; 868 ub->st_blksize = pipe->pipe_buffer.size; 869 if (ub->st_blksize == 0 && pipe->pipe_peer) 870 ub->st_blksize = pipe->pipe_peer->pipe_buffer.size; 871 ub->st_size = pipe->pipe_buffer.cnt; 872 ub->st_blocks = (ub->st_size) ? 1 : 0; 873 ub->st_atimespec = pipe->pipe_atime; 874 ub->st_mtimespec = pipe->pipe_mtime; 875 ub->st_ctimespec = ub->st_birthtimespec = pipe->pipe_btime; 876 ub->st_uid = kauth_cred_geteuid(fp->f_cred); 877 ub->st_gid = kauth_cred_getegid(fp->f_cred); 878 879 /* 880 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 881 * XXX (st_dev, st_ino) should be unique. 882 */ 883 mutex_exit(pipe->pipe_lock); 884 return 0; 885 } 886 887 static int 888 pipe_close(file_t *fp) 889 { 890 struct pipe *pipe = fp->f_pipe; 891 892 fp->f_pipe = NULL; 893 pipeclose(pipe); 894 return (0); 895 } 896 897 static void 898 pipe_restart(file_t *fp) 899 { 900 struct pipe *pipe = fp->f_pipe; 901 902 /* 903 * Unblock blocked reads/writes in order to allow close() to complete. 904 * System calls return ERESTART so that the fd is revalidated. 905 * (Partial writes return the transfer length.) 906 */ 907 mutex_enter(pipe->pipe_lock); 908 pipe->pipe_state |= PIPE_RESTART; 909 /* Wakeup both cvs, maybe we only need one, but maybe there are some 910 * other paths where wakeup is needed, and it saves deciding which! */ 911 cv_broadcast(&pipe->pipe_rcv); 912 cv_broadcast(&pipe->pipe_wcv); 913 mutex_exit(pipe->pipe_lock); 914 } 915 916 static void 917 pipe_free_kmem(struct pipe *pipe) 918 { 919 920 if (pipe->pipe_buffer.buffer != NULL) { 921 if (pipe->pipe_buffer.size > PIPE_SIZE) { 922 atomic_dec_uint(&nbigpipe); 923 } 924 if (pipe->pipe_buffer.buffer != (void *)pipe->pipe_kmem) { 925 uvm_km_free(kernel_map, 926 (vaddr_t)pipe->pipe_buffer.buffer, 927 pipe->pipe_buffer.size, UVM_KMF_PAGEABLE); 928 atomic_add_int(&amountpipekva, 929 -pipe->pipe_buffer.size); 930 } 931 pipe->pipe_buffer.buffer = NULL; 932 } 933 } 934 935 /* 936 * Shutdown the pipe. 937 */ 938 static void 939 pipeclose(struct pipe *pipe) 940 { 941 kmutex_t *lock; 942 struct pipe *ppipe; 943 944 if (pipe == NULL) 945 return; 946 947 KASSERT(cv_is_valid(&pipe->pipe_rcv)); 948 KASSERT(cv_is_valid(&pipe->pipe_wcv)); 949 KASSERT(cv_is_valid(&pipe->pipe_draincv)); 950 KASSERT(cv_is_valid(&pipe->pipe_lkcv)); 951 952 lock = pipe->pipe_lock; 953 if (lock == NULL) 954 /* Must have failed during create */ 955 goto free_resources; 956 957 mutex_enter(lock); 958 pipeselwakeup(pipe, pipe, POLL_HUP); 959 960 /* 961 * If the other side is blocked, wake it up saying that 962 * we want to close it down. 963 */ 964 pipe->pipe_state |= PIPE_EOF; 965 if (pipe->pipe_busy) { 966 while (pipe->pipe_busy) { 967 cv_broadcast(&pipe->pipe_wcv); 968 cv_wait_sig(&pipe->pipe_draincv, lock); 969 } 970 } 971 972 /* 973 * Disconnect from peer. 974 */ 975 if ((ppipe = pipe->pipe_peer) != NULL) { 976 pipeselwakeup(ppipe, ppipe, POLL_HUP); 977 ppipe->pipe_state |= PIPE_EOF; 978 cv_broadcast(&ppipe->pipe_rcv); 979 ppipe->pipe_peer = NULL; 980 } 981 982 /* 983 * Any knote objects still left in the list are 984 * the one attached by peer. Since no one will 985 * traverse this list, we just clear it. 986 * 987 * XXX Exposes select/kqueue internals. 988 */ 989 SLIST_INIT(&pipe->pipe_sel.sel_klist); 990 991 KASSERT((pipe->pipe_state & PIPE_LOCKFL) == 0); 992 mutex_exit(lock); 993 mutex_obj_free(lock); 994 995 /* 996 * Free resources. 997 */ 998 free_resources: 999 pipe->pipe_pgid = 0; 1000 pipe->pipe_state = PIPE_SIGNALR; 1001 pipe->pipe_peer = NULL; 1002 pipe->pipe_lock = NULL; 1003 pipe_free_kmem(pipe); 1004 if (pipe->pipe_kmem != 0) { 1005 pool_cache_put(pipe_rd_cache, pipe); 1006 } else { 1007 pool_cache_put(pipe_wr_cache, pipe); 1008 } 1009 } 1010 1011 static void 1012 filt_pipedetach(struct knote *kn) 1013 { 1014 struct pipe *pipe; 1015 kmutex_t *lock; 1016 1017 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1018 lock = pipe->pipe_lock; 1019 1020 mutex_enter(lock); 1021 1022 switch(kn->kn_filter) { 1023 case EVFILT_WRITE: 1024 /* Need the peer structure, not our own. */ 1025 pipe = pipe->pipe_peer; 1026 1027 /* If reader end already closed, just return. */ 1028 if (pipe == NULL) { 1029 mutex_exit(lock); 1030 return; 1031 } 1032 1033 break; 1034 default: 1035 /* Nothing to do. */ 1036 break; 1037 } 1038 1039 KASSERT(kn->kn_hook == pipe); 1040 selremove_knote(&pipe->pipe_sel, kn); 1041 mutex_exit(lock); 1042 } 1043 1044 static int 1045 filt_piperead(struct knote *kn, long hint) 1046 { 1047 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1048 struct pipe *wpipe; 1049 int rv; 1050 1051 if ((hint & NOTE_SUBMIT) == 0) { 1052 mutex_enter(rpipe->pipe_lock); 1053 } 1054 wpipe = rpipe->pipe_peer; 1055 kn->kn_data = rpipe->pipe_buffer.cnt; 1056 1057 if ((rpipe->pipe_state & PIPE_EOF) || 1058 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1059 knote_set_eof(kn, 0); 1060 rv = 1; 1061 } else { 1062 rv = kn->kn_data > 0; 1063 } 1064 1065 if ((hint & NOTE_SUBMIT) == 0) { 1066 mutex_exit(rpipe->pipe_lock); 1067 } 1068 return rv; 1069 } 1070 1071 static int 1072 filt_pipewrite(struct knote *kn, long hint) 1073 { 1074 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1075 struct pipe *wpipe; 1076 int rv; 1077 1078 if ((hint & NOTE_SUBMIT) == 0) { 1079 mutex_enter(rpipe->pipe_lock); 1080 } 1081 wpipe = rpipe->pipe_peer; 1082 1083 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1084 kn->kn_data = 0; 1085 knote_set_eof(kn, 0); 1086 rv = 1; 1087 } else { 1088 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 1089 rv = kn->kn_data >= PIPE_BUF; 1090 } 1091 1092 if ((hint & NOTE_SUBMIT) == 0) { 1093 mutex_exit(rpipe->pipe_lock); 1094 } 1095 return rv; 1096 } 1097 1098 static const struct filterops pipe_rfiltops = { 1099 .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE, 1100 .f_attach = NULL, 1101 .f_detach = filt_pipedetach, 1102 .f_event = filt_piperead, 1103 }; 1104 1105 static const struct filterops pipe_wfiltops = { 1106 .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE, 1107 .f_attach = NULL, 1108 .f_detach = filt_pipedetach, 1109 .f_event = filt_pipewrite, 1110 }; 1111 1112 static int 1113 pipe_kqfilter(file_t *fp, struct knote *kn) 1114 { 1115 struct pipe *pipe; 1116 kmutex_t *lock; 1117 1118 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1119 lock = pipe->pipe_lock; 1120 1121 mutex_enter(lock); 1122 1123 switch (kn->kn_filter) { 1124 case EVFILT_READ: 1125 kn->kn_fop = &pipe_rfiltops; 1126 break; 1127 case EVFILT_WRITE: 1128 kn->kn_fop = &pipe_wfiltops; 1129 pipe = pipe->pipe_peer; 1130 if (pipe == NULL) { 1131 /* Other end of pipe has been closed. */ 1132 mutex_exit(lock); 1133 return (EBADF); 1134 } 1135 break; 1136 default: 1137 mutex_exit(lock); 1138 return (EINVAL); 1139 } 1140 1141 kn->kn_hook = pipe; 1142 selrecord_knote(&pipe->pipe_sel, kn); 1143 mutex_exit(lock); 1144 1145 return (0); 1146 } 1147 1148 /* 1149 * Handle pipe sysctls. 1150 */ 1151 SYSCTL_SETUP(sysctl_kern_pipe_setup, "sysctl kern.pipe subtree setup") 1152 { 1153 1154 sysctl_createv(clog, 0, NULL, NULL, 1155 CTLFLAG_PERMANENT, 1156 CTLTYPE_NODE, "pipe", 1157 SYSCTL_DESCR("Pipe settings"), 1158 NULL, 0, NULL, 0, 1159 CTL_KERN, KERN_PIPE, CTL_EOL); 1160 1161 sysctl_createv(clog, 0, NULL, NULL, 1162 CTLFLAG_PERMANENT|CTLFLAG_READWRITE, 1163 CTLTYPE_INT, "maxbigpipes", 1164 SYSCTL_DESCR("Maximum number of \"big\" pipes"), 1165 NULL, 0, &maxbigpipes, 0, 1166 CTL_KERN, KERN_PIPE, KERN_PIPE_MAXBIGPIPES, CTL_EOL); 1167 sysctl_createv(clog, 0, NULL, NULL, 1168 CTLFLAG_PERMANENT, 1169 CTLTYPE_INT, "nbigpipes", 1170 SYSCTL_DESCR("Number of \"big\" pipes"), 1171 NULL, 0, &nbigpipe, 0, 1172 CTL_KERN, KERN_PIPE, KERN_PIPE_NBIGPIPES, CTL_EOL); 1173 sysctl_createv(clog, 0, NULL, NULL, 1174 CTLFLAG_PERMANENT, 1175 CTLTYPE_INT, "kvasize", 1176 SYSCTL_DESCR("Amount of kernel memory consumed by pipe " 1177 "buffers"), 1178 NULL, 0, &amountpipekva, 0, 1179 CTL_KERN, KERN_PIPE, KERN_PIPE_KVASIZE, CTL_EOL); 1180 } 1181