1 /* $NetBSD: sys_pipe.c,v 1.152 2021/01/25 19:21:11 dholland Exp $ */ 2 3 /*- 4 * Copyright (c) 2003, 2007, 2008, 2009 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Paul Kranenburg, and by Andrew Doran. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * Copyright (c) 1996 John S. Dyson 34 * All rights reserved. 35 * 36 * Redistribution and use in source and binary forms, with or without 37 * modification, are permitted provided that the following conditions 38 * are met: 39 * 1. Redistributions of source code must retain the above copyright 40 * notice immediately at the beginning of the file, without modification, 41 * this list of conditions, and the following disclaimer. 42 * 2. Redistributions in binary form must reproduce the above copyright 43 * notice, this list of conditions and the following disclaimer in the 44 * documentation and/or other materials provided with the distribution. 45 * 3. Absolutely no warranty of function or purpose is made by the author 46 * John S. Dyson. 47 * 4. Modifications may be freely made to this file if the above conditions 48 * are met. 49 */ 50 51 /* 52 * This file contains a high-performance replacement for the socket-based 53 * pipes scheme originally used. It does not support all features of 54 * sockets, but does do everything that pipes normally do. 55 * 56 * This code has two modes of operation, a small write mode and a large 57 * write mode. The small write mode acts like conventional pipes with 58 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 59 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 60 * and PIPE_SIZE in size it is mapped read-only into the kernel address space 61 * using the UVM page loan facility from where the receiving process can copy 62 * the data directly from the pages in the sending process. 63 * 64 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 65 * happen for small transfers so that the system will not spend all of 66 * its time context switching. PIPE_SIZE is constrained by the 67 * amount of kernel virtual memory. 68 */ 69 70 #include <sys/cdefs.h> 71 __KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.152 2021/01/25 19:21:11 dholland Exp $"); 72 73 #include <sys/param.h> 74 #include <sys/systm.h> 75 #include <sys/proc.h> 76 #include <sys/fcntl.h> 77 #include <sys/file.h> 78 #include <sys/filedesc.h> 79 #include <sys/filio.h> 80 #include <sys/kernel.h> 81 #include <sys/ttycom.h> 82 #include <sys/stat.h> 83 #include <sys/poll.h> 84 #include <sys/signalvar.h> 85 #include <sys/vnode.h> 86 #include <sys/uio.h> 87 #include <sys/select.h> 88 #include <sys/mount.h> 89 #include <sys/syscallargs.h> 90 #include <sys/sysctl.h> 91 #include <sys/kauth.h> 92 #include <sys/atomic.h> 93 #include <sys/pipe.h> 94 95 static int pipe_read(file_t *, off_t *, struct uio *, kauth_cred_t, int); 96 static int pipe_write(file_t *, off_t *, struct uio *, kauth_cred_t, int); 97 static int pipe_close(file_t *); 98 static int pipe_poll(file_t *, int); 99 static int pipe_kqfilter(file_t *, struct knote *); 100 static int pipe_stat(file_t *, struct stat *); 101 static int pipe_ioctl(file_t *, u_long, void *); 102 static void pipe_restart(file_t *); 103 104 static const struct fileops pipeops = { 105 .fo_name = "pipe", 106 .fo_read = pipe_read, 107 .fo_write = pipe_write, 108 .fo_ioctl = pipe_ioctl, 109 .fo_fcntl = fnullop_fcntl, 110 .fo_poll = pipe_poll, 111 .fo_stat = pipe_stat, 112 .fo_close = pipe_close, 113 .fo_kqfilter = pipe_kqfilter, 114 .fo_restart = pipe_restart, 115 }; 116 117 /* 118 * Default pipe buffer size(s), this can be kind-of large now because pipe 119 * space is pageable. The pipe code will try to maintain locality of 120 * reference for performance reasons, so small amounts of outstanding I/O 121 * will not wipe the cache. 122 */ 123 #define MINPIPESIZE (PIPE_SIZE / 3) 124 #define MAXPIPESIZE (2 * PIPE_SIZE / 3) 125 126 /* 127 * Limit the number of "big" pipes 128 */ 129 #define LIMITBIGPIPES 32 130 static u_int maxbigpipes = LIMITBIGPIPES; 131 static u_int nbigpipe = 0; 132 133 /* 134 * Amount of KVA consumed by pipe buffers. 135 */ 136 static u_int amountpipekva = 0; 137 138 static void pipeclose(struct pipe *); 139 static void pipe_free_kmem(struct pipe *); 140 static int pipe_create(struct pipe **, pool_cache_t); 141 static int pipelock(struct pipe *, bool); 142 static inline void pipeunlock(struct pipe *); 143 static void pipeselwakeup(struct pipe *, struct pipe *, int); 144 static int pipespace(struct pipe *, int); 145 static int pipe_ctor(void *, void *, int); 146 static void pipe_dtor(void *, void *); 147 148 static pool_cache_t pipe_wr_cache; 149 static pool_cache_t pipe_rd_cache; 150 151 void 152 pipe_init(void) 153 { 154 155 /* Writer side is not automatically allocated KVA. */ 156 pipe_wr_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "pipewr", 157 NULL, IPL_NONE, pipe_ctor, pipe_dtor, NULL); 158 KASSERT(pipe_wr_cache != NULL); 159 160 /* Reader side gets preallocated KVA. */ 161 pipe_rd_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "piperd", 162 NULL, IPL_NONE, pipe_ctor, pipe_dtor, (void *)1); 163 KASSERT(pipe_rd_cache != NULL); 164 } 165 166 static int 167 pipe_ctor(void *arg, void *obj, int flags) 168 { 169 struct pipe *pipe; 170 vaddr_t va; 171 172 pipe = obj; 173 174 memset(pipe, 0, sizeof(struct pipe)); 175 if (arg != NULL) { 176 /* Preallocate space. */ 177 va = uvm_km_alloc(kernel_map, PIPE_SIZE, 0, 178 UVM_KMF_PAGEABLE | UVM_KMF_WAITVA); 179 KASSERT(va != 0); 180 pipe->pipe_kmem = va; 181 atomic_add_int(&amountpipekva, PIPE_SIZE); 182 } 183 cv_init(&pipe->pipe_rcv, "pipe_rd"); 184 cv_init(&pipe->pipe_wcv, "pipe_wr"); 185 cv_init(&pipe->pipe_draincv, "pipe_drn"); 186 cv_init(&pipe->pipe_lkcv, "pipe_lk"); 187 selinit(&pipe->pipe_sel); 188 pipe->pipe_state = PIPE_SIGNALR; 189 190 return 0; 191 } 192 193 static void 194 pipe_dtor(void *arg, void *obj) 195 { 196 struct pipe *pipe; 197 198 pipe = obj; 199 200 cv_destroy(&pipe->pipe_rcv); 201 cv_destroy(&pipe->pipe_wcv); 202 cv_destroy(&pipe->pipe_draincv); 203 cv_destroy(&pipe->pipe_lkcv); 204 seldestroy(&pipe->pipe_sel); 205 if (pipe->pipe_kmem != 0) { 206 uvm_km_free(kernel_map, pipe->pipe_kmem, PIPE_SIZE, 207 UVM_KMF_PAGEABLE); 208 atomic_add_int(&amountpipekva, -PIPE_SIZE); 209 } 210 } 211 212 /* 213 * The pipe system call for the DTYPE_PIPE type of pipes 214 */ 215 int 216 pipe1(struct lwp *l, int *fildes, int flags) 217 { 218 struct pipe *rpipe, *wpipe; 219 file_t *rf, *wf; 220 int fd, error; 221 proc_t *p; 222 223 if (flags & ~(O_CLOEXEC|O_NONBLOCK|O_NOSIGPIPE)) 224 return EINVAL; 225 p = curproc; 226 rpipe = wpipe = NULL; 227 if ((error = pipe_create(&rpipe, pipe_rd_cache)) || 228 (error = pipe_create(&wpipe, pipe_wr_cache))) { 229 goto free2; 230 } 231 rpipe->pipe_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE); 232 wpipe->pipe_lock = rpipe->pipe_lock; 233 mutex_obj_hold(wpipe->pipe_lock); 234 235 error = fd_allocfile(&rf, &fd); 236 if (error) 237 goto free2; 238 fildes[0] = fd; 239 240 error = fd_allocfile(&wf, &fd); 241 if (error) 242 goto free3; 243 fildes[1] = fd; 244 245 rf->f_flag = FREAD | flags; 246 rf->f_type = DTYPE_PIPE; 247 rf->f_pipe = rpipe; 248 rf->f_ops = &pipeops; 249 fd_set_exclose(l, fildes[0], (flags & O_CLOEXEC) != 0); 250 251 wf->f_flag = FWRITE | flags; 252 wf->f_type = DTYPE_PIPE; 253 wf->f_pipe = wpipe; 254 wf->f_ops = &pipeops; 255 fd_set_exclose(l, fildes[1], (flags & O_CLOEXEC) != 0); 256 257 rpipe->pipe_peer = wpipe; 258 wpipe->pipe_peer = rpipe; 259 260 fd_affix(p, rf, fildes[0]); 261 fd_affix(p, wf, fildes[1]); 262 return (0); 263 free3: 264 fd_abort(p, rf, fildes[0]); 265 free2: 266 pipeclose(wpipe); 267 pipeclose(rpipe); 268 269 return (error); 270 } 271 272 /* 273 * Allocate kva for pipe circular buffer, the space is pageable 274 * This routine will 'realloc' the size of a pipe safely, if it fails 275 * it will retain the old buffer. 276 * If it fails it will return ENOMEM. 277 */ 278 static int 279 pipespace(struct pipe *pipe, int size) 280 { 281 void *buffer; 282 283 /* 284 * Allocate pageable virtual address space. Physical memory is 285 * allocated on demand. 286 */ 287 if (size == PIPE_SIZE && pipe->pipe_kmem != 0) { 288 buffer = (void *)pipe->pipe_kmem; 289 } else { 290 buffer = (void *)uvm_km_alloc(kernel_map, round_page(size), 291 0, UVM_KMF_PAGEABLE); 292 if (buffer == NULL) 293 return (ENOMEM); 294 atomic_add_int(&amountpipekva, size); 295 } 296 297 /* free old resources if we're resizing */ 298 pipe_free_kmem(pipe); 299 pipe->pipe_buffer.buffer = buffer; 300 pipe->pipe_buffer.size = size; 301 pipe->pipe_buffer.in = 0; 302 pipe->pipe_buffer.out = 0; 303 pipe->pipe_buffer.cnt = 0; 304 return (0); 305 } 306 307 /* 308 * Initialize and allocate VM and memory for pipe. 309 */ 310 static int 311 pipe_create(struct pipe **pipep, pool_cache_t cache) 312 { 313 struct pipe *pipe; 314 int error; 315 316 pipe = pool_cache_get(cache, PR_WAITOK); 317 KASSERT(pipe != NULL); 318 *pipep = pipe; 319 error = 0; 320 getnanotime(&pipe->pipe_btime); 321 pipe->pipe_atime = pipe->pipe_mtime = pipe->pipe_btime; 322 pipe->pipe_lock = NULL; 323 if (cache == pipe_rd_cache) { 324 error = pipespace(pipe, PIPE_SIZE); 325 } else { 326 pipe->pipe_buffer.buffer = NULL; 327 pipe->pipe_buffer.size = 0; 328 pipe->pipe_buffer.in = 0; 329 pipe->pipe_buffer.out = 0; 330 pipe->pipe_buffer.cnt = 0; 331 } 332 return error; 333 } 334 335 /* 336 * Lock a pipe for I/O, blocking other access 337 * Called with pipe spin lock held. 338 */ 339 static int 340 pipelock(struct pipe *pipe, bool catch_p) 341 { 342 int error; 343 344 KASSERT(mutex_owned(pipe->pipe_lock)); 345 346 while (pipe->pipe_state & PIPE_LOCKFL) { 347 pipe->pipe_waiters++; 348 KASSERT(pipe->pipe_waiters != 0); /* just in case */ 349 if (catch_p) { 350 error = cv_wait_sig(&pipe->pipe_lkcv, pipe->pipe_lock); 351 if (error != 0) { 352 KASSERT(pipe->pipe_waiters > 0); 353 pipe->pipe_waiters--; 354 return error; 355 } 356 } else 357 cv_wait(&pipe->pipe_lkcv, pipe->pipe_lock); 358 KASSERT(pipe->pipe_waiters > 0); 359 pipe->pipe_waiters--; 360 } 361 362 pipe->pipe_state |= PIPE_LOCKFL; 363 364 return 0; 365 } 366 367 /* 368 * unlock a pipe I/O lock 369 */ 370 static inline void 371 pipeunlock(struct pipe *pipe) 372 { 373 374 KASSERT(pipe->pipe_state & PIPE_LOCKFL); 375 376 pipe->pipe_state &= ~PIPE_LOCKFL; 377 if (pipe->pipe_waiters > 0) { 378 cv_signal(&pipe->pipe_lkcv); 379 } 380 } 381 382 /* 383 * Select/poll wakup. This also sends SIGIO to peer connected to 384 * 'sigpipe' side of pipe. 385 */ 386 static void 387 pipeselwakeup(struct pipe *selp, struct pipe *sigp, int code) 388 { 389 int band; 390 391 switch (code) { 392 case POLL_IN: 393 band = POLLIN|POLLRDNORM; 394 break; 395 case POLL_OUT: 396 band = POLLOUT|POLLWRNORM; 397 break; 398 case POLL_HUP: 399 band = POLLHUP; 400 break; 401 case POLL_ERR: 402 band = POLLERR; 403 break; 404 default: 405 band = 0; 406 #ifdef DIAGNOSTIC 407 printf("bad siginfo code %d in pipe notification.\n", code); 408 #endif 409 break; 410 } 411 412 selnotify(&selp->pipe_sel, band, NOTE_SUBMIT); 413 414 if (sigp == NULL || (sigp->pipe_state & PIPE_ASYNC) == 0) 415 return; 416 417 fownsignal(sigp->pipe_pgid, SIGIO, code, band, selp); 418 } 419 420 static int 421 pipe_read(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 422 int flags) 423 { 424 struct pipe *rpipe = fp->f_pipe; 425 struct pipebuf *bp = &rpipe->pipe_buffer; 426 kmutex_t *lock = rpipe->pipe_lock; 427 int error; 428 size_t nread = 0; 429 size_t size; 430 size_t ocnt; 431 unsigned int wakeup_state = 0; 432 433 mutex_enter(lock); 434 ++rpipe->pipe_busy; 435 ocnt = bp->cnt; 436 437 again: 438 error = pipelock(rpipe, true); 439 if (error) 440 goto unlocked_error; 441 442 while (uio->uio_resid) { 443 /* 444 * Normal pipe buffer receive. 445 */ 446 if (bp->cnt > 0) { 447 size = bp->size - bp->out; 448 if (size > bp->cnt) 449 size = bp->cnt; 450 if (size > uio->uio_resid) 451 size = uio->uio_resid; 452 453 mutex_exit(lock); 454 error = uiomove((char *)bp->buffer + bp->out, size, uio); 455 mutex_enter(lock); 456 if (error) 457 break; 458 459 bp->out += size; 460 if (bp->out >= bp->size) 461 bp->out = 0; 462 463 bp->cnt -= size; 464 465 /* 466 * If there is no more to read in the pipe, reset 467 * its pointers to the beginning. This improves 468 * cache hit stats. 469 */ 470 if (bp->cnt == 0) { 471 bp->in = 0; 472 bp->out = 0; 473 } 474 nread += size; 475 continue; 476 } 477 478 /* 479 * Break if some data was read. 480 */ 481 if (nread > 0) 482 break; 483 484 /* 485 * Detect EOF condition. 486 * Read returns 0 on EOF, no need to set error. 487 */ 488 if (rpipe->pipe_state & PIPE_EOF) 489 break; 490 491 /* 492 * Don't block on non-blocking I/O. 493 */ 494 if (fp->f_flag & FNONBLOCK) { 495 error = EAGAIN; 496 break; 497 } 498 499 /* 500 * Unlock the pipe buffer for our remaining processing. 501 * We will either break out with an error or we will 502 * sleep and relock to loop. 503 */ 504 pipeunlock(rpipe); 505 506 #if 1 /* XXX (dsl) I'm sure these aren't needed here ... */ 507 /* 508 * We want to read more, wake up select/poll. 509 */ 510 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 511 512 /* 513 * If the "write-side" is blocked, wake it up now. 514 */ 515 cv_broadcast(&rpipe->pipe_wcv); 516 #endif 517 518 if (wakeup_state & PIPE_RESTART) { 519 error = ERESTART; 520 goto unlocked_error; 521 } 522 523 /* Now wait until the pipe is filled */ 524 error = cv_wait_sig(&rpipe->pipe_rcv, lock); 525 if (error != 0) 526 goto unlocked_error; 527 wakeup_state = rpipe->pipe_state; 528 goto again; 529 } 530 531 if (error == 0) 532 getnanotime(&rpipe->pipe_atime); 533 pipeunlock(rpipe); 534 535 unlocked_error: 536 --rpipe->pipe_busy; 537 if (rpipe->pipe_busy == 0) { 538 rpipe->pipe_state &= ~PIPE_RESTART; 539 cv_broadcast(&rpipe->pipe_draincv); 540 } 541 if (bp->cnt < MINPIPESIZE) { 542 cv_broadcast(&rpipe->pipe_wcv); 543 } 544 545 /* 546 * If anything was read off the buffer, signal to the writer it's 547 * possible to write more data. Also send signal if we are here for the 548 * first time after last write. 549 */ 550 if ((bp->size - bp->cnt) >= PIPE_BUF 551 && (ocnt != bp->cnt || (rpipe->pipe_state & PIPE_SIGNALR))) { 552 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 553 rpipe->pipe_state &= ~PIPE_SIGNALR; 554 } 555 556 mutex_exit(lock); 557 return (error); 558 } 559 560 static int 561 pipe_write(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 562 int flags) 563 { 564 struct pipe *wpipe, *rpipe; 565 struct pipebuf *bp; 566 kmutex_t *lock; 567 int error; 568 unsigned int wakeup_state = 0; 569 570 /* We want to write to our peer */ 571 rpipe = fp->f_pipe; 572 lock = rpipe->pipe_lock; 573 error = 0; 574 575 mutex_enter(lock); 576 wpipe = rpipe->pipe_peer; 577 578 /* 579 * Detect loss of pipe read side, issue SIGPIPE if lost. 580 */ 581 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) != 0) { 582 mutex_exit(lock); 583 return EPIPE; 584 } 585 ++wpipe->pipe_busy; 586 587 /* Aquire the long-term pipe lock */ 588 if ((error = pipelock(wpipe, true)) != 0) { 589 --wpipe->pipe_busy; 590 if (wpipe->pipe_busy == 0) { 591 wpipe->pipe_state &= ~PIPE_RESTART; 592 cv_broadcast(&wpipe->pipe_draincv); 593 } 594 mutex_exit(lock); 595 return (error); 596 } 597 598 bp = &wpipe->pipe_buffer; 599 600 /* 601 * If it is advantageous to resize the pipe buffer, do so. 602 */ 603 if ((uio->uio_resid > PIPE_SIZE) && 604 (nbigpipe < maxbigpipes) && 605 (bp->size <= PIPE_SIZE) && (bp->cnt == 0)) { 606 607 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 608 atomic_inc_uint(&nbigpipe); 609 } 610 611 while (uio->uio_resid) { 612 size_t space; 613 614 space = bp->size - bp->cnt; 615 616 /* Writes of size <= PIPE_BUF must be atomic. */ 617 if ((space < uio->uio_resid) && (uio->uio_resid <= PIPE_BUF)) 618 space = 0; 619 620 if (space > 0) { 621 int size; /* Transfer size */ 622 int segsize; /* first segment to transfer */ 623 624 /* 625 * Transfer size is minimum of uio transfer 626 * and free space in pipe buffer. 627 */ 628 if (space > uio->uio_resid) 629 size = uio->uio_resid; 630 else 631 size = space; 632 /* 633 * First segment to transfer is minimum of 634 * transfer size and contiguous space in 635 * pipe buffer. If first segment to transfer 636 * is less than the transfer size, we've got 637 * a wraparound in the buffer. 638 */ 639 segsize = bp->size - bp->in; 640 if (segsize > size) 641 segsize = size; 642 643 /* Transfer first segment */ 644 mutex_exit(lock); 645 error = uiomove((char *)bp->buffer + bp->in, segsize, 646 uio); 647 648 if (error == 0 && segsize < size) { 649 /* 650 * Transfer remaining part now, to 651 * support atomic writes. Wraparound 652 * happened. 653 */ 654 KASSERT(bp->in + segsize == bp->size); 655 error = uiomove(bp->buffer, 656 size - segsize, uio); 657 } 658 mutex_enter(lock); 659 if (error) 660 break; 661 662 bp->in += size; 663 if (bp->in >= bp->size) { 664 KASSERT(bp->in == size - segsize + bp->size); 665 bp->in = size - segsize; 666 } 667 668 bp->cnt += size; 669 KASSERT(bp->cnt <= bp->size); 670 wakeup_state = 0; 671 } else { 672 /* 673 * If the "read-side" has been blocked, wake it up now. 674 */ 675 cv_broadcast(&wpipe->pipe_rcv); 676 677 /* 678 * Don't block on non-blocking I/O. 679 */ 680 if (fp->f_flag & FNONBLOCK) { 681 error = EAGAIN; 682 break; 683 } 684 685 /* 686 * We have no more space and have something to offer, 687 * wake up select/poll. 688 */ 689 if (bp->cnt) 690 pipeselwakeup(wpipe, wpipe, POLL_IN); 691 692 if (wakeup_state & PIPE_RESTART) { 693 error = ERESTART; 694 break; 695 } 696 697 pipeunlock(wpipe); 698 error = cv_wait_sig(&wpipe->pipe_wcv, lock); 699 (void)pipelock(wpipe, false); 700 if (error != 0) 701 break; 702 /* 703 * If read side wants to go away, we just issue a signal 704 * to ourselves. 705 */ 706 if (wpipe->pipe_state & PIPE_EOF) { 707 error = EPIPE; 708 break; 709 } 710 wakeup_state = wpipe->pipe_state; 711 } 712 } 713 714 --wpipe->pipe_busy; 715 if (wpipe->pipe_busy == 0) { 716 wpipe->pipe_state &= ~PIPE_RESTART; 717 cv_broadcast(&wpipe->pipe_draincv); 718 } 719 if (bp->cnt > 0) { 720 cv_broadcast(&wpipe->pipe_rcv); 721 } 722 723 /* 724 * Don't return EPIPE if I/O was successful 725 */ 726 if (error == EPIPE && bp->cnt == 0 && uio->uio_resid == 0) 727 error = 0; 728 729 if (error == 0) 730 getnanotime(&wpipe->pipe_mtime); 731 732 /* 733 * We have something to offer, wake up select/poll. 734 * wmap->cnt is always 0 in this point (direct write 735 * is only done synchronously), so check only wpipe->pipe_buffer.cnt 736 */ 737 if (bp->cnt) 738 pipeselwakeup(wpipe, wpipe, POLL_IN); 739 740 /* 741 * Arrange for next read(2) to do a signal. 742 */ 743 wpipe->pipe_state |= PIPE_SIGNALR; 744 745 pipeunlock(wpipe); 746 mutex_exit(lock); 747 return (error); 748 } 749 750 /* 751 * We implement a very minimal set of ioctls for compatibility with sockets. 752 */ 753 int 754 pipe_ioctl(file_t *fp, u_long cmd, void *data) 755 { 756 struct pipe *pipe = fp->f_pipe; 757 kmutex_t *lock = pipe->pipe_lock; 758 759 switch (cmd) { 760 761 case FIONBIO: 762 return (0); 763 764 case FIOASYNC: 765 mutex_enter(lock); 766 if (*(int *)data) { 767 pipe->pipe_state |= PIPE_ASYNC; 768 } else { 769 pipe->pipe_state &= ~PIPE_ASYNC; 770 } 771 mutex_exit(lock); 772 return (0); 773 774 case FIONREAD: 775 mutex_enter(lock); 776 *(int *)data = pipe->pipe_buffer.cnt; 777 mutex_exit(lock); 778 return (0); 779 780 case FIONWRITE: 781 /* Look at other side */ 782 mutex_enter(lock); 783 pipe = pipe->pipe_peer; 784 if (pipe == NULL) 785 *(int *)data = 0; 786 else 787 *(int *)data = pipe->pipe_buffer.cnt; 788 mutex_exit(lock); 789 return (0); 790 791 case FIONSPACE: 792 /* Look at other side */ 793 mutex_enter(lock); 794 pipe = pipe->pipe_peer; 795 if (pipe == NULL) 796 *(int *)data = 0; 797 else 798 *(int *)data = pipe->pipe_buffer.size - 799 pipe->pipe_buffer.cnt; 800 mutex_exit(lock); 801 return (0); 802 803 case TIOCSPGRP: 804 case FIOSETOWN: 805 return fsetown(&pipe->pipe_pgid, cmd, data); 806 807 case TIOCGPGRP: 808 case FIOGETOWN: 809 return fgetown(pipe->pipe_pgid, cmd, data); 810 811 } 812 return (EPASSTHROUGH); 813 } 814 815 int 816 pipe_poll(file_t *fp, int events) 817 { 818 struct pipe *rpipe = fp->f_pipe; 819 struct pipe *wpipe; 820 int eof = 0; 821 int revents = 0; 822 823 mutex_enter(rpipe->pipe_lock); 824 wpipe = rpipe->pipe_peer; 825 826 if (events & (POLLIN | POLLRDNORM)) 827 if ((rpipe->pipe_buffer.cnt > 0) || 828 (rpipe->pipe_state & PIPE_EOF)) 829 revents |= events & (POLLIN | POLLRDNORM); 830 831 eof |= (rpipe->pipe_state & PIPE_EOF); 832 833 if (wpipe == NULL) 834 revents |= events & (POLLOUT | POLLWRNORM); 835 else { 836 if (events & (POLLOUT | POLLWRNORM)) 837 if ((wpipe->pipe_state & PIPE_EOF) || ( 838 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) 839 revents |= events & (POLLOUT | POLLWRNORM); 840 841 eof |= (wpipe->pipe_state & PIPE_EOF); 842 } 843 844 if (wpipe == NULL || eof) 845 revents |= POLLHUP; 846 847 if (revents == 0) { 848 if (events & (POLLIN | POLLRDNORM)) 849 selrecord(curlwp, &rpipe->pipe_sel); 850 851 if (events & (POLLOUT | POLLWRNORM)) 852 selrecord(curlwp, &wpipe->pipe_sel); 853 } 854 mutex_exit(rpipe->pipe_lock); 855 856 return (revents); 857 } 858 859 static int 860 pipe_stat(file_t *fp, struct stat *ub) 861 { 862 struct pipe *pipe = fp->f_pipe; 863 864 mutex_enter(pipe->pipe_lock); 865 memset(ub, 0, sizeof(*ub)); 866 ub->st_mode = S_IFIFO | S_IRUSR | S_IWUSR; 867 ub->st_blksize = pipe->pipe_buffer.size; 868 if (ub->st_blksize == 0 && pipe->pipe_peer) 869 ub->st_blksize = pipe->pipe_peer->pipe_buffer.size; 870 ub->st_size = pipe->pipe_buffer.cnt; 871 ub->st_blocks = (ub->st_size) ? 1 : 0; 872 ub->st_atimespec = pipe->pipe_atime; 873 ub->st_mtimespec = pipe->pipe_mtime; 874 ub->st_ctimespec = ub->st_birthtimespec = pipe->pipe_btime; 875 ub->st_uid = kauth_cred_geteuid(fp->f_cred); 876 ub->st_gid = kauth_cred_getegid(fp->f_cred); 877 878 /* 879 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 880 * XXX (st_dev, st_ino) should be unique. 881 */ 882 mutex_exit(pipe->pipe_lock); 883 return 0; 884 } 885 886 static int 887 pipe_close(file_t *fp) 888 { 889 struct pipe *pipe = fp->f_pipe; 890 891 fp->f_pipe = NULL; 892 pipeclose(pipe); 893 return (0); 894 } 895 896 static void 897 pipe_restart(file_t *fp) 898 { 899 struct pipe *pipe = fp->f_pipe; 900 901 /* 902 * Unblock blocked reads/writes in order to allow close() to complete. 903 * System calls return ERESTART so that the fd is revalidated. 904 * (Partial writes return the transfer length.) 905 */ 906 mutex_enter(pipe->pipe_lock); 907 pipe->pipe_state |= PIPE_RESTART; 908 /* Wakeup both cvs, maybe we only need one, but maybe there are some 909 * other paths where wakeup is needed, and it saves deciding which! */ 910 cv_broadcast(&pipe->pipe_rcv); 911 cv_broadcast(&pipe->pipe_wcv); 912 mutex_exit(pipe->pipe_lock); 913 } 914 915 static void 916 pipe_free_kmem(struct pipe *pipe) 917 { 918 919 if (pipe->pipe_buffer.buffer != NULL) { 920 if (pipe->pipe_buffer.size > PIPE_SIZE) { 921 atomic_dec_uint(&nbigpipe); 922 } 923 if (pipe->pipe_buffer.buffer != (void *)pipe->pipe_kmem) { 924 uvm_km_free(kernel_map, 925 (vaddr_t)pipe->pipe_buffer.buffer, 926 pipe->pipe_buffer.size, UVM_KMF_PAGEABLE); 927 atomic_add_int(&amountpipekva, 928 -pipe->pipe_buffer.size); 929 } 930 pipe->pipe_buffer.buffer = NULL; 931 } 932 } 933 934 /* 935 * Shutdown the pipe. 936 */ 937 static void 938 pipeclose(struct pipe *pipe) 939 { 940 kmutex_t *lock; 941 struct pipe *ppipe; 942 943 if (pipe == NULL) 944 return; 945 946 KASSERT(cv_is_valid(&pipe->pipe_rcv)); 947 KASSERT(cv_is_valid(&pipe->pipe_wcv)); 948 KASSERT(cv_is_valid(&pipe->pipe_draincv)); 949 KASSERT(cv_is_valid(&pipe->pipe_lkcv)); 950 951 lock = pipe->pipe_lock; 952 if (lock == NULL) 953 /* Must have failed during create */ 954 goto free_resources; 955 956 mutex_enter(lock); 957 pipeselwakeup(pipe, pipe, POLL_HUP); 958 959 /* 960 * If the other side is blocked, wake it up saying that 961 * we want to close it down. 962 */ 963 pipe->pipe_state |= PIPE_EOF; 964 if (pipe->pipe_busy) { 965 while (pipe->pipe_busy) { 966 cv_broadcast(&pipe->pipe_wcv); 967 cv_wait_sig(&pipe->pipe_draincv, lock); 968 } 969 } 970 971 /* 972 * Disconnect from peer. 973 */ 974 if ((ppipe = pipe->pipe_peer) != NULL) { 975 pipeselwakeup(ppipe, ppipe, POLL_HUP); 976 ppipe->pipe_state |= PIPE_EOF; 977 cv_broadcast(&ppipe->pipe_rcv); 978 ppipe->pipe_peer = NULL; 979 } 980 981 /* 982 * Any knote objects still left in the list are 983 * the one attached by peer. Since no one will 984 * traverse this list, we just clear it. 985 * 986 * XXX Exposes select/kqueue internals. 987 */ 988 SLIST_INIT(&pipe->pipe_sel.sel_klist); 989 990 KASSERT((pipe->pipe_state & PIPE_LOCKFL) == 0); 991 mutex_exit(lock); 992 mutex_obj_free(lock); 993 994 /* 995 * Free resources. 996 */ 997 free_resources: 998 pipe->pipe_pgid = 0; 999 pipe->pipe_state = PIPE_SIGNALR; 1000 pipe->pipe_peer = NULL; 1001 pipe->pipe_lock = NULL; 1002 pipe_free_kmem(pipe); 1003 if (pipe->pipe_kmem != 0) { 1004 pool_cache_put(pipe_rd_cache, pipe); 1005 } else { 1006 pool_cache_put(pipe_wr_cache, pipe); 1007 } 1008 } 1009 1010 static void 1011 filt_pipedetach(struct knote *kn) 1012 { 1013 struct pipe *pipe; 1014 kmutex_t *lock; 1015 1016 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1017 lock = pipe->pipe_lock; 1018 1019 mutex_enter(lock); 1020 1021 switch(kn->kn_filter) { 1022 case EVFILT_WRITE: 1023 /* Need the peer structure, not our own. */ 1024 pipe = pipe->pipe_peer; 1025 1026 /* If reader end already closed, just return. */ 1027 if (pipe == NULL) { 1028 mutex_exit(lock); 1029 return; 1030 } 1031 1032 break; 1033 default: 1034 /* Nothing to do. */ 1035 break; 1036 } 1037 1038 KASSERT(kn->kn_hook == pipe); 1039 selremove_knote(&pipe->pipe_sel, kn); 1040 mutex_exit(lock); 1041 } 1042 1043 static int 1044 filt_piperead(struct knote *kn, long hint) 1045 { 1046 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1047 struct pipe *wpipe; 1048 1049 if ((hint & NOTE_SUBMIT) == 0) { 1050 mutex_enter(rpipe->pipe_lock); 1051 } 1052 wpipe = rpipe->pipe_peer; 1053 kn->kn_data = rpipe->pipe_buffer.cnt; 1054 1055 if ((rpipe->pipe_state & PIPE_EOF) || 1056 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1057 kn->kn_flags |= EV_EOF; 1058 if ((hint & NOTE_SUBMIT) == 0) { 1059 mutex_exit(rpipe->pipe_lock); 1060 } 1061 return (1); 1062 } 1063 1064 if ((hint & NOTE_SUBMIT) == 0) { 1065 mutex_exit(rpipe->pipe_lock); 1066 } 1067 return (kn->kn_data > 0); 1068 } 1069 1070 static int 1071 filt_pipewrite(struct knote *kn, long hint) 1072 { 1073 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1074 struct pipe *wpipe; 1075 1076 if ((hint & NOTE_SUBMIT) == 0) { 1077 mutex_enter(rpipe->pipe_lock); 1078 } 1079 wpipe = rpipe->pipe_peer; 1080 1081 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1082 kn->kn_data = 0; 1083 kn->kn_flags |= EV_EOF; 1084 if ((hint & NOTE_SUBMIT) == 0) { 1085 mutex_exit(rpipe->pipe_lock); 1086 } 1087 return (1); 1088 } 1089 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 1090 1091 if ((hint & NOTE_SUBMIT) == 0) { 1092 mutex_exit(rpipe->pipe_lock); 1093 } 1094 return (kn->kn_data >= PIPE_BUF); 1095 } 1096 1097 static const struct filterops pipe_rfiltops = { 1098 .f_isfd = 1, 1099 .f_attach = NULL, 1100 .f_detach = filt_pipedetach, 1101 .f_event = filt_piperead, 1102 }; 1103 1104 static const struct filterops pipe_wfiltops = { 1105 .f_isfd = 1, 1106 .f_attach = NULL, 1107 .f_detach = filt_pipedetach, 1108 .f_event = filt_pipewrite, 1109 }; 1110 1111 static int 1112 pipe_kqfilter(file_t *fp, struct knote *kn) 1113 { 1114 struct pipe *pipe; 1115 kmutex_t *lock; 1116 1117 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1118 lock = pipe->pipe_lock; 1119 1120 mutex_enter(lock); 1121 1122 switch (kn->kn_filter) { 1123 case EVFILT_READ: 1124 kn->kn_fop = &pipe_rfiltops; 1125 break; 1126 case EVFILT_WRITE: 1127 kn->kn_fop = &pipe_wfiltops; 1128 pipe = pipe->pipe_peer; 1129 if (pipe == NULL) { 1130 /* Other end of pipe has been closed. */ 1131 mutex_exit(lock); 1132 return (EBADF); 1133 } 1134 break; 1135 default: 1136 mutex_exit(lock); 1137 return (EINVAL); 1138 } 1139 1140 kn->kn_hook = pipe; 1141 selrecord_knote(&pipe->pipe_sel, kn); 1142 mutex_exit(lock); 1143 1144 return (0); 1145 } 1146 1147 /* 1148 * Handle pipe sysctls. 1149 */ 1150 SYSCTL_SETUP(sysctl_kern_pipe_setup, "sysctl kern.pipe subtree setup") 1151 { 1152 1153 sysctl_createv(clog, 0, NULL, NULL, 1154 CTLFLAG_PERMANENT, 1155 CTLTYPE_NODE, "pipe", 1156 SYSCTL_DESCR("Pipe settings"), 1157 NULL, 0, NULL, 0, 1158 CTL_KERN, KERN_PIPE, CTL_EOL); 1159 1160 sysctl_createv(clog, 0, NULL, NULL, 1161 CTLFLAG_PERMANENT|CTLFLAG_READWRITE, 1162 CTLTYPE_INT, "maxbigpipes", 1163 SYSCTL_DESCR("Maximum number of \"big\" pipes"), 1164 NULL, 0, &maxbigpipes, 0, 1165 CTL_KERN, KERN_PIPE, KERN_PIPE_MAXBIGPIPES, CTL_EOL); 1166 sysctl_createv(clog, 0, NULL, NULL, 1167 CTLFLAG_PERMANENT, 1168 CTLTYPE_INT, "nbigpipes", 1169 SYSCTL_DESCR("Number of \"big\" pipes"), 1170 NULL, 0, &nbigpipe, 0, 1171 CTL_KERN, KERN_PIPE, KERN_PIPE_NBIGPIPES, CTL_EOL); 1172 sysctl_createv(clog, 0, NULL, NULL, 1173 CTLFLAG_PERMANENT, 1174 CTLTYPE_INT, "kvasize", 1175 SYSCTL_DESCR("Amount of kernel memory consumed by pipe " 1176 "buffers"), 1177 NULL, 0, &amountpipekva, 0, 1178 CTL_KERN, KERN_PIPE, KERN_PIPE_KVASIZE, CTL_EOL); 1179 } 1180