1 /* $OpenBSD: sys_pipe.c,v 1.75 2016/10/08 02:16:43 guenther Exp $ */ 2 3 /* 4 * Copyright (c) 1996 John S. Dyson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice immediately at the beginning of the file, without modification, 12 * this list of conditions, and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Absolutely no warranty of function or purpose is made by the author 17 * John S. Dyson. 18 * 4. Modifications may be freely made to this file if the above conditions 19 * are met. 20 */ 21 22 /* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/proc.h> 32 #include <sys/file.h> 33 #include <sys/filedesc.h> 34 #include <sys/pool.h> 35 #include <sys/ioctl.h> 36 #include <sys/stat.h> 37 #include <sys/signalvar.h> 38 #include <sys/mount.h> 39 #include <sys/syscallargs.h> 40 #include <sys/event.h> 41 #include <sys/lock.h> 42 #include <sys/poll.h> 43 #ifdef KTRACE 44 #include <sys/ktrace.h> 45 #endif 46 47 #include <uvm/uvm_extern.h> 48 49 #include <sys/pipe.h> 50 51 /* 52 * interfaces to the outside world 53 */ 54 int pipe_read(struct file *, off_t *, struct uio *, struct ucred *); 55 int pipe_write(struct file *, off_t *, struct uio *, struct ucred *); 56 int pipe_close(struct file *, struct proc *); 57 int pipe_poll(struct file *, int events, struct proc *); 58 int pipe_kqfilter(struct file *fp, struct knote *kn); 59 int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *); 60 int pipe_stat(struct file *fp, struct stat *ub, struct proc *p); 61 62 static struct fileops pipeops = { 63 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 64 pipe_stat, pipe_close 65 }; 66 67 void filt_pipedetach(struct knote *kn); 68 int filt_piperead(struct knote *kn, long hint); 69 int filt_pipewrite(struct knote *kn, long hint); 70 71 struct filterops pipe_rfiltops = 72 { 1, NULL, filt_pipedetach, filt_piperead }; 73 struct filterops pipe_wfiltops = 74 { 1, NULL, filt_pipedetach, filt_pipewrite }; 75 76 /* 77 * Default pipe buffer size(s), this can be kind-of large now because pipe 78 * space is pageable. The pipe code will try to maintain locality of 79 * reference for performance reasons, so small amounts of outstanding I/O 80 * will not wipe the cache. 81 */ 82 #define MINPIPESIZE (PIPE_SIZE/3) 83 84 /* 85 * Limit the number of "big" pipes 86 */ 87 #define LIMITBIGPIPES 32 88 int nbigpipe; 89 static int amountpipekva; 90 91 struct pool pipe_pool; 92 93 int dopipe(struct proc *, int *, int); 94 void pipeclose(struct pipe *); 95 void pipe_free_kmem(struct pipe *); 96 int pipe_create(struct pipe *); 97 int pipelock(struct pipe *); 98 void pipeunlock(struct pipe *); 99 void pipeselwakeup(struct pipe *); 100 int pipespace(struct pipe *, u_int); 101 102 /* 103 * The pipe system call for the DTYPE_PIPE type of pipes 104 */ 105 106 int 107 sys_pipe(struct proc *p, void *v, register_t *retval) 108 { 109 struct sys_pipe_args /* { 110 syscallarg(int *) fdp; 111 } */ *uap = v; 112 113 return (dopipe(p, SCARG(uap, fdp), 0)); 114 } 115 116 int 117 sys_pipe2(struct proc *p, void *v, register_t *retval) 118 { 119 struct sys_pipe2_args /* { 120 syscallarg(int *) fdp; 121 syscallarg(int) flags; 122 } */ *uap = v; 123 124 if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK)) 125 return (EINVAL); 126 127 return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags))); 128 } 129 130 int 131 dopipe(struct proc *p, int *ufds, int flags) 132 { 133 struct filedesc *fdp = p->p_fd; 134 struct file *rf, *wf; 135 struct pipe *rpipe, *wpipe = NULL; 136 int fds[2], error; 137 138 rpipe = pool_get(&pipe_pool, PR_WAITOK); 139 error = pipe_create(rpipe); 140 if (error != 0) 141 goto free1; 142 wpipe = pool_get(&pipe_pool, PR_WAITOK); 143 error = pipe_create(wpipe); 144 if (error != 0) 145 goto free1; 146 147 fdplock(fdp); 148 149 error = falloc(p, &rf, &fds[0]); 150 if (error != 0) 151 goto free2; 152 rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); 153 rf->f_type = DTYPE_PIPE; 154 rf->f_data = rpipe; 155 rf->f_ops = &pipeops; 156 157 error = falloc(p, &wf, &fds[1]); 158 if (error != 0) 159 goto free3; 160 wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); 161 wf->f_type = DTYPE_PIPE; 162 wf->f_data = wpipe; 163 wf->f_ops = &pipeops; 164 165 if (flags & O_CLOEXEC) { 166 fdp->fd_ofileflags[fds[0]] |= UF_EXCLOSE; 167 fdp->fd_ofileflags[fds[1]] |= UF_EXCLOSE; 168 } 169 170 rpipe->pipe_peer = wpipe; 171 wpipe->pipe_peer = rpipe; 172 173 FILE_SET_MATURE(rf, p); 174 FILE_SET_MATURE(wf, p); 175 176 error = copyout(fds, ufds, sizeof(fds)); 177 if (error != 0) { 178 fdrelease(p, fds[0]); 179 fdrelease(p, fds[1]); 180 } 181 #ifdef KTRACE 182 else if (KTRPOINT(p, KTR_STRUCT)) 183 ktrfds(p, fds, 2); 184 #endif 185 fdpunlock(fdp); 186 return (error); 187 188 free3: 189 fdremove(fdp, fds[0]); 190 closef(rf, p); 191 rpipe = NULL; 192 free2: 193 fdpunlock(fdp); 194 free1: 195 pipeclose(wpipe); 196 pipeclose(rpipe); 197 return (error); 198 } 199 200 /* 201 * Allocate kva for pipe circular buffer, the space is pageable. 202 * This routine will 'realloc' the size of a pipe safely, if it fails 203 * it will retain the old buffer. 204 * If it fails it will return ENOMEM. 205 */ 206 int 207 pipespace(struct pipe *cpipe, u_int size) 208 { 209 caddr_t buffer; 210 211 buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok); 212 if (buffer == NULL) { 213 return (ENOMEM); 214 } 215 216 /* free old resources if we are resizing */ 217 pipe_free_kmem(cpipe); 218 cpipe->pipe_buffer.buffer = buffer; 219 cpipe->pipe_buffer.size = size; 220 cpipe->pipe_buffer.in = 0; 221 cpipe->pipe_buffer.out = 0; 222 cpipe->pipe_buffer.cnt = 0; 223 224 amountpipekva += cpipe->pipe_buffer.size; 225 226 return (0); 227 } 228 229 /* 230 * initialize and allocate VM and memory for pipe 231 */ 232 int 233 pipe_create(struct pipe *cpipe) 234 { 235 int error; 236 237 /* so pipe_free_kmem() doesn't follow junk pointer */ 238 cpipe->pipe_buffer.buffer = NULL; 239 /* 240 * protect so pipeclose() doesn't follow a junk pointer 241 * if pipespace() fails. 242 */ 243 memset(&cpipe->pipe_sel, 0, sizeof(cpipe->pipe_sel)); 244 cpipe->pipe_state = 0; 245 cpipe->pipe_peer = NULL; 246 cpipe->pipe_busy = 0; 247 248 error = pipespace(cpipe, PIPE_SIZE); 249 if (error != 0) 250 return (error); 251 252 getnanotime(&cpipe->pipe_ctime); 253 cpipe->pipe_atime = cpipe->pipe_ctime; 254 cpipe->pipe_mtime = cpipe->pipe_ctime; 255 cpipe->pipe_pgid = NO_PID; 256 257 return (0); 258 } 259 260 261 /* 262 * lock a pipe for I/O, blocking other access 263 */ 264 int 265 pipelock(struct pipe *cpipe) 266 { 267 int error; 268 while (cpipe->pipe_state & PIPE_LOCK) { 269 cpipe->pipe_state |= PIPE_LWANT; 270 if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0))) 271 return error; 272 } 273 cpipe->pipe_state |= PIPE_LOCK; 274 return 0; 275 } 276 277 /* 278 * unlock a pipe I/O lock 279 */ 280 void 281 pipeunlock(struct pipe *cpipe) 282 { 283 cpipe->pipe_state &= ~PIPE_LOCK; 284 if (cpipe->pipe_state & PIPE_LWANT) { 285 cpipe->pipe_state &= ~PIPE_LWANT; 286 wakeup(cpipe); 287 } 288 } 289 290 void 291 pipeselwakeup(struct pipe *cpipe) 292 { 293 if (cpipe->pipe_state & PIPE_SEL) { 294 cpipe->pipe_state &= ~PIPE_SEL; 295 selwakeup(&cpipe->pipe_sel); 296 } else 297 KNOTE(&cpipe->pipe_sel.si_note, 0); 298 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID) 299 gsignal(cpipe->pipe_pgid, SIGIO); 300 } 301 302 int 303 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 304 { 305 struct pipe *rpipe = fp->f_data; 306 int error; 307 size_t size, nread = 0; 308 309 error = pipelock(rpipe); 310 if (error) 311 return (error); 312 313 ++rpipe->pipe_busy; 314 315 while (uio->uio_resid) { 316 /* 317 * normal pipe buffer receive 318 */ 319 if (rpipe->pipe_buffer.cnt > 0) { 320 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 321 if (size > rpipe->pipe_buffer.cnt) 322 size = rpipe->pipe_buffer.cnt; 323 if (size > uio->uio_resid) 324 size = uio->uio_resid; 325 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 326 size, uio); 327 if (error) { 328 break; 329 } 330 rpipe->pipe_buffer.out += size; 331 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 332 rpipe->pipe_buffer.out = 0; 333 334 rpipe->pipe_buffer.cnt -= size; 335 /* 336 * If there is no more to read in the pipe, reset 337 * its pointers to the beginning. This improves 338 * cache hit stats. 339 */ 340 if (rpipe->pipe_buffer.cnt == 0) { 341 rpipe->pipe_buffer.in = 0; 342 rpipe->pipe_buffer.out = 0; 343 } 344 nread += size; 345 } else { 346 /* 347 * detect EOF condition 348 * read returns 0 on EOF, no need to set error 349 */ 350 if (rpipe->pipe_state & PIPE_EOF) 351 break; 352 353 /* 354 * If the "write-side" has been blocked, wake it up now. 355 */ 356 if (rpipe->pipe_state & PIPE_WANTW) { 357 rpipe->pipe_state &= ~PIPE_WANTW; 358 wakeup(rpipe); 359 } 360 361 /* 362 * Break if some data was read. 363 */ 364 if (nread > 0) 365 break; 366 367 /* 368 * Unlock the pipe buffer for our remaining processing. 369 * We will either break out with an error or we will 370 * sleep and relock to loop. 371 */ 372 pipeunlock(rpipe); 373 374 /* 375 * Handle non-blocking mode operation or 376 * wait for more data. 377 */ 378 if (fp->f_flag & FNONBLOCK) { 379 error = EAGAIN; 380 } else { 381 rpipe->pipe_state |= PIPE_WANTR; 382 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0) 383 error = pipelock(rpipe); 384 } 385 if (error) 386 goto unlocked_error; 387 } 388 } 389 pipeunlock(rpipe); 390 391 if (error == 0) 392 getnanotime(&rpipe->pipe_atime); 393 unlocked_error: 394 --rpipe->pipe_busy; 395 396 /* 397 * PIPE_WANT processing only makes sense if pipe_busy is 0. 398 */ 399 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 400 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 401 wakeup(rpipe); 402 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 403 /* 404 * Handle write blocking hysteresis. 405 */ 406 if (rpipe->pipe_state & PIPE_WANTW) { 407 rpipe->pipe_state &= ~PIPE_WANTW; 408 wakeup(rpipe); 409 } 410 } 411 412 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 413 pipeselwakeup(rpipe); 414 415 return (error); 416 } 417 418 int 419 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 420 { 421 int error = 0; 422 size_t orig_resid; 423 struct pipe *wpipe, *rpipe; 424 425 rpipe = fp->f_data; 426 wpipe = rpipe->pipe_peer; 427 428 /* 429 * detect loss of pipe read side, issue SIGPIPE if lost. 430 */ 431 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 432 return (EPIPE); 433 } 434 ++wpipe->pipe_busy; 435 436 /* 437 * If it is advantageous to resize the pipe buffer, do 438 * so. 439 */ 440 if ((uio->uio_resid > PIPE_SIZE) && 441 (nbigpipe < LIMITBIGPIPES) && 442 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 443 (wpipe->pipe_buffer.cnt == 0)) { 444 445 if ((error = pipelock(wpipe)) == 0) { 446 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 447 nbigpipe++; 448 pipeunlock(wpipe); 449 } 450 } 451 452 /* 453 * If an early error occurred unbusy and return, waking up any pending 454 * readers. 455 */ 456 if (error) { 457 --wpipe->pipe_busy; 458 if ((wpipe->pipe_busy == 0) && 459 (wpipe->pipe_state & PIPE_WANT)) { 460 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 461 wakeup(wpipe); 462 } 463 return (error); 464 } 465 466 orig_resid = uio->uio_resid; 467 468 while (uio->uio_resid) { 469 size_t space; 470 471 retrywrite: 472 if (wpipe->pipe_state & PIPE_EOF) { 473 error = EPIPE; 474 break; 475 } 476 477 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 478 479 /* Writes of size <= PIPE_BUF must be atomic. */ 480 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 481 space = 0; 482 483 if (space > 0) { 484 if ((error = pipelock(wpipe)) == 0) { 485 size_t size; /* Transfer size */ 486 size_t segsize; /* first segment to transfer */ 487 488 /* 489 * If a process blocked in uiomove, our 490 * value for space might be bad. 491 * 492 * XXX will we be ok if the reader has gone 493 * away here? 494 */ 495 if (space > wpipe->pipe_buffer.size - 496 wpipe->pipe_buffer.cnt) { 497 pipeunlock(wpipe); 498 goto retrywrite; 499 } 500 501 /* 502 * Transfer size is minimum of uio transfer 503 * and free space in pipe buffer. 504 */ 505 if (space > uio->uio_resid) 506 size = uio->uio_resid; 507 else 508 size = space; 509 /* 510 * First segment to transfer is minimum of 511 * transfer size and contiguous space in 512 * pipe buffer. If first segment to transfer 513 * is less than the transfer size, we've got 514 * a wraparound in the buffer. 515 */ 516 segsize = wpipe->pipe_buffer.size - 517 wpipe->pipe_buffer.in; 518 if (segsize > size) 519 segsize = size; 520 521 /* Transfer first segment */ 522 523 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 524 segsize, uio); 525 526 if (error == 0 && segsize < size) { 527 /* 528 * Transfer remaining part now, to 529 * support atomic writes. Wraparound 530 * happened. 531 */ 532 #ifdef DIAGNOSTIC 533 if (wpipe->pipe_buffer.in + segsize != 534 wpipe->pipe_buffer.size) 535 panic("Expected pipe buffer wraparound disappeared"); 536 #endif 537 538 error = uiomove(&wpipe->pipe_buffer.buffer[0], 539 size - segsize, uio); 540 } 541 if (error == 0) { 542 wpipe->pipe_buffer.in += size; 543 if (wpipe->pipe_buffer.in >= 544 wpipe->pipe_buffer.size) { 545 #ifdef DIAGNOSTIC 546 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 547 panic("Expected wraparound bad"); 548 #endif 549 wpipe->pipe_buffer.in = size - segsize; 550 } 551 552 wpipe->pipe_buffer.cnt += size; 553 #ifdef DIAGNOSTIC 554 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 555 panic("Pipe buffer overflow"); 556 #endif 557 } 558 pipeunlock(wpipe); 559 } 560 if (error) 561 break; 562 } else { 563 /* 564 * If the "read-side" has been blocked, wake it up now. 565 */ 566 if (wpipe->pipe_state & PIPE_WANTR) { 567 wpipe->pipe_state &= ~PIPE_WANTR; 568 wakeup(wpipe); 569 } 570 571 /* 572 * don't block on non-blocking I/O 573 */ 574 if (fp->f_flag & FNONBLOCK) { 575 error = EAGAIN; 576 break; 577 } 578 579 /* 580 * We have no more space and have something to offer, 581 * wake up select/poll. 582 */ 583 pipeselwakeup(wpipe); 584 585 wpipe->pipe_state |= PIPE_WANTW; 586 error = tsleep(wpipe, (PRIBIO + 1)|PCATCH, 587 "pipewr", 0); 588 if (error) 589 break; 590 /* 591 * If read side wants to go away, we just issue a 592 * signal to ourselves. 593 */ 594 if (wpipe->pipe_state & PIPE_EOF) { 595 error = EPIPE; 596 break; 597 } 598 } 599 } 600 601 --wpipe->pipe_busy; 602 603 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 604 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 605 wakeup(wpipe); 606 } else if (wpipe->pipe_buffer.cnt > 0) { 607 /* 608 * If we have put any characters in the buffer, we wake up 609 * the reader. 610 */ 611 if (wpipe->pipe_state & PIPE_WANTR) { 612 wpipe->pipe_state &= ~PIPE_WANTR; 613 wakeup(wpipe); 614 } 615 } 616 617 /* 618 * Don't return EPIPE if I/O was successful 619 */ 620 if ((wpipe->pipe_buffer.cnt == 0) && 621 (uio->uio_resid == 0) && 622 (error == EPIPE)) { 623 error = 0; 624 } 625 626 if (error == 0) 627 getnanotime(&wpipe->pipe_mtime); 628 /* 629 * We have something to offer, wake up select/poll. 630 */ 631 if (wpipe->pipe_buffer.cnt) 632 pipeselwakeup(wpipe); 633 634 return (error); 635 } 636 637 /* 638 * we implement a very minimal set of ioctls for compatibility with sockets. 639 */ 640 int 641 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p) 642 { 643 struct pipe *mpipe = fp->f_data; 644 645 switch (cmd) { 646 647 case FIONBIO: 648 return (0); 649 650 case FIOASYNC: 651 if (*(int *)data) { 652 mpipe->pipe_state |= PIPE_ASYNC; 653 } else { 654 mpipe->pipe_state &= ~PIPE_ASYNC; 655 } 656 return (0); 657 658 case FIONREAD: 659 *(int *)data = mpipe->pipe_buffer.cnt; 660 return (0); 661 662 case SIOCSPGRP: 663 mpipe->pipe_pgid = *(int *)data; 664 return (0); 665 666 case SIOCGPGRP: 667 *(int *)data = mpipe->pipe_pgid; 668 return (0); 669 670 } 671 return (ENOTTY); 672 } 673 674 int 675 pipe_poll(struct file *fp, int events, struct proc *p) 676 { 677 struct pipe *rpipe = fp->f_data; 678 struct pipe *wpipe; 679 int revents = 0; 680 681 wpipe = rpipe->pipe_peer; 682 if (events & (POLLIN | POLLRDNORM)) { 683 if ((rpipe->pipe_buffer.cnt > 0) || 684 (rpipe->pipe_state & PIPE_EOF)) 685 revents |= events & (POLLIN | POLLRDNORM); 686 } 687 688 /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */ 689 if ((rpipe->pipe_state & PIPE_EOF) || 690 (wpipe == NULL) || 691 (wpipe->pipe_state & PIPE_EOF)) 692 revents |= POLLHUP; 693 else if (events & (POLLOUT | POLLWRNORM)) { 694 if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF) 695 revents |= events & (POLLOUT | POLLWRNORM); 696 } 697 698 if (revents == 0) { 699 if (events & (POLLIN | POLLRDNORM)) { 700 selrecord(p, &rpipe->pipe_sel); 701 rpipe->pipe_state |= PIPE_SEL; 702 } 703 if (events & (POLLOUT | POLLWRNORM)) { 704 selrecord(p, &wpipe->pipe_sel); 705 wpipe->pipe_state |= PIPE_SEL; 706 } 707 } 708 return (revents); 709 } 710 711 int 712 pipe_stat(struct file *fp, struct stat *ub, struct proc *p) 713 { 714 struct pipe *pipe = fp->f_data; 715 716 memset(ub, 0, sizeof(*ub)); 717 ub->st_mode = S_IFIFO; 718 ub->st_blksize = pipe->pipe_buffer.size; 719 ub->st_size = pipe->pipe_buffer.cnt; 720 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 721 ub->st_atim.tv_sec = pipe->pipe_atime.tv_sec; 722 ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec; 723 ub->st_mtim.tv_sec = pipe->pipe_mtime.tv_sec; 724 ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec; 725 ub->st_ctim.tv_sec = pipe->pipe_ctime.tv_sec; 726 ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec; 727 ub->st_uid = fp->f_cred->cr_uid; 728 ub->st_gid = fp->f_cred->cr_gid; 729 /* 730 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 731 * XXX (st_dev, st_ino) should be unique. 732 */ 733 return (0); 734 } 735 736 int 737 pipe_close(struct file *fp, struct proc *p) 738 { 739 struct pipe *cpipe = fp->f_data; 740 741 fp->f_ops = NULL; 742 fp->f_data = NULL; 743 pipeclose(cpipe); 744 return (0); 745 } 746 747 void 748 pipe_free_kmem(struct pipe *cpipe) 749 { 750 if (cpipe->pipe_buffer.buffer != NULL) { 751 if (cpipe->pipe_buffer.size > PIPE_SIZE) 752 --nbigpipe; 753 amountpipekva -= cpipe->pipe_buffer.size; 754 km_free(cpipe->pipe_buffer.buffer, cpipe->pipe_buffer.size, 755 &kv_any, &kp_pageable); 756 cpipe->pipe_buffer.buffer = NULL; 757 } 758 } 759 760 /* 761 * shutdown the pipe 762 */ 763 void 764 pipeclose(struct pipe *cpipe) 765 { 766 struct pipe *ppipe; 767 if (cpipe) { 768 769 pipeselwakeup(cpipe); 770 771 /* 772 * If the other side is blocked, wake it up saying that 773 * we want to close it down. 774 */ 775 cpipe->pipe_state |= PIPE_EOF; 776 while (cpipe->pipe_busy) { 777 wakeup(cpipe); 778 cpipe->pipe_state |= PIPE_WANT; 779 tsleep(cpipe, PRIBIO, "pipecl", 0); 780 } 781 782 /* 783 * Disconnect from peer 784 */ 785 if ((ppipe = cpipe->pipe_peer) != NULL) { 786 pipeselwakeup(ppipe); 787 788 ppipe->pipe_state |= PIPE_EOF; 789 wakeup(ppipe); 790 ppipe->pipe_peer = NULL; 791 } 792 793 /* 794 * free resources 795 */ 796 pipe_free_kmem(cpipe); 797 pool_put(&pipe_pool, cpipe); 798 } 799 } 800 801 int 802 pipe_kqfilter(struct file *fp, struct knote *kn) 803 { 804 struct pipe *rpipe = kn->kn_fp->f_data; 805 struct pipe *wpipe = rpipe->pipe_peer; 806 807 switch (kn->kn_filter) { 808 case EVFILT_READ: 809 kn->kn_fop = &pipe_rfiltops; 810 SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext); 811 break; 812 case EVFILT_WRITE: 813 if (wpipe == NULL) { 814 /* other end of pipe has been closed */ 815 return (EPIPE); 816 } 817 kn->kn_fop = &pipe_wfiltops; 818 SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext); 819 break; 820 default: 821 return (EINVAL); 822 } 823 824 return (0); 825 } 826 827 void 828 filt_pipedetach(struct knote *kn) 829 { 830 struct pipe *rpipe = kn->kn_fp->f_data; 831 struct pipe *wpipe = rpipe->pipe_peer; 832 833 switch (kn->kn_filter) { 834 case EVFILT_READ: 835 SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext); 836 break; 837 case EVFILT_WRITE: 838 if (wpipe == NULL) 839 return; 840 SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext); 841 break; 842 } 843 } 844 845 int 846 filt_piperead(struct knote *kn, long hint) 847 { 848 struct pipe *rpipe = kn->kn_fp->f_data; 849 struct pipe *wpipe = rpipe->pipe_peer; 850 851 kn->kn_data = rpipe->pipe_buffer.cnt; 852 853 if ((rpipe->pipe_state & PIPE_EOF) || 854 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 855 kn->kn_flags |= EV_EOF; 856 return (1); 857 } 858 return (kn->kn_data > 0); 859 } 860 861 int 862 filt_pipewrite(struct knote *kn, long hint) 863 { 864 struct pipe *rpipe = kn->kn_fp->f_data; 865 struct pipe *wpipe = rpipe->pipe_peer; 866 867 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 868 kn->kn_data = 0; 869 kn->kn_flags |= EV_EOF; 870 return (1); 871 } 872 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 873 874 return (kn->kn_data >= PIPE_BUF); 875 } 876 877 void 878 pipe_init(void) 879 { 880 pool_init(&pipe_pool, sizeof(struct pipe), 0, IPL_NONE, PR_WAITOK, 881 "pipepl", NULL); 882 } 883 884