1 /* $NetBSD: sys_pipe.c,v 1.160 2023/04/22 13:53:02 riastradh Exp $ */ 2 3 /*- 4 * Copyright (c) 2003, 2007, 2008, 2009 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Paul Kranenburg, and by Andrew Doran. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * Copyright (c) 1996 John S. Dyson 34 * All rights reserved. 35 * 36 * Redistribution and use in source and binary forms, with or without 37 * modification, are permitted provided that the following conditions 38 * are met: 39 * 1. Redistributions of source code must retain the above copyright 40 * notice immediately at the beginning of the file, without modification, 41 * this list of conditions, and the following disclaimer. 42 * 2. Redistributions in binary form must reproduce the above copyright 43 * notice, this list of conditions and the following disclaimer in the 44 * documentation and/or other materials provided with the distribution. 45 * 3. Absolutely no warranty of function or purpose is made by the author 46 * John S. Dyson. 47 * 4. Modifications may be freely made to this file if the above conditions 48 * are met. 49 */ 50 51 /* 52 * This file contains a high-performance replacement for the socket-based 53 * pipes scheme originally used. It does not support all features of 54 * sockets, but does do everything that pipes normally do. 55 * 56 * This code has two modes of operation, a small write mode and a large 57 * write mode. The small write mode acts like conventional pipes with 58 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 59 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 60 * and PIPE_SIZE in size it is mapped read-only into the kernel address space 61 * using the UVM page loan facility from where the receiving process can copy 62 * the data directly from the pages in the sending process. 63 * 64 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 65 * happen for small transfers so that the system will not spend all of 66 * its time context switching. PIPE_SIZE is constrained by the 67 * amount of kernel virtual memory. 68 */ 69 70 #include <sys/cdefs.h> 71 __KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.160 2023/04/22 13:53:02 riastradh Exp $"); 72 73 #include <sys/param.h> 74 #include <sys/systm.h> 75 #include <sys/proc.h> 76 #include <sys/fcntl.h> 77 #include <sys/file.h> 78 #include <sys/filedesc.h> 79 #include <sys/filio.h> 80 #include <sys/kernel.h> 81 #include <sys/ttycom.h> 82 #include <sys/stat.h> 83 #include <sys/poll.h> 84 #include <sys/signalvar.h> 85 #include <sys/vnode.h> 86 #include <sys/uio.h> 87 #include <sys/select.h> 88 #include <sys/mount.h> 89 #include <sys/syscallargs.h> 90 #include <sys/sysctl.h> 91 #include <sys/kauth.h> 92 #include <sys/atomic.h> 93 #include <sys/pipe.h> 94 95 static int pipe_read(file_t *, off_t *, struct uio *, kauth_cred_t, int); 96 static int pipe_write(file_t *, off_t *, struct uio *, kauth_cred_t, int); 97 static int pipe_close(file_t *); 98 static int pipe_poll(file_t *, int); 99 static int pipe_kqfilter(file_t *, struct knote *); 100 static int pipe_stat(file_t *, struct stat *); 101 static int pipe_ioctl(file_t *, u_long, void *); 102 static void pipe_restart(file_t *); 103 static int pipe_fpathconf(file_t *, int, register_t *); 104 static int pipe_posix_fadvise(file_t *, off_t, off_t, int); 105 106 static const struct fileops pipeops = { 107 .fo_name = "pipe", 108 .fo_read = pipe_read, 109 .fo_write = pipe_write, 110 .fo_ioctl = pipe_ioctl, 111 .fo_fcntl = fnullop_fcntl, 112 .fo_poll = pipe_poll, 113 .fo_stat = pipe_stat, 114 .fo_close = pipe_close, 115 .fo_kqfilter = pipe_kqfilter, 116 .fo_restart = pipe_restart, 117 .fo_fpathconf = pipe_fpathconf, 118 .fo_posix_fadvise = pipe_posix_fadvise, 119 }; 120 121 /* 122 * Default pipe buffer size(s), this can be kind-of large now because pipe 123 * space is pageable. The pipe code will try to maintain locality of 124 * reference for performance reasons, so small amounts of outstanding I/O 125 * will not wipe the cache. 126 */ 127 #define MINPIPESIZE (PIPE_SIZE / 3) 128 #define MAXPIPESIZE (2 * PIPE_SIZE / 3) 129 130 /* 131 * Limit the number of "big" pipes 132 */ 133 #define LIMITBIGPIPES 32 134 static u_int maxbigpipes = LIMITBIGPIPES; 135 static u_int nbigpipe = 0; 136 137 /* 138 * Amount of KVA consumed by pipe buffers. 139 */ 140 static u_int amountpipekva = 0; 141 142 static void pipeclose(struct pipe *); 143 static void pipe_free_kmem(struct pipe *); 144 static int pipe_create(struct pipe **, pool_cache_t); 145 static int pipelock(struct pipe *, bool); 146 static inline void pipeunlock(struct pipe *); 147 static void pipeselwakeup(struct pipe *, struct pipe *, int); 148 static int pipespace(struct pipe *, int); 149 static int pipe_ctor(void *, void *, int); 150 static void pipe_dtor(void *, void *); 151 152 static pool_cache_t pipe_wr_cache; 153 static pool_cache_t pipe_rd_cache; 154 155 void 156 pipe_init(void) 157 { 158 159 /* Writer side is not automatically allocated KVA. */ 160 pipe_wr_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "pipewr", 161 NULL, IPL_NONE, pipe_ctor, pipe_dtor, NULL); 162 KASSERT(pipe_wr_cache != NULL); 163 164 /* Reader side gets preallocated KVA. */ 165 pipe_rd_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "piperd", 166 NULL, IPL_NONE, pipe_ctor, pipe_dtor, (void *)1); 167 KASSERT(pipe_rd_cache != NULL); 168 } 169 170 static int 171 pipe_ctor(void *arg, void *obj, int flags) 172 { 173 struct pipe *pipe; 174 vaddr_t va; 175 176 pipe = obj; 177 178 memset(pipe, 0, sizeof(struct pipe)); 179 if (arg != NULL) { 180 /* Preallocate space. */ 181 va = uvm_km_alloc(kernel_map, PIPE_SIZE, 0, 182 UVM_KMF_PAGEABLE | UVM_KMF_WAITVA); 183 KASSERT(va != 0); 184 pipe->pipe_kmem = va; 185 atomic_add_int(&amountpipekva, PIPE_SIZE); 186 } 187 cv_init(&pipe->pipe_rcv, "pipe_rd"); 188 cv_init(&pipe->pipe_wcv, "pipe_wr"); 189 cv_init(&pipe->pipe_draincv, "pipe_drn"); 190 cv_init(&pipe->pipe_lkcv, "pipe_lk"); 191 selinit(&pipe->pipe_sel); 192 pipe->pipe_state = PIPE_SIGNALR; 193 194 return 0; 195 } 196 197 static void 198 pipe_dtor(void *arg, void *obj) 199 { 200 struct pipe *pipe; 201 202 pipe = obj; 203 204 cv_destroy(&pipe->pipe_rcv); 205 cv_destroy(&pipe->pipe_wcv); 206 cv_destroy(&pipe->pipe_draincv); 207 cv_destroy(&pipe->pipe_lkcv); 208 seldestroy(&pipe->pipe_sel); 209 if (pipe->pipe_kmem != 0) { 210 uvm_km_free(kernel_map, pipe->pipe_kmem, PIPE_SIZE, 211 UVM_KMF_PAGEABLE); 212 atomic_add_int(&amountpipekva, -PIPE_SIZE); 213 } 214 } 215 216 /* 217 * The pipe system call for the DTYPE_PIPE type of pipes 218 */ 219 int 220 pipe1(struct lwp *l, int *fildes, int flags) 221 { 222 struct pipe *rpipe, *wpipe; 223 file_t *rf, *wf; 224 int fd, error; 225 proc_t *p; 226 227 if (flags & ~(O_CLOEXEC|O_NONBLOCK|O_NOSIGPIPE)) 228 return EINVAL; 229 p = curproc; 230 rpipe = wpipe = NULL; 231 if ((error = pipe_create(&rpipe, pipe_rd_cache)) || 232 (error = pipe_create(&wpipe, pipe_wr_cache))) { 233 goto free2; 234 } 235 rpipe->pipe_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE); 236 wpipe->pipe_lock = rpipe->pipe_lock; 237 mutex_obj_hold(wpipe->pipe_lock); 238 239 error = fd_allocfile(&rf, &fd); 240 if (error) 241 goto free2; 242 fildes[0] = fd; 243 244 error = fd_allocfile(&wf, &fd); 245 if (error) 246 goto free3; 247 fildes[1] = fd; 248 249 rf->f_flag = FREAD | flags; 250 rf->f_type = DTYPE_PIPE; 251 rf->f_pipe = rpipe; 252 rf->f_ops = &pipeops; 253 fd_set_exclose(l, fildes[0], (flags & O_CLOEXEC) != 0); 254 255 wf->f_flag = FWRITE | flags; 256 wf->f_type = DTYPE_PIPE; 257 wf->f_pipe = wpipe; 258 wf->f_ops = &pipeops; 259 fd_set_exclose(l, fildes[1], (flags & O_CLOEXEC) != 0); 260 261 rpipe->pipe_peer = wpipe; 262 wpipe->pipe_peer = rpipe; 263 264 fd_affix(p, rf, fildes[0]); 265 fd_affix(p, wf, fildes[1]); 266 return (0); 267 free3: 268 fd_abort(p, rf, fildes[0]); 269 free2: 270 pipeclose(wpipe); 271 pipeclose(rpipe); 272 273 return (error); 274 } 275 276 /* 277 * Allocate kva for pipe circular buffer, the space is pageable 278 * This routine will 'realloc' the size of a pipe safely, if it fails 279 * it will retain the old buffer. 280 * If it fails it will return ENOMEM. 281 */ 282 static int 283 pipespace(struct pipe *pipe, int size) 284 { 285 void *buffer; 286 287 /* 288 * Allocate pageable virtual address space. Physical memory is 289 * allocated on demand. 290 */ 291 if (size == PIPE_SIZE && pipe->pipe_kmem != 0) { 292 buffer = (void *)pipe->pipe_kmem; 293 } else { 294 buffer = (void *)uvm_km_alloc(kernel_map, round_page(size), 295 0, UVM_KMF_PAGEABLE); 296 if (buffer == NULL) 297 return (ENOMEM); 298 atomic_add_int(&amountpipekva, size); 299 } 300 301 /* free old resources if we're resizing */ 302 pipe_free_kmem(pipe); 303 pipe->pipe_buffer.buffer = buffer; 304 pipe->pipe_buffer.size = size; 305 pipe->pipe_buffer.in = 0; 306 pipe->pipe_buffer.out = 0; 307 pipe->pipe_buffer.cnt = 0; 308 return (0); 309 } 310 311 /* 312 * Initialize and allocate VM and memory for pipe. 313 */ 314 static int 315 pipe_create(struct pipe **pipep, pool_cache_t cache) 316 { 317 struct pipe *pipe; 318 int error; 319 320 pipe = pool_cache_get(cache, PR_WAITOK); 321 KASSERT(pipe != NULL); 322 *pipep = pipe; 323 error = 0; 324 getnanotime(&pipe->pipe_btime); 325 pipe->pipe_atime = pipe->pipe_mtime = pipe->pipe_btime; 326 pipe->pipe_lock = NULL; 327 if (cache == pipe_rd_cache) { 328 error = pipespace(pipe, PIPE_SIZE); 329 } else { 330 pipe->pipe_buffer.buffer = NULL; 331 pipe->pipe_buffer.size = 0; 332 pipe->pipe_buffer.in = 0; 333 pipe->pipe_buffer.out = 0; 334 pipe->pipe_buffer.cnt = 0; 335 } 336 return error; 337 } 338 339 /* 340 * Lock a pipe for I/O, blocking other access 341 * Called with pipe spin lock held. 342 */ 343 static int 344 pipelock(struct pipe *pipe, bool catch_p) 345 { 346 int error; 347 348 KASSERT(mutex_owned(pipe->pipe_lock)); 349 350 while (pipe->pipe_state & PIPE_LOCKFL) { 351 pipe->pipe_waiters++; 352 KASSERT(pipe->pipe_waiters != 0); /* just in case */ 353 if (catch_p) { 354 error = cv_wait_sig(&pipe->pipe_lkcv, pipe->pipe_lock); 355 if (error != 0) { 356 KASSERT(pipe->pipe_waiters > 0); 357 pipe->pipe_waiters--; 358 return error; 359 } 360 } else 361 cv_wait(&pipe->pipe_lkcv, pipe->pipe_lock); 362 KASSERT(pipe->pipe_waiters > 0); 363 pipe->pipe_waiters--; 364 } 365 366 pipe->pipe_state |= PIPE_LOCKFL; 367 368 return 0; 369 } 370 371 /* 372 * unlock a pipe I/O lock 373 */ 374 static inline void 375 pipeunlock(struct pipe *pipe) 376 { 377 378 KASSERT(pipe->pipe_state & PIPE_LOCKFL); 379 380 pipe->pipe_state &= ~PIPE_LOCKFL; 381 if (pipe->pipe_waiters > 0) { 382 cv_signal(&pipe->pipe_lkcv); 383 } 384 } 385 386 /* 387 * Select/poll wakup. This also sends SIGIO to peer connected to 388 * 'sigpipe' side of pipe. 389 */ 390 static void 391 pipeselwakeup(struct pipe *selp, struct pipe *sigp, int code) 392 { 393 int band; 394 395 switch (code) { 396 case POLL_IN: 397 band = POLLIN|POLLRDNORM; 398 break; 399 case POLL_OUT: 400 band = POLLOUT|POLLWRNORM; 401 break; 402 case POLL_HUP: 403 band = POLLHUP; 404 break; 405 case POLL_ERR: 406 band = POLLERR; 407 break; 408 default: 409 band = 0; 410 #ifdef DIAGNOSTIC 411 printf("bad siginfo code %d in pipe notification.\n", code); 412 #endif 413 break; 414 } 415 416 selnotify(&selp->pipe_sel, band, NOTE_SUBMIT); 417 418 if (sigp == NULL || (sigp->pipe_state & PIPE_ASYNC) == 0) 419 return; 420 421 fownsignal(sigp->pipe_pgid, SIGIO, code, band, selp); 422 } 423 424 static int 425 pipe_read(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 426 int flags) 427 { 428 struct pipe *rpipe = fp->f_pipe; 429 struct pipebuf *bp = &rpipe->pipe_buffer; 430 kmutex_t *lock = rpipe->pipe_lock; 431 int error; 432 size_t nread = 0; 433 size_t size; 434 size_t ocnt; 435 unsigned int wakeup_state = 0; 436 437 mutex_enter(lock); 438 ++rpipe->pipe_busy; 439 ocnt = bp->cnt; 440 441 again: 442 error = pipelock(rpipe, true); 443 if (error) 444 goto unlocked_error; 445 446 while (uio->uio_resid) { 447 /* 448 * Normal pipe buffer receive. 449 */ 450 if (bp->cnt > 0) { 451 size = bp->size - bp->out; 452 if (size > bp->cnt) 453 size = bp->cnt; 454 if (size > uio->uio_resid) 455 size = uio->uio_resid; 456 457 mutex_exit(lock); 458 error = uiomove((char *)bp->buffer + bp->out, size, uio); 459 mutex_enter(lock); 460 if (error) 461 break; 462 463 bp->out += size; 464 if (bp->out >= bp->size) 465 bp->out = 0; 466 467 bp->cnt -= size; 468 469 /* 470 * If there is no more to read in the pipe, reset 471 * its pointers to the beginning. This improves 472 * cache hit stats. 473 */ 474 if (bp->cnt == 0) { 475 bp->in = 0; 476 bp->out = 0; 477 } 478 nread += size; 479 continue; 480 } 481 482 /* 483 * Break if some data was read. 484 */ 485 if (nread > 0) 486 break; 487 488 /* 489 * Detect EOF condition. 490 * Read returns 0 on EOF, no need to set error. 491 */ 492 if (rpipe->pipe_state & PIPE_EOF) 493 break; 494 495 /* 496 * Don't block on non-blocking I/O. 497 */ 498 if (fp->f_flag & FNONBLOCK) { 499 error = EAGAIN; 500 break; 501 } 502 503 /* 504 * Unlock the pipe buffer for our remaining processing. 505 * We will either break out with an error or we will 506 * sleep and relock to loop. 507 */ 508 pipeunlock(rpipe); 509 510 #if 1 /* XXX (dsl) I'm sure these aren't needed here ... */ 511 /* 512 * We want to read more, wake up select/poll. 513 */ 514 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 515 516 /* 517 * If the "write-side" is blocked, wake it up now. 518 */ 519 cv_broadcast(&rpipe->pipe_wcv); 520 #endif 521 522 if (wakeup_state & PIPE_RESTART) { 523 error = ERESTART; 524 goto unlocked_error; 525 } 526 527 /* Now wait until the pipe is filled */ 528 error = cv_wait_sig(&rpipe->pipe_rcv, lock); 529 if (error != 0) 530 goto unlocked_error; 531 wakeup_state = rpipe->pipe_state; 532 goto again; 533 } 534 535 if (error == 0) 536 getnanotime(&rpipe->pipe_atime); 537 pipeunlock(rpipe); 538 539 unlocked_error: 540 --rpipe->pipe_busy; 541 if (rpipe->pipe_busy == 0) { 542 rpipe->pipe_state &= ~PIPE_RESTART; 543 cv_broadcast(&rpipe->pipe_draincv); 544 } 545 if (bp->cnt < MINPIPESIZE) { 546 cv_broadcast(&rpipe->pipe_wcv); 547 } 548 549 /* 550 * If anything was read off the buffer, signal to the writer it's 551 * possible to write more data. Also send signal if we are here for the 552 * first time after last write. 553 */ 554 if ((bp->size - bp->cnt) >= PIPE_BUF 555 && (ocnt != bp->cnt || (rpipe->pipe_state & PIPE_SIGNALR))) { 556 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 557 rpipe->pipe_state &= ~PIPE_SIGNALR; 558 } 559 560 mutex_exit(lock); 561 return (error); 562 } 563 564 static int 565 pipe_write(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 566 int flags) 567 { 568 struct pipe *wpipe, *rpipe; 569 struct pipebuf *bp; 570 kmutex_t *lock; 571 int error; 572 unsigned int wakeup_state = 0; 573 574 /* We want to write to our peer */ 575 rpipe = fp->f_pipe; 576 lock = rpipe->pipe_lock; 577 error = 0; 578 579 mutex_enter(lock); 580 wpipe = rpipe->pipe_peer; 581 582 /* 583 * Detect loss of pipe read side, issue SIGPIPE if lost. 584 */ 585 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) != 0) { 586 mutex_exit(lock); 587 return EPIPE; 588 } 589 ++wpipe->pipe_busy; 590 591 /* Acquire the long-term pipe lock */ 592 if ((error = pipelock(wpipe, true)) != 0) { 593 --wpipe->pipe_busy; 594 if (wpipe->pipe_busy == 0) { 595 wpipe->pipe_state &= ~PIPE_RESTART; 596 cv_broadcast(&wpipe->pipe_draincv); 597 } 598 mutex_exit(lock); 599 return (error); 600 } 601 602 bp = &wpipe->pipe_buffer; 603 604 /* 605 * If it is advantageous to resize the pipe buffer, do so. 606 */ 607 if ((uio->uio_resid > PIPE_SIZE) && 608 (nbigpipe < maxbigpipes) && 609 (bp->size <= PIPE_SIZE) && (bp->cnt == 0)) { 610 611 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 612 atomic_inc_uint(&nbigpipe); 613 } 614 615 while (uio->uio_resid) { 616 size_t space; 617 618 space = bp->size - bp->cnt; 619 620 /* Writes of size <= PIPE_BUF must be atomic. */ 621 if ((space < uio->uio_resid) && (uio->uio_resid <= PIPE_BUF)) 622 space = 0; 623 624 if (space > 0) { 625 int size; /* Transfer size */ 626 int segsize; /* first segment to transfer */ 627 628 /* 629 * Transfer size is minimum of uio transfer 630 * and free space in pipe buffer. 631 */ 632 if (space > uio->uio_resid) 633 size = uio->uio_resid; 634 else 635 size = space; 636 /* 637 * First segment to transfer is minimum of 638 * transfer size and contiguous space in 639 * pipe buffer. If first segment to transfer 640 * is less than the transfer size, we've got 641 * a wraparound in the buffer. 642 */ 643 segsize = bp->size - bp->in; 644 if (segsize > size) 645 segsize = size; 646 647 /* Transfer first segment */ 648 mutex_exit(lock); 649 error = uiomove((char *)bp->buffer + bp->in, segsize, 650 uio); 651 652 if (error == 0 && segsize < size) { 653 /* 654 * Transfer remaining part now, to 655 * support atomic writes. Wraparound 656 * happened. 657 */ 658 KASSERT(bp->in + segsize == bp->size); 659 error = uiomove(bp->buffer, 660 size - segsize, uio); 661 } 662 mutex_enter(lock); 663 if (error) 664 break; 665 666 bp->in += size; 667 if (bp->in >= bp->size) { 668 KASSERT(bp->in == size - segsize + bp->size); 669 bp->in = size - segsize; 670 } 671 672 bp->cnt += size; 673 KASSERT(bp->cnt <= bp->size); 674 wakeup_state = 0; 675 } else { 676 /* 677 * If the "read-side" has been blocked, wake it up now. 678 */ 679 cv_broadcast(&wpipe->pipe_rcv); 680 681 /* 682 * Don't block on non-blocking I/O. 683 */ 684 if (fp->f_flag & FNONBLOCK) { 685 error = EAGAIN; 686 break; 687 } 688 689 /* 690 * We have no more space and have something to offer, 691 * wake up select/poll. 692 */ 693 if (bp->cnt) 694 pipeselwakeup(wpipe, wpipe, POLL_IN); 695 696 if (wakeup_state & PIPE_RESTART) { 697 error = ERESTART; 698 break; 699 } 700 701 /* 702 * If read side wants to go away, we just issue a signal 703 * to ourselves. 704 */ 705 if (wpipe->pipe_state & PIPE_EOF) { 706 error = EPIPE; 707 break; 708 } 709 710 pipeunlock(wpipe); 711 error = cv_wait_sig(&wpipe->pipe_wcv, lock); 712 (void)pipelock(wpipe, false); 713 if (error != 0) 714 break; 715 wakeup_state = wpipe->pipe_state; 716 } 717 } 718 719 --wpipe->pipe_busy; 720 if (wpipe->pipe_busy == 0) { 721 wpipe->pipe_state &= ~PIPE_RESTART; 722 cv_broadcast(&wpipe->pipe_draincv); 723 } 724 if (bp->cnt > 0) { 725 cv_broadcast(&wpipe->pipe_rcv); 726 } 727 728 /* 729 * Don't return EPIPE if I/O was successful 730 */ 731 if (error == EPIPE && bp->cnt == 0 && uio->uio_resid == 0) 732 error = 0; 733 734 if (error == 0) 735 getnanotime(&wpipe->pipe_mtime); 736 737 /* 738 * We have something to offer, wake up select/poll. 739 * wmap->cnt is always 0 in this point (direct write 740 * is only done synchronously), so check only wpipe->pipe_buffer.cnt 741 */ 742 if (bp->cnt) 743 pipeselwakeup(wpipe, wpipe, POLL_IN); 744 745 /* 746 * Arrange for next read(2) to do a signal. 747 */ 748 wpipe->pipe_state |= PIPE_SIGNALR; 749 750 pipeunlock(wpipe); 751 mutex_exit(lock); 752 return (error); 753 } 754 755 /* 756 * We implement a very minimal set of ioctls for compatibility with sockets. 757 */ 758 int 759 pipe_ioctl(file_t *fp, u_long cmd, void *data) 760 { 761 struct pipe *pipe = fp->f_pipe; 762 kmutex_t *lock = pipe->pipe_lock; 763 764 switch (cmd) { 765 766 case FIONBIO: 767 return (0); 768 769 case FIOASYNC: 770 mutex_enter(lock); 771 if (*(int *)data) { 772 pipe->pipe_state |= PIPE_ASYNC; 773 } else { 774 pipe->pipe_state &= ~PIPE_ASYNC; 775 } 776 mutex_exit(lock); 777 return (0); 778 779 case FIONREAD: 780 mutex_enter(lock); 781 *(int *)data = pipe->pipe_buffer.cnt; 782 mutex_exit(lock); 783 return (0); 784 785 case FIONWRITE: 786 /* Look at other side */ 787 mutex_enter(lock); 788 pipe = pipe->pipe_peer; 789 if (pipe == NULL) 790 *(int *)data = 0; 791 else 792 *(int *)data = pipe->pipe_buffer.cnt; 793 mutex_exit(lock); 794 return (0); 795 796 case FIONSPACE: 797 /* Look at other side */ 798 mutex_enter(lock); 799 pipe = pipe->pipe_peer; 800 if (pipe == NULL) 801 *(int *)data = 0; 802 else 803 *(int *)data = pipe->pipe_buffer.size - 804 pipe->pipe_buffer.cnt; 805 mutex_exit(lock); 806 return (0); 807 808 case TIOCSPGRP: 809 case FIOSETOWN: 810 return fsetown(&pipe->pipe_pgid, cmd, data); 811 812 case TIOCGPGRP: 813 case FIOGETOWN: 814 return fgetown(pipe->pipe_pgid, cmd, data); 815 816 } 817 return (EPASSTHROUGH); 818 } 819 820 int 821 pipe_poll(file_t *fp, int events) 822 { 823 struct pipe *rpipe = fp->f_pipe; 824 struct pipe *wpipe; 825 int eof = 0; 826 int revents = 0; 827 828 mutex_enter(rpipe->pipe_lock); 829 wpipe = rpipe->pipe_peer; 830 831 if (events & (POLLIN | POLLRDNORM)) 832 if ((rpipe->pipe_buffer.cnt > 0) || 833 (rpipe->pipe_state & PIPE_EOF)) 834 revents |= events & (POLLIN | POLLRDNORM); 835 836 eof |= (rpipe->pipe_state & PIPE_EOF); 837 838 if (wpipe == NULL) 839 revents |= events & (POLLOUT | POLLWRNORM); 840 else { 841 if (events & (POLLOUT | POLLWRNORM)) 842 if ((wpipe->pipe_state & PIPE_EOF) || ( 843 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) 844 revents |= events & (POLLOUT | POLLWRNORM); 845 846 eof |= (wpipe->pipe_state & PIPE_EOF); 847 } 848 849 if (wpipe == NULL || eof) 850 revents |= POLLHUP; 851 852 if (revents == 0) { 853 if (events & (POLLIN | POLLRDNORM)) 854 selrecord(curlwp, &rpipe->pipe_sel); 855 856 if (events & (POLLOUT | POLLWRNORM)) 857 selrecord(curlwp, &wpipe->pipe_sel); 858 } 859 mutex_exit(rpipe->pipe_lock); 860 861 return (revents); 862 } 863 864 static int 865 pipe_stat(file_t *fp, struct stat *ub) 866 { 867 struct pipe *pipe = fp->f_pipe; 868 869 mutex_enter(pipe->pipe_lock); 870 memset(ub, 0, sizeof(*ub)); 871 ub->st_mode = S_IFIFO | S_IRUSR | S_IWUSR; 872 ub->st_blksize = pipe->pipe_buffer.size; 873 if (ub->st_blksize == 0 && pipe->pipe_peer) 874 ub->st_blksize = pipe->pipe_peer->pipe_buffer.size; 875 ub->st_size = pipe->pipe_buffer.cnt; 876 ub->st_blocks = (ub->st_size) ? 1 : 0; 877 ub->st_atimespec = pipe->pipe_atime; 878 ub->st_mtimespec = pipe->pipe_mtime; 879 ub->st_ctimespec = ub->st_birthtimespec = pipe->pipe_btime; 880 ub->st_uid = kauth_cred_geteuid(fp->f_cred); 881 ub->st_gid = kauth_cred_getegid(fp->f_cred); 882 883 /* 884 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 885 * XXX (st_dev, st_ino) should be unique. 886 */ 887 mutex_exit(pipe->pipe_lock); 888 return 0; 889 } 890 891 static int 892 pipe_close(file_t *fp) 893 { 894 struct pipe *pipe = fp->f_pipe; 895 896 fp->f_pipe = NULL; 897 pipeclose(pipe); 898 return (0); 899 } 900 901 static void 902 pipe_restart(file_t *fp) 903 { 904 struct pipe *pipe = fp->f_pipe; 905 906 /* 907 * Unblock blocked reads/writes in order to allow close() to complete. 908 * System calls return ERESTART so that the fd is revalidated. 909 * (Partial writes return the transfer length.) 910 */ 911 mutex_enter(pipe->pipe_lock); 912 pipe->pipe_state |= PIPE_RESTART; 913 /* Wakeup both cvs, maybe we only need one, but maybe there are some 914 * other paths where wakeup is needed, and it saves deciding which! */ 915 cv_broadcast(&pipe->pipe_rcv); 916 cv_broadcast(&pipe->pipe_wcv); 917 mutex_exit(pipe->pipe_lock); 918 } 919 920 static int 921 pipe_fpathconf(struct file *fp, int name, register_t *retval) 922 { 923 924 switch (name) { 925 case _PC_PIPE_BUF: 926 *retval = PIPE_BUF; 927 return 0; 928 default: 929 return EINVAL; 930 } 931 } 932 933 static int 934 pipe_posix_fadvise(struct file *fp, off_t offset, off_t len, int advice) 935 { 936 937 return ESPIPE; 938 } 939 940 static void 941 pipe_free_kmem(struct pipe *pipe) 942 { 943 944 if (pipe->pipe_buffer.buffer != NULL) { 945 if (pipe->pipe_buffer.size > PIPE_SIZE) { 946 atomic_dec_uint(&nbigpipe); 947 } 948 if (pipe->pipe_buffer.buffer != (void *)pipe->pipe_kmem) { 949 uvm_km_free(kernel_map, 950 (vaddr_t)pipe->pipe_buffer.buffer, 951 pipe->pipe_buffer.size, UVM_KMF_PAGEABLE); 952 atomic_add_int(&amountpipekva, 953 -pipe->pipe_buffer.size); 954 } 955 pipe->pipe_buffer.buffer = NULL; 956 } 957 } 958 959 /* 960 * Shutdown the pipe. 961 */ 962 static void 963 pipeclose(struct pipe *pipe) 964 { 965 kmutex_t *lock; 966 struct pipe *ppipe; 967 968 if (pipe == NULL) 969 return; 970 971 KASSERT(cv_is_valid(&pipe->pipe_rcv)); 972 KASSERT(cv_is_valid(&pipe->pipe_wcv)); 973 KASSERT(cv_is_valid(&pipe->pipe_draincv)); 974 KASSERT(cv_is_valid(&pipe->pipe_lkcv)); 975 976 lock = pipe->pipe_lock; 977 if (lock == NULL) 978 /* Must have failed during create */ 979 goto free_resources; 980 981 mutex_enter(lock); 982 pipeselwakeup(pipe, pipe, POLL_HUP); 983 984 /* 985 * If the other side is blocked, wake it up saying that 986 * we want to close it down. 987 */ 988 pipe->pipe_state |= PIPE_EOF; 989 if (pipe->pipe_busy) { 990 while (pipe->pipe_busy) { 991 cv_broadcast(&pipe->pipe_wcv); 992 cv_wait_sig(&pipe->pipe_draincv, lock); 993 } 994 } 995 996 /* 997 * Disconnect from peer. 998 */ 999 if ((ppipe = pipe->pipe_peer) != NULL) { 1000 pipeselwakeup(ppipe, ppipe, POLL_HUP); 1001 ppipe->pipe_state |= PIPE_EOF; 1002 cv_broadcast(&ppipe->pipe_rcv); 1003 ppipe->pipe_peer = NULL; 1004 } 1005 1006 /* 1007 * Any knote objects still left in the list are 1008 * the one attached by peer. Since no one will 1009 * traverse this list, we just clear it. 1010 * 1011 * XXX Exposes select/kqueue internals. 1012 */ 1013 SLIST_INIT(&pipe->pipe_sel.sel_klist); 1014 1015 KASSERT((pipe->pipe_state & PIPE_LOCKFL) == 0); 1016 mutex_exit(lock); 1017 mutex_obj_free(lock); 1018 1019 /* 1020 * Free resources. 1021 */ 1022 free_resources: 1023 pipe->pipe_pgid = 0; 1024 pipe->pipe_state = PIPE_SIGNALR; 1025 pipe->pipe_peer = NULL; 1026 pipe->pipe_lock = NULL; 1027 pipe_free_kmem(pipe); 1028 if (pipe->pipe_kmem != 0) { 1029 pool_cache_put(pipe_rd_cache, pipe); 1030 } else { 1031 pool_cache_put(pipe_wr_cache, pipe); 1032 } 1033 } 1034 1035 static void 1036 filt_pipedetach(struct knote *kn) 1037 { 1038 struct pipe *pipe; 1039 kmutex_t *lock; 1040 1041 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1042 lock = pipe->pipe_lock; 1043 1044 mutex_enter(lock); 1045 1046 switch(kn->kn_filter) { 1047 case EVFILT_WRITE: 1048 /* Need the peer structure, not our own. */ 1049 pipe = pipe->pipe_peer; 1050 1051 /* If reader end already closed, just return. */ 1052 if (pipe == NULL) { 1053 mutex_exit(lock); 1054 return; 1055 } 1056 1057 break; 1058 default: 1059 /* Nothing to do. */ 1060 break; 1061 } 1062 1063 KASSERT(kn->kn_hook == pipe); 1064 selremove_knote(&pipe->pipe_sel, kn); 1065 mutex_exit(lock); 1066 } 1067 1068 static int 1069 filt_piperead(struct knote *kn, long hint) 1070 { 1071 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1072 struct pipe *wpipe; 1073 int rv; 1074 1075 if ((hint & NOTE_SUBMIT) == 0) { 1076 mutex_enter(rpipe->pipe_lock); 1077 } 1078 wpipe = rpipe->pipe_peer; 1079 kn->kn_data = rpipe->pipe_buffer.cnt; 1080 1081 if ((rpipe->pipe_state & PIPE_EOF) || 1082 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1083 knote_set_eof(kn, 0); 1084 rv = 1; 1085 } else { 1086 rv = kn->kn_data > 0; 1087 } 1088 1089 if ((hint & NOTE_SUBMIT) == 0) { 1090 mutex_exit(rpipe->pipe_lock); 1091 } 1092 return rv; 1093 } 1094 1095 static int 1096 filt_pipewrite(struct knote *kn, long hint) 1097 { 1098 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1099 struct pipe *wpipe; 1100 int rv; 1101 1102 if ((hint & NOTE_SUBMIT) == 0) { 1103 mutex_enter(rpipe->pipe_lock); 1104 } 1105 wpipe = rpipe->pipe_peer; 1106 1107 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1108 kn->kn_data = 0; 1109 knote_set_eof(kn, 0); 1110 rv = 1; 1111 } else { 1112 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 1113 rv = kn->kn_data >= PIPE_BUF; 1114 } 1115 1116 if ((hint & NOTE_SUBMIT) == 0) { 1117 mutex_exit(rpipe->pipe_lock); 1118 } 1119 return rv; 1120 } 1121 1122 static const struct filterops pipe_rfiltops = { 1123 .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE, 1124 .f_attach = NULL, 1125 .f_detach = filt_pipedetach, 1126 .f_event = filt_piperead, 1127 }; 1128 1129 static const struct filterops pipe_wfiltops = { 1130 .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE, 1131 .f_attach = NULL, 1132 .f_detach = filt_pipedetach, 1133 .f_event = filt_pipewrite, 1134 }; 1135 1136 static int 1137 pipe_kqfilter(file_t *fp, struct knote *kn) 1138 { 1139 struct pipe *pipe; 1140 kmutex_t *lock; 1141 1142 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1143 lock = pipe->pipe_lock; 1144 1145 mutex_enter(lock); 1146 1147 switch (kn->kn_filter) { 1148 case EVFILT_READ: 1149 kn->kn_fop = &pipe_rfiltops; 1150 break; 1151 case EVFILT_WRITE: 1152 kn->kn_fop = &pipe_wfiltops; 1153 pipe = pipe->pipe_peer; 1154 if (pipe == NULL) { 1155 /* Other end of pipe has been closed. */ 1156 mutex_exit(lock); 1157 return (EBADF); 1158 } 1159 break; 1160 default: 1161 mutex_exit(lock); 1162 return (EINVAL); 1163 } 1164 1165 kn->kn_hook = pipe; 1166 selrecord_knote(&pipe->pipe_sel, kn); 1167 mutex_exit(lock); 1168 1169 return (0); 1170 } 1171 1172 /* 1173 * Handle pipe sysctls. 1174 */ 1175 SYSCTL_SETUP(sysctl_kern_pipe_setup, "sysctl kern.pipe subtree setup") 1176 { 1177 1178 sysctl_createv(clog, 0, NULL, NULL, 1179 CTLFLAG_PERMANENT, 1180 CTLTYPE_NODE, "pipe", 1181 SYSCTL_DESCR("Pipe settings"), 1182 NULL, 0, NULL, 0, 1183 CTL_KERN, KERN_PIPE, CTL_EOL); 1184 1185 sysctl_createv(clog, 0, NULL, NULL, 1186 CTLFLAG_PERMANENT|CTLFLAG_READWRITE, 1187 CTLTYPE_INT, "maxbigpipes", 1188 SYSCTL_DESCR("Maximum number of \"big\" pipes"), 1189 NULL, 0, &maxbigpipes, 0, 1190 CTL_KERN, KERN_PIPE, KERN_PIPE_MAXBIGPIPES, CTL_EOL); 1191 sysctl_createv(clog, 0, NULL, NULL, 1192 CTLFLAG_PERMANENT, 1193 CTLTYPE_INT, "nbigpipes", 1194 SYSCTL_DESCR("Number of \"big\" pipes"), 1195 NULL, 0, &nbigpipe, 0, 1196 CTL_KERN, KERN_PIPE, KERN_PIPE_NBIGPIPES, CTL_EOL); 1197 sysctl_createv(clog, 0, NULL, NULL, 1198 CTLFLAG_PERMANENT, 1199 CTLTYPE_INT, "kvasize", 1200 SYSCTL_DESCR("Amount of kernel memory consumed by pipe " 1201 "buffers"), 1202 NULL, 0, &amountpipekva, 0, 1203 CTL_KERN, KERN_PIPE, KERN_PIPE_KVASIZE, CTL_EOL); 1204 } 1205