xref: /plan9-contrib/sys/src/9k/port/qio.c (revision bccccb4a699372f1cf0a2a0674bef2defa0f4684)
1 #include	"u.h"
2 #include	"../port/lib.h"
3 #include	"mem.h"
4 #include	"dat.h"
5 #include	"fns.h"
6 #include	"../port/error.h"
7 
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14 static ulong qcopycnt;
15 
16 static int debugging;
17 
18 #define QDEBUG	if(0)
19 
20 /*
21  *  IO queues
22  */
23 typedef struct Queue	Queue;
24 
25 struct Queue
26 {
27 	Lock;
28 
29 	Block*	bfirst;		/* buffer */
30 	Block*	blast;
31 
32 	int	len;		/* bytes allocated to queue */
33 	int	dlen;		/* data bytes in queue */
34 	int	limit;		/* max bytes in queue */
35 	int	inilim;		/* initial limit */
36 	int	state;
37 	int	noblock;	/* true if writes return immediately when q full */
38 	int	eof;		/* number of eofs read by user */
39 
40 	void	(*kick)(void*);	/* restart output */
41 	void	(*bypass)(void*, Block*);	/* bypass queue altogether */
42 	void*	arg;		/* argument to kick */
43 
44 	QLock	rlock;		/* mutex for reading processes */
45 	Rendez	rr;		/* process waiting to read */
46 	QLock	wlock;		/* mutex for writing processes */
47 	Rendez	wr;		/* process waiting to write */
48 
49 	char	err[ERRMAX];
50 };
51 
52 enum
53 {
54 	Maxatomic	= 64*1024,
55 };
56 
57 uint	qiomaxatomic = Maxatomic;
58 
59 void
ixsummary(void)60 ixsummary(void)
61 {
62 	debugging ^= 1;
63 	iallocsummary();
64 	print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
65 		padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
66 	print("consume %lud, produce %lud, qcopy %lud\n",
67 		consumecnt, producecnt, qcopycnt);
68 }
69 
70 /*
71  *  free a list of blocks
72  */
73 void
freeblist(Block * b)74 freeblist(Block *b)
75 {
76 	Block *next;
77 
78 	for(; b != 0; b = next){
79 		next = b->next;
80 		b->next = 0;
81 		freeb(b);
82 	}
83 }
84 
85 /*
86  *  pad a block to the front (or the back if size is negative)
87  */
88 Block*
padblock(Block * bp,int size)89 padblock(Block *bp, int size)
90 {
91 	int n;
92 	Block *nbp;
93 
94 	QDEBUG checkb(bp, "padblock 1");
95 	if(size >= 0){
96 		if(bp->rp - bp->base >= size){
97 			bp->rp -= size;
98 			return bp;
99 		}
100 
101 		if(bp->next)
102 			panic("padblock %#p", getcallerpc(&bp));
103 		n = BLEN(bp);
104 		padblockcnt++;
105 		nbp = allocb(size+n);
106 		nbp->rp += size;
107 		nbp->wp = nbp->rp;
108 		memmove(nbp->wp, bp->rp, n);
109 		nbp->wp += n;
110 		freeb(bp);
111 		nbp->rp -= size;
112 	} else {
113 		size = -size;
114 
115 		if(bp->next)
116 			panic("padblock %#p", getcallerpc(&bp));
117 
118 		if(bp->lim - bp->wp >= size)
119 			return bp;
120 
121 		n = BLEN(bp);
122 		padblockcnt++;
123 		nbp = allocb(size+n);
124 		memmove(nbp->wp, bp->rp, n);
125 		nbp->wp += n;
126 		freeb(bp);
127 	}
128 	QDEBUG checkb(nbp, "padblock 1");
129 	return nbp;
130 }
131 
132 /*
133  *  return count of bytes in a string of blocks
134  */
135 int
blocklen(Block * bp)136 blocklen(Block *bp)
137 {
138 	int len;
139 
140 	len = 0;
141 	while(bp) {
142 		len += BLEN(bp);
143 		bp = bp->next;
144 	}
145 	return len;
146 }
147 
148 /*
149  * return count of space in blocks
150  */
151 int
blockalloclen(Block * bp)152 blockalloclen(Block *bp)
153 {
154 	int len;
155 
156 	len = 0;
157 	while(bp) {
158 		len += BALLOC(bp);
159 		bp = bp->next;
160 	}
161 	return len;
162 }
163 
164 /*
165  *  copy the  string of blocks into
166  *  a single block and free the string
167  */
168 Block*
concatblock(Block * bp)169 concatblock(Block *bp)
170 {
171 	int len;
172 	Block *nb, *f;
173 
174 	if(bp->next == 0)
175 		return bp;
176 
177 	nb = allocb(blocklen(bp));
178 	for(f = bp; f; f = f->next) {
179 		len = BLEN(f);
180 		memmove(nb->wp, f->rp, len);
181 		nb->wp += len;
182 	}
183 	concatblockcnt += BLEN(nb);
184 	freeblist(bp);
185 	QDEBUG checkb(nb, "concatblock 1");
186 	return nb;
187 }
188 
189 /*
190  *  make sure the first block has at least n bytes
191  */
192 Block*
pullupblock(Block * bp,int n)193 pullupblock(Block *bp, int n)
194 {
195 	int i;
196 	Block *nbp;
197 
198 	/*
199 	 *  this should almost always be true, it's
200 	 *  just to avoid every caller checking.
201 	 */
202 	if(BLEN(bp) >= n)
203 		return bp;
204 
205 	/*
206 	 *  if not enough room in the first block,
207 	 *  add another to the front of the list.
208 	 */
209 	if(bp->lim - bp->rp < n){
210 		nbp = allocb(n);
211 		nbp->next = bp;
212 		bp = nbp;
213 	}
214 
215 	/*
216 	 *  copy bytes from the trailing blocks into the first
217 	 */
218 	n -= BLEN(bp);
219 	while(nbp = bp->next){
220 		i = BLEN(nbp);
221 		if(i > n) {
222 			memmove(bp->wp, nbp->rp, n);
223 			pullupblockcnt++;
224 			bp->wp += n;
225 			nbp->rp += n;
226 			QDEBUG checkb(bp, "pullupblock 1");
227 			return bp;
228 		} else {
229 			/* shouldn't happen but why crash if it does */
230 			if(i < 0){
231 				print("pullupblock -ve length, from %#p\n",
232 					getcallerpc(&bp));
233 				i = 0;
234 			}
235 			memmove(bp->wp, nbp->rp, i);
236 			pullupblockcnt++;
237 			bp->wp += i;
238 			bp->next = nbp->next;
239 			nbp->next = 0;
240 			freeb(nbp);
241 			n -= i;
242 			if(n == 0){
243 				QDEBUG checkb(bp, "pullupblock 2");
244 				return bp;
245 			}
246 		}
247 	}
248 	freeb(bp);
249 	return 0;
250 }
251 
252 /*
253  *  make sure the first block has at least n bytes
254  */
255 Block*
pullupqueue(Queue * q,int n)256 pullupqueue(Queue *q, int n)
257 {
258 	Block *b;
259 
260 	if(BLEN(q->bfirst) >= n)
261 		return q->bfirst;
262 	q->bfirst = pullupblock(q->bfirst, n);
263 	for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
264 		;
265 	q->blast = b;
266 	return q->bfirst;
267 }
268 
269 /*
270  *  trim to len bytes starting at offset
271  */
272 Block *
trimblock(Block * bp,int offset,int len)273 trimblock(Block *bp, int offset, int len)
274 {
275 	long l;
276 	Block *nb, *startb;
277 
278 	QDEBUG checkb(bp, "trimblock 1");
279 	if(blocklen(bp) < offset+len) {
280 		freeblist(bp);
281 		return nil;
282 	}
283 
284 	while((l = BLEN(bp)) < offset) {
285 		offset -= l;
286 		nb = bp->next;
287 		bp->next = nil;
288 		freeb(bp);
289 		bp = nb;
290 	}
291 
292 	startb = bp;
293 	bp->rp += offset;
294 
295 	while((l = BLEN(bp)) < len) {
296 		len -= l;
297 		bp = bp->next;
298 	}
299 
300 	bp->wp -= (BLEN(bp) - len);
301 
302 	if(bp->next) {
303 		freeblist(bp->next);
304 		bp->next = nil;
305 	}
306 
307 	return startb;
308 }
309 
310 /*
311  *  copy 'count' bytes into a new block
312  */
313 Block*
copyblock(Block * bp,int count)314 copyblock(Block *bp, int count)
315 {
316 	int l;
317 	Block *nbp;
318 
319 	QDEBUG checkb(bp, "copyblock 0");
320 	if(bp->flag & BINTR){
321 		nbp = iallocb(count);
322 		if(nbp == nil)
323 			return nil;
324 	}else
325 		nbp = allocb(count);
326 	for(; count > 0 && bp != 0; bp = bp->next){
327 		l = BLEN(bp);
328 		if(l > count)
329 			l = count;
330 		memmove(nbp->wp, bp->rp, l);
331 		nbp->wp += l;
332 		count -= l;
333 	}
334 	if(count > 0){
335 		memset(nbp->wp, 0, count);
336 		nbp->wp += count;
337 	}
338 	copyblockcnt++;
339 	QDEBUG checkb(nbp, "copyblock 1");
340 
341 	return nbp;
342 }
343 
344 Block*
adjustblock(Block * bp,int len)345 adjustblock(Block* bp, int len)
346 {
347 	int n;
348 	Block *nbp;
349 
350 	if(len < 0){
351 		freeb(bp);
352 		return nil;
353 	}
354 
355 	if(bp->rp+len > bp->lim){
356 		nbp = copyblock(bp, len);
357 		freeblist(bp);
358 		QDEBUG checkb(nbp, "adjustblock 1");
359 
360 		return nbp;
361 	}
362 
363 	n = BLEN(bp);
364 	if(len > n)
365 		memset(bp->wp, 0, len-n);
366 	bp->wp = bp->rp+len;
367 	QDEBUG checkb(bp, "adjustblock 2");
368 
369 	return bp;
370 }
371 
372 
373 /*
374  *  throw away up to count bytes from a
375  *  list of blocks.  Return count of bytes
376  *  thrown away.
377  */
378 int
pullblock(Block ** bph,int count)379 pullblock(Block **bph, int count)
380 {
381 	Block *bp;
382 	int n, bytes;
383 
384 	bytes = 0;
385 	if(bph == nil)
386 		return 0;
387 
388 	while(*bph != nil && count != 0) {
389 		bp = *bph;
390 		n = BLEN(bp);
391 		if(count < n)
392 			n = count;
393 		bytes += n;
394 		count -= n;
395 		bp->rp += n;
396 		QDEBUG checkb(bp, "pullblock ");
397 		if(BLEN(bp) == 0) {
398 			*bph = bp->next;
399 			bp->next = nil;
400 			freeb(bp);
401 		}
402 	}
403 	return bytes;
404 }
405 
406 /*
407  *  get next block from a queue, return null if nothing there
408  */
409 Block*
qget(Queue * q)410 qget(Queue *q)
411 {
412 	int dowakeup;
413 	Block *b;
414 
415 	/* sync with qwrite */
416 	ilock(q);
417 
418 	b = q->bfirst;
419 	if(b == nil){
420 		q->state |= Qstarve;
421 		iunlock(q);
422 		return nil;
423 	}
424 	q->bfirst = b->next;
425 	b->next = 0;
426 	q->len -= BALLOC(b);
427 	q->dlen -= BLEN(b);
428 	QDEBUG checkb(b, "qget");
429 
430 	/* if writer flow controlled, restart */
431 	if((q->state & Qflow) && q->len < q->limit/2){
432 		q->state &= ~Qflow;
433 		dowakeup = 1;
434 	} else
435 		dowakeup = 0;
436 
437 	iunlock(q);
438 
439 	if(dowakeup)
440 		wakeup(&q->wr);
441 
442 	return b;
443 }
444 
445 /*
446  *  throw away the next 'len' bytes in the queue
447  */
448 int
qdiscard(Queue * q,int len)449 qdiscard(Queue *q, int len)
450 {
451 	Block *b;
452 	int dowakeup, n, sofar;
453 
454 	ilock(q);
455 	for(sofar = 0; sofar < len; sofar += n){
456 		b = q->bfirst;
457 		if(b == nil)
458 			break;
459 		QDEBUG checkb(b, "qdiscard");
460 		n = BLEN(b);
461 		if(n <= len - sofar){
462 			q->bfirst = b->next;
463 			b->next = 0;
464 			q->len -= BALLOC(b);
465 			q->dlen -= BLEN(b);
466 			freeb(b);
467 		} else {
468 			n = len - sofar;
469 			b->rp += n;
470 			q->dlen -= n;
471 		}
472 	}
473 
474 	/*
475 	 *  if writer flow controlled, restart
476 	 *
477 	 *  This used to be
478 	 *	q->len < q->limit/2
479 	 *  but it slows down tcp too much for certain write sizes.
480 	 *  I really don't understand it completely.  It may be
481 	 *  due to the queue draining so fast that the transmission
482 	 *  stalls waiting for the app to produce more data.  - presotto
483 	 *
484 	 *  changed back from q->len < q->limit for reno tcp. - jmk
485 	 */
486 	if((q->state & Qflow) && q->len < q->limit/2){
487 		q->state &= ~Qflow;
488 		dowakeup = 1;
489 	} else
490 		dowakeup = 0;
491 
492 	iunlock(q);
493 
494 	if(dowakeup)
495 		wakeup(&q->wr);
496 
497 	return sofar;
498 }
499 
500 /*
501  *  Interrupt level copy out of a queue, return # bytes copied.
502  */
503 int
qconsume(Queue * q,void * vp,int len)504 qconsume(Queue *q, void *vp, int len)
505 {
506 	Block *b;
507 	int n, dowakeup;
508 	uchar *p = vp;
509 	Block *tofree = nil;
510 
511 	/* sync with qwrite */
512 	ilock(q);
513 
514 	for(;;) {
515 		b = q->bfirst;
516 		if(b == 0){
517 			q->state |= Qstarve;
518 			iunlock(q);
519 			return -1;
520 		}
521 		QDEBUG checkb(b, "qconsume 1");
522 
523 		n = BLEN(b);
524 		if(n > 0)
525 			break;
526 		q->bfirst = b->next;
527 		q->len -= BALLOC(b);
528 
529 		/* remember to free this */
530 		b->next = tofree;
531 		tofree = b;
532 	};
533 
534 	if(n < len)
535 		len = n;
536 	memmove(p, b->rp, len);
537 	consumecnt += n;
538 	b->rp += len;
539 	q->dlen -= len;
540 
541 	/* discard the block if we're done with it */
542 	if((q->state & Qmsg) || len == n){
543 		q->bfirst = b->next;
544 		b->next = 0;
545 		q->len -= BALLOC(b);
546 		q->dlen -= BLEN(b);
547 
548 		/* remember to free this */
549 		b->next = tofree;
550 		tofree = b;
551 	}
552 
553 	/* if writer flow controlled, restart */
554 	if((q->state & Qflow) && q->len < q->limit/2){
555 		q->state &= ~Qflow;
556 		dowakeup = 1;
557 	} else
558 		dowakeup = 0;
559 
560 	iunlock(q);
561 
562 	if(dowakeup)
563 		wakeup(&q->wr);
564 
565 	if(tofree != nil)
566 		freeblist(tofree);
567 
568 	return len;
569 }
570 
571 int
qpass(Queue * q,Block * b)572 qpass(Queue *q, Block *b)
573 {
574 	int dlen, len, dowakeup;
575 
576 	/* sync with qread */
577 	dowakeup = 0;
578 	ilock(q);
579 	if(q->len >= q->limit){
580 		iunlock(q);
581 		freeblist(b);
582 		return -1;
583 	}
584 	if(q->state & Qclosed){
585 		len = BALLOC(b);
586 		iunlock(q);
587 		freeblist(b);
588 		return len;
589 	}
590 
591 	/* add buffer to queue */
592 	if(q->bfirst)
593 		q->blast->next = b;
594 	else
595 		q->bfirst = b;
596 	len = BALLOC(b);
597 	dlen = BLEN(b);
598 	QDEBUG checkb(b, "qpass");
599 	while(b->next){
600 		b = b->next;
601 		QDEBUG checkb(b, "qpass");
602 		len += BALLOC(b);
603 		dlen += BLEN(b);
604 	}
605 	q->blast = b;
606 	q->len += len;
607 	q->dlen += dlen;
608 
609 	if(q->len >= q->limit/2)
610 		q->state |= Qflow;
611 
612 	if(q->state & Qstarve){
613 		q->state &= ~Qstarve;
614 		dowakeup = 1;
615 	}
616 	iunlock(q);
617 
618 	if(dowakeup)
619 		wakeup(&q->rr);
620 
621 	return len;
622 }
623 
624 int
qpassnolim(Queue * q,Block * b)625 qpassnolim(Queue *q, Block *b)
626 {
627 	int dlen, len, dowakeup;
628 
629 	/* sync with qread */
630 	dowakeup = 0;
631 	ilock(q);
632 
633 	if(q->state & Qclosed){
634 		len = BALLOC(b);
635 		iunlock(q);
636 		freeblist(b);
637 		return len;
638 	}
639 
640 	/* add buffer to queue */
641 	if(q->bfirst)
642 		q->blast->next = b;
643 	else
644 		q->bfirst = b;
645 	len = BALLOC(b);
646 	dlen = BLEN(b);
647 	QDEBUG checkb(b, "qpass");
648 	while(b->next){
649 		b = b->next;
650 		QDEBUG checkb(b, "qpass");
651 		len += BALLOC(b);
652 		dlen += BLEN(b);
653 	}
654 	q->blast = b;
655 	q->len += len;
656 	q->dlen += dlen;
657 
658 	if(q->len >= q->limit/2)
659 		q->state |= Qflow;
660 
661 	if(q->state & Qstarve){
662 		q->state &= ~Qstarve;
663 		dowakeup = 1;
664 	}
665 	iunlock(q);
666 
667 	if(dowakeup)
668 		wakeup(&q->rr);
669 
670 	return len;
671 }
672 
673 /*
674  *  if the allocated space is way out of line with the used
675  *  space, reallocate to a smaller block
676  */
677 Block*
packblock(Block * bp)678 packblock(Block *bp)
679 {
680 	Block **l, *nbp;
681 	int n;
682 
683 	for(l = &bp; *l; l = &(*l)->next){
684 		nbp = *l;
685 		n = BLEN(nbp);
686 		if((n<<2) < BALLOC(nbp)){
687 			*l = allocb(n);
688 			memmove((*l)->wp, nbp->rp, n);
689 			(*l)->wp += n;
690 			(*l)->next = nbp->next;
691 			freeb(nbp);
692 		}
693 	}
694 
695 	return bp;
696 }
697 
698 int
qproduce(Queue * q,void * vp,int len)699 qproduce(Queue *q, void *vp, int len)
700 {
701 	Block *b;
702 	int dowakeup;
703 	uchar *p = vp;
704 
705 	/* sync with qread */
706 	dowakeup = 0;
707 	ilock(q);
708 
709 	/* no waiting receivers, room in buffer? */
710 	if(q->len >= q->limit){
711 		q->state |= Qflow;
712 		iunlock(q);
713 		return -1;
714 	}
715 
716 	/* save in buffer */
717 	b = iallocb(len);
718 	if(b == 0){
719 		iunlock(q);
720 		return 0;
721 	}
722 	memmove(b->wp, p, len);
723 	producecnt += len;
724 	b->wp += len;
725 	if(q->bfirst)
726 		q->blast->next = b;
727 	else
728 		q->bfirst = b;
729 	q->blast = b;
730 	/* b->next = 0; done by iallocb() */
731 	q->len += BALLOC(b);
732 	q->dlen += BLEN(b);
733 	QDEBUG checkb(b, "qproduce");
734 
735 	if(q->state & Qstarve){
736 		q->state &= ~Qstarve;
737 		dowakeup = 1;
738 	}
739 
740 	if(q->len >= q->limit)
741 		q->state |= Qflow;
742 	iunlock(q);
743 
744 	if(dowakeup)
745 		wakeup(&q->rr);
746 
747 	return len;
748 }
749 
750 /*
751  *  copy from offset in the queue
752  */
753 Block*
qcopy(Queue * q,int len,ulong offset)754 qcopy(Queue *q, int len, ulong offset)
755 {
756 	int sofar;
757 	int n;
758 	Block *b, *nb;
759 	uchar *p;
760 
761 	nb = allocb(len);
762 
763 	ilock(q);
764 
765 	/* go to offset */
766 	b = q->bfirst;
767 	for(sofar = 0; ; sofar += n){
768 		if(b == nil){
769 			iunlock(q);
770 			return nb;
771 		}
772 		n = BLEN(b);
773 		if(sofar + n > offset){
774 			p = b->rp + offset - sofar;
775 			n -= offset - sofar;
776 			break;
777 		}
778 		QDEBUG checkb(b, "qcopy");
779 		b = b->next;
780 	}
781 
782 	/* copy bytes from there */
783 	for(sofar = 0; sofar < len;){
784 		if(n > len - sofar)
785 			n = len - sofar;
786 		memmove(nb->wp, p, n);
787 		qcopycnt += n;
788 		sofar += n;
789 		nb->wp += n;
790 		b = b->next;
791 		if(b == nil)
792 			break;
793 		n = BLEN(b);
794 		p = b->rp;
795 	}
796 	iunlock(q);
797 
798 	return nb;
799 }
800 
801 /*
802  *  called by non-interrupt code
803  */
804 Queue*
qopen(int limit,int msg,void (* kick)(void *),void * arg)805 qopen(int limit, int msg, void (*kick)(void*), void *arg)
806 {
807 	Queue *q;
808 
809 	q = malloc(sizeof(Queue));
810 	if(q == 0)
811 		return 0;
812 
813 	q->limit = q->inilim = limit;
814 	q->kick = kick;
815 	q->arg = arg;
816 	q->state = msg;
817 
818 	q->state |= Qstarve;
819 	q->eof = 0;
820 	q->noblock = 0;
821 
822 	return q;
823 }
824 
825 /* open a queue to be bypassed */
826 Queue*
qbypass(void (* bypass)(void *,Block *),void * arg)827 qbypass(void (*bypass)(void*, Block*), void *arg)
828 {
829 	Queue *q;
830 
831 	q = malloc(sizeof(Queue));
832 	if(q == 0)
833 		return 0;
834 
835 	q->limit = 0;
836 	q->arg = arg;
837 	q->bypass = bypass;
838 	q->state = 0;
839 
840 	return q;
841 }
842 
843 static int
notempty(void * a)844 notempty(void *a)
845 {
846 	Queue *q = a;
847 
848 	return (q->state & Qclosed) || q->bfirst != 0;
849 }
850 
851 /*
852  *  wait for the queue to be non-empty or closed.
853  *  called with q ilocked.
854  */
855 static int
qwait(Queue * q)856 qwait(Queue *q)
857 {
858 	/* wait for data */
859 	for(;;){
860 		if(q->bfirst != nil)
861 			break;
862 
863 		if(q->state & Qclosed){
864 			if(++q->eof > 3)
865 				return -1;
866 			if(*q->err && strcmp(q->err, Ehungup) != 0)
867 				return -1;
868 			return 0;
869 		}
870 
871 		q->state |= Qstarve;	/* flag requesting producer to wake me */
872 		iunlock(q);
873 		sleep(&q->rr, notempty, q);
874 		ilock(q);
875 	}
876 	return 1;
877 }
878 
879 /*
880  * add a block list to a queue
881  */
882 void
qaddlist(Queue * q,Block * b)883 qaddlist(Queue *q, Block *b)
884 {
885 	/* queue the block */
886 	if(q->bfirst)
887 		q->blast->next = b;
888 	else
889 		q->bfirst = b;
890 	q->len += blockalloclen(b);
891 	q->dlen += blocklen(b);
892 	while(b->next)
893 		b = b->next;
894 	q->blast = b;
895 }
896 
897 /*
898  *  called with q ilocked
899  */
900 Block*
qremove(Queue * q)901 qremove(Queue *q)
902 {
903 	Block *b;
904 
905 	b = q->bfirst;
906 	if(b == nil)
907 		return nil;
908 	q->bfirst = b->next;
909 	b->next = nil;
910 	q->dlen -= BLEN(b);
911 	q->len -= BALLOC(b);
912 	QDEBUG checkb(b, "qremove");
913 	return b;
914 }
915 
916 /*
917  *  copy the contents of a string of blocks into
918  *  memory.  emptied blocks are freed.  return
919  *  pointer to first unconsumed block.
920  */
921 Block*
bl2mem(uchar * p,Block * b,int n)922 bl2mem(uchar *p, Block *b, int n)
923 {
924 	int i;
925 	Block *next;
926 
927 	for(; b != nil; b = next){
928 		i = BLEN(b);
929 		if(i > n){
930 			memmove(p, b->rp, n);
931 			b->rp += n;
932 			return b;
933 		}
934 		memmove(p, b->rp, i);
935 		n -= i;
936 		p += i;
937 		b->rp += i;
938 		next = b->next;
939 		freeb(b);
940 	}
941 	return nil;
942 }
943 
944 /*
945  *  copy the contents of memory into a string of blocks.
946  *  return nil on error.
947  */
948 Block*
mem2bl(uchar * p,int len)949 mem2bl(uchar *p, int len)
950 {
951 	int n;
952 	Block *b, *first, **l;
953 
954 	first = nil;
955 	l = &first;
956 	if(waserror()){
957 		freeblist(first);
958 		nexterror();
959 	}
960 	do {
961 		n = len;
962 		if(n > Maxatomic)
963 			n = Maxatomic;
964 
965 		*l = b = allocb(n);
966 		setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
967 		memmove(b->wp, p, n);
968 		b->wp += n;
969 		p += n;
970 		len -= n;
971 		l = &b->next;
972 	} while(len > 0);
973 	poperror();
974 
975 	return first;
976 }
977 
978 /*
979  *  put a block back to the front of the queue
980  *  called with q ilocked
981  */
982 void
qputback(Queue * q,Block * b)983 qputback(Queue *q, Block *b)
984 {
985 	b->next = q->bfirst;
986 	if(q->bfirst == nil)
987 		q->blast = b;
988 	q->bfirst = b;
989 	q->len += BALLOC(b);
990 	q->dlen += BLEN(b);
991 }
992 
993 /*
994  *  flow control, get producer going again
995  *  called with q ilocked
996  */
997 static void
qwakeup_iunlock(Queue * q)998 qwakeup_iunlock(Queue *q)
999 {
1000 	int dowakeup;
1001 
1002 	/* if writer flow controlled, restart */
1003 	if((q->state & Qflow) && q->len < q->limit/2){
1004 		q->state &= ~Qflow;
1005 		dowakeup = 1;
1006 	}
1007 	else
1008 		dowakeup = 0;
1009 
1010 	iunlock(q);
1011 
1012 	/* wakeup flow controlled writers */
1013 	if(dowakeup){
1014 		if(q->kick)
1015 			q->kick(q->arg);
1016 		wakeup(&q->wr);
1017 	}
1018 }
1019 
1020 /*
1021  *  get next block from a queue (up to a limit)
1022  */
1023 Block*
qbread(Queue * q,int len)1024 qbread(Queue *q, int len)
1025 {
1026 	Block *b, *nb;
1027 	int n;
1028 
1029 	qlock(&q->rlock);
1030 	if(waserror()){
1031 		qunlock(&q->rlock);
1032 		nexterror();
1033 	}
1034 
1035 	ilock(q);
1036 	switch(qwait(q)){
1037 	case 0:
1038 		/* queue closed */
1039 		iunlock(q);
1040 		qunlock(&q->rlock);
1041 		poperror();
1042 		return nil;
1043 	case -1:
1044 		/* multiple reads on a closed queue */
1045 		iunlock(q);
1046 		error(q->err);
1047 	}
1048 
1049 	/* if we get here, there's at least one block in the queue */
1050 	b = qremove(q);
1051 	n = BLEN(b);
1052 
1053 	/* split block if it's too big and this is not a message queue */
1054 	nb = b;
1055 	if(n > len){
1056 		if((q->state&Qmsg) == 0){
1057 			n -= len;
1058 			b = allocb(n);
1059 			memmove(b->wp, nb->rp+len, n);
1060 			b->wp += n;
1061 			qputback(q, b);
1062 		}
1063 		nb->wp = nb->rp + len;
1064 	}
1065 
1066 	/* restart producer */
1067 	qwakeup_iunlock(q);
1068 
1069 	poperror();
1070 	qunlock(&q->rlock);
1071 	return nb;
1072 }
1073 
1074 /*
1075  *  read a queue.  if no data is queued, post a Block
1076  *  and wait on its Rendez.
1077  */
1078 long
qread(Queue * q,void * vp,int len)1079 qread(Queue *q, void *vp, int len)
1080 {
1081 	Block *b, *first, **l;
1082 	int blen, n;
1083 
1084 	qlock(&q->rlock);
1085 	if(waserror()){
1086 		qunlock(&q->rlock);
1087 		nexterror();
1088 	}
1089 
1090 	ilock(q);
1091 again:
1092 	switch(qwait(q)){
1093 	case 0:
1094 		/* queue closed */
1095 		iunlock(q);
1096 		qunlock(&q->rlock);
1097 		poperror();
1098 		return 0;
1099 	case -1:
1100 		/* multiple reads on a closed queue */
1101 		iunlock(q);
1102 		error(q->err);
1103 	}
1104 
1105 	/* if we get here, there's at least one block in the queue */
1106 	if(q->state & Qcoalesce){
1107 		/* when coalescing, 0 length blocks just go away */
1108 		b = q->bfirst;
1109 		if(BLEN(b) <= 0){
1110 			freeb(qremove(q));
1111 			goto again;
1112 		}
1113 
1114 		/*  grab the first block plus as many
1115 		 *  following blocks as will completely
1116 		 *  fit in the read.
1117 		 */
1118 		n = 0;
1119 		l = &first;
1120 		blen = BLEN(b);
1121 		for(;;) {
1122 			*l = qremove(q);
1123 			l = &b->next;
1124 			n += blen;
1125 
1126 			b = q->bfirst;
1127 			if(b == nil)
1128 				break;
1129 			blen = BLEN(b);
1130 			if(n+blen > len)
1131 				break;
1132 		}
1133 	} else {
1134 		first = qremove(q);
1135 		n = BLEN(first);
1136 	}
1137 
1138 	/* copy to user space outside of the ilock */
1139 	iunlock(q);
1140 	b = bl2mem(vp, first, len);
1141 	ilock(q);
1142 
1143 	/* take care of any left over partial block */
1144 	if(b != nil){
1145 		n -= BLEN(b);
1146 		if(q->state & Qmsg)
1147 			freeb(b);
1148 		else
1149 			qputback(q, b);
1150 	}
1151 
1152 	/* restart producer */
1153 	qwakeup_iunlock(q);
1154 
1155 	poperror();
1156 	qunlock(&q->rlock);
1157 	return n;
1158 }
1159 
1160 static int
qnotfull(void * a)1161 qnotfull(void *a)
1162 {
1163 	Queue *q = a;
1164 
1165 	return q->len < q->limit || (q->state & Qclosed);
1166 }
1167 
1168 ulong noblockcnt;
1169 
1170 /*
1171  *  add a block to a queue obeying flow control
1172  */
1173 long
qbwrite(Queue * q,Block * b)1174 qbwrite(Queue *q, Block *b)
1175 {
1176 	int n, dowakeup;
1177 	Proc *p;
1178 
1179 	n = BLEN(b);
1180 
1181 	if(q->bypass){
1182 		(*q->bypass)(q->arg, b);
1183 		return n;
1184 	}
1185 
1186 	dowakeup = 0;
1187 	qlock(&q->wlock);
1188 	if(waserror()){
1189 		if(b != nil)
1190 			freeb(b);
1191 		qunlock(&q->wlock);
1192 		nexterror();
1193 	}
1194 
1195 	ilock(q);
1196 
1197 	/* give up if the queue is closed */
1198 	if(q->state & Qclosed){
1199 		iunlock(q);
1200 		error(q->err);
1201 	}
1202 
1203 	/* if nonblocking, don't queue over the limit */
1204 	if(q->len >= q->limit){
1205 		if(q->noblock){
1206 			iunlock(q);
1207 			freeb(b);
1208 			noblockcnt += n;
1209 			qunlock(&q->wlock);
1210 			poperror();
1211 			return n;
1212 		}
1213 	}
1214 
1215 	/* queue the block */
1216 	if(q->bfirst)
1217 		q->blast->next = b;
1218 	else
1219 		q->bfirst = b;
1220 	q->blast = b;
1221 	b->next = 0;
1222 	q->len += BALLOC(b);
1223 	q->dlen += n;
1224 	QDEBUG checkb(b, "qbwrite");
1225 	b = nil;
1226 
1227 	/* make sure other end gets awakened */
1228 	if(q->state & Qstarve){
1229 		q->state &= ~Qstarve;
1230 		dowakeup = 1;
1231 	}
1232 	iunlock(q);
1233 
1234 	/*  get output going again */
1235 	if(q->kick && (dowakeup || (q->state&Qkick)))
1236 		q->kick(q->arg);
1237 
1238 	/* wakeup anyone consuming at the other end */
1239 	if(dowakeup){
1240 		p = wakeup(&q->rr);
1241 
1242 		/* if we just wokeup a higher priority process, let it run */
1243 		if(p != nil && p->priority > up->priority)
1244 			sched();
1245 	}
1246 
1247 	/*
1248 	 *  flow control, wait for queue to get below the limit
1249 	 *  before allowing the process to continue and queue
1250 	 *  more.  We do this here so that postnote can only
1251 	 *  interrupt us after the data has been queued.  This
1252 	 *  means that things like 9p flushes and ssl messages
1253 	 *  will not be disrupted by software interrupts.
1254 	 *
1255 	 *  Note - this is moderately dangerous since a process
1256 	 *  that keeps getting interrupted and rewriting will
1257 	 *  queue infinite crud.
1258 	 */
1259 	for(;;){
1260 		if(q->noblock || qnotfull(q))
1261 			break;
1262 
1263 		ilock(q);
1264 		q->state |= Qflow;
1265 		iunlock(q);
1266 		sleep(&q->wr, qnotfull, q);
1267 	}
1268 	USED(b);
1269 
1270 	qunlock(&q->wlock);
1271 	poperror();
1272 	return n;
1273 }
1274 
1275 /*
1276  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1277  */
1278 int
qwrite(Queue * q,void * vp,int len)1279 qwrite(Queue *q, void *vp, int len)
1280 {
1281 	int n, sofar;
1282 	Block *b;
1283 	uchar *p = vp;
1284 
1285 	QDEBUG if(!islo())
1286 		print("qwrite hi %#p\n", getcallerpc(&q));
1287 
1288 	sofar = 0;
1289 	do {
1290 		n = len-sofar;
1291 		if(n > Maxatomic)
1292 			n = Maxatomic;
1293 
1294 		b = allocb(n);
1295 		setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
1296 		if(waserror()){
1297 			freeb(b);
1298 			nexterror();
1299 		}
1300 		memmove(b->wp, p+sofar, n);
1301 		poperror();
1302 		b->wp += n;
1303 
1304 		qbwrite(q, b);
1305 
1306 		sofar += n;
1307 	} while(sofar < len && (q->state & Qmsg) == 0);
1308 
1309 	return len;
1310 }
1311 
1312 /*
1313  *  used by print() to write to a queue.  Since we may be splhi or not in
1314  *  a process, don't qlock.
1315  *
1316  *  this routine merges adjacent blocks if block n+1 will fit into
1317  *  the free space of block n.
1318  */
1319 int
qiwrite(Queue * q,void * vp,int len)1320 qiwrite(Queue *q, void *vp, int len)
1321 {
1322 	int n, sofar, dowakeup;
1323 	Block *b;
1324 	uchar *p = vp;
1325 
1326 	dowakeup = 0;
1327 
1328 	sofar = 0;
1329 	do {
1330 		n = len-sofar;
1331 		if(n > Maxatomic)
1332 			n = Maxatomic;
1333 
1334 		b = iallocb(n);
1335 		if(b == nil)
1336 			break;
1337 		memmove(b->wp, p+sofar, n);
1338 		b->wp += n;
1339 
1340 		ilock(q);
1341 
1342 		/* we use an artificially high limit for kernel prints since anything
1343 		 * over the limit gets dropped
1344 		 */
1345 		if(q->dlen >= 16*1024){
1346 			iunlock(q);
1347 			freeb(b);
1348 			break;
1349 		}
1350 
1351 		QDEBUG checkb(b, "qiwrite");
1352 		if(q->bfirst)
1353 			q->blast->next = b;
1354 		else
1355 			q->bfirst = b;
1356 		q->blast = b;
1357 		q->len += BALLOC(b);
1358 		q->dlen += n;
1359 
1360 		if(q->state & Qstarve){
1361 			q->state &= ~Qstarve;
1362 			dowakeup = 1;
1363 		}
1364 
1365 		iunlock(q);
1366 
1367 		if(dowakeup){
1368 			if(q->kick)
1369 				q->kick(q->arg);
1370 			wakeup(&q->rr);
1371 		}
1372 
1373 		sofar += n;
1374 	} while(sofar < len && (q->state & Qmsg) == 0);
1375 
1376 	return sofar;
1377 }
1378 
1379 /*
1380  *  be extremely careful when calling this,
1381  *  as there is no reference accounting
1382  */
1383 void
qfree(Queue * q)1384 qfree(Queue *q)
1385 {
1386 	qclose(q);
1387 	free(q);
1388 }
1389 
1390 /*
1391  *  Mark a queue as closed.  No further IO is permitted.
1392  *  All blocks are released.
1393  */
1394 void
qclose(Queue * q)1395 qclose(Queue *q)
1396 {
1397 	Block *bfirst;
1398 
1399 	if(q == nil)
1400 		return;
1401 
1402 	/* mark it */
1403 	ilock(q);
1404 	q->state |= Qclosed;
1405 	q->state &= ~(Qflow|Qstarve);
1406 	strcpy(q->err, Ehungup);
1407 	bfirst = q->bfirst;
1408 	q->bfirst = 0;
1409 	q->len = 0;
1410 	q->dlen = 0;
1411 	q->noblock = 0;
1412 	iunlock(q);
1413 
1414 	/* free queued blocks */
1415 	freeblist(bfirst);
1416 
1417 	/* wake up readers/writers */
1418 	wakeup(&q->rr);
1419 	wakeup(&q->wr);
1420 }
1421 
1422 /*
1423  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1424  *  blocks.
1425  */
1426 void
qhangup(Queue * q,char * msg)1427 qhangup(Queue *q, char *msg)
1428 {
1429 	/* mark it */
1430 	ilock(q);
1431 	q->state |= Qclosed;
1432 	if(msg == 0 || *msg == 0)
1433 		strcpy(q->err, Ehungup);
1434 	else
1435 		strncpy(q->err, msg, ERRMAX-1);
1436 	iunlock(q);
1437 
1438 	/* wake up readers/writers */
1439 	wakeup(&q->rr);
1440 	wakeup(&q->wr);
1441 }
1442 
1443 /*
1444  *  return non-zero if the q is hungup
1445  */
1446 int
qisclosed(Queue * q)1447 qisclosed(Queue *q)
1448 {
1449 	return q->state & Qclosed;
1450 }
1451 
1452 /*
1453  *  mark a queue as no longer hung up
1454  */
1455 void
qreopen(Queue * q)1456 qreopen(Queue *q)
1457 {
1458 	ilock(q);
1459 	q->state &= ~Qclosed;
1460 	q->state |= Qstarve;
1461 	q->eof = 0;
1462 	q->limit = q->inilim;
1463 	iunlock(q);
1464 }
1465 
1466 /*
1467  *  return bytes queued
1468  */
1469 int
qlen(Queue * q)1470 qlen(Queue *q)
1471 {
1472 	return q->dlen;
1473 }
1474 
1475 /*
1476  * return space remaining before flow control
1477  */
1478 int
qwindow(Queue * q)1479 qwindow(Queue *q)
1480 {
1481 	int l;
1482 
1483 	l = q->limit - q->len;
1484 	if(l < 0)
1485 		l = 0;
1486 	return l;
1487 }
1488 
1489 /*
1490  *  return true if we can read without blocking
1491  */
1492 int
qcanread(Queue * q)1493 qcanread(Queue *q)
1494 {
1495 	return q->bfirst!=0;
1496 }
1497 
1498 /*
1499  *  change queue limit
1500  */
1501 void
qsetlimit(Queue * q,int limit)1502 qsetlimit(Queue *q, int limit)
1503 {
1504 	q->limit = limit;
1505 }
1506 
1507 /*
1508  *  set blocking/nonblocking
1509  */
1510 void
qnoblock(Queue * q,int onoff)1511 qnoblock(Queue *q, int onoff)
1512 {
1513 	q->noblock = onoff;
1514 }
1515 
1516 /*
1517  *  flush the output queue
1518  */
1519 void
qflush(Queue * q)1520 qflush(Queue *q)
1521 {
1522 	Block *bfirst;
1523 
1524 	/* mark it */
1525 	ilock(q);
1526 	bfirst = q->bfirst;
1527 	q->bfirst = 0;
1528 	q->len = 0;
1529 	q->dlen = 0;
1530 	iunlock(q);
1531 
1532 	/* free queued blocks */
1533 	freeblist(bfirst);
1534 
1535 	/* wake up readers/writers */
1536 	wakeup(&q->wr);
1537 }
1538 
1539 int
qfull(Queue * q)1540 qfull(Queue *q)
1541 {
1542 	return q->state & Qflow;
1543 }
1544 
1545 int
qstate(Queue * q)1546 qstate(Queue *q)
1547 {
1548 	return q->state;
1549 }
1550