1 /*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved.
5 *
6 * This code is derived from software contributed to The DragonFly Project
7 * by Matthew Dillon <dillon@backplane.com>
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice immediately at the beginning of the file, without modification,
14 * this list of conditions, and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 * 3. Absolutely no warranty of function or purpose is made by the author
19 * John S. Dyson.
20 * 4. Modifications may be freely made to this file if the above conditions
21 * are met.
22 */
23
24 /*
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
29 */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysmsg.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68
69 #include <machine/cpufunc.h>
70
71 struct pipegdlock {
72 struct mtx mtx;
73 } __cachealign;
74
75 /*
76 * interfaces to the outside world
77 */
78 static int pipe_read (struct file *fp, struct uio *uio,
79 struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio,
81 struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 struct ucred *cred, struct sysmsg *msg);
88
89 __read_mostly static struct fileops pipeops = {
90 .fo_read = pipe_read,
91 .fo_write = pipe_write,
92 .fo_ioctl = pipe_ioctl,
93 .fo_kqfilter = pipe_kqfilter,
94 .fo_stat = pipe_stat,
95 .fo_close = pipe_close,
96 .fo_shutdown = pipe_shutdown,
97 .fo_seek = badfo_seek
98 };
99
100 static void filt_pipedetach(struct knote *kn);
101 static int filt_piperead(struct knote *kn, long hint);
102 static int filt_pipewrite(struct knote *kn, long hint);
103
104 __read_mostly static struct filterops pipe_rfiltops =
105 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
106 __read_mostly static struct filterops pipe_wfiltops =
107 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
108
109 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
110
111 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */
112
113 __read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE;
114 __read_mostly static struct pipegdlock *pipe_gdlocks;
115
116 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
117 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
118 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
119
120 /*
121 * The pipe buffer size can be changed at any time. Only new pipe()s
122 * are affected. Note that due to cpu cache effects, you do not want
123 * to make this value too large.
124 */
125 __read_mostly static int pipe_size = 32768;
126 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
127 CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
128
129 /*
130 * Reader/writer delay loop. When the reader exhausts the pipe buffer
131 * or the write completely fills the pipe buffer and would otherwise sleep,
132 * it first busy-loops for a few microseconds waiting for data or buffer
133 * space. This eliminates IPIs for most high-bandwidth writer/reader pipes
134 * and also helps when the user program uses a large data buffer in its
135 * UIOs.
136 *
137 * This defaults to 4uS.
138 */
139 #ifdef _RDTSC_SUPPORTED_
140 __read_mostly static int pipe_delay = 4000; /* 4uS default */
141 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
142 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
143 #endif
144
145 /*
146 * Auto-size pipe cache to reduce kmem allocations and frees.
147 */
148 static
149 void
pipeinit(void * dummy)150 pipeinit(void *dummy)
151 {
152 size_t mbytes = kmem_lim_size();
153 int n;
154
155 if (pipe_maxcache == PIPEQ_MAX_CACHE) {
156 if (mbytes >= 7 * 1024)
157 pipe_maxcache *= 2;
158 if (mbytes >= 15 * 1024)
159 pipe_maxcache *= 2;
160 }
161
162 /*
163 * Detune the pcpu caching a bit on systems with an insane number
164 * of cpu threads to reduce memory waste.
165 */
166 if (ncpus > 64) {
167 pipe_maxcache = pipe_maxcache * 64 / ncpus;
168 if (pipe_maxcache < PIPEQ_MAX_CACHE)
169 pipe_maxcache = PIPEQ_MAX_CACHE;
170 }
171
172 pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
173 M_PIPE, M_WAITOK | M_ZERO);
174 for (n = 0; n < ncpus; ++n)
175 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
176 }
177 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
178
179 static void pipeclose (struct pipe *pipe,
180 struct pipebuf *pbr, struct pipebuf *pbw);
181 static void pipe_free_kmem (struct pipebuf *buf);
182 static int pipe_create (struct pipe **pipep);
183
184 /*
185 * Test and clear the specified flag, wakeup(pb) if it was set.
186 * This function must also act as a memory barrier.
187 */
188 static __inline void
pipesignal(struct pipebuf * pb,uint32_t flags)189 pipesignal(struct pipebuf *pb, uint32_t flags)
190 {
191 uint32_t oflags;
192 uint32_t nflags;
193
194 for (;;) {
195 oflags = pb->state;
196 cpu_ccfence();
197 nflags = oflags & ~flags;
198 if (atomic_cmpset_int(&pb->state, oflags, nflags))
199 break;
200 }
201 if (oflags & flags)
202 wakeup(pb);
203 }
204
205 /*
206 *
207 */
208 static __inline void
pipewakeup(struct pipebuf * pb,int dosigio)209 pipewakeup(struct pipebuf *pb, int dosigio)
210 {
211 if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
212 lwkt_gettoken(&sigio_token);
213 pgsigio(pb->sigio, SIGIO, 0);
214 lwkt_reltoken(&sigio_token);
215 }
216 KNOTE(&pb->kq.ki_note, 0);
217 }
218
219 /*
220 * These routines are called before and after a UIO. The UIO
221 * may block, causing our held tokens to be lost temporarily.
222 *
223 * We use these routines to serialize reads against other reads
224 * and writes against other writes.
225 *
226 * The appropriate token is held on entry so *ipp does not race.
227 */
228 static __inline int
pipe_start_uio(int * ipp)229 pipe_start_uio(int *ipp)
230 {
231 int error;
232
233 while (*ipp) {
234 *ipp = -1;
235 error = tsleep(ipp, PCATCH, "pipexx", 0);
236 if (error)
237 return (error);
238 }
239 *ipp = 1;
240 return (0);
241 }
242
243 static __inline void
pipe_end_uio(int * ipp)244 pipe_end_uio(int *ipp)
245 {
246 if (*ipp < 0) {
247 *ipp = 0;
248 wakeup(ipp);
249 } else {
250 KKASSERT(*ipp > 0);
251 *ipp = 0;
252 }
253 }
254
255 /*
256 * The pipe system call for the DTYPE_PIPE type of pipes
257 *
258 * pipe_args(int dummy)
259 *
260 * MPSAFE
261 */
262 int
sys_pipe(struct sysmsg * sysmsg,const struct pipe_args * uap)263 sys_pipe(struct sysmsg *sysmsg, const struct pipe_args *uap)
264 {
265 return kern_pipe(sysmsg->sysmsg_fds, 0);
266 }
267
268 int
sys_pipe2(struct sysmsg * sysmsg,const struct pipe2_args * uap)269 sys_pipe2(struct sysmsg *sysmsg, const struct pipe2_args *uap)
270 {
271 return kern_pipe(sysmsg->sysmsg_fds, uap->flags);
272 }
273
274 int
kern_pipe(long * fds,int flags)275 kern_pipe(long *fds, int flags)
276 {
277 struct thread *td = curthread;
278 struct filedesc *fdp = td->td_proc->p_fd;
279 struct file *rf, *wf;
280 struct pipe *pipe;
281 int fd1, fd2, error;
282
283 pipe = NULL;
284 if (pipe_create(&pipe)) {
285 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
286 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
287 return (ENFILE);
288 }
289
290 error = falloc(td->td_lwp, &rf, &fd1);
291 if (error) {
292 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
293 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
294 return (error);
295 }
296 fds[0] = fd1;
297
298 /*
299 * Warning: once we've gotten past allocation of the fd for the
300 * read-side, we can only drop the read side via fdrop() in order
301 * to avoid races against processes which manage to dup() the read
302 * side while we are blocked trying to allocate the write side.
303 */
304 rf->f_type = DTYPE_PIPE;
305 rf->f_flag = FREAD | FWRITE;
306 rf->f_ops = &pipeops;
307 rf->f_data = (void *)((intptr_t)pipe | 0);
308 if (flags & O_NONBLOCK)
309 rf->f_flag |= O_NONBLOCK;
310 if (flags & O_CLOEXEC)
311 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
312
313 error = falloc(td->td_lwp, &wf, &fd2);
314 if (error) {
315 fsetfd(fdp, NULL, fd1);
316 fdrop(rf);
317 /* pipeA has been closed by fdrop() */
318 /* close pipeB here */
319 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
320 return (error);
321 }
322 wf->f_type = DTYPE_PIPE;
323 wf->f_flag = FREAD | FWRITE;
324 wf->f_ops = &pipeops;
325 wf->f_data = (void *)((intptr_t)pipe | 1);
326 if (flags & O_NONBLOCK)
327 wf->f_flag |= O_NONBLOCK;
328 if (flags & O_CLOEXEC)
329 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
330
331 fds[1] = fd2;
332
333 /*
334 * Once activated the peer relationship remains valid until
335 * both sides are closed.
336 */
337 fsetfd(fdp, rf, fd1);
338 fsetfd(fdp, wf, fd2);
339 fdrop(rf);
340 fdrop(wf);
341
342 return (0);
343 }
344
345 /*
346 * [re]allocates KVA for the pipe's circular buffer. The space is
347 * pageable. Called twice to setup full-duplex communications.
348 *
349 * NOTE: Independent vm_object's are used to improve performance.
350 *
351 * Returns 0 on success, ENOMEM on failure.
352 */
353 static int
pipespace(struct pipe * pipe,struct pipebuf * pb,size_t size)354 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
355 {
356 struct vm_object *object;
357 caddr_t buffer;
358 vm_pindex_t npages;
359 int error;
360
361 size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
362 if (size < 16384)
363 size = 16384;
364 if (size > 1024*1024)
365 size = 1024*1024;
366
367 npages = round_page(size) / PAGE_SIZE;
368 object = pb->object;
369
370 /*
371 * [re]create the object if necessary and reserve space for it
372 * in the kernel_map. The object and memory are pageable. On
373 * success, free the old resources before assigning the new
374 * ones.
375 */
376 if (object == NULL || object->size != npages) {
377 object = vm_object_allocate(OBJT_DEFAULT, npages);
378 buffer = (caddr_t)vm_map_min(kernel_map);
379
380 error = vm_map_find(kernel_map, object, NULL,
381 0, (vm_offset_t *)&buffer, size,
382 PAGE_SIZE, TRUE,
383 VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
384 VM_PROT_ALL, VM_PROT_ALL, 0);
385
386 if (error != KERN_SUCCESS) {
387 vm_object_deallocate(object);
388 return (ENOMEM);
389 }
390 pipe_free_kmem(pb);
391 pb->object = object;
392 pb->buffer = buffer;
393 pb->size = size;
394 }
395 pb->rindex = 0;
396 pb->windex = 0;
397
398 return (0);
399 }
400
401 /*
402 * Initialize and allocate VM and memory for pipe, pulling the pipe from
403 * our per-cpu cache if possible.
404 *
405 * Returns 0 on success, else an error code (typically ENOMEM). Caller
406 * must still deallocate the pipe on failure.
407 */
408 static int
pipe_create(struct pipe ** pipep)409 pipe_create(struct pipe **pipep)
410 {
411 globaldata_t gd = mycpu;
412 struct pipe *pipe;
413 int error;
414
415 if ((pipe = gd->gd_pipeq) != NULL) {
416 gd->gd_pipeq = pipe->next;
417 --gd->gd_pipeqcount;
418 pipe->next = NULL;
419 } else {
420 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
421 pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
422 lwkt_token_init(&pipe->bufferA.rlock, "piper");
423 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
424 lwkt_token_init(&pipe->bufferB.rlock, "piper");
425 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
426 }
427 *pipep = pipe;
428 if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
429 return (error);
430 }
431 if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
432 return (error);
433 }
434 vfs_timestamp(&pipe->ctime);
435 pipe->bufferA.atime = pipe->ctime;
436 pipe->bufferA.mtime = pipe->ctime;
437 pipe->bufferB.atime = pipe->ctime;
438 pipe->bufferB.mtime = pipe->ctime;
439 pipe->open_count = 2;
440
441 return (0);
442 }
443
444 /*
445 * Read data from a pipe
446 */
447 static int
pipe_read(struct file * fp,struct uio * uio,struct ucred * cred,int fflags)448 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
449 {
450 struct pipebuf *rpb;
451 struct pipebuf *wpb;
452 struct pipe *pipe;
453 size_t nread = 0;
454 size_t size; /* total bytes available */
455 size_t nsize; /* total bytes to read */
456 size_t rindex; /* contiguous bytes available */
457 int notify_writer;
458 int bigread;
459 int bigcount;
460 int error;
461 int nbio;
462
463 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
464 if ((intptr_t)fp->f_data & 1) {
465 rpb = &pipe->bufferB;
466 wpb = &pipe->bufferA;
467 } else {
468 rpb = &pipe->bufferA;
469 wpb = &pipe->bufferB;
470 }
471 atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
472
473 if (uio->uio_resid == 0)
474 return(0);
475
476 /*
477 * Calculate nbio
478 */
479 if (fflags & O_FBLOCKING)
480 nbio = 0;
481 else if (fflags & O_FNONBLOCKING)
482 nbio = 1;
483 else if (fp->f_flag & O_NONBLOCK)
484 nbio = 1;
485 else
486 nbio = 0;
487
488 /*
489 * 'quick' NBIO test before things get expensive.
490 */
491 if (nbio && rpb->rindex == rpb->windex &&
492 (rpb->state & PIPE_REOF) == 0) {
493 return EAGAIN;
494 }
495
496 /*
497 * Reads are serialized. Note however that buffer.buffer and
498 * buffer.size can change out from under us when the number
499 * of bytes in the buffer are zero due to the write-side doing a
500 * pipespace().
501 */
502 lwkt_gettoken(&rpb->rlock);
503 error = pipe_start_uio(&rpb->rip);
504 if (error) {
505 lwkt_reltoken(&rpb->rlock);
506 return (error);
507 }
508 notify_writer = 0;
509
510 bigread = (uio->uio_resid > 10 * 1024 * 1024);
511 bigcount = 10;
512
513 while (uio->uio_resid) {
514 /*
515 * Don't hog the cpu.
516 */
517 if (bigread && --bigcount == 0) {
518 lwkt_user_yield();
519 bigcount = 10;
520 if (CURSIG(curthread->td_lwp)) {
521 error = EINTR;
522 break;
523 }
524 }
525
526 /*
527 * lfence required to avoid read-reordering of buffer
528 * contents prior to validation of size.
529 */
530 size = rpb->windex - rpb->rindex;
531 cpu_lfence();
532 if (size) {
533 rindex = rpb->rindex & (rpb->size - 1);
534 nsize = size;
535 if (nsize > rpb->size - rindex)
536 nsize = rpb->size - rindex;
537 nsize = szmin(nsize, uio->uio_resid);
538
539 /*
540 * Limit how much we move in one go so we have a
541 * chance to kick the writer while data is still
542 * available in the pipe. This avoids getting into
543 * a ping-pong with the writer.
544 */
545 if (nsize > (rpb->size >> 1))
546 nsize = rpb->size >> 1;
547
548 error = uiomove(&rpb->buffer[rindex], nsize, uio);
549 if (error)
550 break;
551 rpb->rindex += nsize;
552 nread += nsize;
553
554 /*
555 * If the FIFO is still over half full just continue
556 * and do not try to notify the writer yet. If
557 * less than half full notify any waiting writer.
558 */
559 if (size - nsize > (rpb->size >> 1)) {
560 notify_writer = 0;
561 } else {
562 notify_writer = 1;
563 pipesignal(rpb, PIPE_WANTW);
564 }
565 continue;
566 }
567
568 /*
569 * If the "write-side" was blocked we wake it up. This code
570 * is reached when the buffer is completely emptied.
571 */
572 pipesignal(rpb, PIPE_WANTW);
573
574 /*
575 * Pick up our copy loop again if the writer sent data to
576 * us while we were messing around.
577 *
578 * On a SMP box poll up to pipe_delay nanoseconds for new
579 * data. Typically a value of 2000 to 4000 is sufficient
580 * to eradicate most IPIs/tsleeps/wakeups when a pipe
581 * is used for synchronous communications with small packets,
582 * and 8000 or so (8uS) will pipeline large buffer xfers
583 * between cpus over a pipe.
584 *
585 * For synchronous communications a hit means doing a
586 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
587 * where as miss requiring a tsleep/wakeup sequence
588 * will take 7uS or more.
589 */
590 if (rpb->windex != rpb->rindex)
591 continue;
592
593 #ifdef _RDTSC_SUPPORTED_
594 if (pipe_delay) {
595 int64_t tsc_target;
596 int good = 0;
597
598 tsc_target = tsc_get_target(pipe_delay);
599 while (tsc_test_target(tsc_target) == 0) {
600 cpu_lfence();
601 if (rpb->windex != rpb->rindex) {
602 good = 1;
603 break;
604 }
605 cpu_pause();
606 }
607 if (good)
608 continue;
609 }
610 #endif
611
612 /*
613 * Detect EOF condition, do not set error.
614 */
615 if (rpb->state & PIPE_REOF)
616 break;
617
618 /*
619 * Break if some data was read, or if this was a non-blocking
620 * read.
621 */
622 if (nread > 0)
623 break;
624
625 if (nbio) {
626 error = EAGAIN;
627 break;
628 }
629
630 /*
631 * Last chance, interlock with WANTR
632 */
633 tsleep_interlock(rpb, PCATCH);
634 atomic_set_int(&rpb->state, PIPE_WANTR);
635
636 /*
637 * Retest bytes available after memory barrier above.
638 */
639 size = rpb->windex - rpb->rindex;
640 if (size)
641 continue;
642
643 /*
644 * Retest EOF after memory barrier above.
645 */
646 if (rpb->state & PIPE_REOF)
647 break;
648
649 /*
650 * Wait for more data or state change
651 */
652 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
653 if (error)
654 break;
655 }
656 pipe_end_uio(&rpb->rip);
657
658 /*
659 * Uptime last access time
660 */
661 if (error == 0 && nread && rpb->lticks != ticks) {
662 vfs_timestamp(&rpb->atime);
663 rpb->lticks = ticks;
664 }
665
666 /*
667 * If we drained the FIFO more then half way then handle
668 * write blocking hysteresis.
669 *
670 * Note that PIPE_WANTW cannot be set by the writer without
671 * it holding both rlock and wlock, so we can test it
672 * while holding just rlock.
673 */
674 if (notify_writer) {
675 /*
676 * Synchronous blocking is done on the pipe involved
677 */
678 pipesignal(rpb, PIPE_WANTW);
679
680 /*
681 * But we may also have to deal with a kqueue which is
682 * stored on the same pipe as its descriptor, so a
683 * EVFILT_WRITE event waiting for our side to drain will
684 * be on the other side.
685 */
686 pipewakeup(wpb, 0);
687 }
688 /*size = rpb->windex - rpb->rindex;*/
689 lwkt_reltoken(&rpb->rlock);
690
691 return (error);
692 }
693
694 static int
pipe_write(struct file * fp,struct uio * uio,struct ucred * cred,int fflags)695 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
696 {
697 struct pipebuf *rpb;
698 struct pipebuf *wpb;
699 struct pipe *pipe;
700 size_t windex;
701 size_t space;
702 size_t wcount;
703 size_t orig_resid;
704 int bigwrite;
705 int bigcount;
706 int error;
707 int nbio;
708
709 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
710 if ((intptr_t)fp->f_data & 1) {
711 rpb = &pipe->bufferB;
712 wpb = &pipe->bufferA;
713 } else {
714 rpb = &pipe->bufferA;
715 wpb = &pipe->bufferB;
716 }
717
718 /*
719 * Calculate nbio
720 */
721 if (fflags & O_FBLOCKING)
722 nbio = 0;
723 else if (fflags & O_FNONBLOCKING)
724 nbio = 1;
725 else if (fp->f_flag & O_NONBLOCK)
726 nbio = 1;
727 else
728 nbio = 0;
729
730 /*
731 * 'quick' NBIO test before things get expensive.
732 */
733 if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
734 uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
735 return EAGAIN;
736 }
737
738 /*
739 * Writes go to the peer. The peer will always exist.
740 */
741 lwkt_gettoken(&wpb->wlock);
742 if (wpb->state & PIPE_WEOF) {
743 lwkt_reltoken(&wpb->wlock);
744 return (EPIPE);
745 }
746
747 /*
748 * Degenerate case (EPIPE takes prec)
749 */
750 if (uio->uio_resid == 0) {
751 lwkt_reltoken(&wpb->wlock);
752 return(0);
753 }
754
755 /*
756 * Writes are serialized (start_uio must be called with wlock)
757 */
758 error = pipe_start_uio(&wpb->wip);
759 if (error) {
760 lwkt_reltoken(&wpb->wlock);
761 return (error);
762 }
763
764 orig_resid = uio->uio_resid;
765 wcount = 0;
766
767 bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
768 bigcount = 10;
769
770 while (uio->uio_resid) {
771 if (wpb->state & PIPE_WEOF) {
772 error = EPIPE;
773 break;
774 }
775
776 /*
777 * Don't hog the cpu.
778 */
779 if (bigwrite && --bigcount == 0) {
780 lwkt_user_yield();
781 bigcount = 10;
782 if (CURSIG(curthread->td_lwp)) {
783 error = EINTR;
784 break;
785 }
786 }
787
788 windex = wpb->windex & (wpb->size - 1);
789 space = wpb->size - (wpb->windex - wpb->rindex);
790
791 /*
792 * Writes of size <= PIPE_BUF must be atomic.
793 */
794 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
795 space = 0;
796
797 /*
798 * Write to fill, read size handles write hysteresis. Also
799 * additional restrictions can cause select-based non-blocking
800 * writes to spin.
801 */
802 if (space > 0) {
803 size_t segsize;
804
805 /*
806 * We want to notify a potentially waiting reader
807 * before we exhaust the write buffer for SMP
808 * pipelining. Otherwise the write/read will begin
809 * to ping-pong.
810 */
811 space = szmin(space, uio->uio_resid);
812 if (space > (wpb->size >> 1))
813 space = (wpb->size >> 1);
814
815 /*
816 * First segment to transfer is minimum of
817 * transfer size and contiguous space in
818 * pipe buffer. If first segment to transfer
819 * is less than the transfer size, we've got
820 * a wraparound in the buffer.
821 */
822 segsize = wpb->size - windex;
823 if (segsize > space)
824 segsize = space;
825
826 /*
827 * If this is the first loop and the reader is
828 * blocked, do a preemptive wakeup of the reader.
829 *
830 * On SMP the IPI latency plus the wlock interlock
831 * on the reader side is the fastest way to get the
832 * reader going. (The scheduler will hard loop on
833 * lock tokens).
834 */
835 if (wcount == 0)
836 pipesignal(wpb, PIPE_WANTR);
837
838 /*
839 * Transfer segment, which may include a wrap-around.
840 * Update windex to account for both all in one go
841 * so the reader can read() the data atomically.
842 */
843 error = uiomove(&wpb->buffer[windex], segsize, uio);
844 if (error == 0 && segsize < space) {
845 segsize = space - segsize;
846 error = uiomove(&wpb->buffer[0], segsize, uio);
847 }
848 if (error)
849 break;
850
851 /*
852 * Memory fence prior to windex updating (note: not
853 * needed so this is a NOP on Intel).
854 */
855 cpu_sfence();
856 wpb->windex += space;
857
858 /*
859 * Signal reader
860 */
861 if (wcount != 0)
862 pipesignal(wpb, PIPE_WANTR);
863 wcount += space;
864 continue;
865 }
866
867 /*
868 * Wakeup any pending reader
869 */
870 pipesignal(wpb, PIPE_WANTR);
871
872 /*
873 * don't block on non-blocking I/O
874 */
875 if (nbio) {
876 error = EAGAIN;
877 break;
878 }
879
880 #ifdef _RDTSC_SUPPORTED_
881 if (pipe_delay) {
882 int64_t tsc_target;
883 int good = 0;
884
885 tsc_target = tsc_get_target(pipe_delay);
886 while (tsc_test_target(tsc_target) == 0) {
887 cpu_lfence();
888 space = wpb->size - (wpb->windex - wpb->rindex);
889 if ((space < uio->uio_resid) &&
890 (orig_resid <= PIPE_BUF)) {
891 space = 0;
892 }
893 if (space) {
894 good = 1;
895 break;
896 }
897 cpu_pause();
898 }
899 if (good)
900 continue;
901 }
902 #endif
903
904 /*
905 * Interlocked test. Atomic op enforces the memory barrier.
906 */
907 tsleep_interlock(wpb, PCATCH);
908 atomic_set_int(&wpb->state, PIPE_WANTW);
909
910 /*
911 * Retest space available after memory barrier above.
912 * Writes of size <= PIPE_BUF must be atomic.
913 */
914 space = wpb->size - (wpb->windex - wpb->rindex);
915 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
916 space = 0;
917
918 /*
919 * Retest EOF after memory barrier above.
920 */
921 if (wpb->state & PIPE_WEOF) {
922 error = EPIPE;
923 break;
924 }
925
926 /*
927 * We have no more space and have something to offer,
928 * wake up select/poll/kq.
929 */
930 if (space == 0) {
931 pipewakeup(wpb, 1);
932 error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
933 }
934
935 /*
936 * Break out if we errored or the read side wants us to go
937 * away.
938 */
939 if (error)
940 break;
941 if (wpb->state & PIPE_WEOF) {
942 error = EPIPE;
943 break;
944 }
945 }
946 pipe_end_uio(&wpb->wip);
947
948 /*
949 * If we have put any characters in the buffer, we wake up
950 * the reader.
951 *
952 * Both rlock and wlock are required to be able to modify pipe_state.
953 */
954 if (wpb->windex != wpb->rindex) {
955 pipesignal(wpb, PIPE_WANTR);
956 pipewakeup(wpb, 1);
957 }
958
959 /*
960 * Don't return EPIPE if I/O was successful
961 */
962 if ((wpb->rindex == wpb->windex) &&
963 (uio->uio_resid == 0) &&
964 (error == EPIPE)) {
965 error = 0;
966 }
967
968 if (error == 0 && wpb->lticks != ticks) {
969 vfs_timestamp(&wpb->mtime);
970 wpb->lticks = ticks;
971 }
972
973 /*
974 * We have something to offer,
975 * wake up select/poll/kq.
976 */
977 /*space = wpb->windex - wpb->rindex;*/
978 lwkt_reltoken(&wpb->wlock);
979
980 return (error);
981 }
982
983 /*
984 * we implement a very minimal set of ioctls for compatibility with sockets.
985 */
986 static int
pipe_ioctl(struct file * fp,u_long cmd,caddr_t data,struct ucred * cred,struct sysmsg * msg)987 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
988 struct ucred *cred, struct sysmsg *msg)
989 {
990 struct pipebuf *rpb;
991 struct pipe *pipe;
992 int error;
993
994 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
995 if ((intptr_t)fp->f_data & 1) {
996 rpb = &pipe->bufferB;
997 } else {
998 rpb = &pipe->bufferA;
999 }
1000
1001 lwkt_gettoken(&rpb->rlock);
1002 lwkt_gettoken(&rpb->wlock);
1003
1004 switch (cmd) {
1005 case FIOASYNC:
1006 if (*(int *)data) {
1007 atomic_set_int(&rpb->state, PIPE_ASYNC);
1008 } else {
1009 atomic_clear_int(&rpb->state, PIPE_ASYNC);
1010 }
1011 error = 0;
1012 break;
1013 case FIONREAD:
1014 *(int *)data = (int)(rpb->windex - rpb->rindex);
1015 error = 0;
1016 break;
1017 case FIOSETOWN:
1018 error = fsetown(*(int *)data, &rpb->sigio);
1019 break;
1020 case FIOGETOWN:
1021 *(int *)data = fgetown(&rpb->sigio);
1022 error = 0;
1023 break;
1024 case TIOCSPGRP:
1025 /* This is deprecated, FIOSETOWN should be used instead. */
1026 error = fsetown(-(*(int *)data), &rpb->sigio);
1027 break;
1028
1029 case TIOCGPGRP:
1030 /* This is deprecated, FIOGETOWN should be used instead. */
1031 *(int *)data = -fgetown(&rpb->sigio);
1032 error = 0;
1033 break;
1034 default:
1035 error = ENOTTY;
1036 break;
1037 }
1038 lwkt_reltoken(&rpb->wlock);
1039 lwkt_reltoken(&rpb->rlock);
1040
1041 return (error);
1042 }
1043
1044 /*
1045 * MPSAFE
1046 */
1047 static int
pipe_stat(struct file * fp,struct stat * ub,struct ucred * cred)1048 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1049 {
1050 struct pipebuf *rpb;
1051 struct pipe *pipe;
1052
1053 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1054 if ((intptr_t)fp->f_data & 1) {
1055 rpb = &pipe->bufferB;
1056 } else {
1057 rpb = &pipe->bufferA;
1058 }
1059
1060 bzero((caddr_t)ub, sizeof(*ub));
1061 ub->st_mode = S_IFIFO;
1062 ub->st_blksize = rpb->size;
1063 ub->st_size = rpb->windex - rpb->rindex;
1064 ub->st_blocks = howmany(ub->st_size, ub->st_blksize);
1065 ub->st_atimespec = rpb->atime;
1066 ub->st_mtimespec = rpb->mtime;
1067 ub->st_ctimespec = pipe->ctime;
1068 ub->st_uid = fp->f_cred->cr_uid;
1069 ub->st_gid = fp->f_cred->cr_gid;
1070 ub->st_ino = pipe->inum;
1071 /*
1072 * Left as 0: st_dev, st_nlink, st_rdev,
1073 * st_flags, st_gen.
1074 * XXX (st_dev, st_ino) should be unique.
1075 */
1076
1077 return (0);
1078 }
1079
1080 static int
pipe_close(struct file * fp)1081 pipe_close(struct file *fp)
1082 {
1083 struct pipebuf *rpb;
1084 struct pipebuf *wpb;
1085 struct pipe *pipe;
1086
1087 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1088 if ((intptr_t)fp->f_data & 1) {
1089 rpb = &pipe->bufferB;
1090 wpb = &pipe->bufferA;
1091 } else {
1092 rpb = &pipe->bufferA;
1093 wpb = &pipe->bufferB;
1094 }
1095
1096 fp->f_ops = &badfileops;
1097 fp->f_data = NULL;
1098 funsetown(&rpb->sigio);
1099 pipeclose(pipe, rpb, wpb);
1100
1101 return (0);
1102 }
1103
1104 /*
1105 * Shutdown one or both directions of a full-duplex pipe.
1106 */
1107 static int
pipe_shutdown(struct file * fp,int how)1108 pipe_shutdown(struct file *fp, int how)
1109 {
1110 struct pipebuf *rpb;
1111 struct pipebuf *wpb;
1112 struct pipe *pipe;
1113 int error = EPIPE;
1114
1115 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1116 if ((intptr_t)fp->f_data & 1) {
1117 rpb = &pipe->bufferB;
1118 wpb = &pipe->bufferA;
1119 } else {
1120 rpb = &pipe->bufferA;
1121 wpb = &pipe->bufferB;
1122 }
1123
1124 /*
1125 * We modify pipe_state on both pipes, which means we need
1126 * all four tokens!
1127 */
1128 lwkt_gettoken(&rpb->rlock);
1129 lwkt_gettoken(&rpb->wlock);
1130 lwkt_gettoken(&wpb->rlock);
1131 lwkt_gettoken(&wpb->wlock);
1132
1133 switch(how) {
1134 case SHUT_RDWR:
1135 case SHUT_RD:
1136 /*
1137 * EOF on my reads and peer writes
1138 */
1139 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1140 if (rpb->state & PIPE_WANTR) {
1141 rpb->state &= ~PIPE_WANTR;
1142 wakeup(rpb);
1143 }
1144 if (rpb->state & PIPE_WANTW) {
1145 rpb->state &= ~PIPE_WANTW;
1146 wakeup(rpb);
1147 }
1148 error = 0;
1149 if (how == SHUT_RD)
1150 break;
1151 /* fall through */
1152 case SHUT_WR:
1153 /*
1154 * EOF on peer reads and my writes
1155 */
1156 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1157 if (wpb->state & PIPE_WANTR) {
1158 wpb->state &= ~PIPE_WANTR;
1159 wakeup(wpb);
1160 }
1161 if (wpb->state & PIPE_WANTW) {
1162 wpb->state &= ~PIPE_WANTW;
1163 wakeup(wpb);
1164 }
1165 error = 0;
1166 break;
1167 }
1168 pipewakeup(rpb, 1);
1169 pipewakeup(wpb, 1);
1170
1171 lwkt_reltoken(&wpb->wlock);
1172 lwkt_reltoken(&wpb->rlock);
1173 lwkt_reltoken(&rpb->wlock);
1174 lwkt_reltoken(&rpb->rlock);
1175
1176 return (error);
1177 }
1178
1179 /*
1180 * Destroy the pipe buffer.
1181 */
1182 static void
pipe_free_kmem(struct pipebuf * pb)1183 pipe_free_kmem(struct pipebuf *pb)
1184 {
1185 if (pb->buffer != NULL) {
1186 kmem_free(kernel_map, (vm_offset_t)pb->buffer, pb->size);
1187 pb->buffer = NULL;
1188 pb->object = NULL;
1189 }
1190 }
1191
1192 /*
1193 * Close one half of the pipe. We are closing the pipe for reading on rpb
1194 * and writing on wpb. This routine must be called twice with the pipebufs
1195 * reversed to close both directions.
1196 */
1197 static void
pipeclose(struct pipe * pipe,struct pipebuf * rpb,struct pipebuf * wpb)1198 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1199 {
1200 globaldata_t gd;
1201
1202 if (pipe == NULL)
1203 return;
1204
1205 /*
1206 * We need both the read and write tokens to modify pipe_state.
1207 */
1208 lwkt_gettoken(&rpb->rlock);
1209 lwkt_gettoken(&rpb->wlock);
1210
1211 /*
1212 * Set our state, wakeup anyone waiting in select/poll/kq, and
1213 * wakeup anyone blocked on our pipe. No action if our side
1214 * is already closed.
1215 */
1216 if (rpb->state & PIPE_CLOSED) {
1217 lwkt_reltoken(&rpb->wlock);
1218 lwkt_reltoken(&rpb->rlock);
1219 return;
1220 }
1221
1222 atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1223 pipewakeup(rpb, 1);
1224 if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1225 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1226 wakeup(rpb);
1227 }
1228 lwkt_reltoken(&rpb->wlock);
1229 lwkt_reltoken(&rpb->rlock);
1230
1231 /*
1232 * Disconnect from peer.
1233 */
1234 lwkt_gettoken(&wpb->rlock);
1235 lwkt_gettoken(&wpb->wlock);
1236
1237 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1238 pipewakeup(wpb, 1);
1239 if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1240 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1241 wakeup(wpb);
1242 }
1243 if (SLIST_FIRST(&wpb->kq.ki_note))
1244 KNOTE(&wpb->kq.ki_note, 0);
1245 lwkt_reltoken(&wpb->wlock);
1246 lwkt_reltoken(&wpb->rlock);
1247
1248 /*
1249 * Free resources once both sides are closed. We maintain a pcpu
1250 * cache to improve performance, so the actual tear-down case is
1251 * limited to bulk situations.
1252 *
1253 * However, the bulk tear-down case can cause intense contention
1254 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1255 * of processes are killed at the same time. To deal with this we
1256 * use a pcpu mutex to maintain concurrency but also limit the
1257 * number of threads banging on the map and pmap.
1258 *
1259 * We use the mtx mechanism instead of the lockmgr mechanism because
1260 * the mtx mechanism utilizes a queued design which will not break
1261 * down in the face of thousands to hundreds of thousands of
1262 * processes trying to free pipes simultaneously. The lockmgr
1263 * mechanism will wind up waking them all up each time a lock
1264 * cycles.
1265 */
1266 if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1267 gd = mycpu;
1268 if (gd->gd_pipeqcount >= pipe_maxcache) {
1269 mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1270 pipe_free_kmem(rpb);
1271 pipe_free_kmem(wpb);
1272 mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1273 kfree(pipe, M_PIPE);
1274 } else {
1275 rpb->state = 0;
1276 wpb->state = 0;
1277 pipe->next = gd->gd_pipeq;
1278 gd->gd_pipeq = pipe;
1279 ++gd->gd_pipeqcount;
1280 }
1281 }
1282 }
1283
1284 static int
pipe_kqfilter(struct file * fp,struct knote * kn)1285 pipe_kqfilter(struct file *fp, struct knote *kn)
1286 {
1287 struct pipebuf *rpb;
1288 struct pipebuf *wpb;
1289 struct pipe *pipe;
1290
1291 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1292 if ((intptr_t)fp->f_data & 1) {
1293 rpb = &pipe->bufferB;
1294 wpb = &pipe->bufferA;
1295 } else {
1296 rpb = &pipe->bufferA;
1297 wpb = &pipe->bufferB;
1298 }
1299
1300 switch (kn->kn_filter) {
1301 case EVFILT_READ:
1302 kn->kn_fop = &pipe_rfiltops;
1303 break;
1304 case EVFILT_WRITE:
1305 kn->kn_fop = &pipe_wfiltops;
1306 break;
1307 default:
1308 return (EOPNOTSUPP);
1309 }
1310
1311 if (rpb == &pipe->bufferA)
1312 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1313 else
1314 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1315
1316 knote_insert(&rpb->kq.ki_note, kn);
1317
1318 return (0);
1319 }
1320
1321 static void
filt_pipedetach(struct knote * kn)1322 filt_pipedetach(struct knote *kn)
1323 {
1324 struct pipebuf *rpb;
1325 struct pipebuf *wpb;
1326 struct pipe *pipe;
1327
1328 pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1329 if ((intptr_t)kn->kn_hook & 1) {
1330 rpb = &pipe->bufferB;
1331 wpb = &pipe->bufferA;
1332 } else {
1333 rpb = &pipe->bufferA;
1334 wpb = &pipe->bufferB;
1335 }
1336 knote_remove(&rpb->kq.ki_note, kn);
1337 }
1338
1339 /*ARGSUSED*/
1340 static int
filt_piperead(struct knote * kn,long hint)1341 filt_piperead(struct knote *kn, long hint)
1342 {
1343 struct pipebuf *rpb;
1344 struct pipebuf *wpb;
1345 struct pipe *pipe;
1346 int ready = 0;
1347
1348 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1349 if ((intptr_t)kn->kn_fp->f_data & 1) {
1350 rpb = &pipe->bufferB;
1351 wpb = &pipe->bufferA;
1352 } else {
1353 rpb = &pipe->bufferA;
1354 wpb = &pipe->bufferB;
1355 }
1356
1357 /*
1358 * We shouldn't need the pipe locks because the knote itself is
1359 * locked via KN_PROCESSING. If we lose a race against the writer,
1360 * the writer will just issue a KNOTE() after us.
1361 */
1362 #if 0
1363 lwkt_gettoken(&rpb->rlock);
1364 lwkt_gettoken(&rpb->wlock);
1365 #endif
1366
1367 kn->kn_data = rpb->windex - rpb->rindex;
1368 if (kn->kn_data < 0)
1369 kn->kn_data = 0;
1370
1371 if (rpb->state & PIPE_REOF) {
1372 /*
1373 * Only set NODATA if all data has been exhausted
1374 */
1375 if (kn->kn_data == 0)
1376 kn->kn_flags |= EV_NODATA;
1377 kn->kn_flags |= EV_EOF;
1378
1379 /*
1380 * Only set HUP if the pipe is completely closed.
1381 * half-closed does not count (to make the behavior
1382 * the same as linux).
1383 */
1384 if (wpb->state & PIPE_CLOSED) {
1385 kn->kn_flags |= EV_HUP;
1386 ready = 1;
1387 }
1388 }
1389
1390 #if 0
1391 lwkt_reltoken(&rpb->wlock);
1392 lwkt_reltoken(&rpb->rlock);
1393 #endif
1394
1395 if (!ready && (kn->kn_sfflags & NOTE_HUPONLY) == 0)
1396 ready = kn->kn_data > 0;
1397
1398 return (ready);
1399 }
1400
1401 /*ARGSUSED*/
1402 static int
filt_pipewrite(struct knote * kn,long hint)1403 filt_pipewrite(struct knote *kn, long hint)
1404 {
1405 struct pipebuf *rpb;
1406 struct pipebuf *wpb;
1407 struct pipe *pipe;
1408 int ready = 0;
1409
1410 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1411 if ((intptr_t)kn->kn_fp->f_data & 1) {
1412 rpb = &pipe->bufferB;
1413 wpb = &pipe->bufferA;
1414 } else {
1415 rpb = &pipe->bufferA;
1416 wpb = &pipe->bufferB;
1417 }
1418
1419 kn->kn_data = 0;
1420 if (wpb->state & PIPE_CLOSED) {
1421 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
1422 return (1);
1423 }
1424
1425 /*
1426 * We shouldn't need the pipe locks because the knote itself is
1427 * locked via KN_PROCESSING. If we lose a race against the reader,
1428 * the writer will just issue a KNOTE() after us.
1429 */
1430 #if 0
1431 lwkt_gettoken(&wpb->rlock);
1432 lwkt_gettoken(&wpb->wlock);
1433 #endif
1434
1435 if (wpb->state & PIPE_WEOF) {
1436 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
1437 ready = 1;
1438 }
1439
1440 if (!ready) {
1441 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1442 if (kn->kn_data < 0)
1443 kn->kn_data = 0;
1444 }
1445
1446 #if 0
1447 lwkt_reltoken(&wpb->wlock);
1448 lwkt_reltoken(&wpb->rlock);
1449 #endif
1450
1451 if (!ready)
1452 ready = kn->kn_data >= PIPE_BUF;
1453
1454 return (ready);
1455 }
1456