1 /* $OpenBSD: sys_pipe.c,v 1.78 2018/04/10 09:17:45 mpi Exp $ */ 2 3 /* 4 * Copyright (c) 1996 John S. Dyson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice immediately at the beginning of the file, without modification, 12 * this list of conditions, and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Absolutely no warranty of function or purpose is made by the author 17 * John S. Dyson. 18 * 4. Modifications may be freely made to this file if the above conditions 19 * are met. 20 */ 21 22 /* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/proc.h> 32 #include <sys/fcntl.h> 33 #include <sys/file.h> 34 #include <sys/filedesc.h> 35 #include <sys/pool.h> 36 #include <sys/ioctl.h> 37 #include <sys/stat.h> 38 #include <sys/signalvar.h> 39 #include <sys/mount.h> 40 #include <sys/syscallargs.h> 41 #include <sys/event.h> 42 #include <sys/lock.h> 43 #include <sys/poll.h> 44 #ifdef KTRACE 45 #include <sys/ktrace.h> 46 #endif 47 48 #include <uvm/uvm_extern.h> 49 50 #include <sys/pipe.h> 51 52 /* 53 * interfaces to the outside world 54 */ 55 int pipe_read(struct file *, off_t *, struct uio *, struct ucred *); 56 int pipe_write(struct file *, off_t *, struct uio *, struct ucred *); 57 int pipe_close(struct file *, struct proc *); 58 int pipe_poll(struct file *, int events, struct proc *); 59 int pipe_kqfilter(struct file *fp, struct knote *kn); 60 int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *); 61 int pipe_stat(struct file *fp, struct stat *ub, struct proc *p); 62 63 static struct fileops pipeops = { 64 .fo_read = pipe_read, 65 .fo_write = pipe_write, 66 .fo_ioctl = pipe_ioctl, 67 .fo_poll = pipe_poll, 68 .fo_kqfilter = pipe_kqfilter, 69 .fo_stat = pipe_stat, 70 .fo_close = pipe_close 71 }; 72 73 void filt_pipedetach(struct knote *kn); 74 int filt_piperead(struct knote *kn, long hint); 75 int filt_pipewrite(struct knote *kn, long hint); 76 77 struct filterops pipe_rfiltops = 78 { 1, NULL, filt_pipedetach, filt_piperead }; 79 struct filterops pipe_wfiltops = 80 { 1, NULL, filt_pipedetach, filt_pipewrite }; 81 82 /* 83 * Default pipe buffer size(s), this can be kind-of large now because pipe 84 * space is pageable. The pipe code will try to maintain locality of 85 * reference for performance reasons, so small amounts of outstanding I/O 86 * will not wipe the cache. 87 */ 88 #define MINPIPESIZE (PIPE_SIZE/3) 89 90 /* 91 * Limit the number of "big" pipes 92 */ 93 #define LIMITBIGPIPES 32 94 int nbigpipe; 95 static int amountpipekva; 96 97 struct pool pipe_pool; 98 99 int dopipe(struct proc *, int *, int); 100 void pipeclose(struct pipe *); 101 void pipe_free_kmem(struct pipe *); 102 int pipe_create(struct pipe *); 103 int pipelock(struct pipe *); 104 void pipeunlock(struct pipe *); 105 void pipeselwakeup(struct pipe *); 106 int pipespace(struct pipe *, u_int); 107 108 /* 109 * The pipe system call for the DTYPE_PIPE type of pipes 110 */ 111 112 int 113 sys_pipe(struct proc *p, void *v, register_t *retval) 114 { 115 struct sys_pipe_args /* { 116 syscallarg(int *) fdp; 117 } */ *uap = v; 118 119 return (dopipe(p, SCARG(uap, fdp), 0)); 120 } 121 122 int 123 sys_pipe2(struct proc *p, void *v, register_t *retval) 124 { 125 struct sys_pipe2_args /* { 126 syscallarg(int *) fdp; 127 syscallarg(int) flags; 128 } */ *uap = v; 129 130 if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK)) 131 return (EINVAL); 132 133 return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags))); 134 } 135 136 int 137 dopipe(struct proc *p, int *ufds, int flags) 138 { 139 struct filedesc *fdp = p->p_fd; 140 struct file *rf, *wf; 141 struct pipe *rpipe, *wpipe = NULL; 142 int fds[2], cloexec, error; 143 144 cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0; 145 146 rpipe = pool_get(&pipe_pool, PR_WAITOK); 147 error = pipe_create(rpipe); 148 if (error != 0) 149 goto free1; 150 wpipe = pool_get(&pipe_pool, PR_WAITOK); 151 error = pipe_create(wpipe); 152 if (error != 0) 153 goto free1; 154 155 fdplock(fdp); 156 157 error = falloc(p, cloexec, &rf, &fds[0]); 158 if (error != 0) 159 goto free2; 160 rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); 161 rf->f_type = DTYPE_PIPE; 162 rf->f_data = rpipe; 163 rf->f_ops = &pipeops; 164 165 error = falloc(p, cloexec, &wf, &fds[1]); 166 if (error != 0) 167 goto free3; 168 wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); 169 wf->f_type = DTYPE_PIPE; 170 wf->f_data = wpipe; 171 wf->f_ops = &pipeops; 172 173 rpipe->pipe_peer = wpipe; 174 wpipe->pipe_peer = rpipe; 175 176 FILE_SET_MATURE(rf, p); 177 FILE_SET_MATURE(wf, p); 178 179 error = copyout(fds, ufds, sizeof(fds)); 180 if (error != 0) { 181 fdrelease(p, fds[0]); 182 fdrelease(p, fds[1]); 183 } 184 #ifdef KTRACE 185 else if (KTRPOINT(p, KTR_STRUCT)) 186 ktrfds(p, fds, 2); 187 #endif 188 fdpunlock(fdp); 189 return (error); 190 191 free3: 192 fdremove(fdp, fds[0]); 193 closef(rf, p); 194 rpipe = NULL; 195 free2: 196 fdpunlock(fdp); 197 free1: 198 pipeclose(wpipe); 199 pipeclose(rpipe); 200 return (error); 201 } 202 203 /* 204 * Allocate kva for pipe circular buffer, the space is pageable. 205 * This routine will 'realloc' the size of a pipe safely, if it fails 206 * it will retain the old buffer. 207 * If it fails it will return ENOMEM. 208 */ 209 int 210 pipespace(struct pipe *cpipe, u_int size) 211 { 212 caddr_t buffer; 213 214 buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok); 215 if (buffer == NULL) { 216 return (ENOMEM); 217 } 218 219 /* free old resources if we are resizing */ 220 pipe_free_kmem(cpipe); 221 cpipe->pipe_buffer.buffer = buffer; 222 cpipe->pipe_buffer.size = size; 223 cpipe->pipe_buffer.in = 0; 224 cpipe->pipe_buffer.out = 0; 225 cpipe->pipe_buffer.cnt = 0; 226 227 amountpipekva += cpipe->pipe_buffer.size; 228 229 return (0); 230 } 231 232 /* 233 * initialize and allocate VM and memory for pipe 234 */ 235 int 236 pipe_create(struct pipe *cpipe) 237 { 238 int error; 239 240 /* so pipe_free_kmem() doesn't follow junk pointer */ 241 cpipe->pipe_buffer.buffer = NULL; 242 /* 243 * protect so pipeclose() doesn't follow a junk pointer 244 * if pipespace() fails. 245 */ 246 memset(&cpipe->pipe_sel, 0, sizeof(cpipe->pipe_sel)); 247 cpipe->pipe_state = 0; 248 cpipe->pipe_peer = NULL; 249 cpipe->pipe_busy = 0; 250 251 error = pipespace(cpipe, PIPE_SIZE); 252 if (error != 0) 253 return (error); 254 255 getnanotime(&cpipe->pipe_ctime); 256 cpipe->pipe_atime = cpipe->pipe_ctime; 257 cpipe->pipe_mtime = cpipe->pipe_ctime; 258 cpipe->pipe_pgid = NO_PID; 259 260 return (0); 261 } 262 263 264 /* 265 * lock a pipe for I/O, blocking other access 266 */ 267 int 268 pipelock(struct pipe *cpipe) 269 { 270 int error; 271 while (cpipe->pipe_state & PIPE_LOCK) { 272 cpipe->pipe_state |= PIPE_LWANT; 273 if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0))) 274 return error; 275 } 276 cpipe->pipe_state |= PIPE_LOCK; 277 return 0; 278 } 279 280 /* 281 * unlock a pipe I/O lock 282 */ 283 void 284 pipeunlock(struct pipe *cpipe) 285 { 286 cpipe->pipe_state &= ~PIPE_LOCK; 287 if (cpipe->pipe_state & PIPE_LWANT) { 288 cpipe->pipe_state &= ~PIPE_LWANT; 289 wakeup(cpipe); 290 } 291 } 292 293 void 294 pipeselwakeup(struct pipe *cpipe) 295 { 296 if (cpipe->pipe_state & PIPE_SEL) { 297 cpipe->pipe_state &= ~PIPE_SEL; 298 selwakeup(&cpipe->pipe_sel); 299 } else 300 KNOTE(&cpipe->pipe_sel.si_note, 0); 301 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID) 302 gsignal(cpipe->pipe_pgid, SIGIO); 303 } 304 305 int 306 pipe_read(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 307 { 308 struct pipe *rpipe = fp->f_data; 309 int error; 310 size_t size, nread = 0; 311 312 error = pipelock(rpipe); 313 if (error) 314 return (error); 315 316 ++rpipe->pipe_busy; 317 318 while (uio->uio_resid) { 319 /* 320 * normal pipe buffer receive 321 */ 322 if (rpipe->pipe_buffer.cnt > 0) { 323 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 324 if (size > rpipe->pipe_buffer.cnt) 325 size = rpipe->pipe_buffer.cnt; 326 if (size > uio->uio_resid) 327 size = uio->uio_resid; 328 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 329 size, uio); 330 if (error) { 331 break; 332 } 333 rpipe->pipe_buffer.out += size; 334 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 335 rpipe->pipe_buffer.out = 0; 336 337 rpipe->pipe_buffer.cnt -= size; 338 /* 339 * If there is no more to read in the pipe, reset 340 * its pointers to the beginning. This improves 341 * cache hit stats. 342 */ 343 if (rpipe->pipe_buffer.cnt == 0) { 344 rpipe->pipe_buffer.in = 0; 345 rpipe->pipe_buffer.out = 0; 346 } 347 nread += size; 348 } else { 349 /* 350 * detect EOF condition 351 * read returns 0 on EOF, no need to set error 352 */ 353 if (rpipe->pipe_state & PIPE_EOF) 354 break; 355 356 /* 357 * If the "write-side" has been blocked, wake it up now. 358 */ 359 if (rpipe->pipe_state & PIPE_WANTW) { 360 rpipe->pipe_state &= ~PIPE_WANTW; 361 wakeup(rpipe); 362 } 363 364 /* 365 * Break if some data was read. 366 */ 367 if (nread > 0) 368 break; 369 370 /* 371 * Unlock the pipe buffer for our remaining processing. 372 * We will either break out with an error or we will 373 * sleep and relock to loop. 374 */ 375 pipeunlock(rpipe); 376 377 /* 378 * Handle non-blocking mode operation or 379 * wait for more data. 380 */ 381 if (fp->f_flag & FNONBLOCK) { 382 error = EAGAIN; 383 } else { 384 rpipe->pipe_state |= PIPE_WANTR; 385 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0) 386 error = pipelock(rpipe); 387 } 388 if (error) 389 goto unlocked_error; 390 } 391 } 392 pipeunlock(rpipe); 393 394 if (error == 0) 395 getnanotime(&rpipe->pipe_atime); 396 unlocked_error: 397 --rpipe->pipe_busy; 398 399 /* 400 * PIPE_WANT processing only makes sense if pipe_busy is 0. 401 */ 402 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 403 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 404 wakeup(rpipe); 405 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 406 /* 407 * Handle write blocking hysteresis. 408 */ 409 if (rpipe->pipe_state & PIPE_WANTW) { 410 rpipe->pipe_state &= ~PIPE_WANTW; 411 wakeup(rpipe); 412 } 413 } 414 415 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 416 pipeselwakeup(rpipe); 417 418 return (error); 419 } 420 421 int 422 pipe_write(struct file *fp, off_t *poff, struct uio *uio, struct ucred *cred) 423 { 424 int error = 0; 425 size_t orig_resid; 426 struct pipe *wpipe, *rpipe; 427 428 rpipe = fp->f_data; 429 wpipe = rpipe->pipe_peer; 430 431 /* 432 * detect loss of pipe read side, issue SIGPIPE if lost. 433 */ 434 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 435 return (EPIPE); 436 } 437 ++wpipe->pipe_busy; 438 439 /* 440 * If it is advantageous to resize the pipe buffer, do 441 * so. 442 */ 443 if ((uio->uio_resid > PIPE_SIZE) && 444 (nbigpipe < LIMITBIGPIPES) && 445 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 446 (wpipe->pipe_buffer.cnt == 0)) { 447 448 if ((error = pipelock(wpipe)) == 0) { 449 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 450 nbigpipe++; 451 pipeunlock(wpipe); 452 } 453 } 454 455 /* 456 * If an early error occurred unbusy and return, waking up any pending 457 * readers. 458 */ 459 if (error) { 460 --wpipe->pipe_busy; 461 if ((wpipe->pipe_busy == 0) && 462 (wpipe->pipe_state & PIPE_WANT)) { 463 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 464 wakeup(wpipe); 465 } 466 return (error); 467 } 468 469 orig_resid = uio->uio_resid; 470 471 while (uio->uio_resid) { 472 size_t space; 473 474 retrywrite: 475 if (wpipe->pipe_state & PIPE_EOF) { 476 error = EPIPE; 477 break; 478 } 479 480 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 481 482 /* Writes of size <= PIPE_BUF must be atomic. */ 483 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 484 space = 0; 485 486 if (space > 0) { 487 if ((error = pipelock(wpipe)) == 0) { 488 size_t size; /* Transfer size */ 489 size_t segsize; /* first segment to transfer */ 490 491 /* 492 * If a process blocked in uiomove, our 493 * value for space might be bad. 494 * 495 * XXX will we be ok if the reader has gone 496 * away here? 497 */ 498 if (space > wpipe->pipe_buffer.size - 499 wpipe->pipe_buffer.cnt) { 500 pipeunlock(wpipe); 501 goto retrywrite; 502 } 503 504 /* 505 * Transfer size is minimum of uio transfer 506 * and free space in pipe buffer. 507 */ 508 if (space > uio->uio_resid) 509 size = uio->uio_resid; 510 else 511 size = space; 512 /* 513 * First segment to transfer is minimum of 514 * transfer size and contiguous space in 515 * pipe buffer. If first segment to transfer 516 * is less than the transfer size, we've got 517 * a wraparound in the buffer. 518 */ 519 segsize = wpipe->pipe_buffer.size - 520 wpipe->pipe_buffer.in; 521 if (segsize > size) 522 segsize = size; 523 524 /* Transfer first segment */ 525 526 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 527 segsize, uio); 528 529 if (error == 0 && segsize < size) { 530 /* 531 * Transfer remaining part now, to 532 * support atomic writes. Wraparound 533 * happened. 534 */ 535 #ifdef DIAGNOSTIC 536 if (wpipe->pipe_buffer.in + segsize != 537 wpipe->pipe_buffer.size) 538 panic("Expected pipe buffer wraparound disappeared"); 539 #endif 540 541 error = uiomove(&wpipe->pipe_buffer.buffer[0], 542 size - segsize, uio); 543 } 544 if (error == 0) { 545 wpipe->pipe_buffer.in += size; 546 if (wpipe->pipe_buffer.in >= 547 wpipe->pipe_buffer.size) { 548 #ifdef DIAGNOSTIC 549 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 550 panic("Expected wraparound bad"); 551 #endif 552 wpipe->pipe_buffer.in = size - segsize; 553 } 554 555 wpipe->pipe_buffer.cnt += size; 556 #ifdef DIAGNOSTIC 557 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 558 panic("Pipe buffer overflow"); 559 #endif 560 } 561 pipeunlock(wpipe); 562 } 563 if (error) 564 break; 565 } else { 566 /* 567 * If the "read-side" has been blocked, wake it up now. 568 */ 569 if (wpipe->pipe_state & PIPE_WANTR) { 570 wpipe->pipe_state &= ~PIPE_WANTR; 571 wakeup(wpipe); 572 } 573 574 /* 575 * don't block on non-blocking I/O 576 */ 577 if (fp->f_flag & FNONBLOCK) { 578 error = EAGAIN; 579 break; 580 } 581 582 /* 583 * We have no more space and have something to offer, 584 * wake up select/poll. 585 */ 586 pipeselwakeup(wpipe); 587 588 wpipe->pipe_state |= PIPE_WANTW; 589 error = tsleep(wpipe, (PRIBIO + 1)|PCATCH, 590 "pipewr", 0); 591 if (error) 592 break; 593 /* 594 * If read side wants to go away, we just issue a 595 * signal to ourselves. 596 */ 597 if (wpipe->pipe_state & PIPE_EOF) { 598 error = EPIPE; 599 break; 600 } 601 } 602 } 603 604 --wpipe->pipe_busy; 605 606 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 607 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 608 wakeup(wpipe); 609 } else if (wpipe->pipe_buffer.cnt > 0) { 610 /* 611 * If we have put any characters in the buffer, we wake up 612 * the reader. 613 */ 614 if (wpipe->pipe_state & PIPE_WANTR) { 615 wpipe->pipe_state &= ~PIPE_WANTR; 616 wakeup(wpipe); 617 } 618 } 619 620 /* 621 * Don't return EPIPE if I/O was successful 622 */ 623 if ((wpipe->pipe_buffer.cnt == 0) && 624 (uio->uio_resid == 0) && 625 (error == EPIPE)) { 626 error = 0; 627 } 628 629 if (error == 0) 630 getnanotime(&wpipe->pipe_mtime); 631 /* 632 * We have something to offer, wake up select/poll. 633 */ 634 if (wpipe->pipe_buffer.cnt) 635 pipeselwakeup(wpipe); 636 637 return (error); 638 } 639 640 /* 641 * we implement a very minimal set of ioctls for compatibility with sockets. 642 */ 643 int 644 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p) 645 { 646 struct pipe *mpipe = fp->f_data; 647 648 switch (cmd) { 649 650 case FIONBIO: 651 return (0); 652 653 case FIOASYNC: 654 if (*(int *)data) { 655 mpipe->pipe_state |= PIPE_ASYNC; 656 } else { 657 mpipe->pipe_state &= ~PIPE_ASYNC; 658 } 659 return (0); 660 661 case FIONREAD: 662 *(int *)data = mpipe->pipe_buffer.cnt; 663 return (0); 664 665 case SIOCSPGRP: 666 mpipe->pipe_pgid = *(int *)data; 667 return (0); 668 669 case SIOCGPGRP: 670 *(int *)data = mpipe->pipe_pgid; 671 return (0); 672 673 } 674 return (ENOTTY); 675 } 676 677 int 678 pipe_poll(struct file *fp, int events, struct proc *p) 679 { 680 struct pipe *rpipe = fp->f_data; 681 struct pipe *wpipe; 682 int revents = 0; 683 684 wpipe = rpipe->pipe_peer; 685 if (events & (POLLIN | POLLRDNORM)) { 686 if ((rpipe->pipe_buffer.cnt > 0) || 687 (rpipe->pipe_state & PIPE_EOF)) 688 revents |= events & (POLLIN | POLLRDNORM); 689 } 690 691 /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */ 692 if ((rpipe->pipe_state & PIPE_EOF) || 693 (wpipe == NULL) || 694 (wpipe->pipe_state & PIPE_EOF)) 695 revents |= POLLHUP; 696 else if (events & (POLLOUT | POLLWRNORM)) { 697 if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF) 698 revents |= events & (POLLOUT | POLLWRNORM); 699 } 700 701 if (revents == 0) { 702 if (events & (POLLIN | POLLRDNORM)) { 703 selrecord(p, &rpipe->pipe_sel); 704 rpipe->pipe_state |= PIPE_SEL; 705 } 706 if (events & (POLLOUT | POLLWRNORM)) { 707 selrecord(p, &wpipe->pipe_sel); 708 wpipe->pipe_state |= PIPE_SEL; 709 } 710 } 711 return (revents); 712 } 713 714 int 715 pipe_stat(struct file *fp, struct stat *ub, struct proc *p) 716 { 717 struct pipe *pipe = fp->f_data; 718 719 memset(ub, 0, sizeof(*ub)); 720 ub->st_mode = S_IFIFO; 721 ub->st_blksize = pipe->pipe_buffer.size; 722 ub->st_size = pipe->pipe_buffer.cnt; 723 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 724 ub->st_atim.tv_sec = pipe->pipe_atime.tv_sec; 725 ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec; 726 ub->st_mtim.tv_sec = pipe->pipe_mtime.tv_sec; 727 ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec; 728 ub->st_ctim.tv_sec = pipe->pipe_ctime.tv_sec; 729 ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec; 730 ub->st_uid = fp->f_cred->cr_uid; 731 ub->st_gid = fp->f_cred->cr_gid; 732 /* 733 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 734 * XXX (st_dev, st_ino) should be unique. 735 */ 736 return (0); 737 } 738 739 int 740 pipe_close(struct file *fp, struct proc *p) 741 { 742 struct pipe *cpipe = fp->f_data; 743 744 fp->f_ops = NULL; 745 fp->f_data = NULL; 746 pipeclose(cpipe); 747 return (0); 748 } 749 750 void 751 pipe_free_kmem(struct pipe *cpipe) 752 { 753 if (cpipe->pipe_buffer.buffer != NULL) { 754 if (cpipe->pipe_buffer.size > PIPE_SIZE) 755 --nbigpipe; 756 amountpipekva -= cpipe->pipe_buffer.size; 757 km_free(cpipe->pipe_buffer.buffer, cpipe->pipe_buffer.size, 758 &kv_any, &kp_pageable); 759 cpipe->pipe_buffer.buffer = NULL; 760 } 761 } 762 763 /* 764 * shutdown the pipe 765 */ 766 void 767 pipeclose(struct pipe *cpipe) 768 { 769 struct pipe *ppipe; 770 if (cpipe) { 771 772 pipeselwakeup(cpipe); 773 774 /* 775 * If the other side is blocked, wake it up saying that 776 * we want to close it down. 777 */ 778 cpipe->pipe_state |= PIPE_EOF; 779 while (cpipe->pipe_busy) { 780 wakeup(cpipe); 781 cpipe->pipe_state |= PIPE_WANT; 782 tsleep(cpipe, PRIBIO, "pipecl", 0); 783 } 784 785 /* 786 * Disconnect from peer 787 */ 788 if ((ppipe = cpipe->pipe_peer) != NULL) { 789 pipeselwakeup(ppipe); 790 791 ppipe->pipe_state |= PIPE_EOF; 792 wakeup(ppipe); 793 ppipe->pipe_peer = NULL; 794 } 795 796 /* 797 * free resources 798 */ 799 pipe_free_kmem(cpipe); 800 pool_put(&pipe_pool, cpipe); 801 } 802 } 803 804 int 805 pipe_kqfilter(struct file *fp, struct knote *kn) 806 { 807 struct pipe *rpipe = kn->kn_fp->f_data; 808 struct pipe *wpipe = rpipe->pipe_peer; 809 810 switch (kn->kn_filter) { 811 case EVFILT_READ: 812 kn->kn_fop = &pipe_rfiltops; 813 SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext); 814 break; 815 case EVFILT_WRITE: 816 if (wpipe == NULL) { 817 /* other end of pipe has been closed */ 818 return (EPIPE); 819 } 820 kn->kn_fop = &pipe_wfiltops; 821 SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext); 822 break; 823 default: 824 return (EINVAL); 825 } 826 827 return (0); 828 } 829 830 void 831 filt_pipedetach(struct knote *kn) 832 { 833 struct pipe *rpipe = kn->kn_fp->f_data; 834 struct pipe *wpipe = rpipe->pipe_peer; 835 836 switch (kn->kn_filter) { 837 case EVFILT_READ: 838 SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext); 839 break; 840 case EVFILT_WRITE: 841 if (wpipe == NULL) 842 return; 843 SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext); 844 break; 845 } 846 } 847 848 int 849 filt_piperead(struct knote *kn, long hint) 850 { 851 struct pipe *rpipe = kn->kn_fp->f_data; 852 struct pipe *wpipe = rpipe->pipe_peer; 853 854 kn->kn_data = rpipe->pipe_buffer.cnt; 855 856 if ((rpipe->pipe_state & PIPE_EOF) || 857 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 858 kn->kn_flags |= EV_EOF; 859 return (1); 860 } 861 return (kn->kn_data > 0); 862 } 863 864 int 865 filt_pipewrite(struct knote *kn, long hint) 866 { 867 struct pipe *rpipe = kn->kn_fp->f_data; 868 struct pipe *wpipe = rpipe->pipe_peer; 869 870 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 871 kn->kn_data = 0; 872 kn->kn_flags |= EV_EOF; 873 return (1); 874 } 875 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 876 877 return (kn->kn_data >= PIPE_BUF); 878 } 879 880 void 881 pipe_init(void) 882 { 883 pool_init(&pipe_pool, sizeof(struct pipe), 0, IPL_NONE, PR_WAITOK, 884 "pipepl", NULL); 885 } 886 887