xref: /dflybsd-src/sys/kern/sys_pipe.c (revision f0140465f072dacd8b485e76b254a999329a1d3c)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $
21  */
22 
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/kernel.h>
32 #include <sys/proc.h>
33 #include <sys/fcntl.h>
34 #include <sys/file.h>
35 #include <sys/filedesc.h>
36 #include <sys/filio.h>
37 #include <sys/ttycom.h>
38 #include <sys/stat.h>
39 #include <sys/signalvar.h>
40 #include <sys/sysproto.h>
41 #include <sys/pipe.h>
42 #include <sys/vnode.h>
43 #include <sys/uio.h>
44 #include <sys/event.h>
45 #include <sys/globaldata.h>
46 #include <sys/module.h>
47 #include <sys/malloc.h>
48 #include <sys/sysctl.h>
49 #include <sys/socket.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <sys/lock.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_zone.h>
61 
62 #include <sys/file2.h>
63 #include <sys/signal2.h>
64 
65 #include <machine/cpufunc.h>
66 
67 /*
68  * interfaces to the outside world
69  */
70 static int pipe_read (struct file *fp, struct uio *uio,
71 		struct ucred *cred, int flags);
72 static int pipe_write (struct file *fp, struct uio *uio,
73 		struct ucred *cred, int flags);
74 static int pipe_close (struct file *fp);
75 static int pipe_shutdown (struct file *fp, int how);
76 static int pipe_kqfilter (struct file *fp, struct knote *kn);
77 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
78 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
79 		struct ucred *cred, struct sysmsg *msg);
80 
81 static struct fileops pipeops = {
82 	.fo_read = pipe_read,
83 	.fo_write = pipe_write,
84 	.fo_ioctl = pipe_ioctl,
85 	.fo_kqfilter = pipe_kqfilter,
86 	.fo_stat = pipe_stat,
87 	.fo_close = pipe_close,
88 	.fo_shutdown = pipe_shutdown
89 };
90 
91 static void	filt_pipedetach(struct knote *kn);
92 static int	filt_piperead(struct knote *kn, long hint);
93 static int	filt_pipewrite(struct knote *kn, long hint);
94 
95 static struct filterops pipe_rfiltops =
96 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
97 static struct filterops pipe_wfiltops =
98 	{ FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
99 
100 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
101 
102 /*
103  * Default pipe buffer size(s), this can be kind-of large now because pipe
104  * space is pageable.  The pipe code will try to maintain locality of
105  * reference for performance reasons, so small amounts of outstanding I/O
106  * will not wipe the cache.
107  */
108 #define MINPIPESIZE (PIPE_SIZE/3)
109 #define MAXPIPESIZE (2*PIPE_SIZE/3)
110 
111 /*
112  * Limit the number of "big" pipes
113  */
114 #define LIMITBIGPIPES	64
115 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
116 
117 static int pipe_maxbig = LIMITBIGPIPES;
118 static int pipe_maxcache = PIPEQ_MAX_CACHE;
119 static int pipe_bigcount;
120 static int pipe_nbig;
121 static int pipe_bcache_alloc;
122 static int pipe_bkmem_alloc;
123 static int pipe_rblocked_count;
124 static int pipe_wblocked_count;
125 
126 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
127 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
128         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
129 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
130         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
131 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
132         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
133 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
134         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
135 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
136         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
137 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
138         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
139 #ifdef SMP
140 static int pipe_delay = 5000;	/* 5uS default */
141 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
142         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
143 #endif
144 #if !defined(NO_PIPE_SYSCTL_STATS)
145 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
146         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
147 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
148         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
149 #endif
150 
151 /*
152  * Auto-size pipe cache to reduce kmem allocations and frees.
153  */
154 static
155 void
156 pipeinit(void *dummy)
157 {
158 	size_t mbytes = kmem_lim_size();
159 
160 	if (pipe_maxbig == LIMITBIGPIPES) {
161 		if (mbytes >= 7 * 1024)
162 			pipe_maxbig *= 2;
163 		if (mbytes >= 15 * 1024)
164 			pipe_maxbig *= 2;
165 	}
166 	if (pipe_maxcache == PIPEQ_MAX_CACHE) {
167 		if (mbytes >= 7 * 1024)
168 			pipe_maxcache *= 2;
169 		if (mbytes >= 15 * 1024)
170 			pipe_maxcache *= 2;
171 	}
172 }
173 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL)
174 
175 static void pipeclose (struct pipe *cpipe);
176 static void pipe_free_kmem (struct pipe *cpipe);
177 static int pipe_create (struct pipe **cpipep);
178 static int pipespace (struct pipe *cpipe, int size);
179 
180 static __inline void
181 pipewakeup(struct pipe *cpipe, int dosigio)
182 {
183 	if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
184 		lwkt_gettoken(&proc_token);
185 		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
186 		lwkt_reltoken(&proc_token);
187 	}
188 	KNOTE(&cpipe->pipe_kq.ki_note, 0);
189 }
190 
191 /*
192  * These routines are called before and after a UIO.  The UIO
193  * may block, causing our held tokens to be lost temporarily.
194  *
195  * We use these routines to serialize reads against other reads
196  * and writes against other writes.
197  *
198  * The read token is held on entry so *ipp does not race.
199  */
200 static __inline int
201 pipe_start_uio(struct pipe *cpipe, int *ipp)
202 {
203 	int error;
204 
205 	while (*ipp) {
206 		*ipp = -1;
207 		error = tsleep(ipp, PCATCH, "pipexx", 0);
208 		if (error)
209 			return (error);
210 	}
211 	*ipp = 1;
212 	return (0);
213 }
214 
215 static __inline void
216 pipe_end_uio(struct pipe *cpipe, int *ipp)
217 {
218 	if (*ipp < 0) {
219 		*ipp = 0;
220 		wakeup(ipp);
221 	} else {
222 		KKASSERT(*ipp > 0);
223 		*ipp = 0;
224 	}
225 }
226 
227 /*
228  * The pipe system call for the DTYPE_PIPE type of pipes
229  *
230  * pipe_args(int dummy)
231  *
232  * MPSAFE
233  */
234 int
235 sys_pipe(struct pipe_args *uap)
236 {
237 	struct thread *td = curthread;
238 	struct filedesc *fdp = td->td_proc->p_fd;
239 	struct file *rf, *wf;
240 	struct pipe *rpipe, *wpipe;
241 	int fd1, fd2, error;
242 
243 	rpipe = wpipe = NULL;
244 	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
245 		pipeclose(rpipe);
246 		pipeclose(wpipe);
247 		return (ENFILE);
248 	}
249 
250 	error = falloc(td->td_lwp, &rf, &fd1);
251 	if (error) {
252 		pipeclose(rpipe);
253 		pipeclose(wpipe);
254 		return (error);
255 	}
256 	uap->sysmsg_fds[0] = fd1;
257 
258 	/*
259 	 * Warning: once we've gotten past allocation of the fd for the
260 	 * read-side, we can only drop the read side via fdrop() in order
261 	 * to avoid races against processes which manage to dup() the read
262 	 * side while we are blocked trying to allocate the write side.
263 	 */
264 	rf->f_type = DTYPE_PIPE;
265 	rf->f_flag = FREAD | FWRITE;
266 	rf->f_ops = &pipeops;
267 	rf->f_data = rpipe;
268 	error = falloc(td->td_lwp, &wf, &fd2);
269 	if (error) {
270 		fsetfd(fdp, NULL, fd1);
271 		fdrop(rf);
272 		/* rpipe has been closed by fdrop(). */
273 		pipeclose(wpipe);
274 		return (error);
275 	}
276 	wf->f_type = DTYPE_PIPE;
277 	wf->f_flag = FREAD | FWRITE;
278 	wf->f_ops = &pipeops;
279 	wf->f_data = wpipe;
280 	uap->sysmsg_fds[1] = fd2;
281 
282 	rpipe->pipe_slock = kmalloc(sizeof(struct lock),
283 				    M_PIPE, M_WAITOK|M_ZERO);
284 	wpipe->pipe_slock = rpipe->pipe_slock;
285 	rpipe->pipe_peer = wpipe;
286 	wpipe->pipe_peer = rpipe;
287 	lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
288 
289 	/*
290 	 * Once activated the peer relationship remains valid until
291 	 * both sides are closed.
292 	 */
293 	fsetfd(fdp, rf, fd1);
294 	fsetfd(fdp, wf, fd2);
295 	fdrop(rf);
296 	fdrop(wf);
297 
298 	return (0);
299 }
300 
301 /*
302  * Allocate kva for pipe circular buffer, the space is pageable
303  * This routine will 'realloc' the size of a pipe safely, if it fails
304  * it will retain the old buffer.
305  * If it fails it will return ENOMEM.
306  */
307 static int
308 pipespace(struct pipe *cpipe, int size)
309 {
310 	struct vm_object *object;
311 	caddr_t buffer;
312 	int npages, error;
313 
314 	npages = round_page(size) / PAGE_SIZE;
315 	object = cpipe->pipe_buffer.object;
316 
317 	/*
318 	 * [re]create the object if necessary and reserve space for it
319 	 * in the kernel_map.  The object and memory are pageable.  On
320 	 * success, free the old resources before assigning the new
321 	 * ones.
322 	 */
323 	if (object == NULL || object->size != npages) {
324 		object = vm_object_allocate(OBJT_DEFAULT, npages);
325 		buffer = (caddr_t)vm_map_min(&kernel_map);
326 
327 		error = vm_map_find(&kernel_map, object, 0,
328 				    (vm_offset_t *)&buffer,
329 				    size, PAGE_SIZE,
330 				    1, VM_MAPTYPE_NORMAL,
331 				    VM_PROT_ALL, VM_PROT_ALL,
332 				    0);
333 
334 		if (error != KERN_SUCCESS) {
335 			vm_object_deallocate(object);
336 			return (ENOMEM);
337 		}
338 		pipe_free_kmem(cpipe);
339 		cpipe->pipe_buffer.object = object;
340 		cpipe->pipe_buffer.buffer = buffer;
341 		cpipe->pipe_buffer.size = size;
342 		++pipe_bkmem_alloc;
343 	} else {
344 		++pipe_bcache_alloc;
345 	}
346 	cpipe->pipe_buffer.rindex = 0;
347 	cpipe->pipe_buffer.windex = 0;
348 	return (0);
349 }
350 
351 /*
352  * Initialize and allocate VM and memory for pipe, pulling the pipe from
353  * our per-cpu cache if possible.  For now make sure it is sized for the
354  * smaller PIPE_SIZE default.
355  */
356 static int
357 pipe_create(struct pipe **cpipep)
358 {
359 	globaldata_t gd = mycpu;
360 	struct pipe *cpipe;
361 	int error;
362 
363 	if ((cpipe = gd->gd_pipeq) != NULL) {
364 		gd->gd_pipeq = cpipe->pipe_peer;
365 		--gd->gd_pipeqcount;
366 		cpipe->pipe_peer = NULL;
367 		cpipe->pipe_wantwcnt = 0;
368 	} else {
369 		cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
370 	}
371 	*cpipep = cpipe;
372 	if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
373 		return (error);
374 	vfs_timestamp(&cpipe->pipe_ctime);
375 	cpipe->pipe_atime = cpipe->pipe_ctime;
376 	cpipe->pipe_mtime = cpipe->pipe_ctime;
377 	lwkt_token_init(&cpipe->pipe_rlock, "piper");
378 	lwkt_token_init(&cpipe->pipe_wlock, "pipew");
379 	return (0);
380 }
381 
382 static int
383 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
384 {
385 	struct pipe *rpipe;
386 	struct pipe *wpipe;
387 	int error;
388 	size_t nread = 0;
389 	int nbio;
390 	u_int size;	/* total bytes available */
391 	u_int nsize;	/* total bytes to read */
392 	u_int rindex;	/* contiguous bytes available */
393 	int notify_writer;
394 	int bigread;
395 	int bigcount;
396 
397 	atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
398 
399 	if (uio->uio_resid == 0)
400 		return(0);
401 
402 	/*
403 	 * Setup locks, calculate nbio
404 	 */
405 	rpipe = (struct pipe *)fp->f_data;
406 	wpipe = rpipe->pipe_peer;
407 	lwkt_gettoken(&rpipe->pipe_rlock);
408 
409 	if (fflags & O_FBLOCKING)
410 		nbio = 0;
411 	else if (fflags & O_FNONBLOCKING)
412 		nbio = 1;
413 	else if (fp->f_flag & O_NONBLOCK)
414 		nbio = 1;
415 	else
416 		nbio = 0;
417 
418 	/*
419 	 * Reads are serialized.  Note however that pipe_buffer.buffer and
420 	 * pipe_buffer.size can change out from under us when the number
421 	 * of bytes in the buffer are zero due to the write-side doing a
422 	 * pipespace().
423 	 */
424 	error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
425 	if (error) {
426 		lwkt_reltoken(&rpipe->pipe_rlock);
427 		return (error);
428 	}
429 	notify_writer = 0;
430 
431 	bigread = (uio->uio_resid > 10 * 1024 * 1024);
432 	bigcount = 10;
433 
434 	while (uio->uio_resid) {
435 		/*
436 		 * Don't hog the cpu.
437 		 */
438 		if (bigread && --bigcount == 0) {
439 			lwkt_user_yield();
440 			bigcount = 10;
441 			if (CURSIG(curthread->td_lwp)) {
442 				error = EINTR;
443 				break;
444 			}
445 		}
446 
447 		size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
448 		cpu_lfence();
449 		if (size) {
450 			rindex = rpipe->pipe_buffer.rindex &
451 				 (rpipe->pipe_buffer.size - 1);
452 			nsize = size;
453 			if (nsize > rpipe->pipe_buffer.size - rindex)
454 				nsize = rpipe->pipe_buffer.size - rindex;
455 			nsize = szmin(nsize, uio->uio_resid);
456 
457 			error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
458 					nsize, uio);
459 			if (error)
460 				break;
461 			cpu_mfence();
462 			rpipe->pipe_buffer.rindex += nsize;
463 			nread += nsize;
464 
465 			/*
466 			 * If the FIFO is still over half full just continue
467 			 * and do not try to notify the writer yet.
468 			 */
469 			if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
470 				notify_writer = 0;
471 				continue;
472 			}
473 
474 			/*
475 			 * When the FIFO is less then half full notify any
476 			 * waiting writer.  WANTW can be checked while
477 			 * holding just the rlock.
478 			 */
479 			notify_writer = 1;
480 			if ((rpipe->pipe_state & PIPE_WANTW) == 0)
481 				continue;
482 		}
483 
484 		/*
485 		 * If the "write-side" was blocked we wake it up.  This code
486 		 * is reached either when the buffer is completely emptied
487 		 * or if it becomes more then half-empty.
488 		 *
489 		 * Pipe_state can only be modified if both the rlock and
490 		 * wlock are held.
491 		 */
492 		if (rpipe->pipe_state & PIPE_WANTW) {
493 			lwkt_gettoken(&rpipe->pipe_wlock);
494 			if (rpipe->pipe_state & PIPE_WANTW) {
495 				rpipe->pipe_state &= ~PIPE_WANTW;
496 				lwkt_reltoken(&rpipe->pipe_wlock);
497 				wakeup(rpipe);
498 			} else {
499 				lwkt_reltoken(&rpipe->pipe_wlock);
500 			}
501 		}
502 
503 		/*
504 		 * Pick up our copy loop again if the writer sent data to
505 		 * us while we were messing around.
506 		 *
507 		 * On a SMP box poll up to pipe_delay nanoseconds for new
508 		 * data.  Typically a value of 2000 to 4000 is sufficient
509 		 * to eradicate most IPIs/tsleeps/wakeups when a pipe
510 		 * is used for synchronous communications with small packets,
511 		 * and 8000 or so (8uS) will pipeline large buffer xfers
512 		 * between cpus over a pipe.
513 		 *
514 		 * For synchronous communications a hit means doing a
515 		 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
516 		 * where as miss requiring a tsleep/wakeup sequence
517 		 * will take 7uS or more.
518 		 */
519 		if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
520 			continue;
521 
522 #if defined(SMP) && defined(_RDTSC_SUPPORTED_)
523 		if (pipe_delay) {
524 			int64_t tsc_target;
525 			int good = 0;
526 
527 			tsc_target = tsc_get_target(pipe_delay);
528 			while (tsc_test_target(tsc_target) == 0) {
529 				if (rpipe->pipe_buffer.windex !=
530 				    rpipe->pipe_buffer.rindex) {
531 					good = 1;
532 					break;
533 				}
534 			}
535 			if (good)
536 				continue;
537 		}
538 #endif
539 
540 		/*
541 		 * Detect EOF condition, do not set error.
542 		 */
543 		if (rpipe->pipe_state & PIPE_REOF)
544 			break;
545 
546 		/*
547 		 * Break if some data was read, or if this was a non-blocking
548 		 * read.
549 		 */
550 		if (nread > 0)
551 			break;
552 
553 		if (nbio) {
554 			error = EAGAIN;
555 			break;
556 		}
557 
558 		/*
559 		 * Last chance, interlock with WANTR.
560 		 */
561 		lwkt_gettoken(&rpipe->pipe_wlock);
562 		size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
563 		if (size) {
564 			lwkt_reltoken(&rpipe->pipe_wlock);
565 			continue;
566 		}
567 
568 		/*
569 		 * Retest EOF - acquiring a new token can temporarily release
570 		 * tokens already held.
571 		 */
572 		if (rpipe->pipe_state & PIPE_REOF) {
573 			lwkt_reltoken(&rpipe->pipe_wlock);
574 			break;
575 		}
576 
577 		/*
578 		 * If there is no more to read in the pipe, reset its
579 		 * pointers to the beginning.  This improves cache hit
580 		 * stats.
581 		 *
582 		 * We need both locks to modify both pointers, and there
583 		 * must also not be a write in progress or the uiomove()
584 		 * in the write might block and temporarily release
585 		 * its wlock, then reacquire and update windex.  We are
586 		 * only serialized against reads, not writes.
587 		 *
588 		 * XXX should we even bother resetting the indices?  It
589 		 *     might actually be more cache efficient not to.
590 		 */
591 		if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
592 		    rpipe->pipe_wip == 0) {
593 			rpipe->pipe_buffer.rindex = 0;
594 			rpipe->pipe_buffer.windex = 0;
595 		}
596 
597 		/*
598 		 * Wait for more data.
599 		 *
600 		 * Pipe_state can only be set if both the rlock and wlock
601 		 * are held.
602 		 */
603 		rpipe->pipe_state |= PIPE_WANTR;
604 		tsleep_interlock(rpipe, PCATCH);
605 		lwkt_reltoken(&rpipe->pipe_wlock);
606 		error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
607 		++pipe_rblocked_count;
608 		if (error)
609 			break;
610 	}
611 	pipe_end_uio(rpipe, &rpipe->pipe_rip);
612 
613 	/*
614 	 * Uptime last access time
615 	 */
616 	if (error == 0 && nread)
617 		vfs_timestamp(&rpipe->pipe_atime);
618 
619 	/*
620 	 * If we drained the FIFO more then half way then handle
621 	 * write blocking hysteresis.
622 	 *
623 	 * Note that PIPE_WANTW cannot be set by the writer without
624 	 * it holding both rlock and wlock, so we can test it
625 	 * while holding just rlock.
626 	 */
627 	if (notify_writer) {
628 		/*
629 		 * Synchronous blocking is done on the pipe involved
630 		 */
631 		if (rpipe->pipe_state & PIPE_WANTW) {
632 			lwkt_gettoken(&rpipe->pipe_wlock);
633 			if (rpipe->pipe_state & PIPE_WANTW) {
634 				rpipe->pipe_state &= ~PIPE_WANTW;
635 				lwkt_reltoken(&rpipe->pipe_wlock);
636 				wakeup(rpipe);
637 			} else {
638 				lwkt_reltoken(&rpipe->pipe_wlock);
639 			}
640 		}
641 
642 		/*
643 		 * But we may also have to deal with a kqueue which is
644 		 * stored on the same pipe as its descriptor, so a
645 		 * EVFILT_WRITE event waiting for our side to drain will
646 		 * be on the other side.
647 		 */
648 		lwkt_gettoken(&wpipe->pipe_wlock);
649 		pipewakeup(wpipe, 0);
650 		lwkt_reltoken(&wpipe->pipe_wlock);
651 	}
652 	/*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
653 	lwkt_reltoken(&rpipe->pipe_rlock);
654 
655 	return (error);
656 }
657 
658 static int
659 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
660 {
661 	int error;
662 	int orig_resid;
663 	int nbio;
664 	struct pipe *wpipe;
665 	struct pipe *rpipe;
666 	u_int windex;
667 	u_int space;
668 	u_int wcount;
669 	int bigwrite;
670 	int bigcount;
671 
672 	/*
673 	 * Writes go to the peer.  The peer will always exist.
674 	 */
675 	rpipe = (struct pipe *) fp->f_data;
676 	wpipe = rpipe->pipe_peer;
677 	lwkt_gettoken(&wpipe->pipe_wlock);
678 	if (wpipe->pipe_state & PIPE_WEOF) {
679 		lwkt_reltoken(&wpipe->pipe_wlock);
680 		return (EPIPE);
681 	}
682 
683 	/*
684 	 * Degenerate case (EPIPE takes prec)
685 	 */
686 	if (uio->uio_resid == 0) {
687 		lwkt_reltoken(&wpipe->pipe_wlock);
688 		return(0);
689 	}
690 
691 	/*
692 	 * Writes are serialized (start_uio must be called with wlock)
693 	 */
694 	error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
695 	if (error) {
696 		lwkt_reltoken(&wpipe->pipe_wlock);
697 		return (error);
698 	}
699 
700 	if (fflags & O_FBLOCKING)
701 		nbio = 0;
702 	else if (fflags & O_FNONBLOCKING)
703 		nbio = 1;
704 	else if (fp->f_flag & O_NONBLOCK)
705 		nbio = 1;
706 	else
707 		nbio = 0;
708 
709 	/*
710 	 * If it is advantageous to resize the pipe buffer, do
711 	 * so.  We are write-serialized so we can block safely.
712 	 */
713 	if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
714 	    (pipe_nbig < pipe_maxbig) &&
715 	    wpipe->pipe_wantwcnt > 4 &&
716 	    (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
717 		/*
718 		 * Recheck after lock.
719 		 */
720 		lwkt_gettoken(&wpipe->pipe_rlock);
721 		if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
722 		    (pipe_nbig < pipe_maxbig) &&
723 		    (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
724 			atomic_add_int(&pipe_nbig, 1);
725 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
726 				++pipe_bigcount;
727 			else
728 				atomic_subtract_int(&pipe_nbig, 1);
729 		}
730 		lwkt_reltoken(&wpipe->pipe_rlock);
731 	}
732 
733 	orig_resid = uio->uio_resid;
734 	wcount = 0;
735 
736 	bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
737 	bigcount = 10;
738 
739 	while (uio->uio_resid) {
740 		if (wpipe->pipe_state & PIPE_WEOF) {
741 			error = EPIPE;
742 			break;
743 		}
744 
745 		/*
746 		 * Don't hog the cpu.
747 		 */
748 		if (bigwrite && --bigcount == 0) {
749 			lwkt_user_yield();
750 			bigcount = 10;
751 			if (CURSIG(curthread->td_lwp)) {
752 				error = EINTR;
753 				break;
754 			}
755 		}
756 
757 		windex = wpipe->pipe_buffer.windex &
758 			 (wpipe->pipe_buffer.size - 1);
759 		space = wpipe->pipe_buffer.size -
760 			(wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
761 		cpu_lfence();
762 
763 		/* Writes of size <= PIPE_BUF must be atomic. */
764 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
765 			space = 0;
766 
767 		/*
768 		 * Write to fill, read size handles write hysteresis.  Also
769 		 * additional restrictions can cause select-based non-blocking
770 		 * writes to spin.
771 		 */
772 		if (space > 0) {
773 			u_int segsize;
774 
775 			/*
776 			 * Transfer size is minimum of uio transfer
777 			 * and free space in pipe buffer.
778 			 *
779 			 * Limit each uiocopy to no more then PIPE_SIZE
780 			 * so we can keep the gravy train going on a
781 			 * SMP box.  This doubles the performance for
782 			 * write sizes > 16K.  Otherwise large writes
783 			 * wind up doing an inefficient synchronous
784 			 * ping-pong.
785 			 */
786 			space = szmin(space, uio->uio_resid);
787 			if (space > PIPE_SIZE)
788 				space = PIPE_SIZE;
789 
790 			/*
791 			 * First segment to transfer is minimum of
792 			 * transfer size and contiguous space in
793 			 * pipe buffer.  If first segment to transfer
794 			 * is less than the transfer size, we've got
795 			 * a wraparound in the buffer.
796 			 */
797 			segsize = wpipe->pipe_buffer.size - windex;
798 			if (segsize > space)
799 				segsize = space;
800 
801 #ifdef SMP
802 			/*
803 			 * If this is the first loop and the reader is
804 			 * blocked, do a preemptive wakeup of the reader.
805 			 *
806 			 * On SMP the IPI latency plus the wlock interlock
807 			 * on the reader side is the fastest way to get the
808 			 * reader going.  (The scheduler will hard loop on
809 			 * lock tokens).
810 			 *
811 			 * NOTE: We can't clear WANTR here without acquiring
812 			 * the rlock, which we don't want to do here!
813 			 */
814 			if ((wpipe->pipe_state & PIPE_WANTR))
815 				wakeup(wpipe);
816 #endif
817 
818 			/*
819 			 * Transfer segment, which may include a wrap-around.
820 			 * Update windex to account for both all in one go
821 			 * so the reader can read() the data atomically.
822 			 */
823 			error = uiomove(&wpipe->pipe_buffer.buffer[windex],
824 					segsize, uio);
825 			if (error == 0 && segsize < space) {
826 				segsize = space - segsize;
827 				error = uiomove(&wpipe->pipe_buffer.buffer[0],
828 						segsize, uio);
829 			}
830 			if (error)
831 				break;
832 			cpu_mfence();
833 			wpipe->pipe_buffer.windex += space;
834 			wcount += space;
835 			continue;
836 		}
837 
838 		/*
839 		 * We need both the rlock and the wlock to interlock against
840 		 * the EOF, WANTW, and size checks, and to modify pipe_state.
841 		 *
842 		 * These are token locks so we do not have to worry about
843 		 * deadlocks.
844 		 */
845 		lwkt_gettoken(&wpipe->pipe_rlock);
846 
847 		/*
848 		 * If the "read-side" has been blocked, wake it up now
849 		 * and yield to let it drain synchronously rather
850 		 * then block.
851 		 */
852 		if (wpipe->pipe_state & PIPE_WANTR) {
853 			wpipe->pipe_state &= ~PIPE_WANTR;
854 			wakeup(wpipe);
855 		}
856 
857 		/*
858 		 * don't block on non-blocking I/O
859 		 */
860 		if (nbio) {
861 			lwkt_reltoken(&wpipe->pipe_rlock);
862 			error = EAGAIN;
863 			break;
864 		}
865 
866 		/*
867 		 * re-test whether we have to block in the writer after
868 		 * acquiring both locks, in case the reader opened up
869 		 * some space.
870 		 */
871 		space = wpipe->pipe_buffer.size -
872 			(wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
873 		cpu_lfence();
874 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
875 			space = 0;
876 
877 		/*
878 		 * Retest EOF - acquiring a new token can temporarily release
879 		 * tokens already held.
880 		 */
881 		if (wpipe->pipe_state & PIPE_WEOF) {
882 			lwkt_reltoken(&wpipe->pipe_rlock);
883 			error = EPIPE;
884 			break;
885 		}
886 
887 		/*
888 		 * We have no more space and have something to offer,
889 		 * wake up select/poll/kq.
890 		 */
891 		if (space == 0) {
892 			wpipe->pipe_state |= PIPE_WANTW;
893 			++wpipe->pipe_wantwcnt;
894 			pipewakeup(wpipe, 1);
895 			if (wpipe->pipe_state & PIPE_WANTW)
896 				error = tsleep(wpipe, PCATCH, "pipewr", 0);
897 			++pipe_wblocked_count;
898 		}
899 		lwkt_reltoken(&wpipe->pipe_rlock);
900 
901 		/*
902 		 * Break out if we errored or the read side wants us to go
903 		 * away.
904 		 */
905 		if (error)
906 			break;
907 		if (wpipe->pipe_state & PIPE_WEOF) {
908 			error = EPIPE;
909 			break;
910 		}
911 	}
912 	pipe_end_uio(wpipe, &wpipe->pipe_wip);
913 
914 	/*
915 	 * If we have put any characters in the buffer, we wake up
916 	 * the reader.
917 	 *
918 	 * Both rlock and wlock are required to be able to modify pipe_state.
919 	 */
920 	if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
921 		if (wpipe->pipe_state & PIPE_WANTR) {
922 			lwkt_gettoken(&wpipe->pipe_rlock);
923 			if (wpipe->pipe_state & PIPE_WANTR) {
924 				wpipe->pipe_state &= ~PIPE_WANTR;
925 				lwkt_reltoken(&wpipe->pipe_rlock);
926 				wakeup(wpipe);
927 			} else {
928 				lwkt_reltoken(&wpipe->pipe_rlock);
929 			}
930 		}
931 		lwkt_gettoken(&wpipe->pipe_rlock);
932 		pipewakeup(wpipe, 1);
933 		lwkt_reltoken(&wpipe->pipe_rlock);
934 	}
935 
936 	/*
937 	 * Don't return EPIPE if I/O was successful
938 	 */
939 	if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
940 	    (uio->uio_resid == 0) &&
941 	    (error == EPIPE)) {
942 		error = 0;
943 	}
944 
945 	if (error == 0)
946 		vfs_timestamp(&wpipe->pipe_mtime);
947 
948 	/*
949 	 * We have something to offer,
950 	 * wake up select/poll/kq.
951 	 */
952 	/*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
953 	lwkt_reltoken(&wpipe->pipe_wlock);
954 	return (error);
955 }
956 
957 /*
958  * we implement a very minimal set of ioctls for compatibility with sockets.
959  */
960 int
961 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
962 	   struct ucred *cred, struct sysmsg *msg)
963 {
964 	struct pipe *mpipe;
965 	int error;
966 
967 	mpipe = (struct pipe *)fp->f_data;
968 
969 	lwkt_gettoken(&mpipe->pipe_rlock);
970 	lwkt_gettoken(&mpipe->pipe_wlock);
971 
972 	switch (cmd) {
973 	case FIOASYNC:
974 		if (*(int *)data) {
975 			mpipe->pipe_state |= PIPE_ASYNC;
976 		} else {
977 			mpipe->pipe_state &= ~PIPE_ASYNC;
978 		}
979 		error = 0;
980 		break;
981 	case FIONREAD:
982 		*(int *)data = mpipe->pipe_buffer.windex -
983 				mpipe->pipe_buffer.rindex;
984 		error = 0;
985 		break;
986 	case FIOSETOWN:
987 		error = fsetown(*(int *)data, &mpipe->pipe_sigio);
988 		break;
989 	case FIOGETOWN:
990 		*(int *)data = fgetown(&mpipe->pipe_sigio);
991 		error = 0;
992 		break;
993 	case TIOCSPGRP:
994 		/* This is deprecated, FIOSETOWN should be used instead. */
995 		error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
996 		break;
997 
998 	case TIOCGPGRP:
999 		/* This is deprecated, FIOGETOWN should be used instead. */
1000 		*(int *)data = -fgetown(&mpipe->pipe_sigio);
1001 		error = 0;
1002 		break;
1003 	default:
1004 		error = ENOTTY;
1005 		break;
1006 	}
1007 	lwkt_reltoken(&mpipe->pipe_wlock);
1008 	lwkt_reltoken(&mpipe->pipe_rlock);
1009 
1010 	return (error);
1011 }
1012 
1013 /*
1014  * MPSAFE
1015  */
1016 static int
1017 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1018 {
1019 	struct pipe *pipe;
1020 
1021 	pipe = (struct pipe *)fp->f_data;
1022 
1023 	bzero((caddr_t)ub, sizeof(*ub));
1024 	ub->st_mode = S_IFIFO;
1025 	ub->st_blksize = pipe->pipe_buffer.size;
1026 	ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1027 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1028 	ub->st_atimespec = pipe->pipe_atime;
1029 	ub->st_mtimespec = pipe->pipe_mtime;
1030 	ub->st_ctimespec = pipe->pipe_ctime;
1031 	/*
1032 	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1033 	 * st_flags, st_gen.
1034 	 * XXX (st_dev, st_ino) should be unique.
1035 	 */
1036 	return (0);
1037 }
1038 
1039 static int
1040 pipe_close(struct file *fp)
1041 {
1042 	struct pipe *cpipe;
1043 
1044 	cpipe = (struct pipe *)fp->f_data;
1045 	fp->f_ops = &badfileops;
1046 	fp->f_data = NULL;
1047 	funsetown(&cpipe->pipe_sigio);
1048 	pipeclose(cpipe);
1049 	return (0);
1050 }
1051 
1052 /*
1053  * Shutdown one or both directions of a full-duplex pipe.
1054  */
1055 static int
1056 pipe_shutdown(struct file *fp, int how)
1057 {
1058 	struct pipe *rpipe;
1059 	struct pipe *wpipe;
1060 	int error = EPIPE;
1061 
1062 	rpipe = (struct pipe *)fp->f_data;
1063 	wpipe = rpipe->pipe_peer;
1064 
1065 	/*
1066 	 * We modify pipe_state on both pipes, which means we need
1067 	 * all four tokens!
1068 	 */
1069 	lwkt_gettoken(&rpipe->pipe_rlock);
1070 	lwkt_gettoken(&rpipe->pipe_wlock);
1071 	lwkt_gettoken(&wpipe->pipe_rlock);
1072 	lwkt_gettoken(&wpipe->pipe_wlock);
1073 
1074 	switch(how) {
1075 	case SHUT_RDWR:
1076 	case SHUT_RD:
1077 		rpipe->pipe_state |= PIPE_REOF;		/* my reads */
1078 		rpipe->pipe_state |= PIPE_WEOF;		/* peer writes */
1079 		if (rpipe->pipe_state & PIPE_WANTR) {
1080 			rpipe->pipe_state &= ~PIPE_WANTR;
1081 			wakeup(rpipe);
1082 		}
1083 		if (rpipe->pipe_state & PIPE_WANTW) {
1084 			rpipe->pipe_state &= ~PIPE_WANTW;
1085 			wakeup(rpipe);
1086 		}
1087 		error = 0;
1088 		if (how == SHUT_RD)
1089 			break;
1090 		/* fall through */
1091 	case SHUT_WR:
1092 		wpipe->pipe_state |= PIPE_REOF;		/* peer reads */
1093 		wpipe->pipe_state |= PIPE_WEOF;		/* my writes */
1094 		if (wpipe->pipe_state & PIPE_WANTR) {
1095 			wpipe->pipe_state &= ~PIPE_WANTR;
1096 			wakeup(wpipe);
1097 		}
1098 		if (wpipe->pipe_state & PIPE_WANTW) {
1099 			wpipe->pipe_state &= ~PIPE_WANTW;
1100 			wakeup(wpipe);
1101 		}
1102 		error = 0;
1103 		break;
1104 	}
1105 	pipewakeup(rpipe, 1);
1106 	pipewakeup(wpipe, 1);
1107 
1108 	lwkt_reltoken(&wpipe->pipe_wlock);
1109 	lwkt_reltoken(&wpipe->pipe_rlock);
1110 	lwkt_reltoken(&rpipe->pipe_wlock);
1111 	lwkt_reltoken(&rpipe->pipe_rlock);
1112 
1113 	return (error);
1114 }
1115 
1116 static void
1117 pipe_free_kmem(struct pipe *cpipe)
1118 {
1119 	if (cpipe->pipe_buffer.buffer != NULL) {
1120 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1121 			atomic_subtract_int(&pipe_nbig, 1);
1122 		kmem_free(&kernel_map,
1123 			(vm_offset_t)cpipe->pipe_buffer.buffer,
1124 			cpipe->pipe_buffer.size);
1125 		cpipe->pipe_buffer.buffer = NULL;
1126 		cpipe->pipe_buffer.object = NULL;
1127 	}
1128 }
1129 
1130 /*
1131  * Close the pipe.  The slock must be held to interlock against simultanious
1132  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1133  */
1134 static void
1135 pipeclose(struct pipe *cpipe)
1136 {
1137 	globaldata_t gd;
1138 	struct pipe *ppipe;
1139 
1140 	if (cpipe == NULL)
1141 		return;
1142 
1143 	/*
1144 	 * The slock may not have been allocated yet (close during
1145 	 * initialization)
1146 	 *
1147 	 * We need both the read and write tokens to modify pipe_state.
1148 	 */
1149 	if (cpipe->pipe_slock)
1150 		lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1151 	lwkt_gettoken(&cpipe->pipe_rlock);
1152 	lwkt_gettoken(&cpipe->pipe_wlock);
1153 
1154 	/*
1155 	 * Set our state, wakeup anyone waiting in select/poll/kq, and
1156 	 * wakeup anyone blocked on our pipe.
1157 	 */
1158 	cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1159 	pipewakeup(cpipe, 1);
1160 	if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1161 		cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1162 		wakeup(cpipe);
1163 	}
1164 
1165 	/*
1166 	 * Disconnect from peer.
1167 	 */
1168 	if ((ppipe = cpipe->pipe_peer) != NULL) {
1169 		lwkt_gettoken(&ppipe->pipe_rlock);
1170 		lwkt_gettoken(&ppipe->pipe_wlock);
1171 		ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1172 		pipewakeup(ppipe, 1);
1173 		if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1174 			ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1175 			wakeup(ppipe);
1176 		}
1177 		if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1178 			KNOTE(&ppipe->pipe_kq.ki_note, 0);
1179 		lwkt_reltoken(&ppipe->pipe_wlock);
1180 		lwkt_reltoken(&ppipe->pipe_rlock);
1181 	}
1182 
1183 	/*
1184 	 * If the peer is also closed we can free resources for both
1185 	 * sides, otherwise we leave our side intact to deal with any
1186 	 * races (since we only have the slock).
1187 	 */
1188 	if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1189 		cpipe->pipe_peer = NULL;
1190 		ppipe->pipe_peer = NULL;
1191 		ppipe->pipe_slock = NULL;	/* we will free the slock */
1192 		pipeclose(ppipe);
1193 		ppipe = NULL;
1194 	}
1195 
1196 	lwkt_reltoken(&cpipe->pipe_wlock);
1197 	lwkt_reltoken(&cpipe->pipe_rlock);
1198 	if (cpipe->pipe_slock)
1199 		lockmgr(cpipe->pipe_slock, LK_RELEASE);
1200 
1201 	/*
1202 	 * If we disassociated from our peer we can free resources
1203 	 */
1204 	if (ppipe == NULL) {
1205 		gd = mycpu;
1206 		if (cpipe->pipe_slock) {
1207 			kfree(cpipe->pipe_slock, M_PIPE);
1208 			cpipe->pipe_slock = NULL;
1209 		}
1210 		if (gd->gd_pipeqcount >= pipe_maxcache ||
1211 		    cpipe->pipe_buffer.size != PIPE_SIZE
1212 		) {
1213 			pipe_free_kmem(cpipe);
1214 			kfree(cpipe, M_PIPE);
1215 		} else {
1216 			cpipe->pipe_state = 0;
1217 			cpipe->pipe_peer = gd->gd_pipeq;
1218 			gd->gd_pipeq = cpipe;
1219 			++gd->gd_pipeqcount;
1220 		}
1221 	}
1222 }
1223 
1224 static int
1225 pipe_kqfilter(struct file *fp, struct knote *kn)
1226 {
1227 	struct pipe *cpipe;
1228 
1229 	cpipe = (struct pipe *)kn->kn_fp->f_data;
1230 
1231 	switch (kn->kn_filter) {
1232 	case EVFILT_READ:
1233 		kn->kn_fop = &pipe_rfiltops;
1234 		break;
1235 	case EVFILT_WRITE:
1236 		kn->kn_fop = &pipe_wfiltops;
1237 		if (cpipe->pipe_peer == NULL) {
1238 			/* other end of pipe has been closed */
1239 			return (EPIPE);
1240 		}
1241 		break;
1242 	default:
1243 		return (EOPNOTSUPP);
1244 	}
1245 	kn->kn_hook = (caddr_t)cpipe;
1246 
1247 	knote_insert(&cpipe->pipe_kq.ki_note, kn);
1248 
1249 	return (0);
1250 }
1251 
1252 static void
1253 filt_pipedetach(struct knote *kn)
1254 {
1255 	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1256 
1257 	knote_remove(&cpipe->pipe_kq.ki_note, kn);
1258 }
1259 
1260 /*ARGSUSED*/
1261 static int
1262 filt_piperead(struct knote *kn, long hint)
1263 {
1264 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1265 	int ready = 0;
1266 
1267 	lwkt_gettoken(&rpipe->pipe_rlock);
1268 	lwkt_gettoken(&rpipe->pipe_wlock);
1269 
1270 	kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1271 
1272 	if (rpipe->pipe_state & PIPE_REOF) {
1273 		/*
1274 		 * Only set NODATA if all data has been exhausted
1275 		 */
1276 		if (kn->kn_data == 0)
1277 			kn->kn_flags |= EV_NODATA;
1278 		kn->kn_flags |= EV_EOF;
1279 		ready = 1;
1280 	}
1281 
1282 	lwkt_reltoken(&rpipe->pipe_wlock);
1283 	lwkt_reltoken(&rpipe->pipe_rlock);
1284 
1285 	if (!ready)
1286 		ready = kn->kn_data > 0;
1287 
1288 	return (ready);
1289 }
1290 
1291 /*ARGSUSED*/
1292 static int
1293 filt_pipewrite(struct knote *kn, long hint)
1294 {
1295 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1296 	struct pipe *wpipe = rpipe->pipe_peer;
1297 	int ready = 0;
1298 
1299 	kn->kn_data = 0;
1300 	if (wpipe == NULL) {
1301 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1302 		return (1);
1303 	}
1304 
1305 	lwkt_gettoken(&wpipe->pipe_rlock);
1306 	lwkt_gettoken(&wpipe->pipe_wlock);
1307 
1308 	if (wpipe->pipe_state & PIPE_WEOF) {
1309 		kn->kn_flags |= (EV_EOF | EV_NODATA);
1310 		ready = 1;
1311 	}
1312 
1313 	if (!ready)
1314 		kn->kn_data = wpipe->pipe_buffer.size -
1315 			      (wpipe->pipe_buffer.windex -
1316 			       wpipe->pipe_buffer.rindex);
1317 
1318 	lwkt_reltoken(&wpipe->pipe_wlock);
1319 	lwkt_reltoken(&wpipe->pipe_rlock);
1320 
1321 	if (!ready)
1322 		ready = kn->kn_data >= PIPE_BUF;
1323 
1324 	return (ready);
1325 }
1326