1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $ 20 * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $ 21 */ 22 23 /* 24 * This file contains a high-performance replacement for the socket-based 25 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 26 * all features of sockets, but does do everything that pipes normally 27 * do. 28 */ 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/kernel.h> 32 #include <sys/proc.h> 33 #include <sys/fcntl.h> 34 #include <sys/file.h> 35 #include <sys/filedesc.h> 36 #include <sys/filio.h> 37 #include <sys/ttycom.h> 38 #include <sys/stat.h> 39 #include <sys/select.h> 40 #include <sys/signalvar.h> 41 #include <sys/sysproto.h> 42 #include <sys/pipe.h> 43 #include <sys/vnode.h> 44 #include <sys/uio.h> 45 #include <sys/event.h> 46 #include <sys/globaldata.h> 47 #include <sys/module.h> 48 #include <sys/malloc.h> 49 #include <sys/sysctl.h> 50 #include <sys/socket.h> 51 52 #include <vm/vm.h> 53 #include <vm/vm_param.h> 54 #include <sys/lock.h> 55 #include <vm/vm_object.h> 56 #include <vm/vm_kern.h> 57 #include <vm/vm_extern.h> 58 #include <vm/pmap.h> 59 #include <vm/vm_map.h> 60 #include <vm/vm_page.h> 61 #include <vm/vm_zone.h> 62 63 #include <sys/file2.h> 64 #include <sys/signal2.h> 65 #include <sys/mplock2.h> 66 67 #include <machine/cpufunc.h> 68 69 /* 70 * interfaces to the outside world 71 */ 72 static int pipe_read (struct file *fp, struct uio *uio, 73 struct ucred *cred, int flags); 74 static int pipe_write (struct file *fp, struct uio *uio, 75 struct ucred *cred, int flags); 76 static int pipe_close (struct file *fp); 77 static int pipe_shutdown (struct file *fp, int how); 78 static int pipe_kqfilter (struct file *fp, struct knote *kn); 79 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred); 80 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, 81 struct ucred *cred, struct sysmsg *msg); 82 83 static struct fileops pipeops = { 84 .fo_read = pipe_read, 85 .fo_write = pipe_write, 86 .fo_ioctl = pipe_ioctl, 87 .fo_kqfilter = pipe_kqfilter, 88 .fo_stat = pipe_stat, 89 .fo_close = pipe_close, 90 .fo_shutdown = pipe_shutdown 91 }; 92 93 static void filt_pipedetach(struct knote *kn); 94 static int filt_piperead(struct knote *kn, long hint); 95 static int filt_pipewrite(struct knote *kn, long hint); 96 97 static struct filterops pipe_rfiltops = 98 { 1, NULL, filt_pipedetach, filt_piperead }; 99 static struct filterops pipe_wfiltops = 100 { 1, NULL, filt_pipedetach, filt_pipewrite }; 101 102 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); 103 104 /* 105 * Default pipe buffer size(s), this can be kind-of large now because pipe 106 * space is pageable. The pipe code will try to maintain locality of 107 * reference for performance reasons, so small amounts of outstanding I/O 108 * will not wipe the cache. 109 */ 110 #define MINPIPESIZE (PIPE_SIZE/3) 111 #define MAXPIPESIZE (2*PIPE_SIZE/3) 112 113 /* 114 * Limit the number of "big" pipes 115 */ 116 #define LIMITBIGPIPES 64 117 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ 118 119 static int pipe_maxbig = LIMITBIGPIPES; 120 static int pipe_maxcache = PIPEQ_MAX_CACHE; 121 static int pipe_bigcount; 122 static int pipe_nbig; 123 static int pipe_bcache_alloc; 124 static int pipe_bkmem_alloc; 125 static int pipe_rblocked_count; 126 static int pipe_wblocked_count; 127 128 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); 129 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig, 130 CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated"); 131 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount, 132 CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded"); 133 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked, 134 CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded"); 135 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked, 136 CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded"); 137 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, 138 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); 139 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig, 140 CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes"); 141 #ifdef SMP 142 static int pipe_delay = 5000; /* 5uS default */ 143 SYSCTL_INT(_kern_pipe, OID_AUTO, delay, 144 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns"); 145 static int pipe_mpsafe = 1; 146 SYSCTL_INT(_kern_pipe, OID_AUTO, mpsafe, 147 CTLFLAG_RW, &pipe_mpsafe, 0, ""); 148 #endif 149 #if !defined(NO_PIPE_SYSCTL_STATS) 150 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc, 151 CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache"); 152 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc, 153 CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem"); 154 #endif 155 156 static void pipeclose (struct pipe *cpipe); 157 static void pipe_free_kmem (struct pipe *cpipe); 158 static int pipe_create (struct pipe **cpipep); 159 static __inline void pipeselwakeup (struct pipe *cpipe); 160 static int pipespace (struct pipe *cpipe, int size); 161 162 static __inline int 163 pipeseltest(struct pipe *cpipe) 164 { 165 return ((cpipe->pipe_state & PIPE_SEL) || 166 ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) || 167 SLIST_FIRST(&cpipe->pipe_sel.si_note)); 168 } 169 170 static __inline void 171 pipeselwakeup(struct pipe *cpipe) 172 { 173 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) { 174 get_mplock(); 175 pgsigio(cpipe->pipe_sigio, SIGIO, 0); 176 rel_mplock(); 177 } 178 if (SLIST_FIRST(&cpipe->pipe_sel.si_note)) { 179 get_mplock(); 180 KNOTE(&cpipe->pipe_sel.si_note, 0); 181 rel_mplock(); 182 } 183 } 184 185 /* 186 * These routines are called before and after a UIO. The UIO 187 * may block, causing our held tokens to be lost temporarily. 188 * 189 * We use these routines to serialize reads against other reads 190 * and writes against other writes. 191 * 192 * The read token is held on entry so *ipp does not race. 193 */ 194 static __inline int 195 pipe_start_uio(struct pipe *cpipe, int *ipp) 196 { 197 int error; 198 199 while (*ipp) { 200 *ipp = -1; 201 error = tsleep(ipp, PCATCH, "pipexx", 0); 202 if (error) 203 return (error); 204 } 205 *ipp = 1; 206 return (0); 207 } 208 209 static __inline void 210 pipe_end_uio(struct pipe *cpipe, int *ipp) 211 { 212 if (*ipp < 0) { 213 *ipp = 0; 214 wakeup(ipp); 215 } else { 216 KKASSERT(*ipp > 0); 217 *ipp = 0; 218 } 219 } 220 221 static __inline void 222 pipe_get_mplock(int *save) 223 { 224 #ifdef SMP 225 if (pipe_mpsafe == 0) { 226 get_mplock(); 227 *save = 1; 228 } else 229 #endif 230 { 231 *save = 0; 232 } 233 } 234 235 static __inline void 236 pipe_rel_mplock(int *save) 237 { 238 #ifdef SMP 239 if (*save) 240 rel_mplock(); 241 #endif 242 } 243 244 245 /* 246 * The pipe system call for the DTYPE_PIPE type of pipes 247 * 248 * pipe_args(int dummy) 249 * 250 * MPSAFE 251 */ 252 int 253 sys_pipe(struct pipe_args *uap) 254 { 255 struct thread *td = curthread; 256 struct filedesc *fdp = td->td_proc->p_fd; 257 struct file *rf, *wf; 258 struct pipe *rpipe, *wpipe; 259 int fd1, fd2, error; 260 261 rpipe = wpipe = NULL; 262 if (pipe_create(&rpipe) || pipe_create(&wpipe)) { 263 pipeclose(rpipe); 264 pipeclose(wpipe); 265 return (ENFILE); 266 } 267 268 error = falloc(td->td_lwp, &rf, &fd1); 269 if (error) { 270 pipeclose(rpipe); 271 pipeclose(wpipe); 272 return (error); 273 } 274 uap->sysmsg_fds[0] = fd1; 275 276 /* 277 * Warning: once we've gotten past allocation of the fd for the 278 * read-side, we can only drop the read side via fdrop() in order 279 * to avoid races against processes which manage to dup() the read 280 * side while we are blocked trying to allocate the write side. 281 */ 282 rf->f_type = DTYPE_PIPE; 283 rf->f_flag = FREAD | FWRITE; 284 rf->f_ops = &pipeops; 285 rf->f_data = rpipe; 286 error = falloc(td->td_lwp, &wf, &fd2); 287 if (error) { 288 fsetfd(fdp, NULL, fd1); 289 fdrop(rf); 290 /* rpipe has been closed by fdrop(). */ 291 pipeclose(wpipe); 292 return (error); 293 } 294 wf->f_type = DTYPE_PIPE; 295 wf->f_flag = FREAD | FWRITE; 296 wf->f_ops = &pipeops; 297 wf->f_data = wpipe; 298 uap->sysmsg_fds[1] = fd2; 299 300 rpipe->pipe_slock = kmalloc(sizeof(struct lock), 301 M_PIPE, M_WAITOK|M_ZERO); 302 wpipe->pipe_slock = rpipe->pipe_slock; 303 rpipe->pipe_peer = wpipe; 304 wpipe->pipe_peer = rpipe; 305 lockinit(rpipe->pipe_slock, "pipecl", 0, 0); 306 307 /* 308 * Once activated the peer relationship remains valid until 309 * both sides are closed. 310 */ 311 fsetfd(fdp, rf, fd1); 312 fsetfd(fdp, wf, fd2); 313 fdrop(rf); 314 fdrop(wf); 315 316 return (0); 317 } 318 319 /* 320 * Allocate kva for pipe circular buffer, the space is pageable 321 * This routine will 'realloc' the size of a pipe safely, if it fails 322 * it will retain the old buffer. 323 * If it fails it will return ENOMEM. 324 */ 325 static int 326 pipespace(struct pipe *cpipe, int size) 327 { 328 struct vm_object *object; 329 caddr_t buffer; 330 int npages, error; 331 332 npages = round_page(size) / PAGE_SIZE; 333 object = cpipe->pipe_buffer.object; 334 335 /* 336 * [re]create the object if necessary and reserve space for it 337 * in the kernel_map. The object and memory are pageable. On 338 * success, free the old resources before assigning the new 339 * ones. 340 */ 341 if (object == NULL || object->size != npages) { 342 get_mplock(); 343 object = vm_object_allocate(OBJT_DEFAULT, npages); 344 buffer = (caddr_t)vm_map_min(&kernel_map); 345 346 error = vm_map_find(&kernel_map, object, 0, 347 (vm_offset_t *)&buffer, 348 size, PAGE_SIZE, 349 1, VM_MAPTYPE_NORMAL, 350 VM_PROT_ALL, VM_PROT_ALL, 351 0); 352 353 if (error != KERN_SUCCESS) { 354 vm_object_deallocate(object); 355 rel_mplock(); 356 return (ENOMEM); 357 } 358 pipe_free_kmem(cpipe); 359 rel_mplock(); 360 cpipe->pipe_buffer.object = object; 361 cpipe->pipe_buffer.buffer = buffer; 362 cpipe->pipe_buffer.size = size; 363 ++pipe_bkmem_alloc; 364 } else { 365 ++pipe_bcache_alloc; 366 } 367 cpipe->pipe_buffer.rindex = 0; 368 cpipe->pipe_buffer.windex = 0; 369 return (0); 370 } 371 372 /* 373 * Initialize and allocate VM and memory for pipe, pulling the pipe from 374 * our per-cpu cache if possible. For now make sure it is sized for the 375 * smaller PIPE_SIZE default. 376 */ 377 static int 378 pipe_create(struct pipe **cpipep) 379 { 380 globaldata_t gd = mycpu; 381 struct pipe *cpipe; 382 int error; 383 384 if ((cpipe = gd->gd_pipeq) != NULL) { 385 gd->gd_pipeq = cpipe->pipe_peer; 386 --gd->gd_pipeqcount; 387 cpipe->pipe_peer = NULL; 388 cpipe->pipe_wantwcnt = 0; 389 } else { 390 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO); 391 } 392 *cpipep = cpipe; 393 if ((error = pipespace(cpipe, PIPE_SIZE)) != 0) 394 return (error); 395 vfs_timestamp(&cpipe->pipe_ctime); 396 cpipe->pipe_atime = cpipe->pipe_ctime; 397 cpipe->pipe_mtime = cpipe->pipe_ctime; 398 lwkt_token_init(&cpipe->pipe_rlock, 1); 399 lwkt_token_init(&cpipe->pipe_wlock, 1); 400 return (0); 401 } 402 403 /* 404 * MPALMOSTSAFE (acquires mplock) 405 */ 406 static int 407 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 408 { 409 struct pipe *rpipe; 410 int error; 411 size_t nread = 0; 412 int nbio; 413 u_int size; /* total bytes available */ 414 u_int nsize; /* total bytes to read */ 415 u_int rindex; /* contiguous bytes available */ 416 int notify_writer; 417 int mpsave; 418 int bigread; 419 int bigcount; 420 421 if (uio->uio_resid == 0) 422 return(0); 423 424 /* 425 * Setup locks, calculate nbio 426 */ 427 pipe_get_mplock(&mpsave); 428 rpipe = (struct pipe *)fp->f_data; 429 lwkt_gettoken(&rpipe->pipe_rlock); 430 431 if (fflags & O_FBLOCKING) 432 nbio = 0; 433 else if (fflags & O_FNONBLOCKING) 434 nbio = 1; 435 else if (fp->f_flag & O_NONBLOCK) 436 nbio = 1; 437 else 438 nbio = 0; 439 440 /* 441 * Reads are serialized. Note howeverthat pipe_buffer.buffer and 442 * pipe_buffer.size can change out from under us when the number 443 * of bytes in the buffer are zero due to the write-side doing a 444 * pipespace(). 445 */ 446 error = pipe_start_uio(rpipe, &rpipe->pipe_rip); 447 if (error) { 448 pipe_rel_mplock(&mpsave); 449 lwkt_reltoken(&rpipe->pipe_rlock); 450 return (error); 451 } 452 notify_writer = 0; 453 454 bigread = (uio->uio_resid > 10 * 1024 * 1024); 455 bigcount = 10; 456 457 while (uio->uio_resid) { 458 /* 459 * Don't hog the cpu. 460 */ 461 if (bigread && --bigcount == 0) { 462 lwkt_user_yield(); 463 bigcount = 10; 464 if (CURSIG(curthread->td_lwp)) { 465 error = EINTR; 466 break; 467 } 468 } 469 470 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 471 cpu_lfence(); 472 if (size) { 473 rindex = rpipe->pipe_buffer.rindex & 474 (rpipe->pipe_buffer.size - 1); 475 nsize = size; 476 if (nsize > rpipe->pipe_buffer.size - rindex) 477 nsize = rpipe->pipe_buffer.size - rindex; 478 nsize = szmin(nsize, uio->uio_resid); 479 480 error = uiomove(&rpipe->pipe_buffer.buffer[rindex], 481 nsize, uio); 482 if (error) 483 break; 484 cpu_mfence(); 485 rpipe->pipe_buffer.rindex += nsize; 486 nread += nsize; 487 488 /* 489 * If the FIFO is still over half full just continue 490 * and do not try to notify the writer yet. 491 */ 492 if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) { 493 notify_writer = 0; 494 continue; 495 } 496 497 /* 498 * When the FIFO is less then half full notify any 499 * waiting writer. WANTW can be checked while 500 * holding just the rlock. 501 */ 502 notify_writer = 1; 503 if ((rpipe->pipe_state & PIPE_WANTW) == 0) 504 continue; 505 } 506 507 /* 508 * If the "write-side" was blocked we wake it up. This code 509 * is reached either when the buffer is completely emptied 510 * or if it becomes more then half-empty. 511 * 512 * Pipe_state can only be modified if both the rlock and 513 * wlock are held. 514 */ 515 if (rpipe->pipe_state & PIPE_WANTW) { 516 lwkt_gettoken(&rpipe->pipe_wlock); 517 if (rpipe->pipe_state & PIPE_WANTW) { 518 notify_writer = 0; 519 rpipe->pipe_state &= ~PIPE_WANTW; 520 lwkt_reltoken(&rpipe->pipe_wlock); 521 wakeup(rpipe); 522 } else { 523 lwkt_reltoken(&rpipe->pipe_wlock); 524 } 525 } 526 527 /* 528 * Pick up our copy loop again if the writer sent data to 529 * us while we were messing around. 530 * 531 * On a SMP box poll up to pipe_delay nanoseconds for new 532 * data. Typically a value of 2000 to 4000 is sufficient 533 * to eradicate most IPIs/tsleeps/wakeups when a pipe 534 * is used for synchronous communications with small packets, 535 * and 8000 or so (8uS) will pipeline large buffer xfers 536 * between cpus over a pipe. 537 * 538 * For synchronous communications a hit means doing a 539 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS, 540 * where as miss requiring a tsleep/wakeup sequence 541 * will take 7uS or more. 542 */ 543 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) 544 continue; 545 546 #if defined(SMP) && defined(_RDTSC_SUPPORTED_) 547 if (pipe_delay) { 548 int64_t tsc_target; 549 int good = 0; 550 551 tsc_target = tsc_get_target(pipe_delay); 552 while (tsc_test_target(tsc_target) == 0) { 553 if (rpipe->pipe_buffer.windex != 554 rpipe->pipe_buffer.rindex) { 555 good = 1; 556 break; 557 } 558 } 559 if (good) 560 continue; 561 } 562 #endif 563 564 /* 565 * Detect EOF condition, do not set error. 566 */ 567 if (rpipe->pipe_state & PIPE_REOF) 568 break; 569 570 /* 571 * Break if some data was read, or if this was a non-blocking 572 * read. 573 */ 574 if (nread > 0) 575 break; 576 577 if (nbio) { 578 error = EAGAIN; 579 break; 580 } 581 582 /* 583 * Last chance, interlock with WANTR. 584 */ 585 lwkt_gettoken(&rpipe->pipe_wlock); 586 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 587 if (size) { 588 lwkt_reltoken(&rpipe->pipe_wlock); 589 continue; 590 } 591 592 /* 593 * Retest EOF - acquiring a new token can temporarily release 594 * tokens already held. 595 */ 596 if (rpipe->pipe_state & PIPE_REOF) { 597 lwkt_reltoken(&rpipe->pipe_wlock); 598 break; 599 } 600 601 /* 602 * If there is no more to read in the pipe, reset its 603 * pointers to the beginning. This improves cache hit 604 * stats. 605 * 606 * We need both locks to modify both pointers, and there 607 * must also not be a write in progress or the uiomove() 608 * in the write might block and temporarily release 609 * its wlock, then reacquire and update windex. We are 610 * only serialized against reads, not writes. 611 * 612 * XXX should we even bother resetting the indices? It 613 * might actually be more cache efficient not to. 614 */ 615 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex && 616 rpipe->pipe_wip == 0) { 617 rpipe->pipe_buffer.rindex = 0; 618 rpipe->pipe_buffer.windex = 0; 619 } 620 621 /* 622 * Wait for more data. 623 * 624 * Pipe_state can only be set if both the rlock and wlock 625 * are held. 626 */ 627 rpipe->pipe_state |= PIPE_WANTR; 628 tsleep_interlock(rpipe, PCATCH); 629 lwkt_reltoken(&rpipe->pipe_wlock); 630 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0); 631 ++pipe_rblocked_count; 632 if (error) 633 break; 634 } 635 pipe_end_uio(rpipe, &rpipe->pipe_rip); 636 637 /* 638 * Uptime last access time 639 */ 640 if (error == 0 && nread) 641 vfs_timestamp(&rpipe->pipe_atime); 642 643 /* 644 * If we drained the FIFO more then half way then handle 645 * write blocking hysteresis. 646 * 647 * Note that PIPE_WANTW cannot be set by the writer without 648 * it holding both rlock and wlock, so we can test it 649 * while holding just rlock. 650 */ 651 if (notify_writer) { 652 if (rpipe->pipe_state & PIPE_WANTW) { 653 lwkt_gettoken(&rpipe->pipe_wlock); 654 if (rpipe->pipe_state & PIPE_WANTW) { 655 rpipe->pipe_state &= ~PIPE_WANTW; 656 lwkt_reltoken(&rpipe->pipe_wlock); 657 wakeup(rpipe); 658 } else { 659 lwkt_reltoken(&rpipe->pipe_wlock); 660 } 661 } 662 if (pipeseltest(rpipe)) { 663 lwkt_gettoken(&rpipe->pipe_wlock); 664 pipeselwakeup(rpipe); 665 lwkt_reltoken(&rpipe->pipe_wlock); 666 } 667 } 668 /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/ 669 lwkt_reltoken(&rpipe->pipe_rlock); 670 671 pipe_rel_mplock(&mpsave); 672 return (error); 673 } 674 675 /* 676 * MPALMOSTSAFE - acquires mplock 677 */ 678 static int 679 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 680 { 681 int error; 682 int orig_resid; 683 int nbio; 684 struct pipe *wpipe, *rpipe; 685 u_int windex; 686 u_int space; 687 u_int wcount; 688 int mpsave; 689 int bigwrite; 690 int bigcount; 691 692 pipe_get_mplock(&mpsave); 693 694 /* 695 * Writes go to the peer. The peer will always exist. 696 */ 697 rpipe = (struct pipe *) fp->f_data; 698 wpipe = rpipe->pipe_peer; 699 lwkt_gettoken(&wpipe->pipe_wlock); 700 if (wpipe->pipe_state & PIPE_WEOF) { 701 pipe_rel_mplock(&mpsave); 702 lwkt_reltoken(&wpipe->pipe_wlock); 703 return (EPIPE); 704 } 705 706 /* 707 * Degenerate case (EPIPE takes prec) 708 */ 709 if (uio->uio_resid == 0) { 710 pipe_rel_mplock(&mpsave); 711 lwkt_reltoken(&wpipe->pipe_wlock); 712 return(0); 713 } 714 715 /* 716 * Writes are serialized (start_uio must be called with wlock) 717 */ 718 error = pipe_start_uio(wpipe, &wpipe->pipe_wip); 719 if (error) { 720 pipe_rel_mplock(&mpsave); 721 lwkt_reltoken(&wpipe->pipe_wlock); 722 return (error); 723 } 724 725 if (fflags & O_FBLOCKING) 726 nbio = 0; 727 else if (fflags & O_FNONBLOCKING) 728 nbio = 1; 729 else if (fp->f_flag & O_NONBLOCK) 730 nbio = 1; 731 else 732 nbio = 0; 733 734 /* 735 * If it is advantageous to resize the pipe buffer, do 736 * so. We are write-serialized so we can block safely. 737 */ 738 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && 739 (pipe_nbig < pipe_maxbig) && 740 wpipe->pipe_wantwcnt > 4 && 741 (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { 742 /* 743 * Recheck after lock. 744 */ 745 lwkt_gettoken(&wpipe->pipe_rlock); 746 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && 747 (pipe_nbig < pipe_maxbig) && 748 (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { 749 atomic_add_int(&pipe_nbig, 1); 750 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 751 ++pipe_bigcount; 752 else 753 atomic_subtract_int(&pipe_nbig, 1); 754 } 755 lwkt_reltoken(&wpipe->pipe_rlock); 756 } 757 758 orig_resid = uio->uio_resid; 759 wcount = 0; 760 761 bigwrite = (uio->uio_resid > 10 * 1024 * 1024); 762 bigcount = 10; 763 764 while (uio->uio_resid) { 765 if (wpipe->pipe_state & PIPE_WEOF) { 766 error = EPIPE; 767 break; 768 } 769 770 /* 771 * Don't hog the cpu. 772 */ 773 if (bigwrite && --bigcount == 0) { 774 lwkt_user_yield(); 775 bigcount = 10; 776 if (CURSIG(curthread->td_lwp)) { 777 error = EINTR; 778 break; 779 } 780 } 781 782 windex = wpipe->pipe_buffer.windex & 783 (wpipe->pipe_buffer.size - 1); 784 space = wpipe->pipe_buffer.size - 785 (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); 786 cpu_lfence(); 787 788 /* Writes of size <= PIPE_BUF must be atomic. */ 789 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 790 space = 0; 791 792 /* 793 * Write to fill, read size handles write hysteresis. Also 794 * additional restrictions can cause select-based non-blocking 795 * writes to spin. 796 */ 797 if (space > 0) { 798 u_int segsize; 799 800 /* 801 * Transfer size is minimum of uio transfer 802 * and free space in pipe buffer. 803 * 804 * Limit each uiocopy to no more then PIPE_SIZE 805 * so we can keep the gravy train going on a 806 * SMP box. This doubles the performance for 807 * write sizes > 16K. Otherwise large writes 808 * wind up doing an inefficient synchronous 809 * ping-pong. 810 */ 811 space = szmin(space, uio->uio_resid); 812 if (space > PIPE_SIZE) 813 space = PIPE_SIZE; 814 815 /* 816 * First segment to transfer is minimum of 817 * transfer size and contiguous space in 818 * pipe buffer. If first segment to transfer 819 * is less than the transfer size, we've got 820 * a wraparound in the buffer. 821 */ 822 segsize = wpipe->pipe_buffer.size - windex; 823 if (segsize > space) 824 segsize = space; 825 826 #ifdef SMP 827 /* 828 * If this is the first loop and the reader is 829 * blocked, do a preemptive wakeup of the reader. 830 * 831 * On SMP the IPI latency plus the wlock interlock 832 * on the reader side is the fastest way to get the 833 * reader going. (The scheduler will hard loop on 834 * lock tokens). 835 * 836 * NOTE: We can't clear WANTR here without acquiring 837 * the rlock, which we don't want to do here! 838 */ 839 if ((wpipe->pipe_state & PIPE_WANTR) && pipe_mpsafe > 1) 840 wakeup(wpipe); 841 #endif 842 843 /* 844 * Transfer segment, which may include a wrap-around. 845 * Update windex to account for both all in one go 846 * so the reader can read() the data atomically. 847 */ 848 error = uiomove(&wpipe->pipe_buffer.buffer[windex], 849 segsize, uio); 850 if (error == 0 && segsize < space) { 851 segsize = space - segsize; 852 error = uiomove(&wpipe->pipe_buffer.buffer[0], 853 segsize, uio); 854 } 855 if (error) 856 break; 857 cpu_mfence(); 858 wpipe->pipe_buffer.windex += space; 859 wcount += space; 860 continue; 861 } 862 863 /* 864 * We need both the rlock and the wlock to interlock against 865 * the EOF, WANTW, and size checks, and to modify pipe_state. 866 * 867 * These are token locks so we do not have to worry about 868 * deadlocks. 869 */ 870 lwkt_gettoken(&wpipe->pipe_rlock); 871 872 /* 873 * If the "read-side" has been blocked, wake it up now 874 * and yield to let it drain synchronously rather 875 * then block. 876 */ 877 if (wpipe->pipe_state & PIPE_WANTR) { 878 wpipe->pipe_state &= ~PIPE_WANTR; 879 wakeup(wpipe); 880 } 881 882 /* 883 * don't block on non-blocking I/O 884 */ 885 if (nbio) { 886 lwkt_reltoken(&wpipe->pipe_rlock); 887 error = EAGAIN; 888 break; 889 } 890 891 /* 892 * re-test whether we have to block in the writer after 893 * acquiring both locks, in case the reader opened up 894 * some space. 895 */ 896 space = wpipe->pipe_buffer.size - 897 (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); 898 cpu_lfence(); 899 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 900 space = 0; 901 902 /* 903 * Retest EOF - acquiring a new token can temporarily release 904 * tokens already held. 905 */ 906 if (wpipe->pipe_state & PIPE_WEOF) { 907 lwkt_reltoken(&wpipe->pipe_rlock); 908 error = EPIPE; 909 break; 910 } 911 912 /* 913 * We have no more space and have something to offer, 914 * wake up select/poll. 915 */ 916 if (space == 0) { 917 wpipe->pipe_state |= PIPE_WANTW; 918 ++wpipe->pipe_wantwcnt; 919 pipeselwakeup(wpipe); 920 if (wpipe->pipe_state & PIPE_WANTW) 921 error = tsleep(wpipe, PCATCH, "pipewr", 0); 922 ++pipe_wblocked_count; 923 } 924 lwkt_reltoken(&wpipe->pipe_rlock); 925 926 /* 927 * Break out if we errored or the read side wants us to go 928 * away. 929 */ 930 if (error) 931 break; 932 if (wpipe->pipe_state & PIPE_WEOF) { 933 error = EPIPE; 934 break; 935 } 936 } 937 pipe_end_uio(wpipe, &wpipe->pipe_wip); 938 939 /* 940 * If we have put any characters in the buffer, we wake up 941 * the reader. 942 * 943 * Both rlock and wlock are required to be able to modify pipe_state. 944 */ 945 if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) { 946 if (wpipe->pipe_state & PIPE_WANTR) { 947 lwkt_gettoken(&wpipe->pipe_rlock); 948 if (wpipe->pipe_state & PIPE_WANTR) { 949 wpipe->pipe_state &= ~PIPE_WANTR; 950 lwkt_reltoken(&wpipe->pipe_rlock); 951 wakeup(wpipe); 952 } else { 953 lwkt_reltoken(&wpipe->pipe_rlock); 954 } 955 } 956 if (pipeseltest(wpipe)) { 957 lwkt_gettoken(&wpipe->pipe_rlock); 958 pipeselwakeup(wpipe); 959 lwkt_reltoken(&wpipe->pipe_rlock); 960 } 961 } 962 963 /* 964 * Don't return EPIPE if I/O was successful 965 */ 966 if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) && 967 (uio->uio_resid == 0) && 968 (error == EPIPE)) { 969 error = 0; 970 } 971 972 if (error == 0) 973 vfs_timestamp(&wpipe->pipe_mtime); 974 975 /* 976 * We have something to offer, 977 * wake up select/poll. 978 */ 979 /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/ 980 lwkt_reltoken(&wpipe->pipe_wlock); 981 pipe_rel_mplock(&mpsave); 982 return (error); 983 } 984 985 /* 986 * MPALMOSTSAFE - acquires mplock 987 * 988 * we implement a very minimal set of ioctls for compatibility with sockets. 989 */ 990 int 991 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, 992 struct ucred *cred, struct sysmsg *msg) 993 { 994 struct pipe *mpipe; 995 int error; 996 int mpsave; 997 998 pipe_get_mplock(&mpsave); 999 mpipe = (struct pipe *)fp->f_data; 1000 1001 lwkt_gettoken(&mpipe->pipe_rlock); 1002 lwkt_gettoken(&mpipe->pipe_wlock); 1003 1004 switch (cmd) { 1005 case FIOASYNC: 1006 if (*(int *)data) { 1007 mpipe->pipe_state |= PIPE_ASYNC; 1008 } else { 1009 mpipe->pipe_state &= ~PIPE_ASYNC; 1010 } 1011 error = 0; 1012 break; 1013 case FIONREAD: 1014 *(int *)data = mpipe->pipe_buffer.windex - 1015 mpipe->pipe_buffer.rindex; 1016 error = 0; 1017 break; 1018 case FIOSETOWN: 1019 get_mplock(); 1020 error = fsetown(*(int *)data, &mpipe->pipe_sigio); 1021 rel_mplock(); 1022 break; 1023 case FIOGETOWN: 1024 *(int *)data = fgetown(mpipe->pipe_sigio); 1025 error = 0; 1026 break; 1027 case TIOCSPGRP: 1028 /* This is deprecated, FIOSETOWN should be used instead. */ 1029 get_mplock(); 1030 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio); 1031 rel_mplock(); 1032 break; 1033 1034 case TIOCGPGRP: 1035 /* This is deprecated, FIOGETOWN should be used instead. */ 1036 *(int *)data = -fgetown(mpipe->pipe_sigio); 1037 error = 0; 1038 break; 1039 default: 1040 error = ENOTTY; 1041 break; 1042 } 1043 lwkt_reltoken(&mpipe->pipe_wlock); 1044 lwkt_reltoken(&mpipe->pipe_rlock); 1045 pipe_rel_mplock(&mpsave); 1046 1047 return (error); 1048 } 1049 1050 /* 1051 * MPSAFE 1052 */ 1053 static int 1054 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) 1055 { 1056 struct pipe *pipe; 1057 int mpsave; 1058 1059 pipe_get_mplock(&mpsave); 1060 pipe = (struct pipe *)fp->f_data; 1061 1062 bzero((caddr_t)ub, sizeof(*ub)); 1063 ub->st_mode = S_IFIFO; 1064 ub->st_blksize = pipe->pipe_buffer.size; 1065 ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex; 1066 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1067 ub->st_atimespec = pipe->pipe_atime; 1068 ub->st_mtimespec = pipe->pipe_mtime; 1069 ub->st_ctimespec = pipe->pipe_ctime; 1070 /* 1071 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, 1072 * st_flags, st_gen. 1073 * XXX (st_dev, st_ino) should be unique. 1074 */ 1075 pipe_rel_mplock(&mpsave); 1076 return (0); 1077 } 1078 1079 /* 1080 * MPALMOSTSAFE - acquires mplock 1081 */ 1082 static int 1083 pipe_close(struct file *fp) 1084 { 1085 struct pipe *cpipe; 1086 1087 get_mplock(); 1088 cpipe = (struct pipe *)fp->f_data; 1089 fp->f_ops = &badfileops; 1090 fp->f_data = NULL; 1091 funsetown(cpipe->pipe_sigio); 1092 pipeclose(cpipe); 1093 rel_mplock(); 1094 return (0); 1095 } 1096 1097 /* 1098 * Shutdown one or both directions of a full-duplex pipe. 1099 * 1100 * MPALMOSTSAFE - acquires mplock 1101 */ 1102 static int 1103 pipe_shutdown(struct file *fp, int how) 1104 { 1105 struct pipe *rpipe; 1106 struct pipe *wpipe; 1107 int error = EPIPE; 1108 int mpsave; 1109 1110 pipe_get_mplock(&mpsave); 1111 rpipe = (struct pipe *)fp->f_data; 1112 wpipe = rpipe->pipe_peer; 1113 1114 /* 1115 * We modify pipe_state on both pipes, which means we need 1116 * all four tokens! 1117 */ 1118 lwkt_gettoken(&rpipe->pipe_rlock); 1119 lwkt_gettoken(&rpipe->pipe_wlock); 1120 lwkt_gettoken(&wpipe->pipe_rlock); 1121 lwkt_gettoken(&wpipe->pipe_wlock); 1122 1123 switch(how) { 1124 case SHUT_RDWR: 1125 case SHUT_RD: 1126 rpipe->pipe_state |= PIPE_REOF; /* my reads */ 1127 rpipe->pipe_state |= PIPE_WEOF; /* peer writes */ 1128 if (rpipe->pipe_state & PIPE_WANTR) { 1129 rpipe->pipe_state &= ~PIPE_WANTR; 1130 wakeup(rpipe); 1131 } 1132 if (rpipe->pipe_state & PIPE_WANTW) { 1133 rpipe->pipe_state &= ~PIPE_WANTW; 1134 wakeup(rpipe); 1135 } 1136 error = 0; 1137 if (how == SHUT_RD) 1138 break; 1139 /* fall through */ 1140 case SHUT_WR: 1141 wpipe->pipe_state |= PIPE_REOF; /* peer reads */ 1142 wpipe->pipe_state |= PIPE_WEOF; /* my writes */ 1143 if (wpipe->pipe_state & PIPE_WANTR) { 1144 wpipe->pipe_state &= ~PIPE_WANTR; 1145 wakeup(wpipe); 1146 } 1147 if (wpipe->pipe_state & PIPE_WANTW) { 1148 wpipe->pipe_state &= ~PIPE_WANTW; 1149 wakeup(wpipe); 1150 } 1151 error = 0; 1152 break; 1153 } 1154 pipeselwakeup(rpipe); 1155 pipeselwakeup(wpipe); 1156 1157 lwkt_reltoken(&wpipe->pipe_wlock); 1158 lwkt_reltoken(&wpipe->pipe_rlock); 1159 lwkt_reltoken(&rpipe->pipe_wlock); 1160 lwkt_reltoken(&rpipe->pipe_rlock); 1161 1162 pipe_rel_mplock(&mpsave); 1163 return (error); 1164 } 1165 1166 static void 1167 pipe_free_kmem(struct pipe *cpipe) 1168 { 1169 if (cpipe->pipe_buffer.buffer != NULL) { 1170 if (cpipe->pipe_buffer.size > PIPE_SIZE) 1171 atomic_subtract_int(&pipe_nbig, 1); 1172 kmem_free(&kernel_map, 1173 (vm_offset_t)cpipe->pipe_buffer.buffer, 1174 cpipe->pipe_buffer.size); 1175 cpipe->pipe_buffer.buffer = NULL; 1176 cpipe->pipe_buffer.object = NULL; 1177 } 1178 } 1179 1180 /* 1181 * Close the pipe. The slock must be held to interlock against simultanious 1182 * closes. The rlock and wlock must be held to adjust the pipe_state. 1183 */ 1184 static void 1185 pipeclose(struct pipe *cpipe) 1186 { 1187 globaldata_t gd; 1188 struct pipe *ppipe; 1189 1190 if (cpipe == NULL) 1191 return; 1192 1193 /* 1194 * The slock may not have been allocated yet (close during 1195 * initialization) 1196 * 1197 * We need both the read and write tokens to modify pipe_state. 1198 */ 1199 if (cpipe->pipe_slock) 1200 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE); 1201 lwkt_gettoken(&cpipe->pipe_rlock); 1202 lwkt_gettoken(&cpipe->pipe_wlock); 1203 1204 /* 1205 * Set our state, wakeup anyone waiting in select, and 1206 * wakeup anyone blocked on our pipe. 1207 */ 1208 cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF; 1209 pipeselwakeup(cpipe); 1210 if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { 1211 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); 1212 wakeup(cpipe); 1213 } 1214 1215 /* 1216 * Disconnect from peer. 1217 */ 1218 if ((ppipe = cpipe->pipe_peer) != NULL) { 1219 lwkt_gettoken(&ppipe->pipe_rlock); 1220 lwkt_gettoken(&ppipe->pipe_wlock); 1221 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF; 1222 pipeselwakeup(ppipe); 1223 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { 1224 ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); 1225 wakeup(ppipe); 1226 } 1227 if (SLIST_FIRST(&ppipe->pipe_sel.si_note)) { 1228 get_mplock(); 1229 KNOTE(&ppipe->pipe_sel.si_note, 0); 1230 rel_mplock(); 1231 } 1232 lwkt_reltoken(&ppipe->pipe_wlock); 1233 lwkt_reltoken(&ppipe->pipe_rlock); 1234 } 1235 1236 /* 1237 * If the peer is also closed we can free resources for both 1238 * sides, otherwise we leave our side intact to deal with any 1239 * races (since we only have the slock). 1240 */ 1241 if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) { 1242 cpipe->pipe_peer = NULL; 1243 ppipe->pipe_peer = NULL; 1244 ppipe->pipe_slock = NULL; /* we will free the slock */ 1245 pipeclose(ppipe); 1246 ppipe = NULL; 1247 } 1248 1249 lwkt_reltoken(&cpipe->pipe_wlock); 1250 lwkt_reltoken(&cpipe->pipe_rlock); 1251 if (cpipe->pipe_slock) 1252 lockmgr(cpipe->pipe_slock, LK_RELEASE); 1253 1254 /* 1255 * If we disassociated from our peer we can free resources 1256 */ 1257 if (ppipe == NULL) { 1258 gd = mycpu; 1259 if (cpipe->pipe_slock) { 1260 kfree(cpipe->pipe_slock, M_PIPE); 1261 cpipe->pipe_slock = NULL; 1262 } 1263 if (gd->gd_pipeqcount >= pipe_maxcache || 1264 cpipe->pipe_buffer.size != PIPE_SIZE 1265 ) { 1266 pipe_free_kmem(cpipe); 1267 kfree(cpipe, M_PIPE); 1268 } else { 1269 cpipe->pipe_state = 0; 1270 cpipe->pipe_peer = gd->gd_pipeq; 1271 gd->gd_pipeq = cpipe; 1272 ++gd->gd_pipeqcount; 1273 } 1274 } 1275 } 1276 1277 /* 1278 * MPALMOSTSAFE - acquires mplock 1279 */ 1280 static int 1281 pipe_kqfilter(struct file *fp, struct knote *kn) 1282 { 1283 struct pipe *cpipe; 1284 1285 get_mplock(); 1286 cpipe = (struct pipe *)kn->kn_fp->f_data; 1287 1288 switch (kn->kn_filter) { 1289 case EVFILT_READ: 1290 kn->kn_fop = &pipe_rfiltops; 1291 break; 1292 case EVFILT_WRITE: 1293 kn->kn_fop = &pipe_wfiltops; 1294 cpipe = cpipe->pipe_peer; 1295 if (cpipe == NULL) { 1296 /* other end of pipe has been closed */ 1297 rel_mplock(); 1298 return (EPIPE); 1299 } 1300 break; 1301 default: 1302 rel_mplock(); 1303 return (EOPNOTSUPP); 1304 } 1305 kn->kn_hook = (caddr_t)cpipe; 1306 1307 SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext); 1308 rel_mplock(); 1309 return (0); 1310 } 1311 1312 static void 1313 filt_pipedetach(struct knote *kn) 1314 { 1315 struct pipe *cpipe = (struct pipe *)kn->kn_hook; 1316 1317 SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext); 1318 } 1319 1320 /*ARGSUSED*/ 1321 static int 1322 filt_piperead(struct knote *kn, long hint) 1323 { 1324 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1325 1326 kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 1327 1328 /* XXX RACE */ 1329 if (rpipe->pipe_state & PIPE_REOF) { 1330 kn->kn_flags |= EV_EOF; 1331 return (1); 1332 } 1333 return (kn->kn_data > 0); 1334 } 1335 1336 /*ARGSUSED*/ 1337 static int 1338 filt_pipewrite(struct knote *kn, long hint) 1339 { 1340 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1341 struct pipe *wpipe = rpipe->pipe_peer; 1342 u_int32_t space; 1343 1344 /* XXX RACE */ 1345 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_WEOF)) { 1346 kn->kn_data = 0; 1347 kn->kn_flags |= EV_EOF; 1348 return (1); 1349 } 1350 space = wpipe->pipe_buffer.windex - 1351 wpipe->pipe_buffer.rindex; 1352 space = wpipe->pipe_buffer.size - space; 1353 kn->kn_data = space; 1354 return (kn->kn_data >= PIPE_BUF); 1355 } 1356