1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $ 20 * $DragonFly: src/sys/kern/sys_pipe.c,v 1.14 2004/02/20 17:11:07 dillon Exp $ 21 */ 22 23 /* 24 * This file contains a high-performance replacement for the socket-based 25 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 26 * all features of sockets, but does do everything that pipes normally 27 * do. 28 */ 29 30 /* 31 * This code has two modes of operation, a small write mode and a large 32 * write mode. The small write mode acts like conventional pipes with 33 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 34 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 35 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 36 * the receiving process can copy it directly from the pages in the sending 37 * process. 38 * 39 * If the sending process receives a signal, it is possible that it will 40 * go away, and certainly its address space can change, because control 41 * is returned back to the user-mode side. In that case, the pipe code 42 * arranges to copy the buffer supplied by the user process, to a pageable 43 * kernel buffer, and the receiving process will grab the data from the 44 * pageable kernel buffer. Since signals don't happen all that often, 45 * the copy operation is normally eliminated. 46 * 47 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 48 * happen for small transfers so that the system will not spend all of 49 * its time context switching. PIPE_SIZE is constrained by the 50 * amount of kernel virtual memory. 51 */ 52 53 #include <sys/param.h> 54 #include <sys/systm.h> 55 #include <sys/kernel.h> 56 #include <sys/proc.h> 57 #include <sys/fcntl.h> 58 #include <sys/file.h> 59 #include <sys/filedesc.h> 60 #include <sys/filio.h> 61 #include <sys/ttycom.h> 62 #include <sys/stat.h> 63 #include <sys/poll.h> 64 #include <sys/select.h> 65 #include <sys/signalvar.h> 66 #include <sys/sysproto.h> 67 #include <sys/pipe.h> 68 #include <sys/vnode.h> 69 #include <sys/uio.h> 70 #include <sys/event.h> 71 #include <sys/globaldata.h> 72 #include <sys/module.h> 73 #include <sys/malloc.h> 74 #include <sys/sysctl.h> 75 76 #include <vm/vm.h> 77 #include <vm/vm_param.h> 78 #include <sys/lock.h> 79 #include <vm/vm_object.h> 80 #include <vm/vm_kern.h> 81 #include <vm/vm_extern.h> 82 #include <vm/pmap.h> 83 #include <vm/vm_map.h> 84 #include <vm/vm_page.h> 85 #include <vm/vm_zone.h> 86 87 #include <sys/file2.h> 88 89 /* 90 * Use this define if you want to disable *fancy* VM things. Expect an 91 * approx 30% decrease in transfer rate. This could be useful for 92 * NetBSD or OpenBSD. 93 */ 94 /* #define PIPE_NODIRECT */ 95 96 /* 97 * interfaces to the outside world 98 */ 99 static int pipe_read (struct file *fp, struct uio *uio, 100 struct ucred *cred, int flags, struct thread *td); 101 static int pipe_write (struct file *fp, struct uio *uio, 102 struct ucred *cred, int flags, struct thread *td); 103 static int pipe_close (struct file *fp, struct thread *td); 104 static int pipe_poll (struct file *fp, int events, struct ucred *cred, 105 struct thread *td); 106 static int pipe_kqfilter (struct file *fp, struct knote *kn); 107 static int pipe_stat (struct file *fp, struct stat *sb, struct thread *td); 108 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, struct thread *td); 109 110 static struct fileops pipeops = { 111 NULL, /* port */ 112 0, /* autoq */ 113 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 114 pipe_stat, pipe_close 115 }; 116 117 static void filt_pipedetach(struct knote *kn); 118 static int filt_piperead(struct knote *kn, long hint); 119 static int filt_pipewrite(struct knote *kn, long hint); 120 121 static struct filterops pipe_rfiltops = 122 { 1, NULL, filt_pipedetach, filt_piperead }; 123 static struct filterops pipe_wfiltops = 124 { 1, NULL, filt_pipedetach, filt_pipewrite }; 125 126 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); 127 128 /* 129 * Default pipe buffer size(s), this can be kind-of large now because pipe 130 * space is pageable. The pipe code will try to maintain locality of 131 * reference for performance reasons, so small amounts of outstanding I/O 132 * will not wipe the cache. 133 */ 134 #define MINPIPESIZE (PIPE_SIZE/3) 135 #define MAXPIPESIZE (2*PIPE_SIZE/3) 136 137 /* 138 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 139 * is there so that on large systems, we don't exhaust it. 140 */ 141 #define MAXPIPEKVA (8*1024*1024) 142 143 /* 144 * Limit for direct transfers, we cannot, of course limit 145 * the amount of kva for pipes in general though. 146 */ 147 #define LIMITPIPEKVA (16*1024*1024) 148 149 /* 150 * Limit the number of "big" pipes 151 */ 152 #define LIMITBIGPIPES 32 153 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ 154 155 static int pipe_maxbig = LIMITBIGPIPES; 156 static int pipe_maxcache = PIPEQ_MAX_CACHE; 157 static int pipe_nbig; 158 static int pipe_kva; 159 static int pipe_bcache_alloc; 160 static int pipe_bkmem_alloc; 161 static int pipe_dcache_alloc; 162 static int pipe_dkmem_alloc; 163 164 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); 165 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig, 166 CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated"); 167 SYSCTL_INT(_kern_pipe, OID_AUTO, kva, 168 CTLFLAG_RD, &pipe_kva, 0, "kva reserved by pipes"); 169 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, 170 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); 171 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig, 172 CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes"); 173 #if !defined(NO_PIPE_SYSCTL_STATS) 174 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc, 175 CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache"); 176 SYSCTL_INT(_kern_pipe, OID_AUTO, dcache_alloc, 177 CTLFLAG_RW, &pipe_dcache_alloc, 0, "pipe direct buf from pcpu cache"); 178 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc, 179 CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem"); 180 SYSCTL_INT(_kern_pipe, OID_AUTO, dkmem_alloc, 181 CTLFLAG_RW, &pipe_dkmem_alloc, 0, "pipe direct buf from kmem"); 182 #endif 183 184 static void pipeclose (struct pipe *cpipe); 185 static void pipe_free_kmem (struct pipe *cpipe); 186 static int pipe_create (struct pipe **cpipep); 187 static __inline int pipelock (struct pipe *cpipe, int catch); 188 static __inline void pipeunlock (struct pipe *cpipe); 189 static __inline void pipeselwakeup (struct pipe *cpipe); 190 #ifndef PIPE_NODIRECT 191 static int pipe_build_write_buffer (struct pipe *wpipe, struct uio *uio); 192 static void pipe_destroy_write_buffer (struct pipe *wpipe); 193 static int pipe_direct_write (struct pipe *wpipe, struct uio *uio); 194 static void pipe_clone_write_buffer (struct pipe *wpipe); 195 #endif 196 static int pipespace (struct pipe *cpipe, int size); 197 198 /* 199 * The pipe system call for the DTYPE_PIPE type of pipes 200 * 201 * pipe_ARgs(int dummy) 202 */ 203 204 /* ARGSUSED */ 205 int 206 pipe(struct pipe_args *uap) 207 { 208 struct thread *td = curthread; 209 struct proc *p = td->td_proc; 210 struct filedesc *fdp; 211 struct file *rf, *wf; 212 struct pipe *rpipe, *wpipe; 213 int fd1, fd2, error; 214 215 KKASSERT(p); 216 fdp = p->p_fd; 217 218 rpipe = wpipe = NULL; 219 if (pipe_create(&rpipe) || pipe_create(&wpipe)) { 220 pipeclose(rpipe); 221 pipeclose(wpipe); 222 return (ENFILE); 223 } 224 225 rpipe->pipe_state |= PIPE_DIRECTOK; 226 wpipe->pipe_state |= PIPE_DIRECTOK; 227 228 error = falloc(p, &rf, &fd1); 229 if (error) { 230 pipeclose(rpipe); 231 pipeclose(wpipe); 232 return (error); 233 } 234 fhold(rf); 235 uap->sysmsg_fds[0] = fd1; 236 237 /* 238 * Warning: once we've gotten past allocation of the fd for the 239 * read-side, we can only drop the read side via fdrop() in order 240 * to avoid races against processes which manage to dup() the read 241 * side while we are blocked trying to allocate the write side. 242 */ 243 rf->f_flag = FREAD | FWRITE; 244 rf->f_type = DTYPE_PIPE; 245 rf->f_data = (caddr_t)rpipe; 246 rf->f_ops = &pipeops; 247 error = falloc(p, &wf, &fd2); 248 if (error) { 249 if (fdp->fd_ofiles[fd1] == rf) { 250 fdp->fd_ofiles[fd1] = NULL; 251 fdrop(rf, td); 252 } 253 fdrop(rf, td); 254 /* rpipe has been closed by fdrop(). */ 255 pipeclose(wpipe); 256 return (error); 257 } 258 wf->f_flag = FREAD | FWRITE; 259 wf->f_type = DTYPE_PIPE; 260 wf->f_data = (caddr_t)wpipe; 261 wf->f_ops = &pipeops; 262 uap->sysmsg_fds[1] = fd2; 263 264 rpipe->pipe_peer = wpipe; 265 wpipe->pipe_peer = rpipe; 266 fdrop(rf, td); 267 268 return (0); 269 } 270 271 /* 272 * Allocate kva for pipe circular buffer, the space is pageable 273 * This routine will 'realloc' the size of a pipe safely, if it fails 274 * it will retain the old buffer. 275 * If it fails it will return ENOMEM. 276 */ 277 static int 278 pipespace(struct pipe *cpipe, int size) 279 { 280 struct vm_object *object; 281 caddr_t buffer; 282 int npages, error; 283 284 npages = round_page(size) / PAGE_SIZE; 285 object = cpipe->pipe_buffer.object; 286 287 /* 288 * [re]create the object if necessary and reserve space for it 289 * in the kernel_map. The object and memory are pageable. On 290 * success, free the old resources before assigning the new 291 * ones. 292 */ 293 if (object == NULL || object->size != npages) { 294 object = vm_object_allocate(OBJT_DEFAULT, npages); 295 buffer = (caddr_t) vm_map_min(kernel_map); 296 297 error = vm_map_find(kernel_map, object, 0, 298 (vm_offset_t *) &buffer, size, 1, 299 VM_PROT_ALL, VM_PROT_ALL, 0); 300 301 if (error != KERN_SUCCESS) { 302 vm_object_deallocate(object); 303 return (ENOMEM); 304 } 305 pipe_kva += size; 306 pipe_free_kmem(cpipe); 307 cpipe->pipe_buffer.object = object; 308 cpipe->pipe_buffer.buffer = buffer; 309 cpipe->pipe_buffer.size = size; 310 ++pipe_bkmem_alloc; 311 } else { 312 ++pipe_bcache_alloc; 313 if (cpipe->pipe_map.kva) 314 ++pipe_dcache_alloc; 315 } 316 cpipe->pipe_buffer.in = 0; 317 cpipe->pipe_buffer.out = 0; 318 cpipe->pipe_buffer.cnt = 0; 319 return (0); 320 } 321 322 /* 323 * Initialize and allocate VM and memory for pipe, pulling the pipe from 324 * our per-cpu cache if possible. For now make sure it is sized for the 325 * smaller PIPE_SIZE default. 326 */ 327 static int 328 pipe_create(cpipep) 329 struct pipe **cpipep; 330 { 331 globaldata_t gd = mycpu; 332 struct pipe *cpipe; 333 int error; 334 335 if ((cpipe = gd->gd_pipeq) != NULL) { 336 gd->gd_pipeq = cpipe->pipe_peer; 337 --gd->gd_pipeqcount; 338 cpipe->pipe_peer = NULL; 339 } else { 340 cpipe = malloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO); 341 } 342 *cpipep = cpipe; 343 if ((error = pipespace(cpipe, PIPE_SIZE)) != 0) 344 return (error); 345 vfs_timestamp(&cpipe->pipe_ctime); 346 cpipe->pipe_atime = cpipe->pipe_ctime; 347 cpipe->pipe_mtime = cpipe->pipe_ctime; 348 return (0); 349 } 350 351 352 /* 353 * lock a pipe for I/O, blocking other access 354 */ 355 static __inline int 356 pipelock(cpipe, catch) 357 struct pipe *cpipe; 358 int catch; 359 { 360 int error; 361 362 while (cpipe->pipe_state & PIPE_LOCK) { 363 cpipe->pipe_state |= PIPE_LWANT; 364 error = tsleep(cpipe, (catch ? PCATCH : 0), "pipelk", 0); 365 if (error != 0) 366 return (error); 367 } 368 cpipe->pipe_state |= PIPE_LOCK; 369 return (0); 370 } 371 372 /* 373 * unlock a pipe I/O lock 374 */ 375 static __inline void 376 pipeunlock(cpipe) 377 struct pipe *cpipe; 378 { 379 380 cpipe->pipe_state &= ~PIPE_LOCK; 381 if (cpipe->pipe_state & PIPE_LWANT) { 382 cpipe->pipe_state &= ~PIPE_LWANT; 383 wakeup(cpipe); 384 } 385 } 386 387 static __inline void 388 pipeselwakeup(cpipe) 389 struct pipe *cpipe; 390 { 391 392 if (cpipe->pipe_state & PIPE_SEL) { 393 cpipe->pipe_state &= ~PIPE_SEL; 394 selwakeup(&cpipe->pipe_sel); 395 } 396 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) 397 pgsigio(cpipe->pipe_sigio, SIGIO, 0); 398 KNOTE(&cpipe->pipe_sel.si_note, 0); 399 } 400 401 /* ARGSUSED */ 402 static int 403 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, 404 int flags, struct thread *td) 405 { 406 struct pipe *rpipe = (struct pipe *) fp->f_data; 407 int error; 408 int nread = 0; 409 u_int size; 410 411 ++rpipe->pipe_busy; 412 error = pipelock(rpipe, 1); 413 if (error) 414 goto unlocked_error; 415 416 while (uio->uio_resid) { 417 /* 418 * normal pipe buffer receive 419 */ 420 if (rpipe->pipe_buffer.cnt > 0) { 421 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 422 if (size > rpipe->pipe_buffer.cnt) 423 size = rpipe->pipe_buffer.cnt; 424 if (size > (u_int) uio->uio_resid) 425 size = (u_int) uio->uio_resid; 426 427 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 428 size, uio); 429 if (error) 430 break; 431 432 rpipe->pipe_buffer.out += size; 433 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 434 rpipe->pipe_buffer.out = 0; 435 436 rpipe->pipe_buffer.cnt -= size; 437 438 /* 439 * If there is no more to read in the pipe, reset 440 * its pointers to the beginning. This improves 441 * cache hit stats. 442 */ 443 if (rpipe->pipe_buffer.cnt == 0) { 444 rpipe->pipe_buffer.in = 0; 445 rpipe->pipe_buffer.out = 0; 446 } 447 nread += size; 448 #ifndef PIPE_NODIRECT 449 /* 450 * Direct copy, bypassing a kernel buffer. 451 */ 452 } else if ((size = rpipe->pipe_map.cnt) && 453 (rpipe->pipe_state & PIPE_DIRECTW)) { 454 caddr_t va; 455 if (size > (u_int) uio->uio_resid) 456 size = (u_int) uio->uio_resid; 457 458 va = (caddr_t) rpipe->pipe_map.kva + 459 rpipe->pipe_map.pos; 460 error = uiomove(va, size, uio); 461 if (error) 462 break; 463 nread += size; 464 rpipe->pipe_map.pos += size; 465 rpipe->pipe_map.cnt -= size; 466 if (rpipe->pipe_map.cnt == 0) { 467 rpipe->pipe_state &= ~PIPE_DIRECTW; 468 wakeup(rpipe); 469 } 470 #endif 471 } else { 472 /* 473 * detect EOF condition 474 * read returns 0 on EOF, no need to set error 475 */ 476 if (rpipe->pipe_state & PIPE_EOF) 477 break; 478 479 /* 480 * If the "write-side" has been blocked, wake it up now. 481 */ 482 if (rpipe->pipe_state & PIPE_WANTW) { 483 rpipe->pipe_state &= ~PIPE_WANTW; 484 wakeup(rpipe); 485 } 486 487 /* 488 * Break if some data was read. 489 */ 490 if (nread > 0) 491 break; 492 493 /* 494 * Unlock the pipe buffer for our remaining processing. We 495 * will either break out with an error or we will sleep and 496 * relock to loop. 497 */ 498 pipeunlock(rpipe); 499 500 /* 501 * Handle non-blocking mode operation or 502 * wait for more data. 503 */ 504 if (fp->f_flag & FNONBLOCK) { 505 error = EAGAIN; 506 } else { 507 rpipe->pipe_state |= PIPE_WANTR; 508 if ((error = tsleep(rpipe, PCATCH, 509 "piperd", 0)) == 0) { 510 error = pipelock(rpipe, 1); 511 } 512 } 513 if (error) 514 goto unlocked_error; 515 } 516 } 517 pipeunlock(rpipe); 518 519 if (error == 0) 520 vfs_timestamp(&rpipe->pipe_atime); 521 unlocked_error: 522 --rpipe->pipe_busy; 523 524 /* 525 * PIPE_WANT processing only makes sense if pipe_busy is 0. 526 */ 527 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 528 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 529 wakeup(rpipe); 530 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 531 /* 532 * Handle write blocking hysteresis. 533 */ 534 if (rpipe->pipe_state & PIPE_WANTW) { 535 rpipe->pipe_state &= ~PIPE_WANTW; 536 wakeup(rpipe); 537 } 538 } 539 540 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 541 pipeselwakeup(rpipe); 542 543 return (error); 544 } 545 546 #ifndef PIPE_NODIRECT 547 /* 548 * Map the sending processes' buffer into kernel space and wire it. 549 * This is similar to a physical write operation. 550 */ 551 static int 552 pipe_build_write_buffer(wpipe, uio) 553 struct pipe *wpipe; 554 struct uio *uio; 555 { 556 u_int size; 557 int i; 558 vm_offset_t addr, endaddr; 559 vm_paddr_t paddr; 560 561 size = (u_int) uio->uio_iov->iov_len; 562 if (size > wpipe->pipe_buffer.size) 563 size = wpipe->pipe_buffer.size; 564 565 endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size); 566 addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base); 567 for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) { 568 vm_page_t m; 569 570 if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 || 571 (paddr = pmap_kextract(addr)) == 0) { 572 int j; 573 574 for (j = 0; j < i; j++) 575 vm_page_unhold(wpipe->pipe_map.ms[j]); 576 return (EFAULT); 577 } 578 579 m = PHYS_TO_VM_PAGE(paddr); 580 vm_page_hold(m); 581 wpipe->pipe_map.ms[i] = m; 582 } 583 584 /* 585 * set up the control block 586 */ 587 wpipe->pipe_map.npages = i; 588 wpipe->pipe_map.pos = 589 ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 590 wpipe->pipe_map.cnt = size; 591 592 /* 593 * and map the buffer 594 */ 595 if (wpipe->pipe_map.kva == 0) { 596 /* 597 * We need to allocate space for an extra page because the 598 * address range might (will) span pages at times. 599 */ 600 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 601 wpipe->pipe_buffer.size + PAGE_SIZE); 602 pipe_kva += wpipe->pipe_buffer.size + PAGE_SIZE; 603 ++pipe_dkmem_alloc; 604 } 605 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 606 wpipe->pipe_map.npages); 607 608 /* 609 * and update the uio data 610 */ 611 612 uio->uio_iov->iov_len -= size; 613 uio->uio_iov->iov_base += size; 614 if (uio->uio_iov->iov_len == 0) 615 uio->uio_iov++; 616 uio->uio_resid -= size; 617 uio->uio_offset += size; 618 return (0); 619 } 620 621 /* 622 * unmap and unwire the process buffer 623 */ 624 static void 625 pipe_destroy_write_buffer(wpipe) 626 struct pipe *wpipe; 627 { 628 int i; 629 630 if (wpipe->pipe_map.kva) { 631 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 632 633 if (pipe_kva > MAXPIPEKVA) { 634 vm_offset_t kva = wpipe->pipe_map.kva; 635 wpipe->pipe_map.kva = 0; 636 kmem_free(kernel_map, kva, 637 wpipe->pipe_buffer.size + PAGE_SIZE); 638 pipe_kva -= wpipe->pipe_buffer.size + PAGE_SIZE; 639 } 640 } 641 for (i = 0; i < wpipe->pipe_map.npages; i++) 642 vm_page_unhold(wpipe->pipe_map.ms[i]); 643 wpipe->pipe_map.npages = 0; 644 } 645 646 /* 647 * In the case of a signal, the writing process might go away. This 648 * code copies the data into the circular buffer so that the source 649 * pages can be freed without loss of data. 650 */ 651 static void 652 pipe_clone_write_buffer(wpipe) 653 struct pipe *wpipe; 654 { 655 int size; 656 int pos; 657 658 size = wpipe->pipe_map.cnt; 659 pos = wpipe->pipe_map.pos; 660 bcopy((caddr_t) wpipe->pipe_map.kva + pos, 661 (caddr_t) wpipe->pipe_buffer.buffer, size); 662 663 wpipe->pipe_buffer.in = size; 664 wpipe->pipe_buffer.out = 0; 665 wpipe->pipe_buffer.cnt = size; 666 wpipe->pipe_state &= ~PIPE_DIRECTW; 667 668 pipe_destroy_write_buffer(wpipe); 669 } 670 671 /* 672 * This implements the pipe buffer write mechanism. Note that only 673 * a direct write OR a normal pipe write can be pending at any given time. 674 * If there are any characters in the pipe buffer, the direct write will 675 * be deferred until the receiving process grabs all of the bytes from 676 * the pipe buffer. Then the direct mapping write is set-up. 677 */ 678 static int 679 pipe_direct_write(wpipe, uio) 680 struct pipe *wpipe; 681 struct uio *uio; 682 { 683 int error; 684 685 retry: 686 while (wpipe->pipe_state & PIPE_DIRECTW) { 687 if (wpipe->pipe_state & PIPE_WANTR) { 688 wpipe->pipe_state &= ~PIPE_WANTR; 689 wakeup(wpipe); 690 } 691 wpipe->pipe_state |= PIPE_WANTW; 692 error = tsleep(wpipe, PCATCH, "pipdww", 0); 693 if (error) 694 goto error1; 695 if (wpipe->pipe_state & PIPE_EOF) { 696 error = EPIPE; 697 goto error1; 698 } 699 } 700 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 701 if (wpipe->pipe_buffer.cnt > 0) { 702 if (wpipe->pipe_state & PIPE_WANTR) { 703 wpipe->pipe_state &= ~PIPE_WANTR; 704 wakeup(wpipe); 705 } 706 707 wpipe->pipe_state |= PIPE_WANTW; 708 error = tsleep(wpipe, PCATCH, "pipdwc", 0); 709 if (error) 710 goto error1; 711 if (wpipe->pipe_state & PIPE_EOF) { 712 error = EPIPE; 713 goto error1; 714 } 715 goto retry; 716 } 717 718 wpipe->pipe_state |= PIPE_DIRECTW; 719 720 error = pipe_build_write_buffer(wpipe, uio); 721 if (error) { 722 wpipe->pipe_state &= ~PIPE_DIRECTW; 723 goto error1; 724 } 725 726 error = 0; 727 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 728 if (wpipe->pipe_state & PIPE_EOF) { 729 pipelock(wpipe, 0); 730 pipe_destroy_write_buffer(wpipe); 731 pipeunlock(wpipe); 732 pipeselwakeup(wpipe); 733 error = EPIPE; 734 goto error1; 735 } 736 if (wpipe->pipe_state & PIPE_WANTR) { 737 wpipe->pipe_state &= ~PIPE_WANTR; 738 wakeup(wpipe); 739 } 740 pipeselwakeup(wpipe); 741 error = tsleep(wpipe, PCATCH, "pipdwt", 0); 742 } 743 744 pipelock(wpipe,0); 745 if (wpipe->pipe_state & PIPE_DIRECTW) { 746 /* 747 * this bit of trickery substitutes a kernel buffer for 748 * the process that might be going away. 749 */ 750 pipe_clone_write_buffer(wpipe); 751 } else { 752 pipe_destroy_write_buffer(wpipe); 753 } 754 pipeunlock(wpipe); 755 return (error); 756 757 error1: 758 wakeup(wpipe); 759 return (error); 760 } 761 #endif 762 763 static int 764 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, 765 int flags, struct thread *td) 766 { 767 int error = 0; 768 int orig_resid; 769 struct pipe *wpipe, *rpipe; 770 771 rpipe = (struct pipe *) fp->f_data; 772 wpipe = rpipe->pipe_peer; 773 774 /* 775 * detect loss of pipe read side, issue SIGPIPE if lost. 776 */ 777 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 778 return (EPIPE); 779 } 780 ++wpipe->pipe_busy; 781 782 /* 783 * If it is advantageous to resize the pipe buffer, do 784 * so. 785 */ 786 if ((uio->uio_resid > PIPE_SIZE) && 787 (pipe_nbig < pipe_maxbig) && 788 (wpipe->pipe_state & PIPE_DIRECTW) == 0 && 789 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 790 (wpipe->pipe_buffer.cnt == 0)) { 791 792 if ((error = pipelock(wpipe,1)) == 0) { 793 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 794 pipe_nbig++; 795 pipeunlock(wpipe); 796 } 797 } 798 799 /* 800 * If an early error occured unbusy and return, waking up any pending 801 * readers. 802 */ 803 if (error) { 804 --wpipe->pipe_busy; 805 if ((wpipe->pipe_busy == 0) && 806 (wpipe->pipe_state & PIPE_WANT)) { 807 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 808 wakeup(wpipe); 809 } 810 return(error); 811 } 812 813 KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone")); 814 815 orig_resid = uio->uio_resid; 816 817 while (uio->uio_resid) { 818 int space; 819 820 #ifndef PIPE_NODIRECT 821 /* 822 * If the transfer is large, we can gain performance if 823 * we do process-to-process copies directly. 824 * If the write is non-blocking, we don't use the 825 * direct write mechanism. 826 * 827 * The direct write mechanism will detect the reader going 828 * away on us. 829 */ 830 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) && 831 (fp->f_flag & FNONBLOCK) == 0 && 832 (wpipe->pipe_map.kva || (pipe_kva < LIMITPIPEKVA)) && 833 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 834 error = pipe_direct_write( wpipe, uio); 835 if (error) 836 break; 837 continue; 838 } 839 #endif 840 841 /* 842 * Pipe buffered writes cannot be coincidental with 843 * direct writes. We wait until the currently executing 844 * direct write is completed before we start filling the 845 * pipe buffer. We break out if a signal occurs or the 846 * reader goes away. 847 */ 848 retrywrite: 849 while (wpipe->pipe_state & PIPE_DIRECTW) { 850 if (wpipe->pipe_state & PIPE_WANTR) { 851 wpipe->pipe_state &= ~PIPE_WANTR; 852 wakeup(wpipe); 853 } 854 error = tsleep(wpipe, PCATCH, "pipbww", 0); 855 if (wpipe->pipe_state & PIPE_EOF) 856 break; 857 if (error) 858 break; 859 } 860 if (wpipe->pipe_state & PIPE_EOF) { 861 error = EPIPE; 862 break; 863 } 864 865 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 866 867 /* Writes of size <= PIPE_BUF must be atomic. */ 868 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 869 space = 0; 870 871 if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) { 872 if ((error = pipelock(wpipe,1)) == 0) { 873 int size; /* Transfer size */ 874 int segsize; /* first segment to transfer */ 875 876 /* 877 * It is possible for a direct write to 878 * slip in on us... handle it here... 879 */ 880 if (wpipe->pipe_state & PIPE_DIRECTW) { 881 pipeunlock(wpipe); 882 goto retrywrite; 883 } 884 /* 885 * If a process blocked in uiomove, our 886 * value for space might be bad. 887 * 888 * XXX will we be ok if the reader has gone 889 * away here? 890 */ 891 if (space > wpipe->pipe_buffer.size - 892 wpipe->pipe_buffer.cnt) { 893 pipeunlock(wpipe); 894 goto retrywrite; 895 } 896 897 /* 898 * Transfer size is minimum of uio transfer 899 * and free space in pipe buffer. 900 */ 901 if (space > uio->uio_resid) 902 size = uio->uio_resid; 903 else 904 size = space; 905 /* 906 * First segment to transfer is minimum of 907 * transfer size and contiguous space in 908 * pipe buffer. If first segment to transfer 909 * is less than the transfer size, we've got 910 * a wraparound in the buffer. 911 */ 912 segsize = wpipe->pipe_buffer.size - 913 wpipe->pipe_buffer.in; 914 if (segsize > size) 915 segsize = size; 916 917 /* Transfer first segment */ 918 919 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 920 segsize, uio); 921 922 if (error == 0 && segsize < size) { 923 /* 924 * Transfer remaining part now, to 925 * support atomic writes. Wraparound 926 * happened. 927 */ 928 if (wpipe->pipe_buffer.in + segsize != 929 wpipe->pipe_buffer.size) 930 panic("Expected pipe buffer wraparound disappeared"); 931 932 error = uiomove(&wpipe->pipe_buffer.buffer[0], 933 size - segsize, uio); 934 } 935 if (error == 0) { 936 wpipe->pipe_buffer.in += size; 937 if (wpipe->pipe_buffer.in >= 938 wpipe->pipe_buffer.size) { 939 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 940 panic("Expected wraparound bad"); 941 wpipe->pipe_buffer.in = size - segsize; 942 } 943 944 wpipe->pipe_buffer.cnt += size; 945 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 946 panic("Pipe buffer overflow"); 947 948 } 949 pipeunlock(wpipe); 950 } 951 if (error) 952 break; 953 954 } else { 955 /* 956 * If the "read-side" has been blocked, wake it up now. 957 */ 958 if (wpipe->pipe_state & PIPE_WANTR) { 959 wpipe->pipe_state &= ~PIPE_WANTR; 960 wakeup(wpipe); 961 } 962 963 /* 964 * don't block on non-blocking I/O 965 */ 966 if (fp->f_flag & FNONBLOCK) { 967 error = EAGAIN; 968 break; 969 } 970 971 /* 972 * We have no more space and have something to offer, 973 * wake up select/poll. 974 */ 975 pipeselwakeup(wpipe); 976 977 wpipe->pipe_state |= PIPE_WANTW; 978 error = tsleep(wpipe, PCATCH, "pipewr", 0); 979 if (error != 0) 980 break; 981 /* 982 * If read side wants to go away, we just issue a signal 983 * to ourselves. 984 */ 985 if (wpipe->pipe_state & PIPE_EOF) { 986 error = EPIPE; 987 break; 988 } 989 } 990 } 991 992 --wpipe->pipe_busy; 993 994 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 995 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 996 wakeup(wpipe); 997 } else if (wpipe->pipe_buffer.cnt > 0) { 998 /* 999 * If we have put any characters in the buffer, we wake up 1000 * the reader. 1001 */ 1002 if (wpipe->pipe_state & PIPE_WANTR) { 1003 wpipe->pipe_state &= ~PIPE_WANTR; 1004 wakeup(wpipe); 1005 } 1006 } 1007 1008 /* 1009 * Don't return EPIPE if I/O was successful 1010 */ 1011 if ((wpipe->pipe_buffer.cnt == 0) && 1012 (uio->uio_resid == 0) && 1013 (error == EPIPE)) { 1014 error = 0; 1015 } 1016 1017 if (error == 0) 1018 vfs_timestamp(&wpipe->pipe_mtime); 1019 1020 /* 1021 * We have something to offer, 1022 * wake up select/poll. 1023 */ 1024 if (wpipe->pipe_buffer.cnt) 1025 pipeselwakeup(wpipe); 1026 1027 return (error); 1028 } 1029 1030 /* 1031 * we implement a very minimal set of ioctls for compatibility with sockets. 1032 */ 1033 int 1034 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td) 1035 { 1036 struct pipe *mpipe = (struct pipe *)fp->f_data; 1037 1038 switch (cmd) { 1039 1040 case FIONBIO: 1041 return (0); 1042 1043 case FIOASYNC: 1044 if (*(int *)data) { 1045 mpipe->pipe_state |= PIPE_ASYNC; 1046 } else { 1047 mpipe->pipe_state &= ~PIPE_ASYNC; 1048 } 1049 return (0); 1050 1051 case FIONREAD: 1052 if (mpipe->pipe_state & PIPE_DIRECTW) 1053 *(int *)data = mpipe->pipe_map.cnt; 1054 else 1055 *(int *)data = mpipe->pipe_buffer.cnt; 1056 return (0); 1057 1058 case FIOSETOWN: 1059 return (fsetown(*(int *)data, &mpipe->pipe_sigio)); 1060 1061 case FIOGETOWN: 1062 *(int *)data = fgetown(mpipe->pipe_sigio); 1063 return (0); 1064 1065 /* This is deprecated, FIOSETOWN should be used instead. */ 1066 case TIOCSPGRP: 1067 return (fsetown(-(*(int *)data), &mpipe->pipe_sigio)); 1068 1069 /* This is deprecated, FIOGETOWN should be used instead. */ 1070 case TIOCGPGRP: 1071 *(int *)data = -fgetown(mpipe->pipe_sigio); 1072 return (0); 1073 1074 } 1075 return (ENOTTY); 1076 } 1077 1078 int 1079 pipe_poll(struct file *fp, int events, struct ucred *cred, struct thread *td) 1080 { 1081 struct pipe *rpipe = (struct pipe *)fp->f_data; 1082 struct pipe *wpipe; 1083 int revents = 0; 1084 1085 wpipe = rpipe->pipe_peer; 1086 if (events & (POLLIN | POLLRDNORM)) 1087 if ((rpipe->pipe_state & PIPE_DIRECTW) || 1088 (rpipe->pipe_buffer.cnt > 0) || 1089 (rpipe->pipe_state & PIPE_EOF)) 1090 revents |= events & (POLLIN | POLLRDNORM); 1091 1092 if (events & (POLLOUT | POLLWRNORM)) 1093 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) || 1094 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && 1095 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) 1096 revents |= events & (POLLOUT | POLLWRNORM); 1097 1098 if ((rpipe->pipe_state & PIPE_EOF) || 1099 (wpipe == NULL) || 1100 (wpipe->pipe_state & PIPE_EOF)) 1101 revents |= POLLHUP; 1102 1103 if (revents == 0) { 1104 if (events & (POLLIN | POLLRDNORM)) { 1105 selrecord(td, &rpipe->pipe_sel); 1106 rpipe->pipe_state |= PIPE_SEL; 1107 } 1108 1109 if (events & (POLLOUT | POLLWRNORM)) { 1110 selrecord(td, &wpipe->pipe_sel); 1111 wpipe->pipe_state |= PIPE_SEL; 1112 } 1113 } 1114 1115 return (revents); 1116 } 1117 1118 static int 1119 pipe_stat(struct file *fp, struct stat *ub, struct thread *td) 1120 { 1121 struct pipe *pipe = (struct pipe *)fp->f_data; 1122 1123 bzero((caddr_t)ub, sizeof(*ub)); 1124 ub->st_mode = S_IFIFO; 1125 ub->st_blksize = pipe->pipe_buffer.size; 1126 ub->st_size = pipe->pipe_buffer.cnt; 1127 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1128 ub->st_atimespec = pipe->pipe_atime; 1129 ub->st_mtimespec = pipe->pipe_mtime; 1130 ub->st_ctimespec = pipe->pipe_ctime; 1131 /* 1132 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, 1133 * st_flags, st_gen. 1134 * XXX (st_dev, st_ino) should be unique. 1135 */ 1136 return (0); 1137 } 1138 1139 /* ARGSUSED */ 1140 static int 1141 pipe_close(struct file *fp, struct thread *td) 1142 { 1143 struct pipe *cpipe = (struct pipe *)fp->f_data; 1144 1145 fp->f_ops = &badfileops; 1146 fp->f_data = NULL; 1147 funsetown(cpipe->pipe_sigio); 1148 pipeclose(cpipe); 1149 return (0); 1150 } 1151 1152 static void 1153 pipe_free_kmem(struct pipe *cpipe) 1154 { 1155 if (cpipe->pipe_buffer.buffer != NULL) { 1156 if (cpipe->pipe_buffer.size > PIPE_SIZE) 1157 --pipe_nbig; 1158 pipe_kva -= cpipe->pipe_buffer.size; 1159 kmem_free(kernel_map, 1160 (vm_offset_t)cpipe->pipe_buffer.buffer, 1161 cpipe->pipe_buffer.size); 1162 cpipe->pipe_buffer.buffer = NULL; 1163 cpipe->pipe_buffer.object = NULL; 1164 } 1165 #ifndef PIPE_NODIRECT 1166 if (cpipe->pipe_map.kva != NULL) { 1167 pipe_kva -= cpipe->pipe_buffer.size + PAGE_SIZE; 1168 kmem_free(kernel_map, 1169 cpipe->pipe_map.kva, 1170 cpipe->pipe_buffer.size + PAGE_SIZE); 1171 cpipe->pipe_map.cnt = 0; 1172 cpipe->pipe_map.kva = 0; 1173 cpipe->pipe_map.pos = 0; 1174 cpipe->pipe_map.npages = 0; 1175 } 1176 #endif 1177 } 1178 1179 /* 1180 * shutdown the pipe 1181 */ 1182 static void 1183 pipeclose(struct pipe *cpipe) 1184 { 1185 globaldata_t gd; 1186 struct pipe *ppipe; 1187 1188 if (cpipe == NULL) 1189 return; 1190 1191 pipeselwakeup(cpipe); 1192 1193 /* 1194 * If the other side is blocked, wake it up saying that 1195 * we want to close it down. 1196 */ 1197 while (cpipe->pipe_busy) { 1198 wakeup(cpipe); 1199 cpipe->pipe_state |= PIPE_WANT | PIPE_EOF; 1200 tsleep(cpipe, 0, "pipecl", 0); 1201 } 1202 1203 /* 1204 * Disconnect from peer 1205 */ 1206 if ((ppipe = cpipe->pipe_peer) != NULL) { 1207 pipeselwakeup(ppipe); 1208 1209 ppipe->pipe_state |= PIPE_EOF; 1210 wakeup(ppipe); 1211 KNOTE(&ppipe->pipe_sel.si_note, 0); 1212 ppipe->pipe_peer = NULL; 1213 } 1214 1215 /* 1216 * free or cache resources 1217 */ 1218 gd = mycpu; 1219 if (gd->gd_pipeqcount >= pipe_maxcache || 1220 cpipe->pipe_buffer.size != PIPE_SIZE 1221 ) { 1222 pipe_free_kmem(cpipe); 1223 free(cpipe, M_PIPE); 1224 } else { 1225 KKASSERT(cpipe->pipe_map.npages == 0); 1226 1227 cpipe->pipe_state = 0; 1228 cpipe->pipe_busy = 0; 1229 cpipe->pipe_map.cnt = 0; 1230 cpipe->pipe_map.pos = 0; 1231 cpipe->pipe_peer = gd->gd_pipeq; 1232 gd->gd_pipeq = cpipe; 1233 ++gd->gd_pipeqcount; 1234 } 1235 } 1236 1237 /*ARGSUSED*/ 1238 static int 1239 pipe_kqfilter(struct file *fp, struct knote *kn) 1240 { 1241 struct pipe *cpipe = (struct pipe *)kn->kn_fp->f_data; 1242 1243 switch (kn->kn_filter) { 1244 case EVFILT_READ: 1245 kn->kn_fop = &pipe_rfiltops; 1246 break; 1247 case EVFILT_WRITE: 1248 kn->kn_fop = &pipe_wfiltops; 1249 cpipe = cpipe->pipe_peer; 1250 if (cpipe == NULL) 1251 /* other end of pipe has been closed */ 1252 return (EPIPE); 1253 break; 1254 default: 1255 return (1); 1256 } 1257 kn->kn_hook = (caddr_t)cpipe; 1258 1259 SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext); 1260 return (0); 1261 } 1262 1263 static void 1264 filt_pipedetach(struct knote *kn) 1265 { 1266 struct pipe *cpipe = (struct pipe *)kn->kn_hook; 1267 1268 SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext); 1269 } 1270 1271 /*ARGSUSED*/ 1272 static int 1273 filt_piperead(struct knote *kn, long hint) 1274 { 1275 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1276 struct pipe *wpipe = rpipe->pipe_peer; 1277 1278 kn->kn_data = rpipe->pipe_buffer.cnt; 1279 if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW)) 1280 kn->kn_data = rpipe->pipe_map.cnt; 1281 1282 if ((rpipe->pipe_state & PIPE_EOF) || 1283 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1284 kn->kn_flags |= EV_EOF; 1285 return (1); 1286 } 1287 return (kn->kn_data > 0); 1288 } 1289 1290 /*ARGSUSED*/ 1291 static int 1292 filt_pipewrite(struct knote *kn, long hint) 1293 { 1294 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1295 struct pipe *wpipe = rpipe->pipe_peer; 1296 1297 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1298 kn->kn_data = 0; 1299 kn->kn_flags |= EV_EOF; 1300 return (1); 1301 } 1302 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 1303 if (wpipe->pipe_state & PIPE_DIRECTW) 1304 kn->kn_data = 0; 1305 1306 return (kn->kn_data >= PIPE_BUF); 1307 } 1308