xref: /plan9-contrib/sys/src/9/port/qio.c (revision b8a11165c6411897fdf740a1fb2f87ac0998b113)
1 #include	"u.h"
2 #include	"../port/lib.h"
3 #include	"mem.h"
4 #include	"dat.h"
5 #include	"fns.h"
6 #include	"../port/error.h"
7 
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14 static ulong qcopycnt;
15 
16 static int debugging;
17 
18 #define QDEBUG	if(0)
19 
20 /*
21  *  IO queues
22  */
23 typedef struct Queue	Queue;
24 
25 struct Queue
26 {
27 	Lock;
28 
29 	Block*	bfirst;		/* buffer */
30 	Block*	blast;
31 
32 	int	len;		/* bytes allocated to queue */
33 	int	dlen;		/* data bytes in queue */
34 	int	limit;		/* max bytes in queue */
35 	int	inilim;		/* initial limit */
36 	int	state;
37 	int	noblock;	/* true if writes return immediately when q full */
38 	int	eof;		/* number of eofs read by user */
39 
40 	void	(*kick)(void*);	/* restart output */
41 	void	(*bypass)(void*, Block*);	/* bypass queue altogether */
42 	void*	arg;		/* argument to kick */
43 
44 	QLock	rlock;		/* mutex for reading processes */
45 	Rendez	rr;		/* process waiting to read */
46 	QLock	wlock;		/* mutex for writing processes */
47 	Rendez	wr;		/* process waiting to write */
48 
49 	char	err[ERRMAX];
50 };
51 
52 enum
53 {
54 	Maxatomic	= 64*1024,
55 };
56 
57 uint	qiomaxatomic = Maxatomic;
58 
59 void
ixsummary(void)60 ixsummary(void)
61 {
62 	debugging ^= 1;
63 	iallocsummary();
64 	print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
65 		padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
66 	print("consume %lud, produce %lud, qcopy %lud\n",
67 		consumecnt, producecnt, qcopycnt);
68 }
69 
70 /*
71  *  free a list of blocks
72  */
73 void
freeblist(Block * b)74 freeblist(Block *b)
75 {
76 	Block *next;
77 
78 	for(; b != 0; b = next){
79 		next = b->next;
80 		if(b->ref == 1)
81 			b->next = nil;
82 		freeb(b);
83 	}
84 }
85 
86 /*
87  *  pad a block to the front (or the back if size is negative)
88  */
89 Block*
padblock(Block * bp,int size)90 padblock(Block *bp, int size)
91 {
92 	int n;
93 	Block *nbp;
94 
95 	QDEBUG checkb(bp, "padblock 1");
96 	if(size >= 0){
97 		if(bp->rp - bp->base >= size){
98 			bp->rp -= size;
99 			return bp;
100 		}
101 
102 		if(bp->next)
103 			panic("padblock %#p", getcallerpc(&bp));
104 		n = BLEN(bp);
105 		padblockcnt++;
106 		nbp = allocb(size+n);
107 		nbp->rp += size;
108 		nbp->wp = nbp->rp;
109 		memmove(nbp->wp, bp->rp, n);
110 		nbp->wp += n;
111 		freeb(bp);
112 		nbp->rp -= size;
113 	} else {
114 		size = -size;
115 
116 		if(bp->next)
117 			panic("padblock %#p", getcallerpc(&bp));
118 
119 		if(bp->lim - bp->wp >= size)
120 			return bp;
121 
122 		n = BLEN(bp);
123 		padblockcnt++;
124 		nbp = allocb(size+n);
125 		memmove(nbp->wp, bp->rp, n);
126 		nbp->wp += n;
127 		freeb(bp);
128 	}
129 	QDEBUG checkb(nbp, "padblock 1");
130 	return nbp;
131 }
132 
133 /*
134  *  return count of bytes in a string of blocks
135  */
136 int
blocklen(Block * bp)137 blocklen(Block *bp)
138 {
139 	int len;
140 
141 	len = 0;
142 	while(bp) {
143 		len += BLEN(bp);
144 		bp = bp->next;
145 	}
146 	return len;
147 }
148 
149 /*
150  * return count of space in blocks
151  */
152 int
blockalloclen(Block * bp)153 blockalloclen(Block *bp)
154 {
155 	int len;
156 
157 	len = 0;
158 	while(bp) {
159 		len += BALLOC(bp);
160 		bp = bp->next;
161 	}
162 	return len;
163 }
164 
165 /*
166  *  copy the  string of blocks into
167  *  a single block and free the string
168  */
169 Block*
concatblock(Block * bp)170 concatblock(Block *bp)
171 {
172 	int len;
173 	Block *nb, *f;
174 
175 	if(bp->next == 0)
176 		return bp;
177 
178 	nb = allocb(blocklen(bp));
179 	for(f = bp; f; f = f->next) {
180 		len = BLEN(f);
181 		memmove(nb->wp, f->rp, len);
182 		nb->wp += len;
183 	}
184 	concatblockcnt += BLEN(nb);
185 	freeblist(bp);
186 	QDEBUG checkb(nb, "concatblock 1");
187 	return nb;
188 }
189 
190 /*
191  *  make sure the first block has at least n bytes
192  */
193 Block*
pullupblock(Block * bp,int n)194 pullupblock(Block *bp, int n)
195 {
196 	int i;
197 	Block *nbp;
198 
199 	/*
200 	 *  this should almost always be true, it's
201 	 *  just to avoid every caller checking.
202 	 */
203 	if(BLEN(bp) >= n)
204 		return bp;
205 
206 	/*
207 	 *  if not enough room in the first block,
208 	 *  add another to the front of the list.
209 	 */
210 	if(bp->lim - bp->rp < n){
211 		nbp = allocb(n);
212 		nbp->next = bp;
213 		bp = nbp;
214 	}
215 
216 	/*
217 	 *  copy bytes from the trailing blocks into the first
218 	 */
219 	n -= BLEN(bp);
220 	while(nbp = bp->next){
221 		i = BLEN(nbp);
222 		if(i > n) {
223 			memmove(bp->wp, nbp->rp, n);
224 			pullupblockcnt++;
225 			bp->wp += n;
226 			nbp->rp += n;
227 			QDEBUG checkb(bp, "pullupblock 1");
228 			return bp;
229 		} else {
230 			/* shouldn't happen but why crash if it does */
231 			if(i < 0){
232 				print("pullup negative length packet, called from %#p\n",
233 					getcallerpc(&bp));
234 				i = 0;
235 			}
236 			memmove(bp->wp, nbp->rp, i);
237 			pullupblockcnt++;
238 			bp->wp += i;
239 			bp->next = nbp->next;
240 			nbp->next = 0;
241 			freeb(nbp);
242 			n -= i;
243 			if(n == 0){
244 				QDEBUG checkb(bp, "pullupblock 2");
245 				return bp;
246 			}
247 		}
248 	}
249 	freeb(bp);
250 	return 0;
251 }
252 
253 /*
254  *  make sure the first block has at least n bytes
255  */
256 Block*
pullupqueue(Queue * q,int n)257 pullupqueue(Queue *q, int n)
258 {
259 	Block *b;
260 
261 	if(BLEN(q->bfirst) >= n)
262 		return q->bfirst;
263 	q->bfirst = pullupblock(q->bfirst, n);
264 	for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
265 		;
266 	q->blast = b;
267 	return q->bfirst;
268 }
269 
270 /*
271  *  trim to len bytes starting at offset
272  */
273 Block *
trimblock(Block * bp,int offset,int len)274 trimblock(Block *bp, int offset, int len)
275 {
276 	ulong l;
277 	Block *nb, *startb;
278 
279 	QDEBUG checkb(bp, "trimblock 1");
280 	if(blocklen(bp) < offset+len) {
281 		freeblist(bp);
282 		return nil;
283 	}
284 
285 	while((l = BLEN(bp)) < offset) {
286 		offset -= l;
287 		nb = bp->next;
288 		bp->next = nil;
289 		freeb(bp);
290 		bp = nb;
291 	}
292 
293 	startb = bp;
294 	bp->rp += offset;
295 
296 	while((l = BLEN(bp)) < len) {
297 		len -= l;
298 		bp = bp->next;
299 	}
300 
301 	bp->wp -= (BLEN(bp) - len);
302 
303 	if(bp->next) {
304 		freeblist(bp->next);
305 		bp->next = nil;
306 	}
307 
308 	return startb;
309 }
310 
311 /*
312  *  copy 'count' bytes into a new block
313  */
314 Block*
copyblock(Block * bp,int count)315 copyblock(Block *bp, int count)
316 {
317 	int l;
318 	Block *nbp;
319 
320 	QDEBUG checkb(bp, "copyblock 0");
321 	nbp = allocb(count);
322 	for(; count > 0 && bp != 0; bp = bp->next){
323 		l = BLEN(bp);
324 		if(l > count)
325 			l = count;
326 		memmove(nbp->wp, bp->rp, l);
327 		nbp->wp += l;
328 		count -= l;
329 	}
330 	if(count > 0){
331 		memset(nbp->wp, 0, count);
332 		nbp->wp += count;
333 	}
334 	copyblockcnt++;
335 	QDEBUG checkb(nbp, "copyblock 1");
336 
337 	return nbp;
338 }
339 
340 Block*
adjustblock(Block * bp,int len)341 adjustblock(Block* bp, int len)
342 {
343 	int n;
344 	Block *nbp;
345 
346 	if(len < 0){
347 		freeb(bp);
348 		return nil;
349 	}
350 
351 	if(bp->rp+len > bp->lim){
352 		nbp = copyblock(bp, len);
353 		freeblist(bp);
354 		QDEBUG checkb(nbp, "adjustblock 1");
355 
356 		return nbp;
357 	}
358 
359 	n = BLEN(bp);
360 	if(len > n)
361 		memset(bp->wp, 0, len-n);
362 	bp->wp = bp->rp+len;
363 	QDEBUG checkb(bp, "adjustblock 2");
364 
365 	return bp;
366 }
367 
368 
369 /*
370  *  throw away up to count bytes from a
371  *  list of blocks.  Return count of bytes
372  *  thrown away.
373  */
374 int
pullblock(Block ** bph,int count)375 pullblock(Block **bph, int count)
376 {
377 	Block *bp;
378 	int n, bytes;
379 
380 	bytes = 0;
381 	if(bph == nil)
382 		return 0;
383 
384 	while(*bph != nil && count != 0) {
385 		bp = *bph;
386 		n = BLEN(bp);
387 		if(count < n)
388 			n = count;
389 		bytes += n;
390 		count -= n;
391 		bp->rp += n;
392 		QDEBUG checkb(bp, "pullblock ");
393 		if(BLEN(bp) == 0) {
394 			*bph = bp->next;
395 			bp->next = nil;
396 			freeb(bp);
397 		}
398 	}
399 	return bytes;
400 }
401 
402 /*
403  *  get next block from a queue, return null if nothing there
404  */
405 Block*
qget(Queue * q)406 qget(Queue *q)
407 {
408 	int dowakeup;
409 	Block *b;
410 
411 	/* sync with qwrite */
412 	ilock(q);
413 
414 	b = q->bfirst;
415 	if(b == nil){
416 		q->state |= Qstarve;
417 		iunlock(q);
418 		return nil;
419 	}
420 	q->bfirst = b->next;
421 	b->next = 0;
422 	q->len -= BALLOC(b);
423 	q->dlen -= BLEN(b);
424 	QDEBUG checkb(b, "qget");
425 
426 	/* if writer flow controlled, restart */
427 	if((q->state & Qflow) && q->len < q->limit/2){
428 		q->state &= ~Qflow;
429 		dowakeup = 1;
430 	} else
431 		dowakeup = 0;
432 
433 	iunlock(q);
434 
435 	if(dowakeup)
436 		wakeup(&q->wr);
437 
438 	return b;
439 }
440 
441 /*
442  *  throw away the next 'len' bytes in the queue
443  */
444 int
qdiscard(Queue * q,int len)445 qdiscard(Queue *q, int len)
446 {
447 	Block *b;
448 	int dowakeup, n, sofar;
449 
450 	ilock(q);
451 	for(sofar = 0; sofar < len; sofar += n){
452 		b = q->bfirst;
453 		if(b == nil)
454 			break;
455 		QDEBUG checkb(b, "qdiscard");
456 		n = BLEN(b);
457 		if(n <= len - sofar){
458 			q->bfirst = b->next;
459 			b->next = 0;
460 			q->len -= BALLOC(b);
461 			q->dlen -= BLEN(b);
462 			freeb(b);
463 		} else {
464 			n = len - sofar;
465 			b->rp += n;
466 			q->dlen -= n;
467 		}
468 	}
469 
470 	/*
471 	 *  if writer flow controlled, restart
472 	 *
473 	 *  This used to be
474 	 *	q->len < q->limit/2
475 	 *  but it slows down tcp too much for certain write sizes.
476 	 *  I really don't understand it completely.  It may be
477 	 *  due to the queue draining so fast that the transmission
478 	 *  stalls waiting for the app to produce more data.  - presotto
479 	 */
480 	if((q->state & Qflow) && q->len < q->limit){
481 		q->state &= ~Qflow;
482 		dowakeup = 1;
483 	} else
484 		dowakeup = 0;
485 
486 	iunlock(q);
487 
488 	if(dowakeup)
489 		wakeup(&q->wr);
490 
491 	return sofar;
492 }
493 
494 /*
495  *  Interrupt level copy out of a queue, return # bytes copied.
496  */
497 int
qconsume(Queue * q,void * vp,int len)498 qconsume(Queue *q, void *vp, int len)
499 {
500 	Block *b;
501 	int n, dowakeup;
502 	uchar *p = vp;
503 	Block *tofree = nil;
504 
505 	/* sync with qwrite */
506 	ilock(q);
507 
508 	for(;;) {
509 		b = q->bfirst;
510 		if(b == 0){
511 			q->state |= Qstarve;
512 			iunlock(q);
513 			return -1;
514 		}
515 		QDEBUG checkb(b, "qconsume 1");
516 
517 		n = BLEN(b);
518 		if(n > 0)
519 			break;
520 		q->bfirst = b->next;
521 		q->len -= BALLOC(b);
522 
523 		/* remember to free this */
524 		b->next = tofree;
525 		tofree = b;
526 	};
527 
528 	if(n < len)
529 		len = n;
530 	memmove(p, b->rp, len);
531 	consumecnt += n;
532 	b->rp += len;
533 	q->dlen -= len;
534 
535 	/* discard the block if we're done with it */
536 	if((q->state & Qmsg) || len == n){
537 		q->bfirst = b->next;
538 		b->next = 0;
539 		q->len -= BALLOC(b);
540 		q->dlen -= BLEN(b);
541 
542 		/* remember to free this */
543 		b->next = tofree;
544 		tofree = b;
545 	}
546 
547 	/* if writer flow controlled, restart */
548 	if((q->state & Qflow) && q->len < q->limit/2){
549 		q->state &= ~Qflow;
550 		dowakeup = 1;
551 	} else
552 		dowakeup = 0;
553 
554 	iunlock(q);
555 
556 	if(dowakeup)
557 		wakeup(&q->wr);
558 
559 	if(tofree != nil)
560 		freeblist(tofree);
561 
562 	return len;
563 }
564 
565 int
qpass(Queue * q,Block * b)566 qpass(Queue *q, Block *b)
567 {
568 	int dlen, len, dowakeup;
569 
570 	/* sync with qread */
571 	dowakeup = 0;
572 	ilock(q);
573 	if(q->len >= q->limit){
574 		freeblist(b);
575 		iunlock(q);
576 		return -1;
577 	}
578 	if(q->state & Qclosed){
579 		len = BALLOC(b);
580 		freeblist(b);
581 		iunlock(q);
582 		return len;
583 	}
584 
585 	/* add buffer to queue */
586 	if(q->bfirst)
587 		q->blast->next = b;
588 	else
589 		q->bfirst = b;
590 	len = BALLOC(b);
591 	dlen = BLEN(b);
592 	QDEBUG checkb(b, "qpass");
593 	while(b->next){
594 		b = b->next;
595 		QDEBUG checkb(b, "qpass");
596 		len += BALLOC(b);
597 		dlen += BLEN(b);
598 	}
599 	q->blast = b;
600 	q->len += len;
601 	q->dlen += dlen;
602 
603 	if(q->len >= q->limit/2)
604 		q->state |= Qflow;
605 
606 	if(q->state & Qstarve){
607 		q->state &= ~Qstarve;
608 		dowakeup = 1;
609 	}
610 	iunlock(q);
611 
612 	if(dowakeup)
613 		wakeup(&q->rr);
614 
615 	return len;
616 }
617 
618 int
qpassnolim(Queue * q,Block * b)619 qpassnolim(Queue *q, Block *b)
620 {
621 	int dlen, len, dowakeup;
622 
623 	/* sync with qread */
624 	dowakeup = 0;
625 	ilock(q);
626 
627 	if(q->state & Qclosed){
628 		freeblist(b);
629 		iunlock(q);
630 		return BALLOC(b);
631 	}
632 
633 	/* add buffer to queue */
634 	if(q->bfirst)
635 		q->blast->next = b;
636 	else
637 		q->bfirst = b;
638 	len = BALLOC(b);
639 	dlen = BLEN(b);
640 	QDEBUG checkb(b, "qpass");
641 	while(b->next){
642 		b = b->next;
643 		QDEBUG checkb(b, "qpass");
644 		len += BALLOC(b);
645 		dlen += BLEN(b);
646 	}
647 	q->blast = b;
648 	q->len += len;
649 	q->dlen += dlen;
650 
651 	if(q->len >= q->limit/2)
652 		q->state |= Qflow;
653 
654 	if(q->state & Qstarve){
655 		q->state &= ~Qstarve;
656 		dowakeup = 1;
657 	}
658 	iunlock(q);
659 
660 	if(dowakeup)
661 		wakeup(&q->rr);
662 
663 	return len;
664 }
665 
666 /*
667  *  if the allocated space is way out of line with the used
668  *  space, reallocate to a smaller block
669  */
670 Block*
packblock(Block * bp)671 packblock(Block *bp)
672 {
673 	Block **l, *nbp;
674 	int n;
675 
676 	for(l = &bp; *l; l = &(*l)->next){
677 		nbp = *l;
678 		n = BLEN(nbp);
679 		if((n<<2) < BALLOC(nbp)){
680 			*l = allocb(n);
681 			memmove((*l)->wp, nbp->rp, n);
682 			(*l)->wp += n;
683 			(*l)->next = nbp->next;
684 			freeb(nbp);
685 		}
686 	}
687 
688 	return bp;
689 }
690 
691 int
qproduce(Queue * q,void * vp,int len)692 qproduce(Queue *q, void *vp, int len)
693 {
694 	Block *b;
695 	int dowakeup;
696 	uchar *p = vp;
697 
698 	/* sync with qread */
699 	dowakeup = 0;
700 	ilock(q);
701 
702 	/* no waiting receivers, room in buffer? */
703 	if(q->len >= q->limit){
704 		q->state |= Qflow;
705 		iunlock(q);
706 		return -1;
707 	}
708 
709 	/* save in buffer */
710 	b = iallocb(len);
711 	if(b == 0){
712 		iunlock(q);
713 		return 0;
714 	}
715 	memmove(b->wp, p, len);
716 	producecnt += len;
717 	b->wp += len;
718 	if(q->bfirst)
719 		q->blast->next = b;
720 	else
721 		q->bfirst = b;
722 	q->blast = b;
723 	/* b->next = 0; done by iallocb() */
724 	q->len += BALLOC(b);
725 	q->dlen += BLEN(b);
726 	QDEBUG checkb(b, "qproduce");
727 
728 	if(q->state & Qstarve){
729 		q->state &= ~Qstarve;
730 		dowakeup = 1;
731 	}
732 
733 	if(q->len >= q->limit)
734 		q->state |= Qflow;
735 	iunlock(q);
736 
737 	if(dowakeup)
738 		wakeup(&q->rr);
739 
740 	return len;
741 }
742 
743 /*
744  *  copy from offset in the queue
745  */
746 Block*
qcopy(Queue * q,int len,ulong offset)747 qcopy(Queue *q, int len, ulong offset)
748 {
749 	int sofar;
750 	int n;
751 	Block *b, *nb;
752 	uchar *p;
753 
754 	nb = allocb(len);
755 
756 	ilock(q);
757 
758 	/* go to offset */
759 	b = q->bfirst;
760 	for(sofar = 0; ; sofar += n){
761 		if(b == nil){
762 			iunlock(q);
763 			return nb;
764 		}
765 		n = BLEN(b);
766 		if(sofar + n > offset){
767 			p = b->rp + offset - sofar;
768 			n -= offset - sofar;
769 			break;
770 		}
771 		QDEBUG checkb(b, "qcopy");
772 		b = b->next;
773 	}
774 
775 	/* copy bytes from there */
776 	for(sofar = 0; sofar < len;){
777 		if(n > len - sofar)
778 			n = len - sofar;
779 		memmove(nb->wp, p, n);
780 		qcopycnt += n;
781 		sofar += n;
782 		nb->wp += n;
783 		b = b->next;
784 		if(b == nil)
785 			break;
786 		n = BLEN(b);
787 		p = b->rp;
788 	}
789 	iunlock(q);
790 
791 	return nb;
792 }
793 
794 /*
795  *  called by non-interrupt code
796  */
797 Queue*
qopen(int limit,int msg,void (* kick)(void *),void * arg)798 qopen(int limit, int msg, void (*kick)(void*), void *arg)
799 {
800 	Queue *q;
801 
802 	q = malloc(sizeof(Queue));
803 	if(q == 0)
804 		return 0;
805 
806 	q->limit = q->inilim = limit;
807 	q->kick = kick;
808 	q->arg = arg;
809 	q->state = msg;
810 
811 	q->state |= Qstarve;
812 	q->eof = 0;
813 	q->noblock = 0;
814 
815 	return q;
816 }
817 
818 /* open a queue to be bypassed */
819 Queue*
qbypass(void (* bypass)(void *,Block *),void * arg)820 qbypass(void (*bypass)(void*, Block*), void *arg)
821 {
822 	Queue *q;
823 
824 	q = malloc(sizeof(Queue));
825 	if(q == 0)
826 		return 0;
827 
828 	q->limit = 0;
829 	q->arg = arg;
830 	q->bypass = bypass;
831 	q->state = 0;
832 
833 	return q;
834 }
835 
836 static int
notempty(void * a)837 notempty(void *a)
838 {
839 	Queue *q = a;
840 
841 	return (q->state & Qclosed) || q->bfirst != 0;
842 }
843 
844 /*
845  *  wait for the queue to be non-empty or closed.
846  *  called with q ilocked.
847  */
848 static int
qwait(Queue * q)849 qwait(Queue *q)
850 {
851 	/* wait for data */
852 	for(;;){
853 		if(q->bfirst != nil)
854 			break;
855 
856 		if(q->state & Qclosed){
857 			if(++q->eof > 3)
858 				return -1;
859 			if(*q->err && strcmp(q->err, Ehungup) != 0)
860 				return -1;
861 			return 0;
862 		}
863 
864 		q->state |= Qstarve;	/* flag requesting producer to wake me */
865 		iunlock(q);
866 		sleep(&q->rr, notempty, q);
867 		ilock(q);
868 	}
869 	return 1;
870 }
871 
872 /*
873  * add a block list to a queue
874  */
875 void
qaddlist(Queue * q,Block * b)876 qaddlist(Queue *q, Block *b)
877 {
878 	/* queue the block */
879 	if(q->bfirst)
880 		q->blast->next = b;
881 	else
882 		q->bfirst = b;
883 	q->len += blockalloclen(b);
884 	q->dlen += blocklen(b);
885 	while(b->next)
886 		b = b->next;
887 	q->blast = b;
888 }
889 
890 /*
891  *  called with q ilocked
892  */
893 Block*
qremove(Queue * q)894 qremove(Queue *q)
895 {
896 	Block *b;
897 
898 	b = q->bfirst;
899 	if(b == nil)
900 		return nil;
901 	q->bfirst = b->next;
902 	b->next = nil;
903 	q->dlen -= BLEN(b);
904 	q->len -= BALLOC(b);
905 	QDEBUG checkb(b, "qremove");
906 	return b;
907 }
908 
909 /*
910  *  copy the contents of a string of blocks into
911  *  memory.  emptied blocks are freed.  return
912  *  pointer to first unconsumed block.
913  */
914 Block*
bl2mem(uchar * p,Block * b,int n)915 bl2mem(uchar *p, Block *b, int n)
916 {
917 	int i;
918 	Block *next;
919 
920 	for(; b != nil; b = next){
921 		i = BLEN(b);
922 		if(i > n){
923 			memmove(p, b->rp, n);
924 			b->rp += n;
925 			return b;
926 		}
927 		memmove(p, b->rp, i);
928 		n -= i;
929 		p += i;
930 		b->rp += i;
931 		next = b->next;
932 		freeb(b);
933 	}
934 	return nil;
935 }
936 
937 /*
938  *  copy the contents of memory into a string of blocks.
939  *  return nil on error.
940  */
941 Block*
mem2bl(uchar * p,int len)942 mem2bl(uchar *p, int len)
943 {
944 	int n;
945 	Block *b, *first, **l;
946 
947 	first = nil;
948 	l = &first;
949 	if(waserror()){
950 		freeblist(first);
951 		nexterror();
952 	}
953 	do {
954 		n = len;
955 		if(n > Maxatomic)
956 			n = Maxatomic;
957 
958 		*l = b = allocb(n);
959 		setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
960 		memmove(b->wp, p, n);
961 		b->wp += n;
962 		p += n;
963 		len -= n;
964 		l = &b->next;
965 	} while(len > 0);
966 	poperror();
967 
968 	return first;
969 }
970 
971 /*
972  *  put a block back to the front of the queue
973  *  called with q ilocked
974  */
975 void
qputback(Queue * q,Block * b)976 qputback(Queue *q, Block *b)
977 {
978 	b->next = q->bfirst;
979 	if(q->bfirst == nil)
980 		q->blast = b;
981 	q->bfirst = b;
982 	q->len += BALLOC(b);
983 	q->dlen += BLEN(b);
984 }
985 
986 /*
987  *  flow control, get producer going again
988  *  called with q ilocked
989  */
990 static void
qwakeup_iunlock(Queue * q)991 qwakeup_iunlock(Queue *q)
992 {
993 	int dowakeup = 0;
994 
995 	/* if writer flow controlled, restart */
996 	if((q->state & Qflow) && q->len < q->limit/2){
997 		q->state &= ~Qflow;
998 		dowakeup = 1;
999 	}
1000 
1001 	iunlock(q);
1002 
1003 	/* wakeup flow controlled writers */
1004 	if(dowakeup){
1005 		if(q->kick)
1006 			q->kick(q->arg);
1007 		wakeup(&q->wr);
1008 	}
1009 }
1010 
1011 /*
1012  *  get next block from a queue (up to a limit)
1013  */
1014 Block*
qbread(Queue * q,int len)1015 qbread(Queue *q, int len)
1016 {
1017 	Block *b, *nb;
1018 	int n;
1019 
1020 	qlock(&q->rlock);
1021 	if(waserror()){
1022 		qunlock(&q->rlock);
1023 		nexterror();
1024 	}
1025 
1026 	ilock(q);
1027 	switch(qwait(q)){
1028 	case 0:
1029 		/* queue closed */
1030 		iunlock(q);
1031 		qunlock(&q->rlock);
1032 		poperror();
1033 		return nil;
1034 	case -1:
1035 		/* multiple reads on a closed queue */
1036 		iunlock(q);
1037 		error(q->err);
1038 	}
1039 
1040 	/* if we get here, there's at least one block in the queue */
1041 	b = qremove(q);
1042 	n = BLEN(b);
1043 
1044 	/* split block if it's too big and this is not a message queue */
1045 	nb = b;
1046 	if(n > len){
1047 		if((q->state&Qmsg) == 0){
1048 			n -= len;
1049 			b = allocb(n);
1050 			memmove(b->wp, nb->rp+len, n);
1051 			b->wp += n;
1052 			qputback(q, b);
1053 		}
1054 		nb->wp = nb->rp + len;
1055 	}
1056 
1057 	/* restart producer */
1058 	qwakeup_iunlock(q);
1059 
1060 	poperror();
1061 	qunlock(&q->rlock);
1062 	return nb;
1063 }
1064 
1065 /*
1066  *  read a queue.  if no data is queued, post a Block
1067  *  and wait on its Rendez.
1068  */
1069 long
qread(Queue * q,void * vp,int len)1070 qread(Queue *q, void *vp, int len)
1071 {
1072 	Block *b, *first, **l;
1073 	int m, n;
1074 
1075 	qlock(&q->rlock);
1076 	if(waserror()){
1077 		qunlock(&q->rlock);
1078 		nexterror();
1079 	}
1080 
1081 	ilock(q);
1082 again:
1083 	switch(qwait(q)){
1084 	case 0:
1085 		/* queue closed */
1086 		iunlock(q);
1087 		qunlock(&q->rlock);
1088 		poperror();
1089 		return 0;
1090 	case -1:
1091 		/* multiple reads on a closed queue */
1092 		iunlock(q);
1093 		error(q->err);
1094 	}
1095 
1096 	/* if we get here, there's at least one block in the queue */
1097 	if(q->state & Qcoalesce){
1098 		/* when coalescing, 0 length blocks just go away */
1099 		b = q->bfirst;
1100 		if(BLEN(b) <= 0){
1101 			freeb(qremove(q));
1102 			goto again;
1103 		}
1104 
1105 		/*  grab the first block plus as many
1106 		 *  following blocks as will completely
1107 		 *  fit in the read.
1108 		 */
1109 		n = 0;
1110 		l = &first;
1111 		m = BLEN(b);
1112 		for(;;) {
1113 			*l = qremove(q);
1114 			l = &b->next;
1115 			n += m;
1116 
1117 			b = q->bfirst;
1118 			if(b == nil)
1119 				break;
1120 			m = BLEN(b);
1121 			if(n+m > len)
1122 				break;
1123 		}
1124 	} else {
1125 		first = qremove(q);
1126 		n = BLEN(first);
1127 	}
1128 
1129 	/* copy to user space outside of the ilock */
1130 	iunlock(q);
1131 	b = bl2mem(vp, first, len);
1132 	ilock(q);
1133 
1134 	/* take care of any left over partial block */
1135 	if(b != nil){
1136 		n -= BLEN(b);
1137 		if(q->state & Qmsg)
1138 			freeb(b);
1139 		else
1140 			qputback(q, b);
1141 	}
1142 
1143 	/* restart producer */
1144 	qwakeup_iunlock(q);
1145 
1146 	poperror();
1147 	qunlock(&q->rlock);
1148 	return n;
1149 }
1150 
1151 static int
qnotfull(void * a)1152 qnotfull(void *a)
1153 {
1154 	Queue *q = a;
1155 
1156 	return q->len < q->limit || (q->state & Qclosed);
1157 }
1158 
1159 ulong noblockcnt;
1160 
1161 /*
1162  *  add a block to a queue obeying flow control
1163  */
1164 long
qbwrite(Queue * q,Block * b)1165 qbwrite(Queue *q, Block *b)
1166 {
1167 	int n, dowakeup;
1168 	Proc *p;
1169 
1170 	n = BLEN(b);
1171 
1172 	if(q->bypass){
1173 		(*q->bypass)(q->arg, b);
1174 		return n;
1175 	}
1176 
1177 	dowakeup = 0;
1178 	qlock(&q->wlock);
1179 	if(waserror()){
1180 		if(b != nil)
1181 			freeb(b);
1182 		qunlock(&q->wlock);
1183 		nexterror();
1184 	}
1185 
1186 	ilock(q);
1187 
1188 	/* give up if the queue is closed */
1189 	if(q->state & Qclosed){
1190 		iunlock(q);
1191 		error(q->err);
1192 	}
1193 
1194 	/* if nonblocking, don't queue over the limit */
1195 	if(q->len >= q->limit){
1196 		if(q->noblock){
1197 			iunlock(q);
1198 			freeb(b);
1199 			noblockcnt += n;
1200 			qunlock(&q->wlock);
1201 			poperror();
1202 			return n;
1203 		}
1204 	}
1205 
1206 	/* queue the block */
1207 	if(q->bfirst)
1208 		q->blast->next = b;
1209 	else
1210 		q->bfirst = b;
1211 	q->blast = b;
1212 	b->next = 0;
1213 	q->len += BALLOC(b);
1214 	q->dlen += n;
1215 	QDEBUG checkb(b, "qbwrite");
1216 	b = nil;
1217 
1218 	/* make sure other end gets awakened */
1219 	if(q->state & Qstarve){
1220 		q->state &= ~Qstarve;
1221 		dowakeup = 1;
1222 	}
1223 	iunlock(q);
1224 
1225 	/*  get output going again */
1226 	if(q->kick && (dowakeup || (q->state&Qkick)))
1227 		q->kick(q->arg);
1228 
1229 	/* wakeup anyone consuming at the other end */
1230 	if(dowakeup){
1231 		p = wakeup(&q->rr);
1232 
1233 		/* if we just wokeup a higher priority process, let it run */
1234 		if(p != nil && p->priority > up->priority)
1235 			sched();
1236 	}
1237 
1238 	/*
1239 	 *  flow control, wait for queue to get below the limit
1240 	 *  before allowing the process to continue and queue
1241 	 *  more.  We do this here so that postnote can only
1242 	 *  interrupt us after the data has been queued.  This
1243 	 *  means that things like 9p flushes and ssl messages
1244 	 *  will not be disrupted by software interrupts.
1245 	 *
1246 	 *  Note - this is moderately dangerous since a process
1247 	 *  that keeps getting interrupted and rewriting will
1248 	 *  queue infinite crud.
1249 	 */
1250 	for(;;){
1251 		if(q->noblock || qnotfull(q))
1252 			break;
1253 
1254 		ilock(q);
1255 		q->state |= Qflow;
1256 		iunlock(q);
1257 		sleep(&q->wr, qnotfull, q);
1258 	}
1259 	USED(b);
1260 
1261 	qunlock(&q->wlock);
1262 	poperror();
1263 	return n;
1264 }
1265 
1266 /*
1267  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1268  */
1269 int
qwrite(Queue * q,void * vp,int len)1270 qwrite(Queue *q, void *vp, int len)
1271 {
1272 	int n, sofar;
1273 	Block *b;
1274 	uchar *p = vp;
1275 
1276 	QDEBUG if(!islo())
1277 		print("qwrite hi %#p\n", getcallerpc(&q));
1278 
1279 	sofar = 0;
1280 	do {
1281 		n = len-sofar;
1282 		if(n > Maxatomic)
1283 			n = Maxatomic;
1284 
1285 		b = allocb(n);
1286 		setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
1287 		if(waserror()){
1288 			freeb(b);
1289 			nexterror();
1290 		}
1291 		memmove(b->wp, p+sofar, n);
1292 		poperror();
1293 		b->wp += n;
1294 
1295 		qbwrite(q, b);
1296 
1297 		sofar += n;
1298 	} while(sofar < len && (q->state & Qmsg) == 0);
1299 
1300 	return len;
1301 }
1302 
1303 /*
1304  *  used by print() to write to a queue.  Since we may be splhi or not in
1305  *  a process, don't qlock.
1306  *
1307  *  this routine merges adjacent blocks if block n+1 will fit into
1308  *  the free space of block n.
1309  */
1310 int
qiwrite(Queue * q,void * vp,int len)1311 qiwrite(Queue *q, void *vp, int len)
1312 {
1313 	int n, sofar, dowakeup;
1314 	Block *b;
1315 	uchar *p = vp;
1316 
1317 	dowakeup = 0;
1318 
1319 	sofar = 0;
1320 	do {
1321 		n = len-sofar;
1322 		if(n > Maxatomic)
1323 			n = Maxatomic;
1324 
1325 		b = iallocb(n);
1326 		if(b == nil)
1327 			break;
1328 		memmove(b->wp, p+sofar, n);
1329 		b->wp += n;
1330 
1331 		ilock(q);
1332 
1333 		/* we use an artificially high limit for kernel prints since anything
1334 		 * over the limit gets dropped
1335 		 */
1336 		if(q->dlen >= 16*1024){
1337 			iunlock(q);
1338 			freeb(b);
1339 			break;
1340 		}
1341 
1342 		QDEBUG checkb(b, "qiwrite");
1343 		if(q->bfirst)
1344 			q->blast->next = b;
1345 		else
1346 			q->bfirst = b;
1347 		q->blast = b;
1348 		q->len += BALLOC(b);
1349 		q->dlen += n;
1350 
1351 		if(q->state & Qstarve){
1352 			q->state &= ~Qstarve;
1353 			dowakeup = 1;
1354 		}
1355 
1356 		iunlock(q);
1357 
1358 		if(dowakeup){
1359 			if(q->kick)
1360 				q->kick(q->arg);
1361 			wakeup(&q->rr);
1362 		}
1363 
1364 		sofar += n;
1365 	} while(sofar < len && (q->state & Qmsg) == 0);
1366 
1367 	return sofar;
1368 }
1369 
1370 /*
1371  *  be extremely careful when calling this,
1372  *  as there is no reference accounting
1373  */
1374 void
qfree(Queue * q)1375 qfree(Queue *q)
1376 {
1377 	qclose(q);
1378 	free(q);
1379 }
1380 
1381 /*
1382  *  Mark a queue as closed.  No further IO is permitted.
1383  *  All blocks are released.
1384  */
1385 void
qclose(Queue * q)1386 qclose(Queue *q)
1387 {
1388 	Block *bfirst;
1389 
1390 	if(q == nil)
1391 		return;
1392 
1393 	/* mark it */
1394 	ilock(q);
1395 	q->state |= Qclosed;
1396 	q->state &= ~(Qflow|Qstarve);
1397 	strcpy(q->err, Ehungup);
1398 	bfirst = q->bfirst;
1399 	q->bfirst = 0;
1400 	q->len = 0;
1401 	q->dlen = 0;
1402 	q->noblock = 0;
1403 	iunlock(q);
1404 
1405 	/* free queued blocks */
1406 	freeblist(bfirst);
1407 
1408 	/* wake up readers/writers */
1409 	wakeup(&q->rr);
1410 	wakeup(&q->wr);
1411 }
1412 
1413 /*
1414  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1415  *  blocks.
1416  */
1417 void
qhangup(Queue * q,char * msg)1418 qhangup(Queue *q, char *msg)
1419 {
1420 	/* mark it */
1421 	ilock(q);
1422 	q->state |= Qclosed;
1423 	if(msg == 0 || *msg == 0)
1424 		strcpy(q->err, Ehungup);
1425 	else
1426 		strncpy(q->err, msg, ERRMAX-1);
1427 	iunlock(q);
1428 
1429 	/* wake up readers/writers */
1430 	wakeup(&q->rr);
1431 	wakeup(&q->wr);
1432 }
1433 
1434 /*
1435  *  return non-zero if the q is hungup
1436  */
1437 int
qisclosed(Queue * q)1438 qisclosed(Queue *q)
1439 {
1440 	return q->state & Qclosed;
1441 }
1442 
1443 /*
1444  *  mark a queue as no longer hung up
1445  */
1446 void
qreopen(Queue * q)1447 qreopen(Queue *q)
1448 {
1449 	ilock(q);
1450 	q->state &= ~Qclosed;
1451 	q->state |= Qstarve;
1452 	q->eof = 0;
1453 	q->limit = q->inilim;
1454 	iunlock(q);
1455 }
1456 
1457 /*
1458  *  return bytes queued
1459  */
1460 int
qlen(Queue * q)1461 qlen(Queue *q)
1462 {
1463 	return q->dlen;
1464 }
1465 
1466 /*
1467  * return space remaining before flow control
1468  */
1469 int
qwindow(Queue * q)1470 qwindow(Queue *q)
1471 {
1472 	int l;
1473 
1474 	l = q->limit - q->len;
1475 	if(l < 0)
1476 		l = 0;
1477 	return l;
1478 }
1479 
1480 /*
1481  *  return true if we can read without blocking
1482  */
1483 int
qcanread(Queue * q)1484 qcanread(Queue *q)
1485 {
1486 	return q->bfirst!=0;
1487 }
1488 
1489 /*
1490  *  change queue limit
1491  */
1492 void
qsetlimit(Queue * q,int limit)1493 qsetlimit(Queue *q, int limit)
1494 {
1495 	q->limit = limit;
1496 }
1497 
1498 /*
1499  *  set blocking/nonblocking
1500  */
1501 void
qnoblock(Queue * q,int onoff)1502 qnoblock(Queue *q, int onoff)
1503 {
1504 	q->noblock = onoff;
1505 }
1506 
1507 /*
1508  *  flush the output queue
1509  */
1510 void
qflush(Queue * q)1511 qflush(Queue *q)
1512 {
1513 	Block *bfirst;
1514 
1515 	/* mark it */
1516 	ilock(q);
1517 	bfirst = q->bfirst;
1518 	q->bfirst = 0;
1519 	q->len = 0;
1520 	q->dlen = 0;
1521 	iunlock(q);
1522 
1523 	/* free queued blocks */
1524 	freeblist(bfirst);
1525 
1526 	/* wake up readers/writers */
1527 	wakeup(&q->wr);
1528 }
1529 
1530 int
qfull(Queue * q)1531 qfull(Queue *q)
1532 {
1533 	return q->state & Qflow;
1534 }
1535 
1536 int
qstate(Queue * q)1537 qstate(Queue *q)
1538 {
1539 	return q->state;
1540 }
1541