1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved. 5 * 6 * This code is derived from software contributed to The DragonFly Project 7 * by Matthew Dillon <dillon@backplane.com> 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice immediately at the beginning of the file, without modification, 14 * this list of conditions, and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 3. Absolutely no warranty of function or purpose is made by the author 19 * John S. Dyson. 20 * 4. Modifications may be freely made to this file if the above conditions 21 * are met. 22 */ 23 24 /* 25 * This file contains a high-performance replacement for the socket-based 26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 27 * all features of sockets, but does do everything that pipes normally 28 * do. 29 */ 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/kernel.h> 33 #include <sys/proc.h> 34 #include <sys/fcntl.h> 35 #include <sys/file.h> 36 #include <sys/filedesc.h> 37 #include <sys/filio.h> 38 #include <sys/ttycom.h> 39 #include <sys/stat.h> 40 #include <sys/signalvar.h> 41 #include <sys/sysmsg.h> 42 #include <sys/pipe.h> 43 #include <sys/vnode.h> 44 #include <sys/uio.h> 45 #include <sys/event.h> 46 #include <sys/globaldata.h> 47 #include <sys/module.h> 48 #include <sys/malloc.h> 49 #include <sys/sysctl.h> 50 #include <sys/socket.h> 51 #include <sys/kern_syscall.h> 52 #include <sys/lock.h> 53 #include <sys/mutex.h> 54 55 #include <vm/vm.h> 56 #include <vm/vm_param.h> 57 #include <vm/vm_object.h> 58 #include <vm/vm_kern.h> 59 #include <vm/vm_extern.h> 60 #include <vm/pmap.h> 61 #include <vm/vm_map.h> 62 #include <vm/vm_page.h> 63 #include <vm/vm_zone.h> 64 65 #include <sys/file2.h> 66 #include <sys/signal2.h> 67 #include <sys/mutex2.h> 68 69 #include <machine/cpufunc.h> 70 71 struct pipegdlock { 72 struct mtx mtx; 73 } __cachealign; 74 75 /* 76 * interfaces to the outside world 77 */ 78 static int pipe_read (struct file *fp, struct uio *uio, 79 struct ucred *cred, int flags); 80 static int pipe_write (struct file *fp, struct uio *uio, 81 struct ucred *cred, int flags); 82 static int pipe_close (struct file *fp); 83 static int pipe_shutdown (struct file *fp, int how); 84 static int pipe_kqfilter (struct file *fp, struct knote *kn); 85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred); 86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, 87 struct ucred *cred, struct sysmsg *msg); 88 89 __read_mostly static struct fileops pipeops = { 90 .fo_read = pipe_read, 91 .fo_write = pipe_write, 92 .fo_ioctl = pipe_ioctl, 93 .fo_kqfilter = pipe_kqfilter, 94 .fo_stat = pipe_stat, 95 .fo_close = pipe_close, 96 .fo_shutdown = pipe_shutdown, 97 .fo_seek = badfo_seek 98 }; 99 100 static void filt_pipedetach(struct knote *kn); 101 static int filt_piperead(struct knote *kn, long hint); 102 static int filt_pipewrite(struct knote *kn, long hint); 103 104 __read_mostly static struct filterops pipe_rfiltops = 105 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead }; 106 __read_mostly static struct filterops pipe_wfiltops = 107 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite }; 108 109 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); 110 111 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ 112 113 __read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE; 114 __read_mostly static struct pipegdlock *pipe_gdlocks; 115 116 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); 117 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, 118 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); 119 120 /* 121 * The pipe buffer size can be changed at any time. Only new pipe()s 122 * are affected. Note that due to cpu cache effects, you do not want 123 * to make this value too large. 124 */ 125 __read_mostly static int pipe_size = 32768; 126 SYSCTL_INT(_kern_pipe, OID_AUTO, size, 127 CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)"); 128 129 /* 130 * Reader/writer delay loop. When the reader exhausts the pipe buffer 131 * or the write completely fills the pipe buffer and would otherwise sleep, 132 * it first busy-loops for a few microseconds waiting for data or buffer 133 * space. This eliminates IPIs for most high-bandwidth writer/reader pipes 134 * and also helps when the user program uses a large data buffer in its 135 * UIOs. 136 * 137 * This defaults to 4uS. 138 */ 139 #ifdef _RDTSC_SUPPORTED_ 140 __read_mostly static int pipe_delay = 4000; /* 4uS default */ 141 SYSCTL_INT(_kern_pipe, OID_AUTO, delay, 142 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns"); 143 #endif 144 145 /* 146 * Auto-size pipe cache to reduce kmem allocations and frees. 147 */ 148 static 149 void 150 pipeinit(void *dummy) 151 { 152 size_t mbytes = kmem_lim_size(); 153 int n; 154 155 if (pipe_maxcache == PIPEQ_MAX_CACHE) { 156 if (mbytes >= 7 * 1024) 157 pipe_maxcache *= 2; 158 if (mbytes >= 15 * 1024) 159 pipe_maxcache *= 2; 160 } 161 162 /* 163 * Detune the pcpu caching a bit on systems with an insane number 164 * of cpu threads to reduce memory waste. 165 */ 166 if (ncpus > 64) { 167 pipe_maxcache = pipe_maxcache * 64 / ncpus; 168 if (pipe_maxcache < PIPEQ_MAX_CACHE) 169 pipe_maxcache = PIPEQ_MAX_CACHE; 170 } 171 172 pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus, 173 M_PIPE, M_WAITOK | M_ZERO); 174 for (n = 0; n < ncpus; ++n) 175 mtx_init(&pipe_gdlocks[n].mtx, "pipekm"); 176 } 177 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL); 178 179 static void pipeclose (struct pipe *pipe, 180 struct pipebuf *pbr, struct pipebuf *pbw); 181 static void pipe_free_kmem (struct pipebuf *buf); 182 static int pipe_create (struct pipe **pipep); 183 184 /* 185 * Test and clear the specified flag, wakeup(pb) if it was set. 186 * This function must also act as a memory barrier. 187 */ 188 static __inline void 189 pipesignal(struct pipebuf *pb, uint32_t flags) 190 { 191 uint32_t oflags; 192 uint32_t nflags; 193 194 for (;;) { 195 oflags = pb->state; 196 cpu_ccfence(); 197 nflags = oflags & ~flags; 198 if (atomic_cmpset_int(&pb->state, oflags, nflags)) 199 break; 200 } 201 if (oflags & flags) 202 wakeup(pb); 203 } 204 205 /* 206 * 207 */ 208 static __inline void 209 pipewakeup(struct pipebuf *pb, int dosigio) 210 { 211 if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) { 212 lwkt_gettoken(&sigio_token); 213 pgsigio(pb->sigio, SIGIO, 0); 214 lwkt_reltoken(&sigio_token); 215 } 216 KNOTE(&pb->kq.ki_note, 0); 217 } 218 219 /* 220 * These routines are called before and after a UIO. The UIO 221 * may block, causing our held tokens to be lost temporarily. 222 * 223 * We use these routines to serialize reads against other reads 224 * and writes against other writes. 225 * 226 * The appropriate token is held on entry so *ipp does not race. 227 */ 228 static __inline int 229 pipe_start_uio(int *ipp) 230 { 231 int error; 232 233 while (*ipp) { 234 *ipp = -1; 235 error = tsleep(ipp, PCATCH, "pipexx", 0); 236 if (error) 237 return (error); 238 } 239 *ipp = 1; 240 return (0); 241 } 242 243 static __inline void 244 pipe_end_uio(int *ipp) 245 { 246 if (*ipp < 0) { 247 *ipp = 0; 248 wakeup(ipp); 249 } else { 250 KKASSERT(*ipp > 0); 251 *ipp = 0; 252 } 253 } 254 255 /* 256 * The pipe system call for the DTYPE_PIPE type of pipes 257 * 258 * pipe_args(int dummy) 259 * 260 * MPSAFE 261 */ 262 int 263 sys_pipe(struct sysmsg *sysmsg, const struct pipe_args *uap) 264 { 265 return kern_pipe(sysmsg->sysmsg_fds, 0); 266 } 267 268 int 269 sys_pipe2(struct sysmsg *sysmsg, const struct pipe2_args *uap) 270 { 271 return kern_pipe(sysmsg->sysmsg_fds, uap->flags); 272 } 273 274 int 275 kern_pipe(long *fds, int flags) 276 { 277 struct thread *td = curthread; 278 struct filedesc *fdp = td->td_proc->p_fd; 279 struct file *rf, *wf; 280 struct pipe *pipe; 281 int fd1, fd2, error; 282 283 pipe = NULL; 284 if (pipe_create(&pipe)) { 285 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB); 286 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); 287 return (ENFILE); 288 } 289 290 error = falloc(td->td_lwp, &rf, &fd1); 291 if (error) { 292 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB); 293 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); 294 return (error); 295 } 296 fds[0] = fd1; 297 298 /* 299 * Warning: once we've gotten past allocation of the fd for the 300 * read-side, we can only drop the read side via fdrop() in order 301 * to avoid races against processes which manage to dup() the read 302 * side while we are blocked trying to allocate the write side. 303 */ 304 rf->f_type = DTYPE_PIPE; 305 rf->f_flag = FREAD | FWRITE; 306 rf->f_ops = &pipeops; 307 rf->f_data = (void *)((intptr_t)pipe | 0); 308 if (flags & O_NONBLOCK) 309 rf->f_flag |= O_NONBLOCK; 310 if (flags & O_CLOEXEC) 311 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE; 312 313 error = falloc(td->td_lwp, &wf, &fd2); 314 if (error) { 315 fsetfd(fdp, NULL, fd1); 316 fdrop(rf); 317 /* pipeA has been closed by fdrop() */ 318 /* close pipeB here */ 319 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); 320 return (error); 321 } 322 wf->f_type = DTYPE_PIPE; 323 wf->f_flag = FREAD | FWRITE; 324 wf->f_ops = &pipeops; 325 wf->f_data = (void *)((intptr_t)pipe | 1); 326 if (flags & O_NONBLOCK) 327 wf->f_flag |= O_NONBLOCK; 328 if (flags & O_CLOEXEC) 329 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE; 330 331 fds[1] = fd2; 332 333 /* 334 * Once activated the peer relationship remains valid until 335 * both sides are closed. 336 */ 337 fsetfd(fdp, rf, fd1); 338 fsetfd(fdp, wf, fd2); 339 fdrop(rf); 340 fdrop(wf); 341 342 return (0); 343 } 344 345 /* 346 * [re]allocates KVA for the pipe's circular buffer. The space is 347 * pageable. Called twice to setup full-duplex communications. 348 * 349 * NOTE: Independent vm_object's are used to improve performance. 350 * 351 * Returns 0 on success, ENOMEM on failure. 352 */ 353 static int 354 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size) 355 { 356 struct vm_object *object; 357 caddr_t buffer; 358 vm_pindex_t npages; 359 int error; 360 361 size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK; 362 if (size < 16384) 363 size = 16384; 364 if (size > 1024*1024) 365 size = 1024*1024; 366 367 npages = round_page(size) / PAGE_SIZE; 368 object = pb->object; 369 370 /* 371 * [re]create the object if necessary and reserve space for it 372 * in the kernel_map. The object and memory are pageable. On 373 * success, free the old resources before assigning the new 374 * ones. 375 */ 376 if (object == NULL || object->size != npages) { 377 object = vm_object_allocate(OBJT_DEFAULT, npages); 378 buffer = (caddr_t)vm_map_min(kernel_map); 379 380 error = vm_map_find(kernel_map, object, NULL, 381 0, (vm_offset_t *)&buffer, size, 382 PAGE_SIZE, TRUE, 383 VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE, 384 VM_PROT_ALL, VM_PROT_ALL, 0); 385 386 if (error != KERN_SUCCESS) { 387 vm_object_deallocate(object); 388 return (ENOMEM); 389 } 390 pipe_free_kmem(pb); 391 pb->object = object; 392 pb->buffer = buffer; 393 pb->size = size; 394 } 395 pb->rindex = 0; 396 pb->windex = 0; 397 398 return (0); 399 } 400 401 /* 402 * Initialize and allocate VM and memory for pipe, pulling the pipe from 403 * our per-cpu cache if possible. 404 * 405 * Returns 0 on success, else an error code (typically ENOMEM). Caller 406 * must still deallocate the pipe on failure. 407 */ 408 static int 409 pipe_create(struct pipe **pipep) 410 { 411 globaldata_t gd = mycpu; 412 struct pipe *pipe; 413 int error; 414 415 if ((pipe = gd->gd_pipeq) != NULL) { 416 gd->gd_pipeq = pipe->next; 417 --gd->gd_pipeqcount; 418 pipe->next = NULL; 419 } else { 420 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO); 421 pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2; 422 lwkt_token_init(&pipe->bufferA.rlock, "piper"); 423 lwkt_token_init(&pipe->bufferA.wlock, "pipew"); 424 lwkt_token_init(&pipe->bufferB.rlock, "piper"); 425 lwkt_token_init(&pipe->bufferB.wlock, "pipew"); 426 } 427 *pipep = pipe; 428 if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) { 429 return (error); 430 } 431 if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) { 432 return (error); 433 } 434 vfs_timestamp(&pipe->ctime); 435 pipe->bufferA.atime = pipe->ctime; 436 pipe->bufferA.mtime = pipe->ctime; 437 pipe->bufferB.atime = pipe->ctime; 438 pipe->bufferB.mtime = pipe->ctime; 439 pipe->open_count = 2; 440 441 return (0); 442 } 443 444 /* 445 * Read data from a pipe 446 */ 447 static int 448 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 449 { 450 struct pipebuf *rpb; 451 struct pipebuf *wpb; 452 struct pipe *pipe; 453 size_t nread = 0; 454 size_t size; /* total bytes available */ 455 size_t nsize; /* total bytes to read */ 456 size_t rindex; /* contiguous bytes available */ 457 int notify_writer; 458 int bigread; 459 int bigcount; 460 int error; 461 int nbio; 462 463 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 464 if ((intptr_t)fp->f_data & 1) { 465 rpb = &pipe->bufferB; 466 wpb = &pipe->bufferA; 467 } else { 468 rpb = &pipe->bufferA; 469 wpb = &pipe->bufferB; 470 } 471 atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC); 472 473 if (uio->uio_resid == 0) 474 return(0); 475 476 /* 477 * Calculate nbio 478 */ 479 if (fflags & O_FBLOCKING) 480 nbio = 0; 481 else if (fflags & O_FNONBLOCKING) 482 nbio = 1; 483 else if (fp->f_flag & O_NONBLOCK) 484 nbio = 1; 485 else 486 nbio = 0; 487 488 /* 489 * 'quick' NBIO test before things get expensive. 490 */ 491 if (nbio && rpb->rindex == rpb->windex && 492 (rpb->state & PIPE_REOF) == 0) { 493 return EAGAIN; 494 } 495 496 /* 497 * Reads are serialized. Note however that buffer.buffer and 498 * buffer.size can change out from under us when the number 499 * of bytes in the buffer are zero due to the write-side doing a 500 * pipespace(). 501 */ 502 lwkt_gettoken(&rpb->rlock); 503 error = pipe_start_uio(&rpb->rip); 504 if (error) { 505 lwkt_reltoken(&rpb->rlock); 506 return (error); 507 } 508 notify_writer = 0; 509 510 bigread = (uio->uio_resid > 10 * 1024 * 1024); 511 bigcount = 10; 512 513 while (uio->uio_resid) { 514 /* 515 * Don't hog the cpu. 516 */ 517 if (bigread && --bigcount == 0) { 518 lwkt_user_yield(); 519 bigcount = 10; 520 if (CURSIG(curthread->td_lwp)) { 521 error = EINTR; 522 break; 523 } 524 } 525 526 /* 527 * lfence required to avoid read-reordering of buffer 528 * contents prior to validation of size. 529 */ 530 size = rpb->windex - rpb->rindex; 531 cpu_lfence(); 532 if (size) { 533 rindex = rpb->rindex & (rpb->size - 1); 534 nsize = size; 535 if (nsize > rpb->size - rindex) 536 nsize = rpb->size - rindex; 537 nsize = szmin(nsize, uio->uio_resid); 538 539 /* 540 * Limit how much we move in one go so we have a 541 * chance to kick the writer while data is still 542 * available in the pipe. This avoids getting into 543 * a ping-pong with the writer. 544 */ 545 if (nsize > (rpb->size >> 1)) 546 nsize = rpb->size >> 1; 547 548 error = uiomove(&rpb->buffer[rindex], nsize, uio); 549 if (error) 550 break; 551 rpb->rindex += nsize; 552 nread += nsize; 553 554 /* 555 * If the FIFO is still over half full just continue 556 * and do not try to notify the writer yet. If 557 * less than half full notify any waiting writer. 558 */ 559 if (size - nsize > (rpb->size >> 1)) { 560 notify_writer = 0; 561 } else { 562 notify_writer = 1; 563 pipesignal(rpb, PIPE_WANTW); 564 } 565 continue; 566 } 567 568 /* 569 * If the "write-side" was blocked we wake it up. This code 570 * is reached when the buffer is completely emptied. 571 */ 572 pipesignal(rpb, PIPE_WANTW); 573 574 /* 575 * Pick up our copy loop again if the writer sent data to 576 * us while we were messing around. 577 * 578 * On a SMP box poll up to pipe_delay nanoseconds for new 579 * data. Typically a value of 2000 to 4000 is sufficient 580 * to eradicate most IPIs/tsleeps/wakeups when a pipe 581 * is used for synchronous communications with small packets, 582 * and 8000 or so (8uS) will pipeline large buffer xfers 583 * between cpus over a pipe. 584 * 585 * For synchronous communications a hit means doing a 586 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS, 587 * where as miss requiring a tsleep/wakeup sequence 588 * will take 7uS or more. 589 */ 590 if (rpb->windex != rpb->rindex) 591 continue; 592 593 #ifdef _RDTSC_SUPPORTED_ 594 if (pipe_delay) { 595 int64_t tsc_target; 596 int good = 0; 597 598 tsc_target = tsc_get_target(pipe_delay); 599 while (tsc_test_target(tsc_target) == 0) { 600 cpu_lfence(); 601 if (rpb->windex != rpb->rindex) { 602 good = 1; 603 break; 604 } 605 cpu_pause(); 606 } 607 if (good) 608 continue; 609 } 610 #endif 611 612 /* 613 * Detect EOF condition, do not set error. 614 */ 615 if (rpb->state & PIPE_REOF) 616 break; 617 618 /* 619 * Break if some data was read, or if this was a non-blocking 620 * read. 621 */ 622 if (nread > 0) 623 break; 624 625 if (nbio) { 626 error = EAGAIN; 627 break; 628 } 629 630 /* 631 * Last chance, interlock with WANTR 632 */ 633 tsleep_interlock(rpb, PCATCH); 634 atomic_set_int(&rpb->state, PIPE_WANTR); 635 636 /* 637 * Retest bytes available after memory barrier above. 638 */ 639 size = rpb->windex - rpb->rindex; 640 if (size) 641 continue; 642 643 /* 644 * Retest EOF after memory barrier above. 645 */ 646 if (rpb->state & PIPE_REOF) 647 break; 648 649 /* 650 * Wait for more data or state change 651 */ 652 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0); 653 if (error) 654 break; 655 } 656 pipe_end_uio(&rpb->rip); 657 658 /* 659 * Uptime last access time 660 */ 661 if (error == 0 && nread && rpb->lticks != ticks) { 662 vfs_timestamp(&rpb->atime); 663 rpb->lticks = ticks; 664 } 665 666 /* 667 * If we drained the FIFO more then half way then handle 668 * write blocking hysteresis. 669 * 670 * Note that PIPE_WANTW cannot be set by the writer without 671 * it holding both rlock and wlock, so we can test it 672 * while holding just rlock. 673 */ 674 if (notify_writer) { 675 /* 676 * Synchronous blocking is done on the pipe involved 677 */ 678 pipesignal(rpb, PIPE_WANTW); 679 680 /* 681 * But we may also have to deal with a kqueue which is 682 * stored on the same pipe as its descriptor, so a 683 * EVFILT_WRITE event waiting for our side to drain will 684 * be on the other side. 685 */ 686 pipewakeup(wpb, 0); 687 } 688 /*size = rpb->windex - rpb->rindex;*/ 689 lwkt_reltoken(&rpb->rlock); 690 691 return (error); 692 } 693 694 static int 695 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 696 { 697 struct pipebuf *rpb; 698 struct pipebuf *wpb; 699 struct pipe *pipe; 700 size_t windex; 701 size_t space; 702 size_t wcount; 703 size_t orig_resid; 704 int bigwrite; 705 int bigcount; 706 int error; 707 int nbio; 708 709 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 710 if ((intptr_t)fp->f_data & 1) { 711 rpb = &pipe->bufferB; 712 wpb = &pipe->bufferA; 713 } else { 714 rpb = &pipe->bufferA; 715 wpb = &pipe->bufferB; 716 } 717 718 /* 719 * Calculate nbio 720 */ 721 if (fflags & O_FBLOCKING) 722 nbio = 0; 723 else if (fflags & O_FNONBLOCKING) 724 nbio = 1; 725 else if (fp->f_flag & O_NONBLOCK) 726 nbio = 1; 727 else 728 nbio = 0; 729 730 /* 731 * 'quick' NBIO test before things get expensive. 732 */ 733 if (nbio && wpb->size == (wpb->windex - wpb->rindex) && 734 uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) { 735 return EAGAIN; 736 } 737 738 /* 739 * Writes go to the peer. The peer will always exist. 740 */ 741 lwkt_gettoken(&wpb->wlock); 742 if (wpb->state & PIPE_WEOF) { 743 lwkt_reltoken(&wpb->wlock); 744 return (EPIPE); 745 } 746 747 /* 748 * Degenerate case (EPIPE takes prec) 749 */ 750 if (uio->uio_resid == 0) { 751 lwkt_reltoken(&wpb->wlock); 752 return(0); 753 } 754 755 /* 756 * Writes are serialized (start_uio must be called with wlock) 757 */ 758 error = pipe_start_uio(&wpb->wip); 759 if (error) { 760 lwkt_reltoken(&wpb->wlock); 761 return (error); 762 } 763 764 orig_resid = uio->uio_resid; 765 wcount = 0; 766 767 bigwrite = (uio->uio_resid > 10 * 1024 * 1024); 768 bigcount = 10; 769 770 while (uio->uio_resid) { 771 if (wpb->state & PIPE_WEOF) { 772 error = EPIPE; 773 break; 774 } 775 776 /* 777 * Don't hog the cpu. 778 */ 779 if (bigwrite && --bigcount == 0) { 780 lwkt_user_yield(); 781 bigcount = 10; 782 if (CURSIG(curthread->td_lwp)) { 783 error = EINTR; 784 break; 785 } 786 } 787 788 windex = wpb->windex & (wpb->size - 1); 789 space = wpb->size - (wpb->windex - wpb->rindex); 790 791 /* 792 * Writes of size <= PIPE_BUF must be atomic. 793 */ 794 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 795 space = 0; 796 797 /* 798 * Write to fill, read size handles write hysteresis. Also 799 * additional restrictions can cause select-based non-blocking 800 * writes to spin. 801 */ 802 if (space > 0) { 803 size_t segsize; 804 805 /* 806 * We want to notify a potentially waiting reader 807 * before we exhaust the write buffer for SMP 808 * pipelining. Otherwise the write/read will begin 809 * to ping-pong. 810 */ 811 space = szmin(space, uio->uio_resid); 812 if (space > (wpb->size >> 1)) 813 space = (wpb->size >> 1); 814 815 /* 816 * First segment to transfer is minimum of 817 * transfer size and contiguous space in 818 * pipe buffer. If first segment to transfer 819 * is less than the transfer size, we've got 820 * a wraparound in the buffer. 821 */ 822 segsize = wpb->size - windex; 823 if (segsize > space) 824 segsize = space; 825 826 /* 827 * If this is the first loop and the reader is 828 * blocked, do a preemptive wakeup of the reader. 829 * 830 * On SMP the IPI latency plus the wlock interlock 831 * on the reader side is the fastest way to get the 832 * reader going. (The scheduler will hard loop on 833 * lock tokens). 834 */ 835 if (wcount == 0) 836 pipesignal(wpb, PIPE_WANTR); 837 838 /* 839 * Transfer segment, which may include a wrap-around. 840 * Update windex to account for both all in one go 841 * so the reader can read() the data atomically. 842 */ 843 error = uiomove(&wpb->buffer[windex], segsize, uio); 844 if (error == 0 && segsize < space) { 845 segsize = space - segsize; 846 error = uiomove(&wpb->buffer[0], segsize, uio); 847 } 848 if (error) 849 break; 850 851 /* 852 * Memory fence prior to windex updating (note: not 853 * needed so this is a NOP on Intel). 854 */ 855 cpu_sfence(); 856 wpb->windex += space; 857 858 /* 859 * Signal reader 860 */ 861 if (wcount != 0) 862 pipesignal(wpb, PIPE_WANTR); 863 wcount += space; 864 continue; 865 } 866 867 /* 868 * Wakeup any pending reader 869 */ 870 pipesignal(wpb, PIPE_WANTR); 871 872 /* 873 * don't block on non-blocking I/O 874 */ 875 if (nbio) { 876 error = EAGAIN; 877 break; 878 } 879 880 #ifdef _RDTSC_SUPPORTED_ 881 if (pipe_delay) { 882 int64_t tsc_target; 883 int good = 0; 884 885 tsc_target = tsc_get_target(pipe_delay); 886 while (tsc_test_target(tsc_target) == 0) { 887 cpu_lfence(); 888 space = wpb->size - (wpb->windex - wpb->rindex); 889 if ((space < uio->uio_resid) && 890 (orig_resid <= PIPE_BUF)) { 891 space = 0; 892 } 893 if (space) { 894 good = 1; 895 break; 896 } 897 cpu_pause(); 898 } 899 if (good) 900 continue; 901 } 902 #endif 903 904 /* 905 * Interlocked test. Atomic op enforces the memory barrier. 906 */ 907 tsleep_interlock(wpb, PCATCH); 908 atomic_set_int(&wpb->state, PIPE_WANTW); 909 910 /* 911 * Retest space available after memory barrier above. 912 * Writes of size <= PIPE_BUF must be atomic. 913 */ 914 space = wpb->size - (wpb->windex - wpb->rindex); 915 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 916 space = 0; 917 918 /* 919 * Retest EOF after memory barrier above. 920 */ 921 if (wpb->state & PIPE_WEOF) { 922 error = EPIPE; 923 break; 924 } 925 926 /* 927 * We have no more space and have something to offer, 928 * wake up select/poll/kq. 929 */ 930 if (space == 0) { 931 pipewakeup(wpb, 1); 932 error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0); 933 } 934 935 /* 936 * Break out if we errored or the read side wants us to go 937 * away. 938 */ 939 if (error) 940 break; 941 if (wpb->state & PIPE_WEOF) { 942 error = EPIPE; 943 break; 944 } 945 } 946 pipe_end_uio(&wpb->wip); 947 948 /* 949 * If we have put any characters in the buffer, we wake up 950 * the reader. 951 * 952 * Both rlock and wlock are required to be able to modify pipe_state. 953 */ 954 if (wpb->windex != wpb->rindex) { 955 pipesignal(wpb, PIPE_WANTR); 956 pipewakeup(wpb, 1); 957 } 958 959 /* 960 * Don't return EPIPE if I/O was successful 961 */ 962 if ((wpb->rindex == wpb->windex) && 963 (uio->uio_resid == 0) && 964 (error == EPIPE)) { 965 error = 0; 966 } 967 968 if (error == 0 && wpb->lticks != ticks) { 969 vfs_timestamp(&wpb->mtime); 970 wpb->lticks = ticks; 971 } 972 973 /* 974 * We have something to offer, 975 * wake up select/poll/kq. 976 */ 977 /*space = wpb->windex - wpb->rindex;*/ 978 lwkt_reltoken(&wpb->wlock); 979 980 return (error); 981 } 982 983 /* 984 * we implement a very minimal set of ioctls for compatibility with sockets. 985 */ 986 static int 987 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, 988 struct ucred *cred, struct sysmsg *msg) 989 { 990 struct pipebuf *rpb; 991 struct pipe *pipe; 992 int error; 993 994 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 995 if ((intptr_t)fp->f_data & 1) { 996 rpb = &pipe->bufferB; 997 } else { 998 rpb = &pipe->bufferA; 999 } 1000 1001 lwkt_gettoken(&rpb->rlock); 1002 lwkt_gettoken(&rpb->wlock); 1003 1004 switch (cmd) { 1005 case FIOASYNC: 1006 if (*(int *)data) { 1007 atomic_set_int(&rpb->state, PIPE_ASYNC); 1008 } else { 1009 atomic_clear_int(&rpb->state, PIPE_ASYNC); 1010 } 1011 error = 0; 1012 break; 1013 case FIONREAD: 1014 *(int *)data = (int)(rpb->windex - rpb->rindex); 1015 error = 0; 1016 break; 1017 case FIOSETOWN: 1018 error = fsetown(*(int *)data, &rpb->sigio); 1019 break; 1020 case FIOGETOWN: 1021 *(int *)data = fgetown(&rpb->sigio); 1022 error = 0; 1023 break; 1024 case TIOCSPGRP: 1025 /* This is deprecated, FIOSETOWN should be used instead. */ 1026 error = fsetown(-(*(int *)data), &rpb->sigio); 1027 break; 1028 1029 case TIOCGPGRP: 1030 /* This is deprecated, FIOGETOWN should be used instead. */ 1031 *(int *)data = -fgetown(&rpb->sigio); 1032 error = 0; 1033 break; 1034 default: 1035 error = ENOTTY; 1036 break; 1037 } 1038 lwkt_reltoken(&rpb->wlock); 1039 lwkt_reltoken(&rpb->rlock); 1040 1041 return (error); 1042 } 1043 1044 /* 1045 * MPSAFE 1046 */ 1047 static int 1048 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) 1049 { 1050 struct pipebuf *rpb; 1051 struct pipe *pipe; 1052 1053 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1054 if ((intptr_t)fp->f_data & 1) { 1055 rpb = &pipe->bufferB; 1056 } else { 1057 rpb = &pipe->bufferA; 1058 } 1059 1060 bzero((caddr_t)ub, sizeof(*ub)); 1061 ub->st_mode = S_IFIFO; 1062 ub->st_blksize = rpb->size; 1063 ub->st_size = rpb->windex - rpb->rindex; 1064 ub->st_blocks = howmany(ub->st_size, ub->st_blksize); 1065 ub->st_atimespec = rpb->atime; 1066 ub->st_mtimespec = rpb->mtime; 1067 ub->st_ctimespec = pipe->ctime; 1068 ub->st_uid = fp->f_cred->cr_uid; 1069 ub->st_gid = fp->f_cred->cr_gid; 1070 ub->st_ino = pipe->inum; 1071 /* 1072 * Left as 0: st_dev, st_nlink, st_rdev, 1073 * st_flags, st_gen. 1074 * XXX (st_dev, st_ino) should be unique. 1075 */ 1076 1077 return (0); 1078 } 1079 1080 static int 1081 pipe_close(struct file *fp) 1082 { 1083 struct pipebuf *rpb; 1084 struct pipebuf *wpb; 1085 struct pipe *pipe; 1086 1087 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1088 if ((intptr_t)fp->f_data & 1) { 1089 rpb = &pipe->bufferB; 1090 wpb = &pipe->bufferA; 1091 } else { 1092 rpb = &pipe->bufferA; 1093 wpb = &pipe->bufferB; 1094 } 1095 1096 fp->f_ops = &badfileops; 1097 fp->f_data = NULL; 1098 funsetown(&rpb->sigio); 1099 pipeclose(pipe, rpb, wpb); 1100 1101 return (0); 1102 } 1103 1104 /* 1105 * Shutdown one or both directions of a full-duplex pipe. 1106 */ 1107 static int 1108 pipe_shutdown(struct file *fp, int how) 1109 { 1110 struct pipebuf *rpb; 1111 struct pipebuf *wpb; 1112 struct pipe *pipe; 1113 int error = EPIPE; 1114 1115 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1116 if ((intptr_t)fp->f_data & 1) { 1117 rpb = &pipe->bufferB; 1118 wpb = &pipe->bufferA; 1119 } else { 1120 rpb = &pipe->bufferA; 1121 wpb = &pipe->bufferB; 1122 } 1123 1124 /* 1125 * We modify pipe_state on both pipes, which means we need 1126 * all four tokens! 1127 */ 1128 lwkt_gettoken(&rpb->rlock); 1129 lwkt_gettoken(&rpb->wlock); 1130 lwkt_gettoken(&wpb->rlock); 1131 lwkt_gettoken(&wpb->wlock); 1132 1133 switch(how) { 1134 case SHUT_RDWR: 1135 case SHUT_RD: 1136 /* 1137 * EOF on my reads and peer writes 1138 */ 1139 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF); 1140 if (rpb->state & PIPE_WANTR) { 1141 rpb->state &= ~PIPE_WANTR; 1142 wakeup(rpb); 1143 } 1144 if (rpb->state & PIPE_WANTW) { 1145 rpb->state &= ~PIPE_WANTW; 1146 wakeup(rpb); 1147 } 1148 error = 0; 1149 if (how == SHUT_RD) 1150 break; 1151 /* fall through */ 1152 case SHUT_WR: 1153 /* 1154 * EOF on peer reads and my writes 1155 */ 1156 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF); 1157 if (wpb->state & PIPE_WANTR) { 1158 wpb->state &= ~PIPE_WANTR; 1159 wakeup(wpb); 1160 } 1161 if (wpb->state & PIPE_WANTW) { 1162 wpb->state &= ~PIPE_WANTW; 1163 wakeup(wpb); 1164 } 1165 error = 0; 1166 break; 1167 } 1168 pipewakeup(rpb, 1); 1169 pipewakeup(wpb, 1); 1170 1171 lwkt_reltoken(&wpb->wlock); 1172 lwkt_reltoken(&wpb->rlock); 1173 lwkt_reltoken(&rpb->wlock); 1174 lwkt_reltoken(&rpb->rlock); 1175 1176 return (error); 1177 } 1178 1179 /* 1180 * Destroy the pipe buffer. 1181 */ 1182 static void 1183 pipe_free_kmem(struct pipebuf *pb) 1184 { 1185 if (pb->buffer != NULL) { 1186 kmem_free(kernel_map, (vm_offset_t)pb->buffer, pb->size); 1187 pb->buffer = NULL; 1188 pb->object = NULL; 1189 } 1190 } 1191 1192 /* 1193 * Close one half of the pipe. We are closing the pipe for reading on rpb 1194 * and writing on wpb. This routine must be called twice with the pipebufs 1195 * reversed to close both directions. 1196 */ 1197 static void 1198 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb) 1199 { 1200 globaldata_t gd; 1201 1202 if (pipe == NULL) 1203 return; 1204 1205 /* 1206 * We need both the read and write tokens to modify pipe_state. 1207 */ 1208 lwkt_gettoken(&rpb->rlock); 1209 lwkt_gettoken(&rpb->wlock); 1210 1211 /* 1212 * Set our state, wakeup anyone waiting in select/poll/kq, and 1213 * wakeup anyone blocked on our pipe. No action if our side 1214 * is already closed. 1215 */ 1216 if (rpb->state & PIPE_CLOSED) { 1217 lwkt_reltoken(&rpb->wlock); 1218 lwkt_reltoken(&rpb->rlock); 1219 return; 1220 } 1221 1222 atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF); 1223 pipewakeup(rpb, 1); 1224 if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) { 1225 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW); 1226 wakeup(rpb); 1227 } 1228 lwkt_reltoken(&rpb->wlock); 1229 lwkt_reltoken(&rpb->rlock); 1230 1231 /* 1232 * Disconnect from peer. 1233 */ 1234 lwkt_gettoken(&wpb->rlock); 1235 lwkt_gettoken(&wpb->wlock); 1236 1237 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF); 1238 pipewakeup(wpb, 1); 1239 if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) { 1240 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW); 1241 wakeup(wpb); 1242 } 1243 if (SLIST_FIRST(&wpb->kq.ki_note)) 1244 KNOTE(&wpb->kq.ki_note, 0); 1245 lwkt_reltoken(&wpb->wlock); 1246 lwkt_reltoken(&wpb->rlock); 1247 1248 /* 1249 * Free resources once both sides are closed. We maintain a pcpu 1250 * cache to improve performance, so the actual tear-down case is 1251 * limited to bulk situations. 1252 * 1253 * However, the bulk tear-down case can cause intense contention 1254 * on the kernel_map when, e.g. hundreds to hundreds of thousands 1255 * of processes are killed at the same time. To deal with this we 1256 * use a pcpu mutex to maintain concurrency but also limit the 1257 * number of threads banging on the map and pmap. 1258 * 1259 * We use the mtx mechanism instead of the lockmgr mechanism because 1260 * the mtx mechanism utilizes a queued design which will not break 1261 * down in the face of thousands to hundreds of thousands of 1262 * processes trying to free pipes simultaneously. The lockmgr 1263 * mechanism will wind up waking them all up each time a lock 1264 * cycles. 1265 */ 1266 if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) { 1267 gd = mycpu; 1268 if (gd->gd_pipeqcount >= pipe_maxcache) { 1269 mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx); 1270 pipe_free_kmem(rpb); 1271 pipe_free_kmem(wpb); 1272 mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx); 1273 kfree(pipe, M_PIPE); 1274 } else { 1275 rpb->state = 0; 1276 wpb->state = 0; 1277 pipe->next = gd->gd_pipeq; 1278 gd->gd_pipeq = pipe; 1279 ++gd->gd_pipeqcount; 1280 } 1281 } 1282 } 1283 1284 static int 1285 pipe_kqfilter(struct file *fp, struct knote *kn) 1286 { 1287 struct pipebuf *rpb; 1288 struct pipebuf *wpb; 1289 struct pipe *pipe; 1290 1291 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); 1292 if ((intptr_t)fp->f_data & 1) { 1293 rpb = &pipe->bufferB; 1294 wpb = &pipe->bufferA; 1295 } else { 1296 rpb = &pipe->bufferA; 1297 wpb = &pipe->bufferB; 1298 } 1299 1300 switch (kn->kn_filter) { 1301 case EVFILT_READ: 1302 kn->kn_fop = &pipe_rfiltops; 1303 break; 1304 case EVFILT_WRITE: 1305 kn->kn_fop = &pipe_wfiltops; 1306 break; 1307 default: 1308 return (EOPNOTSUPP); 1309 } 1310 1311 if (rpb == &pipe->bufferA) 1312 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0); 1313 else 1314 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1); 1315 1316 knote_insert(&rpb->kq.ki_note, kn); 1317 1318 return (0); 1319 } 1320 1321 static void 1322 filt_pipedetach(struct knote *kn) 1323 { 1324 struct pipebuf *rpb; 1325 struct pipebuf *wpb; 1326 struct pipe *pipe; 1327 1328 pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1); 1329 if ((intptr_t)kn->kn_hook & 1) { 1330 rpb = &pipe->bufferB; 1331 wpb = &pipe->bufferA; 1332 } else { 1333 rpb = &pipe->bufferA; 1334 wpb = &pipe->bufferB; 1335 } 1336 knote_remove(&rpb->kq.ki_note, kn); 1337 } 1338 1339 /*ARGSUSED*/ 1340 static int 1341 filt_piperead(struct knote *kn, long hint) 1342 { 1343 struct pipebuf *rpb; 1344 struct pipebuf *wpb; 1345 struct pipe *pipe; 1346 int ready = 0; 1347 1348 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1); 1349 if ((intptr_t)kn->kn_fp->f_data & 1) { 1350 rpb = &pipe->bufferB; 1351 wpb = &pipe->bufferA; 1352 } else { 1353 rpb = &pipe->bufferA; 1354 wpb = &pipe->bufferB; 1355 } 1356 1357 /* 1358 * We shouldn't need the pipe locks because the knote itself is 1359 * locked via KN_PROCESSING. If we lose a race against the writer, 1360 * the writer will just issue a KNOTE() after us. 1361 */ 1362 #if 0 1363 lwkt_gettoken(&rpb->rlock); 1364 lwkt_gettoken(&rpb->wlock); 1365 #endif 1366 1367 kn->kn_data = rpb->windex - rpb->rindex; 1368 if (kn->kn_data < 0) 1369 kn->kn_data = 0; 1370 1371 if (rpb->state & PIPE_REOF) { 1372 /* 1373 * Only set NODATA if all data has been exhausted 1374 */ 1375 if (kn->kn_data == 0) 1376 kn->kn_flags |= EV_NODATA; 1377 kn->kn_flags |= EV_EOF; 1378 1379 /* 1380 * Only set HUP if the pipe is completely closed. 1381 * half-closed does not count (to make the behavior 1382 * the same as linux). 1383 */ 1384 if (wpb->state & PIPE_CLOSED) { 1385 kn->kn_flags |= EV_HUP; 1386 ready = 1; 1387 } 1388 } 1389 1390 #if 0 1391 lwkt_reltoken(&rpb->wlock); 1392 lwkt_reltoken(&rpb->rlock); 1393 #endif 1394 1395 if (!ready && (kn->kn_sfflags & NOTE_HUPONLY) == 0) 1396 ready = kn->kn_data > 0; 1397 1398 return (ready); 1399 } 1400 1401 /*ARGSUSED*/ 1402 static int 1403 filt_pipewrite(struct knote *kn, long hint) 1404 { 1405 struct pipebuf *rpb; 1406 struct pipebuf *wpb; 1407 struct pipe *pipe; 1408 int ready = 0; 1409 1410 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1); 1411 if ((intptr_t)kn->kn_fp->f_data & 1) { 1412 rpb = &pipe->bufferB; 1413 wpb = &pipe->bufferA; 1414 } else { 1415 rpb = &pipe->bufferA; 1416 wpb = &pipe->bufferB; 1417 } 1418 1419 kn->kn_data = 0; 1420 if (wpb->state & PIPE_CLOSED) { 1421 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA; 1422 return (1); 1423 } 1424 1425 /* 1426 * We shouldn't need the pipe locks because the knote itself is 1427 * locked via KN_PROCESSING. If we lose a race against the reader, 1428 * the writer will just issue a KNOTE() after us. 1429 */ 1430 #if 0 1431 lwkt_gettoken(&wpb->rlock); 1432 lwkt_gettoken(&wpb->wlock); 1433 #endif 1434 1435 if (wpb->state & PIPE_WEOF) { 1436 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA; 1437 ready = 1; 1438 } 1439 1440 if (!ready) { 1441 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex); 1442 if (kn->kn_data < 0) 1443 kn->kn_data = 0; 1444 } 1445 1446 #if 0 1447 lwkt_reltoken(&wpb->wlock); 1448 lwkt_reltoken(&wpb->rlock); 1449 #endif 1450 1451 if (!ready) 1452 ready = kn->kn_data >= PIPE_BUF; 1453 1454 return (ready); 1455 } 1456