1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $ 20 * $DragonFly: src/sys/kern/sys_pipe.c,v 1.17 2004/04/01 17:58:02 dillon Exp $ 21 */ 22 23 /* 24 * This file contains a high-performance replacement for the socket-based 25 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 26 * all features of sockets, but does do everything that pipes normally 27 * do. 28 */ 29 30 /* 31 * This code has two modes of operation, a small write mode and a large 32 * write mode. The small write mode acts like conventional pipes with 33 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 34 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 35 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 36 * the receiving process can copy it directly from the pages in the sending 37 * process. 38 * 39 * If the sending process receives a signal, it is possible that it will 40 * go away, and certainly its address space can change, because control 41 * is returned back to the user-mode side. In that case, the pipe code 42 * arranges to copy the buffer supplied by the user process, to a pageable 43 * kernel buffer, and the receiving process will grab the data from the 44 * pageable kernel buffer. Since signals don't happen all that often, 45 * the copy operation is normally eliminated. 46 * 47 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 48 * happen for small transfers so that the system will not spend all of 49 * its time context switching. PIPE_SIZE is constrained by the 50 * amount of kernel virtual memory. 51 */ 52 53 #include <sys/param.h> 54 #include <sys/systm.h> 55 #include <sys/kernel.h> 56 #include <sys/proc.h> 57 #include <sys/fcntl.h> 58 #include <sys/file.h> 59 #include <sys/filedesc.h> 60 #include <sys/filio.h> 61 #include <sys/ttycom.h> 62 #include <sys/stat.h> 63 #include <sys/poll.h> 64 #include <sys/select.h> 65 #include <sys/signalvar.h> 66 #include <sys/sysproto.h> 67 #include <sys/pipe.h> 68 #include <sys/vnode.h> 69 #include <sys/uio.h> 70 #include <sys/event.h> 71 #include <sys/globaldata.h> 72 #include <sys/module.h> 73 #include <sys/malloc.h> 74 #include <sys/sysctl.h> 75 76 #include <vm/vm.h> 77 #include <vm/vm_param.h> 78 #include <sys/lock.h> 79 #include <vm/vm_object.h> 80 #include <vm/vm_kern.h> 81 #include <vm/vm_extern.h> 82 #include <vm/pmap.h> 83 #include <vm/vm_map.h> 84 #include <vm/vm_page.h> 85 #include <vm/vm_zone.h> 86 87 #include <sys/file2.h> 88 89 /* 90 * Use this define if you want to disable *fancy* VM things. Expect an 91 * approx 30% decrease in transfer rate. This could be useful for 92 * NetBSD or OpenBSD. 93 */ 94 /* #define PIPE_NODIRECT */ 95 96 /* 97 * interfaces to the outside world 98 */ 99 static int pipe_read (struct file *fp, struct uio *uio, 100 struct ucred *cred, int flags, struct thread *td); 101 static int pipe_write (struct file *fp, struct uio *uio, 102 struct ucred *cred, int flags, struct thread *td); 103 static int pipe_close (struct file *fp, struct thread *td); 104 static int pipe_poll (struct file *fp, int events, struct ucred *cred, 105 struct thread *td); 106 static int pipe_kqfilter (struct file *fp, struct knote *kn); 107 static int pipe_stat (struct file *fp, struct stat *sb, struct thread *td); 108 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, struct thread *td); 109 110 static struct fileops pipeops = { 111 NULL, /* port */ 112 0, /* autoq */ 113 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 114 pipe_stat, pipe_close 115 }; 116 117 static void filt_pipedetach(struct knote *kn); 118 static int filt_piperead(struct knote *kn, long hint); 119 static int filt_pipewrite(struct knote *kn, long hint); 120 121 static struct filterops pipe_rfiltops = 122 { 1, NULL, filt_pipedetach, filt_piperead }; 123 static struct filterops pipe_wfiltops = 124 { 1, NULL, filt_pipedetach, filt_pipewrite }; 125 126 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); 127 128 /* 129 * Default pipe buffer size(s), this can be kind-of large now because pipe 130 * space is pageable. The pipe code will try to maintain locality of 131 * reference for performance reasons, so small amounts of outstanding I/O 132 * will not wipe the cache. 133 */ 134 #define MINPIPESIZE (PIPE_SIZE/3) 135 #define MAXPIPESIZE (2*PIPE_SIZE/3) 136 137 /* 138 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 139 * is there so that on large systems, we don't exhaust it. 140 */ 141 #define MAXPIPEKVA (8*1024*1024) 142 143 /* 144 * Limit for direct transfers, we cannot, of course limit 145 * the amount of kva for pipes in general though. 146 */ 147 #define LIMITPIPEKVA (16*1024*1024) 148 149 /* 150 * Limit the number of "big" pipes 151 */ 152 #define LIMITBIGPIPES 32 153 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ 154 155 static int pipe_maxbig = LIMITBIGPIPES; 156 static int pipe_maxcache = PIPEQ_MAX_CACHE; 157 static int pipe_nbig; 158 static int pipe_bcache_alloc; 159 static int pipe_bkmem_alloc; 160 161 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); 162 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig, 163 CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated"); 164 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, 165 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); 166 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig, 167 CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes"); 168 #if !defined(NO_PIPE_SYSCTL_STATS) 169 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc, 170 CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache"); 171 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc, 172 CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem"); 173 #endif 174 175 static void pipeclose (struct pipe *cpipe); 176 static void pipe_free_kmem (struct pipe *cpipe); 177 static int pipe_create (struct pipe **cpipep); 178 static __inline int pipelock (struct pipe *cpipe, int catch); 179 static __inline void pipeunlock (struct pipe *cpipe); 180 static __inline void pipeselwakeup (struct pipe *cpipe); 181 #ifndef PIPE_NODIRECT 182 static int pipe_build_write_buffer (struct pipe *wpipe, struct uio *uio); 183 static int pipe_direct_write (struct pipe *wpipe, struct uio *uio); 184 static void pipe_clone_write_buffer (struct pipe *wpipe); 185 #endif 186 static int pipespace (struct pipe *cpipe, int size); 187 188 /* 189 * The pipe system call for the DTYPE_PIPE type of pipes 190 * 191 * pipe_ARgs(int dummy) 192 */ 193 194 /* ARGSUSED */ 195 int 196 pipe(struct pipe_args *uap) 197 { 198 struct thread *td = curthread; 199 struct proc *p = td->td_proc; 200 struct filedesc *fdp; 201 struct file *rf, *wf; 202 struct pipe *rpipe, *wpipe; 203 int fd1, fd2, error; 204 205 KKASSERT(p); 206 fdp = p->p_fd; 207 208 rpipe = wpipe = NULL; 209 if (pipe_create(&rpipe) || pipe_create(&wpipe)) { 210 pipeclose(rpipe); 211 pipeclose(wpipe); 212 return (ENFILE); 213 } 214 215 rpipe->pipe_state |= PIPE_DIRECTOK; 216 wpipe->pipe_state |= PIPE_DIRECTOK; 217 218 error = falloc(p, &rf, &fd1); 219 if (error) { 220 pipeclose(rpipe); 221 pipeclose(wpipe); 222 return (error); 223 } 224 fhold(rf); 225 uap->sysmsg_fds[0] = fd1; 226 227 /* 228 * Warning: once we've gotten past allocation of the fd for the 229 * read-side, we can only drop the read side via fdrop() in order 230 * to avoid races against processes which manage to dup() the read 231 * side while we are blocked trying to allocate the write side. 232 */ 233 rf->f_flag = FREAD | FWRITE; 234 rf->f_type = DTYPE_PIPE; 235 rf->f_data = (caddr_t)rpipe; 236 rf->f_ops = &pipeops; 237 error = falloc(p, &wf, &fd2); 238 if (error) { 239 if (fdp->fd_ofiles[fd1] == rf) { 240 fdp->fd_ofiles[fd1] = NULL; 241 fdrop(rf, td); 242 } 243 fdrop(rf, td); 244 /* rpipe has been closed by fdrop(). */ 245 pipeclose(wpipe); 246 return (error); 247 } 248 wf->f_flag = FREAD | FWRITE; 249 wf->f_type = DTYPE_PIPE; 250 wf->f_data = (caddr_t)wpipe; 251 wf->f_ops = &pipeops; 252 uap->sysmsg_fds[1] = fd2; 253 254 rpipe->pipe_peer = wpipe; 255 wpipe->pipe_peer = rpipe; 256 fdrop(rf, td); 257 258 return (0); 259 } 260 261 /* 262 * Allocate kva for pipe circular buffer, the space is pageable 263 * This routine will 'realloc' the size of a pipe safely, if it fails 264 * it will retain the old buffer. 265 * If it fails it will return ENOMEM. 266 */ 267 static int 268 pipespace(struct pipe *cpipe, int size) 269 { 270 struct vm_object *object; 271 caddr_t buffer; 272 int npages, error; 273 274 npages = round_page(size) / PAGE_SIZE; 275 object = cpipe->pipe_buffer.object; 276 277 /* 278 * [re]create the object if necessary and reserve space for it 279 * in the kernel_map. The object and memory are pageable. On 280 * success, free the old resources before assigning the new 281 * ones. 282 */ 283 if (object == NULL || object->size != npages) { 284 object = vm_object_allocate(OBJT_DEFAULT, npages); 285 buffer = (caddr_t) vm_map_min(kernel_map); 286 287 error = vm_map_find(kernel_map, object, 0, 288 (vm_offset_t *) &buffer, size, 1, 289 VM_PROT_ALL, VM_PROT_ALL, 0); 290 291 if (error != KERN_SUCCESS) { 292 vm_object_deallocate(object); 293 return (ENOMEM); 294 } 295 pipe_free_kmem(cpipe); 296 cpipe->pipe_buffer.object = object; 297 cpipe->pipe_buffer.buffer = buffer; 298 cpipe->pipe_buffer.size = size; 299 ++pipe_bkmem_alloc; 300 } else { 301 ++pipe_bcache_alloc; 302 } 303 cpipe->pipe_buffer.in = 0; 304 cpipe->pipe_buffer.out = 0; 305 cpipe->pipe_buffer.cnt = 0; 306 return (0); 307 } 308 309 /* 310 * Initialize and allocate VM and memory for pipe, pulling the pipe from 311 * our per-cpu cache if possible. For now make sure it is sized for the 312 * smaller PIPE_SIZE default. 313 */ 314 static int 315 pipe_create(cpipep) 316 struct pipe **cpipep; 317 { 318 globaldata_t gd = mycpu; 319 struct pipe *cpipe; 320 int error; 321 322 if ((cpipe = gd->gd_pipeq) != NULL) { 323 gd->gd_pipeq = cpipe->pipe_peer; 324 --gd->gd_pipeqcount; 325 cpipe->pipe_peer = NULL; 326 } else { 327 cpipe = malloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO); 328 } 329 *cpipep = cpipe; 330 if ((error = pipespace(cpipe, PIPE_SIZE)) != 0) 331 return (error); 332 vfs_timestamp(&cpipe->pipe_ctime); 333 cpipe->pipe_atime = cpipe->pipe_ctime; 334 cpipe->pipe_mtime = cpipe->pipe_ctime; 335 return (0); 336 } 337 338 339 /* 340 * lock a pipe for I/O, blocking other access 341 */ 342 static __inline int 343 pipelock(cpipe, catch) 344 struct pipe *cpipe; 345 int catch; 346 { 347 int error; 348 349 while (cpipe->pipe_state & PIPE_LOCK) { 350 cpipe->pipe_state |= PIPE_LWANT; 351 error = tsleep(cpipe, (catch ? PCATCH : 0), "pipelk", 0); 352 if (error != 0) 353 return (error); 354 } 355 cpipe->pipe_state |= PIPE_LOCK; 356 return (0); 357 } 358 359 /* 360 * unlock a pipe I/O lock 361 */ 362 static __inline void 363 pipeunlock(cpipe) 364 struct pipe *cpipe; 365 { 366 367 cpipe->pipe_state &= ~PIPE_LOCK; 368 if (cpipe->pipe_state & PIPE_LWANT) { 369 cpipe->pipe_state &= ~PIPE_LWANT; 370 wakeup(cpipe); 371 } 372 } 373 374 static __inline void 375 pipeselwakeup(cpipe) 376 struct pipe *cpipe; 377 { 378 379 if (cpipe->pipe_state & PIPE_SEL) { 380 cpipe->pipe_state &= ~PIPE_SEL; 381 selwakeup(&cpipe->pipe_sel); 382 } 383 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) 384 pgsigio(cpipe->pipe_sigio, SIGIO, 0); 385 KNOTE(&cpipe->pipe_sel.si_note, 0); 386 } 387 388 /* ARGSUSED */ 389 static int 390 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, 391 int flags, struct thread *td) 392 { 393 struct pipe *rpipe = (struct pipe *) fp->f_data; 394 int error; 395 int nread = 0; 396 u_int size; 397 398 ++rpipe->pipe_busy; 399 error = pipelock(rpipe, 1); 400 if (error) 401 goto unlocked_error; 402 403 while (uio->uio_resid) { 404 /* 405 * normal pipe buffer receive 406 */ 407 if (rpipe->pipe_buffer.cnt > 0) { 408 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 409 if (size > rpipe->pipe_buffer.cnt) 410 size = rpipe->pipe_buffer.cnt; 411 if (size > (u_int) uio->uio_resid) 412 size = (u_int) uio->uio_resid; 413 414 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 415 size, uio); 416 if (error) 417 break; 418 419 rpipe->pipe_buffer.out += size; 420 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 421 rpipe->pipe_buffer.out = 0; 422 423 rpipe->pipe_buffer.cnt -= size; 424 425 /* 426 * If there is no more to read in the pipe, reset 427 * its pointers to the beginning. This improves 428 * cache hit stats. 429 */ 430 if (rpipe->pipe_buffer.cnt == 0) { 431 rpipe->pipe_buffer.in = 0; 432 rpipe->pipe_buffer.out = 0; 433 } 434 nread += size; 435 #ifndef PIPE_NODIRECT 436 /* 437 * Direct copy, bypassing a kernel buffer. We cannot mess 438 * with the direct-write buffer until PIPE_DIRECTIP is 439 * cleared. In order to prevent the pipe_write code from 440 * racing itself in direct_write, we set DIRECTIP when we 441 * clear DIRECTW after we have exhausted the buffer. 442 */ 443 } else if (rpipe->pipe_map.xio_bytes && 444 (rpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) == PIPE_DIRECTW) { 445 error = xio_uio_copy(&rpipe->pipe_map, uio, &size); 446 if (error) 447 break; 448 nread += size; 449 if (rpipe->pipe_map.xio_bytes == 0) { 450 rpipe->pipe_state |= PIPE_DIRECTIP; 451 rpipe->pipe_state &= ~PIPE_DIRECTW; 452 wakeup(rpipe); 453 } 454 #endif 455 } else { 456 /* 457 * detect EOF condition 458 * read returns 0 on EOF, no need to set error 459 */ 460 if (rpipe->pipe_state & PIPE_EOF) 461 break; 462 463 /* 464 * If the "write-side" has been blocked, wake it up now. 465 */ 466 if (rpipe->pipe_state & PIPE_WANTW) { 467 rpipe->pipe_state &= ~PIPE_WANTW; 468 wakeup(rpipe); 469 } 470 471 /* 472 * Break if some data was read. 473 */ 474 if (nread > 0) 475 break; 476 477 /* 478 * Unlock the pipe buffer for our remaining processing. We 479 * will either break out with an error or we will sleep and 480 * relock to loop. 481 */ 482 pipeunlock(rpipe); 483 484 /* 485 * Handle non-blocking mode operation or 486 * wait for more data. 487 */ 488 if (fp->f_flag & FNONBLOCK) { 489 error = EAGAIN; 490 } else { 491 rpipe->pipe_state |= PIPE_WANTR; 492 if ((error = tsleep(rpipe, PCATCH|PNORESCHED, 493 "piperd", 0)) == 0) { 494 error = pipelock(rpipe, 1); 495 } 496 } 497 if (error) 498 goto unlocked_error; 499 } 500 } 501 pipeunlock(rpipe); 502 503 if (error == 0) 504 vfs_timestamp(&rpipe->pipe_atime); 505 unlocked_error: 506 --rpipe->pipe_busy; 507 508 /* 509 * PIPE_WANT processing only makes sense if pipe_busy is 0. 510 */ 511 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 512 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 513 wakeup(rpipe); 514 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 515 /* 516 * Handle write blocking hysteresis. 517 */ 518 if (rpipe->pipe_state & PIPE_WANTW) { 519 rpipe->pipe_state &= ~PIPE_WANTW; 520 wakeup(rpipe); 521 } 522 } 523 524 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 525 pipeselwakeup(rpipe); 526 527 return (error); 528 } 529 530 #ifndef PIPE_NODIRECT 531 /* 532 * Map the sending processes' buffer into kernel space and wire it. 533 * This is similar to a physical write operation. 534 */ 535 static int 536 pipe_build_write_buffer(wpipe, uio) 537 struct pipe *wpipe; 538 struct uio *uio; 539 { 540 int error; 541 u_int size; 542 543 size = (u_int) uio->uio_iov->iov_len; 544 if (size > wpipe->pipe_buffer.size) 545 size = wpipe->pipe_buffer.size; 546 if (size > XIO_INTERNAL_SIZE) 547 size = XIO_INTERNAL_SIZE; 548 549 error = xio_init_ubuf(&wpipe->pipe_map, uio->uio_iov->iov_base, 550 size, XIOF_READ); 551 if (error) 552 return(error); 553 554 /* 555 * and update the uio data 556 */ 557 uio->uio_iov->iov_len -= size; 558 uio->uio_iov->iov_base += size; 559 if (uio->uio_iov->iov_len == 0) 560 uio->uio_iov++; 561 uio->uio_resid -= size; 562 uio->uio_offset += size; 563 return (0); 564 } 565 566 /* 567 * In the case of a signal, the writing process might go away. This 568 * code copies the data into the circular buffer so that the source 569 * pages can be freed without loss of data. 570 */ 571 static void 572 pipe_clone_write_buffer(wpipe) 573 struct pipe *wpipe; 574 { 575 int size; 576 577 size = wpipe->pipe_map.xio_bytes; 578 579 wpipe->pipe_buffer.in = size; 580 wpipe->pipe_buffer.out = 0; 581 wpipe->pipe_buffer.cnt = size; 582 wpipe->pipe_state &= ~(PIPE_DIRECTW | PIPE_DIRECTIP); 583 584 xio_copy_xtok(&wpipe->pipe_map, wpipe->pipe_buffer.buffer, size); 585 xio_release(&wpipe->pipe_map); 586 } 587 588 /* 589 * This implements the pipe buffer write mechanism. Note that only 590 * a direct write OR a normal pipe write can be pending at any given time. 591 * If there are any characters in the pipe buffer, the direct write will 592 * be deferred until the receiving process grabs all of the bytes from 593 * the pipe buffer. Then the direct mapping write is set-up. 594 */ 595 static int 596 pipe_direct_write(wpipe, uio) 597 struct pipe *wpipe; 598 struct uio *uio; 599 { 600 int error; 601 602 retry: 603 while (wpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) { 604 if (wpipe->pipe_state & PIPE_WANTR) { 605 wpipe->pipe_state &= ~PIPE_WANTR; 606 wakeup(wpipe); 607 } 608 wpipe->pipe_state |= PIPE_WANTW; 609 error = tsleep(wpipe, PCATCH, "pipdww", 0); 610 if (error) 611 goto error2; 612 if (wpipe->pipe_state & PIPE_EOF) { 613 error = EPIPE; 614 goto error2; 615 } 616 } 617 KKASSERT(wpipe->pipe_map.xio_bytes == 0); 618 if (wpipe->pipe_buffer.cnt > 0) { 619 if (wpipe->pipe_state & PIPE_WANTR) { 620 wpipe->pipe_state &= ~PIPE_WANTR; 621 wakeup(wpipe); 622 } 623 624 wpipe->pipe_state |= PIPE_WANTW; 625 error = tsleep(wpipe, PCATCH, "pipdwc", 0); 626 if (error) 627 goto error2; 628 if (wpipe->pipe_state & PIPE_EOF) { 629 error = EPIPE; 630 goto error2; 631 } 632 goto retry; 633 } 634 635 /* 636 * Build our direct-write buffer 637 */ 638 wpipe->pipe_state |= PIPE_DIRECTW | PIPE_DIRECTIP; 639 error = pipe_build_write_buffer(wpipe, uio); 640 if (error) 641 goto error1; 642 wpipe->pipe_state &= ~PIPE_DIRECTIP; 643 644 /* 645 * Wait until the receiver has snarfed the data. Since we are likely 646 * going to sleep we optimize the case and yield synchronously, 647 * possibly avoiding the tsleep(). 648 */ 649 error = 0; 650 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 651 if (wpipe->pipe_state & PIPE_EOF) { 652 pipelock(wpipe, 0); 653 xio_release(&wpipe->pipe_map); 654 pipeunlock(wpipe); 655 pipeselwakeup(wpipe); 656 error = EPIPE; 657 goto error1; 658 } 659 if (wpipe->pipe_state & PIPE_WANTR) { 660 wpipe->pipe_state &= ~PIPE_WANTR; 661 wakeup(wpipe); 662 } 663 pipeselwakeup(wpipe); 664 error = tsleep(wpipe, PCATCH|PNORESCHED, "pipdwt", 0); 665 } 666 pipelock(wpipe,0); 667 if (wpipe->pipe_state & PIPE_DIRECTW) { 668 /* 669 * this bit of trickery substitutes a kernel buffer for 670 * the process that might be going away. 671 */ 672 pipe_clone_write_buffer(wpipe); 673 KKASSERT((wpipe->pipe_state & PIPE_DIRECTIP) == 0); 674 } else { 675 KKASSERT(wpipe->pipe_state & PIPE_DIRECTIP); 676 xio_release(&wpipe->pipe_map); 677 wpipe->pipe_state &= ~PIPE_DIRECTIP; 678 } 679 pipeunlock(wpipe); 680 return (error); 681 682 /* 683 * Direct-write error, clear the direct write flags. 684 */ 685 error1: 686 wpipe->pipe_state &= ~(PIPE_DIRECTW | PIPE_DIRECTIP); 687 /* fallthrough */ 688 689 /* 690 * General error, wakeup the other side if it happens to be sleeping. 691 */ 692 error2: 693 wakeup(wpipe); 694 return (error); 695 } 696 #endif 697 698 static int 699 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, 700 int flags, struct thread *td) 701 { 702 int error = 0; 703 int orig_resid; 704 struct pipe *wpipe, *rpipe; 705 706 rpipe = (struct pipe *) fp->f_data; 707 wpipe = rpipe->pipe_peer; 708 709 /* 710 * detect loss of pipe read side, issue SIGPIPE if lost. 711 */ 712 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 713 return (EPIPE); 714 } 715 ++wpipe->pipe_busy; 716 717 /* 718 * If it is advantageous to resize the pipe buffer, do 719 * so. 720 */ 721 if ((uio->uio_resid > PIPE_SIZE) && 722 (pipe_nbig < pipe_maxbig) && 723 (wpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) == 0 && 724 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 725 (wpipe->pipe_buffer.cnt == 0)) { 726 727 if ((error = pipelock(wpipe,1)) == 0) { 728 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 729 pipe_nbig++; 730 pipeunlock(wpipe); 731 } 732 } 733 734 /* 735 * If an early error occured unbusy and return, waking up any pending 736 * readers. 737 */ 738 if (error) { 739 --wpipe->pipe_busy; 740 if ((wpipe->pipe_busy == 0) && 741 (wpipe->pipe_state & PIPE_WANT)) { 742 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 743 wakeup(wpipe); 744 } 745 return(error); 746 } 747 748 KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone")); 749 750 orig_resid = uio->uio_resid; 751 752 while (uio->uio_resid) { 753 int space; 754 755 #ifndef PIPE_NODIRECT 756 /* 757 * If the transfer is large, we can gain performance if 758 * we do process-to-process copies directly. 759 * If the write is non-blocking, we don't use the 760 * direct write mechanism. 761 * 762 * The direct write mechanism will detect the reader going 763 * away on us. 764 */ 765 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) && 766 (fp->f_flag & FNONBLOCK) == 0) { 767 error = pipe_direct_write( wpipe, uio); 768 if (error) 769 break; 770 continue; 771 } 772 #endif 773 774 /* 775 * Pipe buffered writes cannot be coincidental with 776 * direct writes. We wait until the currently executing 777 * direct write is completed before we start filling the 778 * pipe buffer. We break out if a signal occurs or the 779 * reader goes away. 780 */ 781 retrywrite: 782 while (wpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) { 783 if (wpipe->pipe_state & PIPE_WANTR) { 784 wpipe->pipe_state &= ~PIPE_WANTR; 785 wakeup(wpipe); 786 } 787 error = tsleep(wpipe, PCATCH, "pipbww", 0); 788 if (wpipe->pipe_state & PIPE_EOF) 789 break; 790 if (error) 791 break; 792 } 793 if (wpipe->pipe_state & PIPE_EOF) { 794 error = EPIPE; 795 break; 796 } 797 798 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 799 800 /* Writes of size <= PIPE_BUF must be atomic. */ 801 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 802 space = 0; 803 804 /* 805 * Write to fill, read size handles write hysteresis. Also 806 * additional restrictions can cause select-based non-blocking 807 * writes to spin. 808 */ 809 if (space > 0) { 810 if ((error = pipelock(wpipe,1)) == 0) { 811 int size; /* Transfer size */ 812 int segsize; /* first segment to transfer */ 813 814 /* 815 * It is possible for a direct write to 816 * slip in on us... handle it here... 817 */ 818 if (wpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) { 819 pipeunlock(wpipe); 820 goto retrywrite; 821 } 822 /* 823 * If a process blocked in uiomove, our 824 * value for space might be bad. 825 * 826 * XXX will we be ok if the reader has gone 827 * away here? 828 */ 829 if (space > wpipe->pipe_buffer.size - 830 wpipe->pipe_buffer.cnt) { 831 pipeunlock(wpipe); 832 goto retrywrite; 833 } 834 835 /* 836 * Transfer size is minimum of uio transfer 837 * and free space in pipe buffer. 838 */ 839 if (space > uio->uio_resid) 840 size = uio->uio_resid; 841 else 842 size = space; 843 /* 844 * First segment to transfer is minimum of 845 * transfer size and contiguous space in 846 * pipe buffer. If first segment to transfer 847 * is less than the transfer size, we've got 848 * a wraparound in the buffer. 849 */ 850 segsize = wpipe->pipe_buffer.size - 851 wpipe->pipe_buffer.in; 852 if (segsize > size) 853 segsize = size; 854 855 /* Transfer first segment */ 856 857 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 858 segsize, uio); 859 860 if (error == 0 && segsize < size) { 861 /* 862 * Transfer remaining part now, to 863 * support atomic writes. Wraparound 864 * happened. 865 */ 866 if (wpipe->pipe_buffer.in + segsize != 867 wpipe->pipe_buffer.size) 868 panic("Expected pipe buffer wraparound disappeared"); 869 870 error = uiomove(&wpipe->pipe_buffer.buffer[0], 871 size - segsize, uio); 872 } 873 if (error == 0) { 874 wpipe->pipe_buffer.in += size; 875 if (wpipe->pipe_buffer.in >= 876 wpipe->pipe_buffer.size) { 877 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 878 panic("Expected wraparound bad"); 879 wpipe->pipe_buffer.in = size - segsize; 880 } 881 882 wpipe->pipe_buffer.cnt += size; 883 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 884 panic("Pipe buffer overflow"); 885 886 } 887 pipeunlock(wpipe); 888 } 889 if (error) 890 break; 891 892 } else { 893 /* 894 * If the "read-side" has been blocked, wake it up now 895 * and yield to let it drain synchronously rather 896 * then block. 897 */ 898 if (wpipe->pipe_state & PIPE_WANTR) { 899 wpipe->pipe_state &= ~PIPE_WANTR; 900 wakeup(wpipe); 901 } 902 903 /* 904 * don't block on non-blocking I/O 905 */ 906 if (fp->f_flag & FNONBLOCK) { 907 error = EAGAIN; 908 break; 909 } 910 911 /* 912 * We have no more space and have something to offer, 913 * wake up select/poll. 914 */ 915 pipeselwakeup(wpipe); 916 917 wpipe->pipe_state |= PIPE_WANTW; 918 error = tsleep(wpipe, PCATCH|PNORESCHED, "pipewr", 0); 919 if (error != 0) 920 break; 921 /* 922 * If read side wants to go away, we just issue a signal 923 * to ourselves. 924 */ 925 if (wpipe->pipe_state & PIPE_EOF) { 926 error = EPIPE; 927 break; 928 } 929 } 930 } 931 932 --wpipe->pipe_busy; 933 934 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 935 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 936 wakeup(wpipe); 937 } else if (wpipe->pipe_buffer.cnt > 0) { 938 /* 939 * If we have put any characters in the buffer, we wake up 940 * the reader. 941 */ 942 if (wpipe->pipe_state & PIPE_WANTR) { 943 wpipe->pipe_state &= ~PIPE_WANTR; 944 wakeup(wpipe); 945 } 946 } 947 948 /* 949 * Don't return EPIPE if I/O was successful 950 */ 951 if ((wpipe->pipe_buffer.cnt == 0) && 952 (uio->uio_resid == 0) && 953 (error == EPIPE)) { 954 error = 0; 955 } 956 957 if (error == 0) 958 vfs_timestamp(&wpipe->pipe_mtime); 959 960 /* 961 * We have something to offer, 962 * wake up select/poll. 963 */ 964 if (wpipe->pipe_buffer.cnt) 965 pipeselwakeup(wpipe); 966 967 return (error); 968 } 969 970 /* 971 * we implement a very minimal set of ioctls for compatibility with sockets. 972 */ 973 int 974 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td) 975 { 976 struct pipe *mpipe = (struct pipe *)fp->f_data; 977 978 switch (cmd) { 979 980 case FIONBIO: 981 return (0); 982 983 case FIOASYNC: 984 if (*(int *)data) { 985 mpipe->pipe_state |= PIPE_ASYNC; 986 } else { 987 mpipe->pipe_state &= ~PIPE_ASYNC; 988 } 989 return (0); 990 991 case FIONREAD: 992 if (mpipe->pipe_state & PIPE_DIRECTW) { 993 *(int *)data = mpipe->pipe_map.xio_bytes; 994 } else { 995 *(int *)data = mpipe->pipe_buffer.cnt; 996 } 997 return (0); 998 999 case FIOSETOWN: 1000 return (fsetown(*(int *)data, &mpipe->pipe_sigio)); 1001 1002 case FIOGETOWN: 1003 *(int *)data = fgetown(mpipe->pipe_sigio); 1004 return (0); 1005 1006 /* This is deprecated, FIOSETOWN should be used instead. */ 1007 case TIOCSPGRP: 1008 return (fsetown(-(*(int *)data), &mpipe->pipe_sigio)); 1009 1010 /* This is deprecated, FIOGETOWN should be used instead. */ 1011 case TIOCGPGRP: 1012 *(int *)data = -fgetown(mpipe->pipe_sigio); 1013 return (0); 1014 1015 } 1016 return (ENOTTY); 1017 } 1018 1019 int 1020 pipe_poll(struct file *fp, int events, struct ucred *cred, struct thread *td) 1021 { 1022 struct pipe *rpipe = (struct pipe *)fp->f_data; 1023 struct pipe *wpipe; 1024 int revents = 0; 1025 1026 wpipe = rpipe->pipe_peer; 1027 if (events & (POLLIN | POLLRDNORM)) 1028 if ((rpipe->pipe_state & PIPE_DIRECTW) || 1029 (rpipe->pipe_buffer.cnt > 0) || 1030 (rpipe->pipe_state & PIPE_EOF)) 1031 revents |= events & (POLLIN | POLLRDNORM); 1032 1033 if (events & (POLLOUT | POLLWRNORM)) 1034 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) || 1035 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && 1036 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) 1037 revents |= events & (POLLOUT | POLLWRNORM); 1038 1039 if ((rpipe->pipe_state & PIPE_EOF) || 1040 (wpipe == NULL) || 1041 (wpipe->pipe_state & PIPE_EOF)) 1042 revents |= POLLHUP; 1043 1044 if (revents == 0) { 1045 if (events & (POLLIN | POLLRDNORM)) { 1046 selrecord(td, &rpipe->pipe_sel); 1047 rpipe->pipe_state |= PIPE_SEL; 1048 } 1049 1050 if (events & (POLLOUT | POLLWRNORM)) { 1051 selrecord(td, &wpipe->pipe_sel); 1052 wpipe->pipe_state |= PIPE_SEL; 1053 } 1054 } 1055 1056 return (revents); 1057 } 1058 1059 static int 1060 pipe_stat(struct file *fp, struct stat *ub, struct thread *td) 1061 { 1062 struct pipe *pipe = (struct pipe *)fp->f_data; 1063 1064 bzero((caddr_t)ub, sizeof(*ub)); 1065 ub->st_mode = S_IFIFO; 1066 ub->st_blksize = pipe->pipe_buffer.size; 1067 ub->st_size = pipe->pipe_buffer.cnt; 1068 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1069 ub->st_atimespec = pipe->pipe_atime; 1070 ub->st_mtimespec = pipe->pipe_mtime; 1071 ub->st_ctimespec = pipe->pipe_ctime; 1072 /* 1073 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, 1074 * st_flags, st_gen. 1075 * XXX (st_dev, st_ino) should be unique. 1076 */ 1077 return (0); 1078 } 1079 1080 /* ARGSUSED */ 1081 static int 1082 pipe_close(struct file *fp, struct thread *td) 1083 { 1084 struct pipe *cpipe = (struct pipe *)fp->f_data; 1085 1086 fp->f_ops = &badfileops; 1087 fp->f_data = NULL; 1088 funsetown(cpipe->pipe_sigio); 1089 pipeclose(cpipe); 1090 return (0); 1091 } 1092 1093 static void 1094 pipe_free_kmem(struct pipe *cpipe) 1095 { 1096 if (cpipe->pipe_buffer.buffer != NULL) { 1097 if (cpipe->pipe_buffer.size > PIPE_SIZE) 1098 --pipe_nbig; 1099 kmem_free(kernel_map, 1100 (vm_offset_t)cpipe->pipe_buffer.buffer, 1101 cpipe->pipe_buffer.size); 1102 cpipe->pipe_buffer.buffer = NULL; 1103 cpipe->pipe_buffer.object = NULL; 1104 } 1105 #ifndef PIPE_NODIRECT 1106 KKASSERT(cpipe->pipe_map.xio_bytes == 0 && 1107 cpipe->pipe_map.xio_offset == 0 && 1108 cpipe->pipe_map.xio_npages == 0); 1109 #endif 1110 } 1111 1112 /* 1113 * shutdown the pipe 1114 */ 1115 static void 1116 pipeclose(struct pipe *cpipe) 1117 { 1118 globaldata_t gd; 1119 struct pipe *ppipe; 1120 1121 if (cpipe == NULL) 1122 return; 1123 1124 pipeselwakeup(cpipe); 1125 1126 /* 1127 * If the other side is blocked, wake it up saying that 1128 * we want to close it down. 1129 */ 1130 while (cpipe->pipe_busy) { 1131 wakeup(cpipe); 1132 cpipe->pipe_state |= PIPE_WANT | PIPE_EOF; 1133 tsleep(cpipe, 0, "pipecl", 0); 1134 } 1135 1136 /* 1137 * Disconnect from peer 1138 */ 1139 if ((ppipe = cpipe->pipe_peer) != NULL) { 1140 pipeselwakeup(ppipe); 1141 1142 ppipe->pipe_state |= PIPE_EOF; 1143 wakeup(ppipe); 1144 KNOTE(&ppipe->pipe_sel.si_note, 0); 1145 ppipe->pipe_peer = NULL; 1146 } 1147 1148 /* 1149 * free or cache resources 1150 */ 1151 gd = mycpu; 1152 if (gd->gd_pipeqcount >= pipe_maxcache || 1153 cpipe->pipe_buffer.size != PIPE_SIZE 1154 ) { 1155 pipe_free_kmem(cpipe); 1156 free(cpipe, M_PIPE); 1157 } else { 1158 KKASSERT(cpipe->pipe_map.xio_npages == 0 && 1159 cpipe->pipe_map.xio_bytes == 0 && 1160 cpipe->pipe_map.xio_offset == 0); 1161 cpipe->pipe_state = 0; 1162 cpipe->pipe_busy = 0; 1163 cpipe->pipe_peer = gd->gd_pipeq; 1164 gd->gd_pipeq = cpipe; 1165 ++gd->gd_pipeqcount; 1166 } 1167 } 1168 1169 /*ARGSUSED*/ 1170 static int 1171 pipe_kqfilter(struct file *fp, struct knote *kn) 1172 { 1173 struct pipe *cpipe = (struct pipe *)kn->kn_fp->f_data; 1174 1175 switch (kn->kn_filter) { 1176 case EVFILT_READ: 1177 kn->kn_fop = &pipe_rfiltops; 1178 break; 1179 case EVFILT_WRITE: 1180 kn->kn_fop = &pipe_wfiltops; 1181 cpipe = cpipe->pipe_peer; 1182 if (cpipe == NULL) 1183 /* other end of pipe has been closed */ 1184 return (EPIPE); 1185 break; 1186 default: 1187 return (1); 1188 } 1189 kn->kn_hook = (caddr_t)cpipe; 1190 1191 SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext); 1192 return (0); 1193 } 1194 1195 static void 1196 filt_pipedetach(struct knote *kn) 1197 { 1198 struct pipe *cpipe = (struct pipe *)kn->kn_hook; 1199 1200 SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext); 1201 } 1202 1203 /*ARGSUSED*/ 1204 static int 1205 filt_piperead(struct knote *kn, long hint) 1206 { 1207 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1208 struct pipe *wpipe = rpipe->pipe_peer; 1209 1210 kn->kn_data = rpipe->pipe_buffer.cnt; 1211 if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW)) 1212 kn->kn_data = rpipe->pipe_map.xio_bytes; 1213 1214 if ((rpipe->pipe_state & PIPE_EOF) || 1215 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1216 kn->kn_flags |= EV_EOF; 1217 return (1); 1218 } 1219 return (kn->kn_data > 0); 1220 } 1221 1222 /*ARGSUSED*/ 1223 static int 1224 filt_pipewrite(struct knote *kn, long hint) 1225 { 1226 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1227 struct pipe *wpipe = rpipe->pipe_peer; 1228 1229 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1230 kn->kn_data = 0; 1231 kn->kn_flags |= EV_EOF; 1232 return (1); 1233 } 1234 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 1235 if (wpipe->pipe_state & PIPE_DIRECTW) 1236 kn->kn_data = 0; 1237 1238 return (kn->kn_data >= PIPE_BUF); 1239 } 1240