xref: /dflybsd-src/sys/kern/sys_pipe.c (revision c5541aee854b0d32586182b733a9ea4d4c92168b)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.17 2004/04/01 17:58:02 dillon Exp $
21  */
22 
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 
30 /*
31  * This code has two modes of operation, a small write mode and a large
32  * write mode.  The small write mode acts like conventional pipes with
33  * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
34  * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
35  * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
36  * the receiving process can copy it directly from the pages in the sending
37  * process.
38  *
39  * If the sending process receives a signal, it is possible that it will
40  * go away, and certainly its address space can change, because control
41  * is returned back to the user-mode side.  In that case, the pipe code
42  * arranges to copy the buffer supplied by the user process, to a pageable
43  * kernel buffer, and the receiving process will grab the data from the
44  * pageable kernel buffer.  Since signals don't happen all that often,
45  * the copy operation is normally eliminated.
46  *
47  * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
48  * happen for small transfers so that the system will not spend all of
49  * its time context switching.  PIPE_SIZE is constrained by the
50  * amount of kernel virtual memory.
51  */
52 
53 #include <sys/param.h>
54 #include <sys/systm.h>
55 #include <sys/kernel.h>
56 #include <sys/proc.h>
57 #include <sys/fcntl.h>
58 #include <sys/file.h>
59 #include <sys/filedesc.h>
60 #include <sys/filio.h>
61 #include <sys/ttycom.h>
62 #include <sys/stat.h>
63 #include <sys/poll.h>
64 #include <sys/select.h>
65 #include <sys/signalvar.h>
66 #include <sys/sysproto.h>
67 #include <sys/pipe.h>
68 #include <sys/vnode.h>
69 #include <sys/uio.h>
70 #include <sys/event.h>
71 #include <sys/globaldata.h>
72 #include <sys/module.h>
73 #include <sys/malloc.h>
74 #include <sys/sysctl.h>
75 
76 #include <vm/vm.h>
77 #include <vm/vm_param.h>
78 #include <sys/lock.h>
79 #include <vm/vm_object.h>
80 #include <vm/vm_kern.h>
81 #include <vm/vm_extern.h>
82 #include <vm/pmap.h>
83 #include <vm/vm_map.h>
84 #include <vm/vm_page.h>
85 #include <vm/vm_zone.h>
86 
87 #include <sys/file2.h>
88 
89 /*
90  * Use this define if you want to disable *fancy* VM things.  Expect an
91  * approx 30% decrease in transfer rate.  This could be useful for
92  * NetBSD or OpenBSD.
93  */
94 /* #define PIPE_NODIRECT */
95 
96 /*
97  * interfaces to the outside world
98  */
99 static int pipe_read (struct file *fp, struct uio *uio,
100 		struct ucred *cred, int flags, struct thread *td);
101 static int pipe_write (struct file *fp, struct uio *uio,
102 		struct ucred *cred, int flags, struct thread *td);
103 static int pipe_close (struct file *fp, struct thread *td);
104 static int pipe_poll (struct file *fp, int events, struct ucred *cred,
105 		struct thread *td);
106 static int pipe_kqfilter (struct file *fp, struct knote *kn);
107 static int pipe_stat (struct file *fp, struct stat *sb, struct thread *td);
108 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, struct thread *td);
109 
110 static struct fileops pipeops = {
111 	NULL,	/* port */
112 	0,	/* autoq */
113 	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
114 	pipe_stat, pipe_close
115 };
116 
117 static void	filt_pipedetach(struct knote *kn);
118 static int	filt_piperead(struct knote *kn, long hint);
119 static int	filt_pipewrite(struct knote *kn, long hint);
120 
121 static struct filterops pipe_rfiltops =
122 	{ 1, NULL, filt_pipedetach, filt_piperead };
123 static struct filterops pipe_wfiltops =
124 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
125 
126 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
127 
128 /*
129  * Default pipe buffer size(s), this can be kind-of large now because pipe
130  * space is pageable.  The pipe code will try to maintain locality of
131  * reference for performance reasons, so small amounts of outstanding I/O
132  * will not wipe the cache.
133  */
134 #define MINPIPESIZE (PIPE_SIZE/3)
135 #define MAXPIPESIZE (2*PIPE_SIZE/3)
136 
137 /*
138  * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
139  * is there so that on large systems, we don't exhaust it.
140  */
141 #define MAXPIPEKVA (8*1024*1024)
142 
143 /*
144  * Limit for direct transfers, we cannot, of course limit
145  * the amount of kva for pipes in general though.
146  */
147 #define LIMITPIPEKVA (16*1024*1024)
148 
149 /*
150  * Limit the number of "big" pipes
151  */
152 #define LIMITBIGPIPES	32
153 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
154 
155 static int pipe_maxbig = LIMITBIGPIPES;
156 static int pipe_maxcache = PIPEQ_MAX_CACHE;
157 static int pipe_nbig;
158 static int pipe_bcache_alloc;
159 static int pipe_bkmem_alloc;
160 
161 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
162 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
163         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
164 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
165         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
166 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
167         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
168 #if !defined(NO_PIPE_SYSCTL_STATS)
169 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
170         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
171 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
172         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
173 #endif
174 
175 static void pipeclose (struct pipe *cpipe);
176 static void pipe_free_kmem (struct pipe *cpipe);
177 static int pipe_create (struct pipe **cpipep);
178 static __inline int pipelock (struct pipe *cpipe, int catch);
179 static __inline void pipeunlock (struct pipe *cpipe);
180 static __inline void pipeselwakeup (struct pipe *cpipe);
181 #ifndef PIPE_NODIRECT
182 static int pipe_build_write_buffer (struct pipe *wpipe, struct uio *uio);
183 static int pipe_direct_write (struct pipe *wpipe, struct uio *uio);
184 static void pipe_clone_write_buffer (struct pipe *wpipe);
185 #endif
186 static int pipespace (struct pipe *cpipe, int size);
187 
188 /*
189  * The pipe system call for the DTYPE_PIPE type of pipes
190  *
191  * pipe_ARgs(int dummy)
192  */
193 
194 /* ARGSUSED */
195 int
196 pipe(struct pipe_args *uap)
197 {
198 	struct thread *td = curthread;
199 	struct proc *p = td->td_proc;
200 	struct filedesc *fdp;
201 	struct file *rf, *wf;
202 	struct pipe *rpipe, *wpipe;
203 	int fd1, fd2, error;
204 
205 	KKASSERT(p);
206 	fdp = p->p_fd;
207 
208 	rpipe = wpipe = NULL;
209 	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
210 		pipeclose(rpipe);
211 		pipeclose(wpipe);
212 		return (ENFILE);
213 	}
214 
215 	rpipe->pipe_state |= PIPE_DIRECTOK;
216 	wpipe->pipe_state |= PIPE_DIRECTOK;
217 
218 	error = falloc(p, &rf, &fd1);
219 	if (error) {
220 		pipeclose(rpipe);
221 		pipeclose(wpipe);
222 		return (error);
223 	}
224 	fhold(rf);
225 	uap->sysmsg_fds[0] = fd1;
226 
227 	/*
228 	 * Warning: once we've gotten past allocation of the fd for the
229 	 * read-side, we can only drop the read side via fdrop() in order
230 	 * to avoid races against processes which manage to dup() the read
231 	 * side while we are blocked trying to allocate the write side.
232 	 */
233 	rf->f_flag = FREAD | FWRITE;
234 	rf->f_type = DTYPE_PIPE;
235 	rf->f_data = (caddr_t)rpipe;
236 	rf->f_ops = &pipeops;
237 	error = falloc(p, &wf, &fd2);
238 	if (error) {
239 		if (fdp->fd_ofiles[fd1] == rf) {
240 			fdp->fd_ofiles[fd1] = NULL;
241 			fdrop(rf, td);
242 		}
243 		fdrop(rf, td);
244 		/* rpipe has been closed by fdrop(). */
245 		pipeclose(wpipe);
246 		return (error);
247 	}
248 	wf->f_flag = FREAD | FWRITE;
249 	wf->f_type = DTYPE_PIPE;
250 	wf->f_data = (caddr_t)wpipe;
251 	wf->f_ops = &pipeops;
252 	uap->sysmsg_fds[1] = fd2;
253 
254 	rpipe->pipe_peer = wpipe;
255 	wpipe->pipe_peer = rpipe;
256 	fdrop(rf, td);
257 
258 	return (0);
259 }
260 
261 /*
262  * Allocate kva for pipe circular buffer, the space is pageable
263  * This routine will 'realloc' the size of a pipe safely, if it fails
264  * it will retain the old buffer.
265  * If it fails it will return ENOMEM.
266  */
267 static int
268 pipespace(struct pipe *cpipe, int size)
269 {
270 	struct vm_object *object;
271 	caddr_t buffer;
272 	int npages, error;
273 
274 	npages = round_page(size) / PAGE_SIZE;
275 	object = cpipe->pipe_buffer.object;
276 
277 	/*
278 	 * [re]create the object if necessary and reserve space for it
279 	 * in the kernel_map.  The object and memory are pageable.  On
280 	 * success, free the old resources before assigning the new
281 	 * ones.
282 	 */
283 	if (object == NULL || object->size != npages) {
284 		object = vm_object_allocate(OBJT_DEFAULT, npages);
285 		buffer = (caddr_t) vm_map_min(kernel_map);
286 
287 		error = vm_map_find(kernel_map, object, 0,
288 			(vm_offset_t *) &buffer, size, 1,
289 			VM_PROT_ALL, VM_PROT_ALL, 0);
290 
291 		if (error != KERN_SUCCESS) {
292 			vm_object_deallocate(object);
293 			return (ENOMEM);
294 		}
295 		pipe_free_kmem(cpipe);
296 		cpipe->pipe_buffer.object = object;
297 		cpipe->pipe_buffer.buffer = buffer;
298 		cpipe->pipe_buffer.size = size;
299 		++pipe_bkmem_alloc;
300 	} else {
301 		++pipe_bcache_alloc;
302 	}
303 	cpipe->pipe_buffer.in = 0;
304 	cpipe->pipe_buffer.out = 0;
305 	cpipe->pipe_buffer.cnt = 0;
306 	return (0);
307 }
308 
309 /*
310  * Initialize and allocate VM and memory for pipe, pulling the pipe from
311  * our per-cpu cache if possible.  For now make sure it is sized for the
312  * smaller PIPE_SIZE default.
313  */
314 static int
315 pipe_create(cpipep)
316 	struct pipe **cpipep;
317 {
318 	globaldata_t gd = mycpu;
319 	struct pipe *cpipe;
320 	int error;
321 
322 	if ((cpipe = gd->gd_pipeq) != NULL) {
323 		gd->gd_pipeq = cpipe->pipe_peer;
324 		--gd->gd_pipeqcount;
325 		cpipe->pipe_peer = NULL;
326 	} else {
327 		cpipe = malloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
328 	}
329 	*cpipep = cpipe;
330 	if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
331 		return (error);
332 	vfs_timestamp(&cpipe->pipe_ctime);
333 	cpipe->pipe_atime = cpipe->pipe_ctime;
334 	cpipe->pipe_mtime = cpipe->pipe_ctime;
335 	return (0);
336 }
337 
338 
339 /*
340  * lock a pipe for I/O, blocking other access
341  */
342 static __inline int
343 pipelock(cpipe, catch)
344 	struct pipe *cpipe;
345 	int catch;
346 {
347 	int error;
348 
349 	while (cpipe->pipe_state & PIPE_LOCK) {
350 		cpipe->pipe_state |= PIPE_LWANT;
351 		error = tsleep(cpipe, (catch ? PCATCH : 0), "pipelk", 0);
352 		if (error != 0)
353 			return (error);
354 	}
355 	cpipe->pipe_state |= PIPE_LOCK;
356 	return (0);
357 }
358 
359 /*
360  * unlock a pipe I/O lock
361  */
362 static __inline void
363 pipeunlock(cpipe)
364 	struct pipe *cpipe;
365 {
366 
367 	cpipe->pipe_state &= ~PIPE_LOCK;
368 	if (cpipe->pipe_state & PIPE_LWANT) {
369 		cpipe->pipe_state &= ~PIPE_LWANT;
370 		wakeup(cpipe);
371 	}
372 }
373 
374 static __inline void
375 pipeselwakeup(cpipe)
376 	struct pipe *cpipe;
377 {
378 
379 	if (cpipe->pipe_state & PIPE_SEL) {
380 		cpipe->pipe_state &= ~PIPE_SEL;
381 		selwakeup(&cpipe->pipe_sel);
382 	}
383 	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
384 		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
385 	KNOTE(&cpipe->pipe_sel.si_note, 0);
386 }
387 
388 /* ARGSUSED */
389 static int
390 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred,
391 	int flags, struct thread *td)
392 {
393 	struct pipe *rpipe = (struct pipe *) fp->f_data;
394 	int error;
395 	int nread = 0;
396 	u_int size;
397 
398 	++rpipe->pipe_busy;
399 	error = pipelock(rpipe, 1);
400 	if (error)
401 		goto unlocked_error;
402 
403 	while (uio->uio_resid) {
404 		/*
405 		 * normal pipe buffer receive
406 		 */
407 		if (rpipe->pipe_buffer.cnt > 0) {
408 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
409 			if (size > rpipe->pipe_buffer.cnt)
410 				size = rpipe->pipe_buffer.cnt;
411 			if (size > (u_int) uio->uio_resid)
412 				size = (u_int) uio->uio_resid;
413 
414 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
415 					size, uio);
416 			if (error)
417 				break;
418 
419 			rpipe->pipe_buffer.out += size;
420 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
421 				rpipe->pipe_buffer.out = 0;
422 
423 			rpipe->pipe_buffer.cnt -= size;
424 
425 			/*
426 			 * If there is no more to read in the pipe, reset
427 			 * its pointers to the beginning.  This improves
428 			 * cache hit stats.
429 			 */
430 			if (rpipe->pipe_buffer.cnt == 0) {
431 				rpipe->pipe_buffer.in = 0;
432 				rpipe->pipe_buffer.out = 0;
433 			}
434 			nread += size;
435 #ifndef PIPE_NODIRECT
436 		/*
437 		 * Direct copy, bypassing a kernel buffer.  We cannot mess
438 		 * with the direct-write buffer until PIPE_DIRECTIP is
439 		 * cleared.  In order to prevent the pipe_write code from
440 		 * racing itself in direct_write, we set DIRECTIP when we
441 		 * clear DIRECTW after we have exhausted the buffer.
442 		 */
443 		} else if (rpipe->pipe_map.xio_bytes &&
444 			   (rpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) == PIPE_DIRECTW) {
445 			error = xio_uio_copy(&rpipe->pipe_map, uio, &size);
446 			if (error)
447 				break;
448 			nread += size;
449 			if (rpipe->pipe_map.xio_bytes == 0) {
450 				rpipe->pipe_state |= PIPE_DIRECTIP;
451 				rpipe->pipe_state &= ~PIPE_DIRECTW;
452 				wakeup(rpipe);
453 			}
454 #endif
455 		} else {
456 			/*
457 			 * detect EOF condition
458 			 * read returns 0 on EOF, no need to set error
459 			 */
460 			if (rpipe->pipe_state & PIPE_EOF)
461 				break;
462 
463 			/*
464 			 * If the "write-side" has been blocked, wake it up now.
465 			 */
466 			if (rpipe->pipe_state & PIPE_WANTW) {
467 				rpipe->pipe_state &= ~PIPE_WANTW;
468 				wakeup(rpipe);
469 			}
470 
471 			/*
472 			 * Break if some data was read.
473 			 */
474 			if (nread > 0)
475 				break;
476 
477 			/*
478 			 * Unlock the pipe buffer for our remaining processing.  We
479 			 * will either break out with an error or we will sleep and
480 			 * relock to loop.
481 			 */
482 			pipeunlock(rpipe);
483 
484 			/*
485 			 * Handle non-blocking mode operation or
486 			 * wait for more data.
487 			 */
488 			if (fp->f_flag & FNONBLOCK) {
489 				error = EAGAIN;
490 			} else {
491 				rpipe->pipe_state |= PIPE_WANTR;
492 				if ((error = tsleep(rpipe, PCATCH|PNORESCHED,
493 				    "piperd", 0)) == 0) {
494 					error = pipelock(rpipe, 1);
495 				}
496 			}
497 			if (error)
498 				goto unlocked_error;
499 		}
500 	}
501 	pipeunlock(rpipe);
502 
503 	if (error == 0)
504 		vfs_timestamp(&rpipe->pipe_atime);
505 unlocked_error:
506 	--rpipe->pipe_busy;
507 
508 	/*
509 	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
510 	 */
511 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
512 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
513 		wakeup(rpipe);
514 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
515 		/*
516 		 * Handle write blocking hysteresis.
517 		 */
518 		if (rpipe->pipe_state & PIPE_WANTW) {
519 			rpipe->pipe_state &= ~PIPE_WANTW;
520 			wakeup(rpipe);
521 		}
522 	}
523 
524 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
525 		pipeselwakeup(rpipe);
526 
527 	return (error);
528 }
529 
530 #ifndef PIPE_NODIRECT
531 /*
532  * Map the sending processes' buffer into kernel space and wire it.
533  * This is similar to a physical write operation.
534  */
535 static int
536 pipe_build_write_buffer(wpipe, uio)
537 	struct pipe *wpipe;
538 	struct uio *uio;
539 {
540 	int error;
541 	u_int size;
542 
543 	size = (u_int) uio->uio_iov->iov_len;
544 	if (size > wpipe->pipe_buffer.size)
545 		size = wpipe->pipe_buffer.size;
546 	if (size > XIO_INTERNAL_SIZE)
547 		size = XIO_INTERNAL_SIZE;
548 
549 	error = xio_init_ubuf(&wpipe->pipe_map, uio->uio_iov->iov_base,
550 				size, XIOF_READ);
551 	if (error)
552 		return(error);
553 
554 	/*
555 	 * and update the uio data
556 	 */
557 	uio->uio_iov->iov_len -= size;
558 	uio->uio_iov->iov_base += size;
559 	if (uio->uio_iov->iov_len == 0)
560 		uio->uio_iov++;
561 	uio->uio_resid -= size;
562 	uio->uio_offset += size;
563 	return (0);
564 }
565 
566 /*
567  * In the case of a signal, the writing process might go away.  This
568  * code copies the data into the circular buffer so that the source
569  * pages can be freed without loss of data.
570  */
571 static void
572 pipe_clone_write_buffer(wpipe)
573 	struct pipe *wpipe;
574 {
575 	int size;
576 
577 	size = wpipe->pipe_map.xio_bytes;
578 
579 	wpipe->pipe_buffer.in = size;
580 	wpipe->pipe_buffer.out = 0;
581 	wpipe->pipe_buffer.cnt = size;
582 	wpipe->pipe_state &= ~(PIPE_DIRECTW | PIPE_DIRECTIP);
583 
584 	xio_copy_xtok(&wpipe->pipe_map, wpipe->pipe_buffer.buffer, size);
585 	xio_release(&wpipe->pipe_map);
586 }
587 
588 /*
589  * This implements the pipe buffer write mechanism.  Note that only
590  * a direct write OR a normal pipe write can be pending at any given time.
591  * If there are any characters in the pipe buffer, the direct write will
592  * be deferred until the receiving process grabs all of the bytes from
593  * the pipe buffer.  Then the direct mapping write is set-up.
594  */
595 static int
596 pipe_direct_write(wpipe, uio)
597 	struct pipe *wpipe;
598 	struct uio *uio;
599 {
600 	int error;
601 
602 retry:
603 	while (wpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) {
604 		if (wpipe->pipe_state & PIPE_WANTR) {
605 			wpipe->pipe_state &= ~PIPE_WANTR;
606 			wakeup(wpipe);
607 		}
608 		wpipe->pipe_state |= PIPE_WANTW;
609 		error = tsleep(wpipe, PCATCH, "pipdww", 0);
610 		if (error)
611 			goto error2;
612 		if (wpipe->pipe_state & PIPE_EOF) {
613 			error = EPIPE;
614 			goto error2;
615 		}
616 	}
617 	KKASSERT(wpipe->pipe_map.xio_bytes == 0);
618 	if (wpipe->pipe_buffer.cnt > 0) {
619 		if (wpipe->pipe_state & PIPE_WANTR) {
620 			wpipe->pipe_state &= ~PIPE_WANTR;
621 			wakeup(wpipe);
622 		}
623 
624 		wpipe->pipe_state |= PIPE_WANTW;
625 		error = tsleep(wpipe, PCATCH, "pipdwc", 0);
626 		if (error)
627 			goto error2;
628 		if (wpipe->pipe_state & PIPE_EOF) {
629 			error = EPIPE;
630 			goto error2;
631 		}
632 		goto retry;
633 	}
634 
635 	/*
636 	 * Build our direct-write buffer
637 	 */
638 	wpipe->pipe_state |= PIPE_DIRECTW | PIPE_DIRECTIP;
639 	error = pipe_build_write_buffer(wpipe, uio);
640 	if (error)
641 		goto error1;
642 	wpipe->pipe_state &= ~PIPE_DIRECTIP;
643 
644 	/*
645 	 * Wait until the receiver has snarfed the data.  Since we are likely
646 	 * going to sleep we optimize the case and yield synchronously,
647 	 * possibly avoiding the tsleep().
648 	 */
649 	error = 0;
650 	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
651 		if (wpipe->pipe_state & PIPE_EOF) {
652 			pipelock(wpipe, 0);
653 			xio_release(&wpipe->pipe_map);
654 			pipeunlock(wpipe);
655 			pipeselwakeup(wpipe);
656 			error = EPIPE;
657 			goto error1;
658 		}
659 		if (wpipe->pipe_state & PIPE_WANTR) {
660 			wpipe->pipe_state &= ~PIPE_WANTR;
661 			wakeup(wpipe);
662 		}
663 		pipeselwakeup(wpipe);
664 		error = tsleep(wpipe, PCATCH|PNORESCHED, "pipdwt", 0);
665 	}
666 	pipelock(wpipe,0);
667 	if (wpipe->pipe_state & PIPE_DIRECTW) {
668 		/*
669 		 * this bit of trickery substitutes a kernel buffer for
670 		 * the process that might be going away.
671 		 */
672 		pipe_clone_write_buffer(wpipe);
673 		KKASSERT((wpipe->pipe_state & PIPE_DIRECTIP) == 0);
674 	} else {
675 		KKASSERT(wpipe->pipe_state & PIPE_DIRECTIP);
676 		xio_release(&wpipe->pipe_map);
677 		wpipe->pipe_state &= ~PIPE_DIRECTIP;
678 	}
679 	pipeunlock(wpipe);
680 	return (error);
681 
682 	/*
683 	 * Direct-write error, clear the direct write flags.
684 	 */
685 error1:
686 	wpipe->pipe_state &= ~(PIPE_DIRECTW | PIPE_DIRECTIP);
687 	/* fallthrough */
688 
689 	/*
690 	 * General error, wakeup the other side if it happens to be sleeping.
691 	 */
692 error2:
693 	wakeup(wpipe);
694 	return (error);
695 }
696 #endif
697 
698 static int
699 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred,
700 	int flags, struct thread *td)
701 {
702 	int error = 0;
703 	int orig_resid;
704 	struct pipe *wpipe, *rpipe;
705 
706 	rpipe = (struct pipe *) fp->f_data;
707 	wpipe = rpipe->pipe_peer;
708 
709 	/*
710 	 * detect loss of pipe read side, issue SIGPIPE if lost.
711 	 */
712 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
713 		return (EPIPE);
714 	}
715 	++wpipe->pipe_busy;
716 
717 	/*
718 	 * If it is advantageous to resize the pipe buffer, do
719 	 * so.
720 	 */
721 	if ((uio->uio_resid > PIPE_SIZE) &&
722 		(pipe_nbig < pipe_maxbig) &&
723 		(wpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) == 0 &&
724 		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
725 		(wpipe->pipe_buffer.cnt == 0)) {
726 
727 		if ((error = pipelock(wpipe,1)) == 0) {
728 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
729 				pipe_nbig++;
730 			pipeunlock(wpipe);
731 		}
732 	}
733 
734 	/*
735 	 * If an early error occured unbusy and return, waking up any pending
736 	 * readers.
737 	 */
738 	if (error) {
739 		--wpipe->pipe_busy;
740 		if ((wpipe->pipe_busy == 0) &&
741 		    (wpipe->pipe_state & PIPE_WANT)) {
742 			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
743 			wakeup(wpipe);
744 		}
745 		return(error);
746 	}
747 
748 	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
749 
750 	orig_resid = uio->uio_resid;
751 
752 	while (uio->uio_resid) {
753 		int space;
754 
755 #ifndef PIPE_NODIRECT
756 		/*
757 		 * If the transfer is large, we can gain performance if
758 		 * we do process-to-process copies directly.
759 		 * If the write is non-blocking, we don't use the
760 		 * direct write mechanism.
761 		 *
762 		 * The direct write mechanism will detect the reader going
763 		 * away on us.
764 		 */
765 		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
766 		    (fp->f_flag & FNONBLOCK) == 0) {
767 			error = pipe_direct_write( wpipe, uio);
768 			if (error)
769 				break;
770 			continue;
771 		}
772 #endif
773 
774 		/*
775 		 * Pipe buffered writes cannot be coincidental with
776 		 * direct writes.  We wait until the currently executing
777 		 * direct write is completed before we start filling the
778 		 * pipe buffer.  We break out if a signal occurs or the
779 		 * reader goes away.
780 		 */
781 	retrywrite:
782 		while (wpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) {
783 			if (wpipe->pipe_state & PIPE_WANTR) {
784 				wpipe->pipe_state &= ~PIPE_WANTR;
785 				wakeup(wpipe);
786 			}
787 			error = tsleep(wpipe, PCATCH, "pipbww", 0);
788 			if (wpipe->pipe_state & PIPE_EOF)
789 				break;
790 			if (error)
791 				break;
792 		}
793 		if (wpipe->pipe_state & PIPE_EOF) {
794 			error = EPIPE;
795 			break;
796 		}
797 
798 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
799 
800 		/* Writes of size <= PIPE_BUF must be atomic. */
801 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
802 			space = 0;
803 
804 		/*
805 		 * Write to fill, read size handles write hysteresis.  Also
806 		 * additional restrictions can cause select-based non-blocking
807 		 * writes to spin.
808 		 */
809 		if (space > 0) {
810 			if ((error = pipelock(wpipe,1)) == 0) {
811 				int size;	/* Transfer size */
812 				int segsize;	/* first segment to transfer */
813 
814 				/*
815 				 * It is possible for a direct write to
816 				 * slip in on us... handle it here...
817 				 */
818 				if (wpipe->pipe_state & (PIPE_DIRECTW|PIPE_DIRECTIP)) {
819 					pipeunlock(wpipe);
820 					goto retrywrite;
821 				}
822 				/*
823 				 * If a process blocked in uiomove, our
824 				 * value for space might be bad.
825 				 *
826 				 * XXX will we be ok if the reader has gone
827 				 * away here?
828 				 */
829 				if (space > wpipe->pipe_buffer.size -
830 				    wpipe->pipe_buffer.cnt) {
831 					pipeunlock(wpipe);
832 					goto retrywrite;
833 				}
834 
835 				/*
836 				 * Transfer size is minimum of uio transfer
837 				 * and free space in pipe buffer.
838 				 */
839 				if (space > uio->uio_resid)
840 					size = uio->uio_resid;
841 				else
842 					size = space;
843 				/*
844 				 * First segment to transfer is minimum of
845 				 * transfer size and contiguous space in
846 				 * pipe buffer.  If first segment to transfer
847 				 * is less than the transfer size, we've got
848 				 * a wraparound in the buffer.
849 				 */
850 				segsize = wpipe->pipe_buffer.size -
851 					wpipe->pipe_buffer.in;
852 				if (segsize > size)
853 					segsize = size;
854 
855 				/* Transfer first segment */
856 
857 				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
858 						segsize, uio);
859 
860 				if (error == 0 && segsize < size) {
861 					/*
862 					 * Transfer remaining part now, to
863 					 * support atomic writes.  Wraparound
864 					 * happened.
865 					 */
866 					if (wpipe->pipe_buffer.in + segsize !=
867 					    wpipe->pipe_buffer.size)
868 						panic("Expected pipe buffer wraparound disappeared");
869 
870 					error = uiomove(&wpipe->pipe_buffer.buffer[0],
871 							size - segsize, uio);
872 				}
873 				if (error == 0) {
874 					wpipe->pipe_buffer.in += size;
875 					if (wpipe->pipe_buffer.in >=
876 					    wpipe->pipe_buffer.size) {
877 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
878 							panic("Expected wraparound bad");
879 						wpipe->pipe_buffer.in = size - segsize;
880 					}
881 
882 					wpipe->pipe_buffer.cnt += size;
883 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
884 						panic("Pipe buffer overflow");
885 
886 				}
887 				pipeunlock(wpipe);
888 			}
889 			if (error)
890 				break;
891 
892 		} else {
893 			/*
894 			 * If the "read-side" has been blocked, wake it up now
895 			 * and yield to let it drain synchronously rather
896 			 * then block.
897 			 */
898 			if (wpipe->pipe_state & PIPE_WANTR) {
899 				wpipe->pipe_state &= ~PIPE_WANTR;
900 				wakeup(wpipe);
901 			}
902 
903 			/*
904 			 * don't block on non-blocking I/O
905 			 */
906 			if (fp->f_flag & FNONBLOCK) {
907 				error = EAGAIN;
908 				break;
909 			}
910 
911 			/*
912 			 * We have no more space and have something to offer,
913 			 * wake up select/poll.
914 			 */
915 			pipeselwakeup(wpipe);
916 
917 			wpipe->pipe_state |= PIPE_WANTW;
918 			error = tsleep(wpipe, PCATCH|PNORESCHED, "pipewr", 0);
919 			if (error != 0)
920 				break;
921 			/*
922 			 * If read side wants to go away, we just issue a signal
923 			 * to ourselves.
924 			 */
925 			if (wpipe->pipe_state & PIPE_EOF) {
926 				error = EPIPE;
927 				break;
928 			}
929 		}
930 	}
931 
932 	--wpipe->pipe_busy;
933 
934 	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
935 		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
936 		wakeup(wpipe);
937 	} else if (wpipe->pipe_buffer.cnt > 0) {
938 		/*
939 		 * If we have put any characters in the buffer, we wake up
940 		 * the reader.
941 		 */
942 		if (wpipe->pipe_state & PIPE_WANTR) {
943 			wpipe->pipe_state &= ~PIPE_WANTR;
944 			wakeup(wpipe);
945 		}
946 	}
947 
948 	/*
949 	 * Don't return EPIPE if I/O was successful
950 	 */
951 	if ((wpipe->pipe_buffer.cnt == 0) &&
952 	    (uio->uio_resid == 0) &&
953 	    (error == EPIPE)) {
954 		error = 0;
955 	}
956 
957 	if (error == 0)
958 		vfs_timestamp(&wpipe->pipe_mtime);
959 
960 	/*
961 	 * We have something to offer,
962 	 * wake up select/poll.
963 	 */
964 	if (wpipe->pipe_buffer.cnt)
965 		pipeselwakeup(wpipe);
966 
967 	return (error);
968 }
969 
970 /*
971  * we implement a very minimal set of ioctls for compatibility with sockets.
972  */
973 int
974 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td)
975 {
976 	struct pipe *mpipe = (struct pipe *)fp->f_data;
977 
978 	switch (cmd) {
979 
980 	case FIONBIO:
981 		return (0);
982 
983 	case FIOASYNC:
984 		if (*(int *)data) {
985 			mpipe->pipe_state |= PIPE_ASYNC;
986 		} else {
987 			mpipe->pipe_state &= ~PIPE_ASYNC;
988 		}
989 		return (0);
990 
991 	case FIONREAD:
992 		if (mpipe->pipe_state & PIPE_DIRECTW) {
993 			*(int *)data = mpipe->pipe_map.xio_bytes;
994 		} else {
995 			*(int *)data = mpipe->pipe_buffer.cnt;
996 		}
997 		return (0);
998 
999 	case FIOSETOWN:
1000 		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1001 
1002 	case FIOGETOWN:
1003 		*(int *)data = fgetown(mpipe->pipe_sigio);
1004 		return (0);
1005 
1006 	/* This is deprecated, FIOSETOWN should be used instead. */
1007 	case TIOCSPGRP:
1008 		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1009 
1010 	/* This is deprecated, FIOGETOWN should be used instead. */
1011 	case TIOCGPGRP:
1012 		*(int *)data = -fgetown(mpipe->pipe_sigio);
1013 		return (0);
1014 
1015 	}
1016 	return (ENOTTY);
1017 }
1018 
1019 int
1020 pipe_poll(struct file *fp, int events, struct ucred *cred, struct thread *td)
1021 {
1022 	struct pipe *rpipe = (struct pipe *)fp->f_data;
1023 	struct pipe *wpipe;
1024 	int revents = 0;
1025 
1026 	wpipe = rpipe->pipe_peer;
1027 	if (events & (POLLIN | POLLRDNORM))
1028 		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1029 		    (rpipe->pipe_buffer.cnt > 0) ||
1030 		    (rpipe->pipe_state & PIPE_EOF))
1031 			revents |= events & (POLLIN | POLLRDNORM);
1032 
1033 	if (events & (POLLOUT | POLLWRNORM))
1034 		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1035 		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1036 		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1037 			revents |= events & (POLLOUT | POLLWRNORM);
1038 
1039 	if ((rpipe->pipe_state & PIPE_EOF) ||
1040 	    (wpipe == NULL) ||
1041 	    (wpipe->pipe_state & PIPE_EOF))
1042 		revents |= POLLHUP;
1043 
1044 	if (revents == 0) {
1045 		if (events & (POLLIN | POLLRDNORM)) {
1046 			selrecord(td, &rpipe->pipe_sel);
1047 			rpipe->pipe_state |= PIPE_SEL;
1048 		}
1049 
1050 		if (events & (POLLOUT | POLLWRNORM)) {
1051 			selrecord(td, &wpipe->pipe_sel);
1052 			wpipe->pipe_state |= PIPE_SEL;
1053 		}
1054 	}
1055 
1056 	return (revents);
1057 }
1058 
1059 static int
1060 pipe_stat(struct file *fp, struct stat *ub, struct thread *td)
1061 {
1062 	struct pipe *pipe = (struct pipe *)fp->f_data;
1063 
1064 	bzero((caddr_t)ub, sizeof(*ub));
1065 	ub->st_mode = S_IFIFO;
1066 	ub->st_blksize = pipe->pipe_buffer.size;
1067 	ub->st_size = pipe->pipe_buffer.cnt;
1068 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1069 	ub->st_atimespec = pipe->pipe_atime;
1070 	ub->st_mtimespec = pipe->pipe_mtime;
1071 	ub->st_ctimespec = pipe->pipe_ctime;
1072 	/*
1073 	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1074 	 * st_flags, st_gen.
1075 	 * XXX (st_dev, st_ino) should be unique.
1076 	 */
1077 	return (0);
1078 }
1079 
1080 /* ARGSUSED */
1081 static int
1082 pipe_close(struct file *fp, struct thread *td)
1083 {
1084 	struct pipe *cpipe = (struct pipe *)fp->f_data;
1085 
1086 	fp->f_ops = &badfileops;
1087 	fp->f_data = NULL;
1088 	funsetown(cpipe->pipe_sigio);
1089 	pipeclose(cpipe);
1090 	return (0);
1091 }
1092 
1093 static void
1094 pipe_free_kmem(struct pipe *cpipe)
1095 {
1096 	if (cpipe->pipe_buffer.buffer != NULL) {
1097 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1098 			--pipe_nbig;
1099 		kmem_free(kernel_map,
1100 			(vm_offset_t)cpipe->pipe_buffer.buffer,
1101 			cpipe->pipe_buffer.size);
1102 		cpipe->pipe_buffer.buffer = NULL;
1103 		cpipe->pipe_buffer.object = NULL;
1104 	}
1105 #ifndef PIPE_NODIRECT
1106 	KKASSERT(cpipe->pipe_map.xio_bytes == 0 &&
1107 		cpipe->pipe_map.xio_offset == 0 &&
1108 		cpipe->pipe_map.xio_npages == 0);
1109 #endif
1110 }
1111 
1112 /*
1113  * shutdown the pipe
1114  */
1115 static void
1116 pipeclose(struct pipe *cpipe)
1117 {
1118 	globaldata_t gd;
1119 	struct pipe *ppipe;
1120 
1121 	if (cpipe == NULL)
1122 		return;
1123 
1124 	pipeselwakeup(cpipe);
1125 
1126 	/*
1127 	 * If the other side is blocked, wake it up saying that
1128 	 * we want to close it down.
1129 	 */
1130 	while (cpipe->pipe_busy) {
1131 		wakeup(cpipe);
1132 		cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1133 		tsleep(cpipe, 0, "pipecl", 0);
1134 	}
1135 
1136 	/*
1137 	 * Disconnect from peer
1138 	 */
1139 	if ((ppipe = cpipe->pipe_peer) != NULL) {
1140 		pipeselwakeup(ppipe);
1141 
1142 		ppipe->pipe_state |= PIPE_EOF;
1143 		wakeup(ppipe);
1144 		KNOTE(&ppipe->pipe_sel.si_note, 0);
1145 		ppipe->pipe_peer = NULL;
1146 	}
1147 
1148 	/*
1149 	 * free or cache resources
1150 	 */
1151 	gd = mycpu;
1152 	if (gd->gd_pipeqcount >= pipe_maxcache ||
1153 	    cpipe->pipe_buffer.size != PIPE_SIZE
1154 	) {
1155 		pipe_free_kmem(cpipe);
1156 		free(cpipe, M_PIPE);
1157 	} else {
1158 		KKASSERT(cpipe->pipe_map.xio_npages == 0 &&
1159 			cpipe->pipe_map.xio_bytes == 0 &&
1160 			cpipe->pipe_map.xio_offset == 0);
1161 		cpipe->pipe_state = 0;
1162 		cpipe->pipe_busy = 0;
1163 		cpipe->pipe_peer = gd->gd_pipeq;
1164 		gd->gd_pipeq = cpipe;
1165 		++gd->gd_pipeqcount;
1166 	}
1167 }
1168 
1169 /*ARGSUSED*/
1170 static int
1171 pipe_kqfilter(struct file *fp, struct knote *kn)
1172 {
1173 	struct pipe *cpipe = (struct pipe *)kn->kn_fp->f_data;
1174 
1175 	switch (kn->kn_filter) {
1176 	case EVFILT_READ:
1177 		kn->kn_fop = &pipe_rfiltops;
1178 		break;
1179 	case EVFILT_WRITE:
1180 		kn->kn_fop = &pipe_wfiltops;
1181 		cpipe = cpipe->pipe_peer;
1182 		if (cpipe == NULL)
1183 			/* other end of pipe has been closed */
1184 			return (EPIPE);
1185 		break;
1186 	default:
1187 		return (1);
1188 	}
1189 	kn->kn_hook = (caddr_t)cpipe;
1190 
1191 	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1192 	return (0);
1193 }
1194 
1195 static void
1196 filt_pipedetach(struct knote *kn)
1197 {
1198 	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1199 
1200 	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1201 }
1202 
1203 /*ARGSUSED*/
1204 static int
1205 filt_piperead(struct knote *kn, long hint)
1206 {
1207 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1208 	struct pipe *wpipe = rpipe->pipe_peer;
1209 
1210 	kn->kn_data = rpipe->pipe_buffer.cnt;
1211 	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1212 		kn->kn_data = rpipe->pipe_map.xio_bytes;
1213 
1214 	if ((rpipe->pipe_state & PIPE_EOF) ||
1215 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1216 		kn->kn_flags |= EV_EOF;
1217 		return (1);
1218 	}
1219 	return (kn->kn_data > 0);
1220 }
1221 
1222 /*ARGSUSED*/
1223 static int
1224 filt_pipewrite(struct knote *kn, long hint)
1225 {
1226 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1227 	struct pipe *wpipe = rpipe->pipe_peer;
1228 
1229 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1230 		kn->kn_data = 0;
1231 		kn->kn_flags |= EV_EOF;
1232 		return (1);
1233 	}
1234 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1235 	if (wpipe->pipe_state & PIPE_DIRECTW)
1236 		kn->kn_data = 0;
1237 
1238 	return (kn->kn_data >= PIPE_BUF);
1239 }
1240