1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $ 20 * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $ 21 */ 22 23 /* 24 * This file contains a high-performance replacement for the socket-based 25 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 26 * all features of sockets, but does do everything that pipes normally 27 * do. 28 */ 29 #include <sys/param.h> 30 #include <sys/systm.h> 31 #include <sys/kernel.h> 32 #include <sys/proc.h> 33 #include <sys/fcntl.h> 34 #include <sys/file.h> 35 #include <sys/filedesc.h> 36 #include <sys/filio.h> 37 #include <sys/ttycom.h> 38 #include <sys/stat.h> 39 #include <sys/poll.h> 40 #include <sys/select.h> 41 #include <sys/signalvar.h> 42 #include <sys/sysproto.h> 43 #include <sys/pipe.h> 44 #include <sys/vnode.h> 45 #include <sys/uio.h> 46 #include <sys/event.h> 47 #include <sys/globaldata.h> 48 #include <sys/module.h> 49 #include <sys/malloc.h> 50 #include <sys/sysctl.h> 51 #include <sys/socket.h> 52 53 #include <vm/vm.h> 54 #include <vm/vm_param.h> 55 #include <sys/lock.h> 56 #include <vm/vm_object.h> 57 #include <vm/vm_kern.h> 58 #include <vm/vm_extern.h> 59 #include <vm/pmap.h> 60 #include <vm/vm_map.h> 61 #include <vm/vm_page.h> 62 #include <vm/vm_zone.h> 63 64 #include <sys/file2.h> 65 #include <sys/signal2.h> 66 67 #include <machine/cpufunc.h> 68 69 /* 70 * interfaces to the outside world 71 */ 72 static int pipe_read (struct file *fp, struct uio *uio, 73 struct ucred *cred, int flags); 74 static int pipe_write (struct file *fp, struct uio *uio, 75 struct ucred *cred, int flags); 76 static int pipe_close (struct file *fp); 77 static int pipe_shutdown (struct file *fp, int how); 78 static int pipe_poll (struct file *fp, int events, struct ucred *cred); 79 static int pipe_kqfilter (struct file *fp, struct knote *kn); 80 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred); 81 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, struct ucred *cred); 82 83 static struct fileops pipeops = { 84 .fo_read = pipe_read, 85 .fo_write = pipe_write, 86 .fo_ioctl = pipe_ioctl, 87 .fo_poll = pipe_poll, 88 .fo_kqfilter = pipe_kqfilter, 89 .fo_stat = pipe_stat, 90 .fo_close = pipe_close, 91 .fo_shutdown = pipe_shutdown 92 }; 93 94 static void filt_pipedetach(struct knote *kn); 95 static int filt_piperead(struct knote *kn, long hint); 96 static int filt_pipewrite(struct knote *kn, long hint); 97 98 static struct filterops pipe_rfiltops = 99 { 1, NULL, filt_pipedetach, filt_piperead }; 100 static struct filterops pipe_wfiltops = 101 { 1, NULL, filt_pipedetach, filt_pipewrite }; 102 103 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); 104 105 /* 106 * Default pipe buffer size(s), this can be kind-of large now because pipe 107 * space is pageable. The pipe code will try to maintain locality of 108 * reference for performance reasons, so small amounts of outstanding I/O 109 * will not wipe the cache. 110 */ 111 #define MINPIPESIZE (PIPE_SIZE/3) 112 #define MAXPIPESIZE (2*PIPE_SIZE/3) 113 114 /* 115 * Limit the number of "big" pipes 116 */ 117 #define LIMITBIGPIPES 64 118 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ 119 120 static int pipe_maxbig = LIMITBIGPIPES; 121 static int pipe_maxcache = PIPEQ_MAX_CACHE; 122 static int pipe_bigcount; 123 static int pipe_nbig; 124 static int pipe_bcache_alloc; 125 static int pipe_bkmem_alloc; 126 static int pipe_rblocked_count; 127 static int pipe_wblocked_count; 128 129 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); 130 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig, 131 CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated"); 132 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount, 133 CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded"); 134 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked, 135 CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded"); 136 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked, 137 CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded"); 138 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, 139 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); 140 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig, 141 CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes"); 142 #ifdef SMP 143 static int pipe_delay = 5000; /* 5uS default */ 144 SYSCTL_INT(_kern_pipe, OID_AUTO, delay, 145 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns"); 146 static int pipe_mpsafe = 1; 147 SYSCTL_INT(_kern_pipe, OID_AUTO, mpsafe, 148 CTLFLAG_RW, &pipe_mpsafe, 0, ""); 149 #endif 150 #if !defined(NO_PIPE_SYSCTL_STATS) 151 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc, 152 CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache"); 153 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc, 154 CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem"); 155 #endif 156 157 static void pipeclose (struct pipe *cpipe); 158 static void pipe_free_kmem (struct pipe *cpipe); 159 static int pipe_create (struct pipe **cpipep); 160 static __inline void pipeselwakeup (struct pipe *cpipe); 161 static int pipespace (struct pipe *cpipe, int size); 162 163 static __inline void 164 pipeselwakeup(struct pipe *cpipe) 165 { 166 if (cpipe->pipe_state & PIPE_SEL) { 167 get_mplock(); 168 cpipe->pipe_state &= ~PIPE_SEL; 169 selwakeup(&cpipe->pipe_sel); 170 rel_mplock(); 171 } 172 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) { 173 get_mplock(); 174 pgsigio(cpipe->pipe_sigio, SIGIO, 0); 175 rel_mplock(); 176 } 177 if (SLIST_FIRST(&cpipe->pipe_sel.si_note)) { 178 get_mplock(); 179 KNOTE(&cpipe->pipe_sel.si_note, 0); 180 rel_mplock(); 181 } 182 } 183 184 /* 185 * These routines are called before and after a UIO. The UIO 186 * may block, causing our held tokens to be lost temporarily. 187 * 188 * We use these routines to serialize reads against other reads 189 * and writes against other writes. 190 * 191 * The read token is held on entry so *ipp does not race. 192 */ 193 static __inline int 194 pipe_start_uio(struct pipe *cpipe, int *ipp) 195 { 196 int error; 197 198 while (*ipp) { 199 *ipp = -1; 200 error = tsleep(ipp, PCATCH, "pipexx", 0); 201 if (error) 202 return (error); 203 } 204 *ipp = 1; 205 return (0); 206 } 207 208 static __inline void 209 pipe_end_uio(struct pipe *cpipe, int *ipp) 210 { 211 if (*ipp < 0) { 212 *ipp = 0; 213 wakeup(ipp); 214 } else { 215 KKASSERT(*ipp > 0); 216 *ipp = 0; 217 } 218 } 219 220 static __inline void 221 pipe_get_mplock(int *save) 222 { 223 #ifdef SMP 224 if (pipe_mpsafe == 0) { 225 get_mplock(); 226 *save = 1; 227 } else 228 #endif 229 { 230 *save = 0; 231 } 232 } 233 234 static __inline void 235 pipe_rel_mplock(int *save) 236 { 237 #ifdef SMP 238 if (*save) 239 rel_mplock(); 240 #endif 241 } 242 243 244 /* 245 * The pipe system call for the DTYPE_PIPE type of pipes 246 * 247 * pipe_ARgs(int dummy) 248 */ 249 250 /* ARGSUSED */ 251 int 252 sys_pipe(struct pipe_args *uap) 253 { 254 struct thread *td = curthread; 255 struct proc *p = td->td_proc; 256 struct file *rf, *wf; 257 struct pipe *rpipe, *wpipe; 258 int fd1, fd2, error; 259 260 KKASSERT(p); 261 262 rpipe = wpipe = NULL; 263 if (pipe_create(&rpipe) || pipe_create(&wpipe)) { 264 pipeclose(rpipe); 265 pipeclose(wpipe); 266 return (ENFILE); 267 } 268 269 error = falloc(p, &rf, &fd1); 270 if (error) { 271 pipeclose(rpipe); 272 pipeclose(wpipe); 273 return (error); 274 } 275 uap->sysmsg_fds[0] = fd1; 276 277 /* 278 * Warning: once we've gotten past allocation of the fd for the 279 * read-side, we can only drop the read side via fdrop() in order 280 * to avoid races against processes which manage to dup() the read 281 * side while we are blocked trying to allocate the write side. 282 */ 283 rf->f_type = DTYPE_PIPE; 284 rf->f_flag = FREAD | FWRITE; 285 rf->f_ops = &pipeops; 286 rf->f_data = rpipe; 287 error = falloc(p, &wf, &fd2); 288 if (error) { 289 fsetfd(p, NULL, fd1); 290 fdrop(rf); 291 /* rpipe has been closed by fdrop(). */ 292 pipeclose(wpipe); 293 return (error); 294 } 295 wf->f_type = DTYPE_PIPE; 296 wf->f_flag = FREAD | FWRITE; 297 wf->f_ops = &pipeops; 298 wf->f_data = wpipe; 299 uap->sysmsg_fds[1] = fd2; 300 301 rpipe->pipe_slock = kmalloc(sizeof(struct lock), 302 M_PIPE, M_WAITOK|M_ZERO); 303 wpipe->pipe_slock = rpipe->pipe_slock; 304 rpipe->pipe_peer = wpipe; 305 wpipe->pipe_peer = rpipe; 306 lockinit(rpipe->pipe_slock, "pipecl", 0, 0); 307 308 /* 309 * Once activated the peer relationship remains valid until 310 * both sides are closed. 311 */ 312 fsetfd(p, rf, fd1); 313 fsetfd(p, wf, fd2); 314 fdrop(rf); 315 fdrop(wf); 316 317 return (0); 318 } 319 320 /* 321 * Allocate kva for pipe circular buffer, the space is pageable 322 * This routine will 'realloc' the size of a pipe safely, if it fails 323 * it will retain the old buffer. 324 * If it fails it will return ENOMEM. 325 */ 326 static int 327 pipespace(struct pipe *cpipe, int size) 328 { 329 struct vm_object *object; 330 caddr_t buffer; 331 int npages, error; 332 333 npages = round_page(size) / PAGE_SIZE; 334 object = cpipe->pipe_buffer.object; 335 336 /* 337 * [re]create the object if necessary and reserve space for it 338 * in the kernel_map. The object and memory are pageable. On 339 * success, free the old resources before assigning the new 340 * ones. 341 */ 342 if (object == NULL || object->size != npages) { 343 get_mplock(); 344 object = vm_object_allocate(OBJT_DEFAULT, npages); 345 buffer = (caddr_t)vm_map_min(&kernel_map); 346 347 error = vm_map_find(&kernel_map, object, 0, 348 (vm_offset_t *)&buffer, size, 349 1, 350 VM_MAPTYPE_NORMAL, 351 VM_PROT_ALL, VM_PROT_ALL, 352 0); 353 354 if (error != KERN_SUCCESS) { 355 vm_object_deallocate(object); 356 rel_mplock(); 357 return (ENOMEM); 358 } 359 pipe_free_kmem(cpipe); 360 rel_mplock(); 361 cpipe->pipe_buffer.object = object; 362 cpipe->pipe_buffer.buffer = buffer; 363 cpipe->pipe_buffer.size = size; 364 ++pipe_bkmem_alloc; 365 } else { 366 ++pipe_bcache_alloc; 367 } 368 cpipe->pipe_buffer.rindex = 0; 369 cpipe->pipe_buffer.windex = 0; 370 return (0); 371 } 372 373 /* 374 * Initialize and allocate VM and memory for pipe, pulling the pipe from 375 * our per-cpu cache if possible. For now make sure it is sized for the 376 * smaller PIPE_SIZE default. 377 */ 378 static int 379 pipe_create(struct pipe **cpipep) 380 { 381 globaldata_t gd = mycpu; 382 struct pipe *cpipe; 383 int error; 384 385 if ((cpipe = gd->gd_pipeq) != NULL) { 386 gd->gd_pipeq = cpipe->pipe_peer; 387 --gd->gd_pipeqcount; 388 cpipe->pipe_peer = NULL; 389 cpipe->pipe_wantwcnt = 0; 390 } else { 391 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO); 392 } 393 *cpipep = cpipe; 394 if ((error = pipespace(cpipe, PIPE_SIZE)) != 0) 395 return (error); 396 vfs_timestamp(&cpipe->pipe_ctime); 397 cpipe->pipe_atime = cpipe->pipe_ctime; 398 cpipe->pipe_mtime = cpipe->pipe_ctime; 399 lwkt_token_init(&cpipe->pipe_rlock); 400 lwkt_token_init(&cpipe->pipe_wlock); 401 return (0); 402 } 403 404 /* 405 * MPALMOSTSAFE (acquires mplock) 406 */ 407 static int 408 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 409 { 410 struct pipe *rpipe; 411 int error; 412 size_t nread = 0; 413 int nbio; 414 u_int size; /* total bytes available */ 415 u_int nsize; /* total bytes to read */ 416 u_int rindex; /* contiguous bytes available */ 417 int notify_writer; 418 lwkt_tokref rlock; 419 lwkt_tokref wlock; 420 int mpsave; 421 int bigread; 422 int bigcount; 423 424 if (uio->uio_resid == 0) 425 return(0); 426 427 /* 428 * Setup locks, calculate nbio 429 */ 430 pipe_get_mplock(&mpsave); 431 rpipe = (struct pipe *)fp->f_data; 432 lwkt_gettoken(&rlock, &rpipe->pipe_rlock); 433 434 if (fflags & O_FBLOCKING) 435 nbio = 0; 436 else if (fflags & O_FNONBLOCKING) 437 nbio = 1; 438 else if (fp->f_flag & O_NONBLOCK) 439 nbio = 1; 440 else 441 nbio = 0; 442 443 /* 444 * Reads are serialized. Note howeverthat pipe_buffer.buffer and 445 * pipe_buffer.size can change out from under us when the number 446 * of bytes in the buffer are zero due to the write-side doing a 447 * pipespace(). 448 */ 449 error = pipe_start_uio(rpipe, &rpipe->pipe_rip); 450 if (error) { 451 pipe_rel_mplock(&mpsave); 452 lwkt_reltoken(&rlock); 453 return (error); 454 } 455 notify_writer = 0; 456 457 bigread = (uio->uio_resid > 10 * 1024 * 1024); 458 bigcount = 10; 459 460 while (uio->uio_resid) { 461 /* 462 * Don't hog the cpu. 463 */ 464 if (bigread && --bigcount == 0) { 465 lwkt_user_yield(); 466 bigcount = 10; 467 if (CURSIG(curthread->td_lwp)) { 468 error = EINTR; 469 break; 470 } 471 } 472 473 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 474 cpu_lfence(); 475 if (size) { 476 rindex = rpipe->pipe_buffer.rindex & 477 (rpipe->pipe_buffer.size - 1); 478 nsize = size; 479 if (nsize > rpipe->pipe_buffer.size - rindex) 480 nsize = rpipe->pipe_buffer.size - rindex; 481 nsize = szmin(nsize, uio->uio_resid); 482 483 error = uiomove(&rpipe->pipe_buffer.buffer[rindex], 484 nsize, uio); 485 if (error) 486 break; 487 cpu_mfence(); 488 rpipe->pipe_buffer.rindex += nsize; 489 nread += nsize; 490 491 /* 492 * If the FIFO is still over half full just continue 493 * and do not try to notify the writer yet. 494 */ 495 if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) { 496 notify_writer = 0; 497 continue; 498 } 499 500 /* 501 * When the FIFO is less then half full notify any 502 * waiting writer. WANTW can be checked while 503 * holding just the rlock. 504 */ 505 notify_writer = 1; 506 if ((rpipe->pipe_state & PIPE_WANTW) == 0) 507 continue; 508 } 509 510 /* 511 * If the "write-side" was blocked we wake it up. This code 512 * is reached either when the buffer is completely emptied 513 * or if it becomes more then half-empty. 514 * 515 * Pipe_state can only be modified if both the rlock and 516 * wlock are held. 517 */ 518 if (rpipe->pipe_state & PIPE_WANTW) { 519 lwkt_gettoken(&wlock, &rpipe->pipe_wlock); 520 if (rpipe->pipe_state & PIPE_WANTW) { 521 notify_writer = 0; 522 rpipe->pipe_state &= ~PIPE_WANTW; 523 lwkt_reltoken(&wlock); 524 wakeup(rpipe); 525 } else { 526 lwkt_reltoken(&wlock); 527 } 528 } 529 530 /* 531 * Pick up our copy loop again if the writer sent data to 532 * us while we were messing around. 533 * 534 * On a SMP box poll up to pipe_delay nanoseconds for new 535 * data. Typically a value of 2000 to 4000 is sufficient 536 * to eradicate most IPIs/tsleeps/wakeups when a pipe 537 * is used for synchronous communications with small packets, 538 * and 8000 or so (8uS) will pipeline large buffer xfers 539 * between cpus over a pipe. 540 * 541 * For synchronous communications a hit means doing a 542 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS, 543 * where as miss requiring a tsleep/wakeup sequence 544 * will take 7uS or more. 545 */ 546 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) 547 continue; 548 549 #if defined(SMP) && defined(_RDTSC_SUPPORTED_) 550 if (pipe_delay) { 551 int64_t tsc_target; 552 int good = 0; 553 554 tsc_target = tsc_get_target(pipe_delay); 555 while (tsc_test_target(tsc_target) == 0) { 556 if (rpipe->pipe_buffer.windex != 557 rpipe->pipe_buffer.rindex) { 558 good = 1; 559 break; 560 } 561 } 562 if (good) 563 continue; 564 } 565 #endif 566 567 /* 568 * Detect EOF condition, do not set error. 569 */ 570 if (rpipe->pipe_state & PIPE_REOF) 571 break; 572 573 /* 574 * Break if some data was read, or if this was a non-blocking 575 * read. 576 */ 577 if (nread > 0) 578 break; 579 580 if (nbio) { 581 error = EAGAIN; 582 break; 583 } 584 585 /* 586 * Last chance, interlock with WANTR. 587 */ 588 lwkt_gettoken(&wlock, &rpipe->pipe_wlock); 589 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 590 if (size) { 591 lwkt_reltoken(&wlock); 592 continue; 593 } 594 595 /* 596 * If there is no more to read in the pipe, reset its 597 * pointers to the beginning. This improves cache hit 598 * stats. 599 * 600 * We need both locks to modify both pointers, and there 601 * must also not be a write in progress or the uiomove() 602 * in the write might block and temporarily release 603 * its wlock, then reacquire and update windex. We are 604 * only serialized against reads, not writes. 605 * 606 * XXX should we even bother resetting the indices? It 607 * might actually be more cache efficient not to. 608 */ 609 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex && 610 rpipe->pipe_wip == 0) { 611 rpipe->pipe_buffer.rindex = 0; 612 rpipe->pipe_buffer.windex = 0; 613 } 614 615 /* 616 * Wait for more data. 617 * 618 * Pipe_state can only be set if both the rlock and wlock 619 * are held. 620 */ 621 rpipe->pipe_state |= PIPE_WANTR; 622 tsleep_interlock(rpipe, PCATCH); 623 lwkt_reltoken(&wlock); 624 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0); 625 ++pipe_rblocked_count; 626 if (error) 627 break; 628 } 629 pipe_end_uio(rpipe, &rpipe->pipe_rip); 630 631 /* 632 * Uptime last access time 633 */ 634 if (error == 0 && nread) 635 vfs_timestamp(&rpipe->pipe_atime); 636 637 /* 638 * If we drained the FIFO more then half way then handle 639 * write blocking hysteresis. 640 * 641 * Note that PIPE_WANTW cannot be set by the writer without 642 * it holding both rlock and wlock, so we can test it 643 * while holding just rlock. 644 */ 645 if (notify_writer) { 646 if (rpipe->pipe_state & PIPE_WANTW) { 647 lwkt_gettoken(&wlock, &rpipe->pipe_wlock); 648 if (rpipe->pipe_state & PIPE_WANTW) { 649 rpipe->pipe_state &= ~PIPE_WANTW; 650 lwkt_reltoken(&wlock); 651 wakeup(rpipe); 652 } else { 653 lwkt_reltoken(&wlock); 654 } 655 } 656 } 657 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 658 lwkt_reltoken(&rlock); 659 660 /* 661 * If enough space is available in buffer then wakeup sel writers? 662 */ 663 if ((rpipe->pipe_buffer.size - size) >= PIPE_BUF) 664 pipeselwakeup(rpipe); 665 pipe_rel_mplock(&mpsave); 666 return (error); 667 } 668 669 /* 670 * MPALMOSTSAFE - acquires mplock 671 */ 672 static int 673 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) 674 { 675 int error; 676 int orig_resid; 677 int nbio; 678 struct pipe *wpipe, *rpipe; 679 lwkt_tokref rlock; 680 lwkt_tokref wlock; 681 u_int windex; 682 u_int space; 683 u_int wcount; 684 int mpsave; 685 int bigwrite; 686 int bigcount; 687 688 pipe_get_mplock(&mpsave); 689 690 /* 691 * Writes go to the peer. The peer will always exist. 692 */ 693 rpipe = (struct pipe *) fp->f_data; 694 wpipe = rpipe->pipe_peer; 695 lwkt_gettoken(&wlock, &wpipe->pipe_wlock); 696 if (wpipe->pipe_state & PIPE_WEOF) { 697 pipe_rel_mplock(&mpsave); 698 lwkt_reltoken(&wlock); 699 return (EPIPE); 700 } 701 702 /* 703 * Degenerate case (EPIPE takes prec) 704 */ 705 if (uio->uio_resid == 0) { 706 pipe_rel_mplock(&mpsave); 707 lwkt_reltoken(&wlock); 708 return(0); 709 } 710 711 /* 712 * Writes are serialized (start_uio must be called with wlock) 713 */ 714 error = pipe_start_uio(wpipe, &wpipe->pipe_wip); 715 if (error) { 716 pipe_rel_mplock(&mpsave); 717 lwkt_reltoken(&wlock); 718 return (error); 719 } 720 721 if (fflags & O_FBLOCKING) 722 nbio = 0; 723 else if (fflags & O_FNONBLOCKING) 724 nbio = 1; 725 else if (fp->f_flag & O_NONBLOCK) 726 nbio = 1; 727 else 728 nbio = 0; 729 730 /* 731 * If it is advantageous to resize the pipe buffer, do 732 * so. We are write-serialized so we can block safely. 733 */ 734 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && 735 (pipe_nbig < pipe_maxbig) && 736 wpipe->pipe_wantwcnt > 4 && 737 (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { 738 /* 739 * Recheck after lock. 740 */ 741 lwkt_gettoken(&rlock, &wpipe->pipe_rlock); 742 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && 743 (pipe_nbig < pipe_maxbig) && 744 (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { 745 atomic_add_int(&pipe_nbig, 1); 746 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 747 ++pipe_bigcount; 748 else 749 atomic_subtract_int(&pipe_nbig, 1); 750 } 751 lwkt_reltoken(&rlock); 752 } 753 754 orig_resid = uio->uio_resid; 755 wcount = 0; 756 757 bigwrite = (uio->uio_resid > 10 * 1024 * 1024); 758 bigcount = 10; 759 760 while (uio->uio_resid) { 761 if (wpipe->pipe_state & PIPE_WEOF) { 762 error = EPIPE; 763 break; 764 } 765 766 /* 767 * Don't hog the cpu. 768 */ 769 if (bigwrite && --bigcount == 0) { 770 lwkt_user_yield(); 771 bigcount = 10; 772 if (CURSIG(curthread->td_lwp)) { 773 error = EINTR; 774 break; 775 } 776 } 777 778 windex = wpipe->pipe_buffer.windex & 779 (wpipe->pipe_buffer.size - 1); 780 space = wpipe->pipe_buffer.size - 781 (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); 782 cpu_lfence(); 783 784 /* Writes of size <= PIPE_BUF must be atomic. */ 785 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 786 space = 0; 787 788 /* 789 * Write to fill, read size handles write hysteresis. Also 790 * additional restrictions can cause select-based non-blocking 791 * writes to spin. 792 */ 793 if (space > 0) { 794 u_int segsize; 795 796 /* 797 * Transfer size is minimum of uio transfer 798 * and free space in pipe buffer. 799 * 800 * Limit each uiocopy to no more then PIPE_SIZE 801 * so we can keep the gravy train going on a 802 * SMP box. This doubles the performance for 803 * write sizes > 16K. Otherwise large writes 804 * wind up doing an inefficient synchronous 805 * ping-pong. 806 */ 807 space = szmin(space, uio->uio_resid); 808 if (space > PIPE_SIZE) 809 space = PIPE_SIZE; 810 811 /* 812 * First segment to transfer is minimum of 813 * transfer size and contiguous space in 814 * pipe buffer. If first segment to transfer 815 * is less than the transfer size, we've got 816 * a wraparound in the buffer. 817 */ 818 segsize = wpipe->pipe_buffer.size - windex; 819 if (segsize > space) 820 segsize = space; 821 822 #ifdef SMP 823 /* 824 * If this is the first loop and the reader is 825 * blocked, do a preemptive wakeup of the reader. 826 * 827 * On SMP the IPI latency plus the wlock interlock 828 * on the reader side is the fastest way to get the 829 * reader going. (The scheduler will hard loop on 830 * lock tokens). 831 * 832 * NOTE: We can't clear WANTR here without acquiring 833 * the rlock, which we don't want to do here! 834 */ 835 if ((wpipe->pipe_state & PIPE_WANTR) && pipe_mpsafe > 1) 836 wakeup(wpipe); 837 #endif 838 839 /* 840 * Transfer segment, which may include a wrap-around. 841 * Update windex to account for both all in one go 842 * so the reader can read() the data atomically. 843 */ 844 error = uiomove(&wpipe->pipe_buffer.buffer[windex], 845 segsize, uio); 846 if (error == 0 && segsize < space) { 847 segsize = space - segsize; 848 error = uiomove(&wpipe->pipe_buffer.buffer[0], 849 segsize, uio); 850 } 851 if (error) 852 break; 853 cpu_mfence(); 854 wpipe->pipe_buffer.windex += space; 855 wcount += space; 856 continue; 857 } 858 859 /* 860 * We need both the rlock and the wlock to interlock against 861 * the EOF, WANTW, and size checks, and to modify pipe_state. 862 * 863 * These are token locks so we do not have to worry about 864 * deadlocks. 865 */ 866 lwkt_gettoken(&rlock, &wpipe->pipe_rlock); 867 868 /* 869 * If the "read-side" has been blocked, wake it up now 870 * and yield to let it drain synchronously rather 871 * then block. 872 */ 873 if (wpipe->pipe_state & PIPE_WANTR) { 874 wpipe->pipe_state &= ~PIPE_WANTR; 875 wakeup(wpipe); 876 } 877 878 /* 879 * don't block on non-blocking I/O 880 */ 881 if (nbio) { 882 lwkt_reltoken(&rlock); 883 error = EAGAIN; 884 break; 885 } 886 887 /* 888 * re-test whether we have to block in the writer after 889 * acquiring both locks, in case the reader opened up 890 * some space. 891 */ 892 space = wpipe->pipe_buffer.size - 893 (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); 894 cpu_lfence(); 895 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 896 space = 0; 897 898 /* 899 * We have no more space and have something to offer, 900 * wake up select/poll. 901 */ 902 if (space == 0) { 903 wpipe->pipe_state |= PIPE_WANTW; 904 ++wpipe->pipe_wantwcnt; 905 pipeselwakeup(wpipe); 906 if (wpipe->pipe_state & PIPE_WANTW) 907 error = tsleep(wpipe, PCATCH, "pipewr", 0); 908 ++pipe_wblocked_count; 909 } 910 lwkt_reltoken(&rlock); 911 912 /* 913 * Break out if we errored or the read side wants us to go 914 * away. 915 */ 916 if (error) 917 break; 918 if (wpipe->pipe_state & PIPE_WEOF) { 919 error = EPIPE; 920 break; 921 } 922 } 923 pipe_end_uio(wpipe, &wpipe->pipe_wip); 924 925 /* 926 * If we have put any characters in the buffer, we wake up 927 * the reader. 928 * 929 * Both rlock and wlock are required to be able to modify pipe_state. 930 */ 931 if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) { 932 if (wpipe->pipe_state & PIPE_WANTR) { 933 lwkt_gettoken(&rlock, &wpipe->pipe_rlock); 934 if (wpipe->pipe_state & PIPE_WANTR) { 935 wpipe->pipe_state &= ~PIPE_WANTR; 936 lwkt_reltoken(&rlock); 937 wakeup(wpipe); 938 } else { 939 lwkt_reltoken(&rlock); 940 } 941 } 942 } 943 944 /* 945 * Don't return EPIPE if I/O was successful 946 */ 947 if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) && 948 (uio->uio_resid == 0) && 949 (error == EPIPE)) { 950 error = 0; 951 } 952 953 if (error == 0) 954 vfs_timestamp(&wpipe->pipe_mtime); 955 956 /* 957 * We have something to offer, 958 * wake up select/poll. 959 */ 960 space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex; 961 lwkt_reltoken(&wlock); 962 if (space) 963 pipeselwakeup(wpipe); 964 pipe_rel_mplock(&mpsave); 965 return (error); 966 } 967 968 /* 969 * MPALMOSTSAFE - acquires mplock 970 * 971 * we implement a very minimal set of ioctls for compatibility with sockets. 972 */ 973 int 974 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct ucred *cred) 975 { 976 struct pipe *mpipe; 977 lwkt_tokref rlock; 978 lwkt_tokref wlock; 979 int error; 980 int mpsave; 981 982 pipe_get_mplock(&mpsave); 983 mpipe = (struct pipe *)fp->f_data; 984 985 lwkt_gettoken(&rlock, &mpipe->pipe_rlock); 986 lwkt_gettoken(&wlock, &mpipe->pipe_wlock); 987 988 switch (cmd) { 989 case FIOASYNC: 990 if (*(int *)data) { 991 mpipe->pipe_state |= PIPE_ASYNC; 992 } else { 993 mpipe->pipe_state &= ~PIPE_ASYNC; 994 } 995 error = 0; 996 break; 997 case FIONREAD: 998 *(int *)data = mpipe->pipe_buffer.windex - 999 mpipe->pipe_buffer.rindex; 1000 error = 0; 1001 break; 1002 case FIOSETOWN: 1003 get_mplock(); 1004 error = fsetown(*(int *)data, &mpipe->pipe_sigio); 1005 rel_mplock(); 1006 break; 1007 case FIOGETOWN: 1008 *(int *)data = fgetown(mpipe->pipe_sigio); 1009 error = 0; 1010 break; 1011 case TIOCSPGRP: 1012 /* This is deprecated, FIOSETOWN should be used instead. */ 1013 get_mplock(); 1014 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio); 1015 rel_mplock(); 1016 break; 1017 1018 case TIOCGPGRP: 1019 /* This is deprecated, FIOGETOWN should be used instead. */ 1020 *(int *)data = -fgetown(mpipe->pipe_sigio); 1021 error = 0; 1022 break; 1023 default: 1024 error = ENOTTY; 1025 break; 1026 } 1027 lwkt_reltoken(&rlock); 1028 lwkt_reltoken(&wlock); 1029 pipe_rel_mplock(&mpsave); 1030 1031 return (error); 1032 } 1033 1034 /* 1035 * MPALMOSTSAFE - acquires mplock 1036 */ 1037 int 1038 pipe_poll(struct file *fp, int events, struct ucred *cred) 1039 { 1040 struct pipe *rpipe; 1041 struct pipe *wpipe; 1042 int revents = 0; 1043 u_int space; 1044 int mpsave; 1045 1046 pipe_get_mplock(&mpsave); 1047 rpipe = (struct pipe *)fp->f_data; 1048 wpipe = rpipe->pipe_peer; 1049 if (events & (POLLIN | POLLRDNORM)) { 1050 if ((rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) || 1051 (rpipe->pipe_state & PIPE_REOF)) { 1052 revents |= events & (POLLIN | POLLRDNORM); 1053 } 1054 } 1055 1056 if (events & (POLLOUT | POLLWRNORM)) { 1057 if (wpipe == NULL || (wpipe->pipe_state & PIPE_WEOF)) { 1058 revents |= events & (POLLOUT | POLLWRNORM); 1059 } else { 1060 space = wpipe->pipe_buffer.windex - 1061 wpipe->pipe_buffer.rindex; 1062 space = wpipe->pipe_buffer.size - space; 1063 if (space >= PIPE_BUF) 1064 revents |= events & (POLLOUT | POLLWRNORM); 1065 } 1066 } 1067 1068 if ((rpipe->pipe_state & PIPE_REOF) || 1069 (wpipe == NULL) || 1070 (wpipe->pipe_state & PIPE_WEOF)) 1071 revents |= POLLHUP; 1072 1073 if (revents == 0) { 1074 if (events & (POLLIN | POLLRDNORM)) { 1075 selrecord(curthread, &rpipe->pipe_sel); 1076 rpipe->pipe_state |= PIPE_SEL; 1077 } 1078 1079 if (events & (POLLOUT | POLLWRNORM)) { 1080 selrecord(curthread, &wpipe->pipe_sel); 1081 wpipe->pipe_state |= PIPE_SEL; 1082 } 1083 } 1084 pipe_rel_mplock(&mpsave); 1085 return (revents); 1086 } 1087 1088 /* 1089 * MPSAFE 1090 */ 1091 static int 1092 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) 1093 { 1094 struct pipe *pipe; 1095 int mpsave; 1096 1097 pipe_get_mplock(&mpsave); 1098 pipe = (struct pipe *)fp->f_data; 1099 1100 bzero((caddr_t)ub, sizeof(*ub)); 1101 ub->st_mode = S_IFIFO; 1102 ub->st_blksize = pipe->pipe_buffer.size; 1103 ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex; 1104 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1105 ub->st_atimespec = pipe->pipe_atime; 1106 ub->st_mtimespec = pipe->pipe_mtime; 1107 ub->st_ctimespec = pipe->pipe_ctime; 1108 /* 1109 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, 1110 * st_flags, st_gen. 1111 * XXX (st_dev, st_ino) should be unique. 1112 */ 1113 pipe_rel_mplock(&mpsave); 1114 return (0); 1115 } 1116 1117 /* 1118 * MPALMOSTSAFE - acquires mplock 1119 */ 1120 static int 1121 pipe_close(struct file *fp) 1122 { 1123 struct pipe *cpipe; 1124 1125 get_mplock(); 1126 cpipe = (struct pipe *)fp->f_data; 1127 fp->f_ops = &badfileops; 1128 fp->f_data = NULL; 1129 funsetown(cpipe->pipe_sigio); 1130 pipeclose(cpipe); 1131 rel_mplock(); 1132 return (0); 1133 } 1134 1135 /* 1136 * Shutdown one or both directions of a full-duplex pipe. 1137 * 1138 * MPALMOSTSAFE - acquires mplock 1139 */ 1140 static int 1141 pipe_shutdown(struct file *fp, int how) 1142 { 1143 struct pipe *rpipe; 1144 struct pipe *wpipe; 1145 int error = EPIPE; 1146 lwkt_tokref rpipe_rlock; 1147 lwkt_tokref rpipe_wlock; 1148 lwkt_tokref wpipe_rlock; 1149 lwkt_tokref wpipe_wlock; 1150 int mpsave; 1151 1152 pipe_get_mplock(&mpsave); 1153 rpipe = (struct pipe *)fp->f_data; 1154 wpipe = rpipe->pipe_peer; 1155 1156 /* 1157 * We modify pipe_state on both pipes, which means we need 1158 * all four tokens! 1159 */ 1160 lwkt_gettoken(&rpipe_rlock, &rpipe->pipe_rlock); 1161 lwkt_gettoken(&rpipe_wlock, &rpipe->pipe_wlock); 1162 lwkt_gettoken(&wpipe_rlock, &wpipe->pipe_rlock); 1163 lwkt_gettoken(&wpipe_wlock, &wpipe->pipe_wlock); 1164 1165 switch(how) { 1166 case SHUT_RDWR: 1167 case SHUT_RD: 1168 rpipe->pipe_state |= PIPE_REOF; /* my reads */ 1169 rpipe->pipe_state |= PIPE_WEOF; /* peer writes */ 1170 if (rpipe->pipe_state & PIPE_WANTR) { 1171 rpipe->pipe_state &= ~PIPE_WANTR; 1172 wakeup(rpipe); 1173 } 1174 if (rpipe->pipe_state & PIPE_WANTW) { 1175 rpipe->pipe_state &= ~PIPE_WANTW; 1176 wakeup(rpipe); 1177 } 1178 error = 0; 1179 if (how == SHUT_RD) 1180 break; 1181 /* fall through */ 1182 case SHUT_WR: 1183 wpipe->pipe_state |= PIPE_REOF; /* peer reads */ 1184 wpipe->pipe_state |= PIPE_WEOF; /* my writes */ 1185 if (wpipe->pipe_state & PIPE_WANTR) { 1186 wpipe->pipe_state &= ~PIPE_WANTR; 1187 wakeup(wpipe); 1188 } 1189 if (wpipe->pipe_state & PIPE_WANTW) { 1190 wpipe->pipe_state &= ~PIPE_WANTW; 1191 wakeup(wpipe); 1192 } 1193 error = 0; 1194 break; 1195 } 1196 pipeselwakeup(rpipe); 1197 pipeselwakeup(wpipe); 1198 1199 lwkt_reltoken(&rpipe_rlock); 1200 lwkt_reltoken(&rpipe_wlock); 1201 lwkt_reltoken(&wpipe_rlock); 1202 lwkt_reltoken(&wpipe_wlock); 1203 1204 pipe_rel_mplock(&mpsave); 1205 return (error); 1206 } 1207 1208 static void 1209 pipe_free_kmem(struct pipe *cpipe) 1210 { 1211 if (cpipe->pipe_buffer.buffer != NULL) { 1212 if (cpipe->pipe_buffer.size > PIPE_SIZE) 1213 atomic_subtract_int(&pipe_nbig, 1); 1214 kmem_free(&kernel_map, 1215 (vm_offset_t)cpipe->pipe_buffer.buffer, 1216 cpipe->pipe_buffer.size); 1217 cpipe->pipe_buffer.buffer = NULL; 1218 cpipe->pipe_buffer.object = NULL; 1219 } 1220 } 1221 1222 /* 1223 * Close the pipe. The slock must be held to interlock against simultanious 1224 * closes. The rlock and wlock must be held to adjust the pipe_state. 1225 */ 1226 static void 1227 pipeclose(struct pipe *cpipe) 1228 { 1229 globaldata_t gd; 1230 struct pipe *ppipe; 1231 lwkt_tokref cpipe_rlock; 1232 lwkt_tokref cpipe_wlock; 1233 lwkt_tokref ppipe_rlock; 1234 lwkt_tokref ppipe_wlock; 1235 1236 if (cpipe == NULL) 1237 return; 1238 1239 /* 1240 * The slock may not have been allocated yet (close during 1241 * initialization) 1242 * 1243 * We need both the read and write tokens to modify pipe_state. 1244 */ 1245 if (cpipe->pipe_slock) 1246 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE); 1247 lwkt_gettoken(&cpipe_rlock, &cpipe->pipe_rlock); 1248 lwkt_gettoken(&cpipe_wlock, &cpipe->pipe_wlock); 1249 1250 /* 1251 * Set our state, wakeup anyone waiting in select, and 1252 * wakeup anyone blocked on our pipe. 1253 */ 1254 cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF; 1255 pipeselwakeup(cpipe); 1256 if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { 1257 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); 1258 wakeup(cpipe); 1259 } 1260 1261 /* 1262 * Disconnect from peer. 1263 */ 1264 if ((ppipe = cpipe->pipe_peer) != NULL) { 1265 lwkt_gettoken(&ppipe_rlock, &ppipe->pipe_rlock); 1266 lwkt_gettoken(&ppipe_wlock, &ppipe->pipe_wlock); 1267 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF; 1268 pipeselwakeup(ppipe); 1269 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { 1270 ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); 1271 wakeup(ppipe); 1272 } 1273 if (SLIST_FIRST(&ppipe->pipe_sel.si_note)) { 1274 get_mplock(); 1275 KNOTE(&ppipe->pipe_sel.si_note, 0); 1276 rel_mplock(); 1277 } 1278 lwkt_reltoken(&ppipe_rlock); 1279 lwkt_reltoken(&ppipe_wlock); 1280 } 1281 1282 /* 1283 * If the peer is also closed we can free resources for both 1284 * sides, otherwise we leave our side intact to deal with any 1285 * races (since we only have the slock). 1286 */ 1287 if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) { 1288 cpipe->pipe_peer = NULL; 1289 ppipe->pipe_peer = NULL; 1290 ppipe->pipe_slock = NULL; /* we will free the slock */ 1291 pipeclose(ppipe); 1292 ppipe = NULL; 1293 } 1294 1295 lwkt_reltoken(&cpipe_rlock); 1296 lwkt_reltoken(&cpipe_wlock); 1297 if (cpipe->pipe_slock) 1298 lockmgr(cpipe->pipe_slock, LK_RELEASE); 1299 1300 /* 1301 * If we disassociated from our peer we can free resources 1302 */ 1303 if (ppipe == NULL) { 1304 gd = mycpu; 1305 if (cpipe->pipe_slock) { 1306 kfree(cpipe->pipe_slock, M_PIPE); 1307 cpipe->pipe_slock = NULL; 1308 } 1309 if (gd->gd_pipeqcount >= pipe_maxcache || 1310 cpipe->pipe_buffer.size != PIPE_SIZE 1311 ) { 1312 pipe_free_kmem(cpipe); 1313 kfree(cpipe, M_PIPE); 1314 } else { 1315 cpipe->pipe_state = 0; 1316 cpipe->pipe_peer = gd->gd_pipeq; 1317 gd->gd_pipeq = cpipe; 1318 ++gd->gd_pipeqcount; 1319 } 1320 } 1321 } 1322 1323 /* 1324 * MPALMOSTSAFE - acquires mplock 1325 */ 1326 static int 1327 pipe_kqfilter(struct file *fp, struct knote *kn) 1328 { 1329 struct pipe *cpipe; 1330 1331 get_mplock(); 1332 cpipe = (struct pipe *)kn->kn_fp->f_data; 1333 1334 switch (kn->kn_filter) { 1335 case EVFILT_READ: 1336 kn->kn_fop = &pipe_rfiltops; 1337 break; 1338 case EVFILT_WRITE: 1339 kn->kn_fop = &pipe_wfiltops; 1340 cpipe = cpipe->pipe_peer; 1341 if (cpipe == NULL) { 1342 /* other end of pipe has been closed */ 1343 rel_mplock(); 1344 return (EPIPE); 1345 } 1346 break; 1347 default: 1348 return (1); 1349 } 1350 kn->kn_hook = (caddr_t)cpipe; 1351 1352 SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext); 1353 rel_mplock(); 1354 return (0); 1355 } 1356 1357 static void 1358 filt_pipedetach(struct knote *kn) 1359 { 1360 struct pipe *cpipe = (struct pipe *)kn->kn_hook; 1361 1362 SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext); 1363 } 1364 1365 /*ARGSUSED*/ 1366 static int 1367 filt_piperead(struct knote *kn, long hint) 1368 { 1369 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1370 1371 kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; 1372 1373 /* XXX RACE */ 1374 if (rpipe->pipe_state & PIPE_REOF) { 1375 kn->kn_flags |= EV_EOF; 1376 return (1); 1377 } 1378 return (kn->kn_data > 0); 1379 } 1380 1381 /*ARGSUSED*/ 1382 static int 1383 filt_pipewrite(struct knote *kn, long hint) 1384 { 1385 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1386 struct pipe *wpipe = rpipe->pipe_peer; 1387 u_int32_t space; 1388 1389 /* XXX RACE */ 1390 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_WEOF)) { 1391 kn->kn_data = 0; 1392 kn->kn_flags |= EV_EOF; 1393 return (1); 1394 } 1395 space = wpipe->pipe_buffer.windex - 1396 wpipe->pipe_buffer.rindex; 1397 space = wpipe->pipe_buffer.size - space; 1398 kn->kn_data = space; 1399 return (kn->kn_data >= PIPE_BUF); 1400 } 1401