xref: /plan9/sys/src/cmd/unix/drawterm/kern/qio.c (revision ec59a3ddbfceee0efe34584c2c9981a5e5ff1ec4)
1 #include	"u.h"
2 #include	"lib.h"
3 #include	"dat.h"
4 #include	"fns.h"
5 #include	"error.h"
6 
7 static ulong padblockcnt;
8 static ulong concatblockcnt;
9 static ulong pullupblockcnt;
10 static ulong copyblockcnt;
11 static ulong consumecnt;
12 static ulong producecnt;
13 static ulong qcopycnt;
14 
15 static int debugging;
16 
17 #define QDEBUG	if(0)
18 
19 /*
20  *  IO queues
21  */
22 struct Queue
23 {
24 	Lock lk;
25 
26 	Block*	bfirst;		/* buffer */
27 	Block*	blast;
28 
29 	int	len;		/* bytes allocated to queue */
30 	int	dlen;		/* data bytes in queue */
31 	int	limit;		/* max bytes in queue */
32 	int	inilim;		/* initial limit */
33 	int	state;
34 	int	noblock;	/* true if writes return immediately when q full */
35 	int	eof;		/* number of eofs read by user */
36 
37 	void	(*kick)(void*);	/* restart output */
38 	void	(*bypass)(void*, Block*);	/* bypass queue altogether */
39 	void*	arg;		/* argument to kick */
40 
41 	QLock	rlock;		/* mutex for reading processes */
42 	Rendez	rr;		/* process waiting to read */
43 	QLock	wlock;		/* mutex for writing processes */
44 	Rendez	wr;		/* process waiting to write */
45 
46 	char	err[ERRMAX];
47 };
48 
49 enum
50 {
51 	Maxatomic	= 64*1024,
52 };
53 
54 uint	qiomaxatomic = Maxatomic;
55 
56 void
ixsummary(void)57 ixsummary(void)
58 {
59 	debugging ^= 1;
60 	iallocsummary();
61 	print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
62 		padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
63 	print("consume %lud, produce %lud, qcopy %lud\n",
64 		consumecnt, producecnt, qcopycnt);
65 }
66 
67 /*
68  *  free a list of blocks
69  */
70 void
freeblist(Block * b)71 freeblist(Block *b)
72 {
73 	Block *next;
74 
75 	for(; b != 0; b = next){
76 		next = b->next;
77 		b->next = 0;
78 		freeb(b);
79 	}
80 }
81 
82 /*
83  *  pad a block to the front (or the back if size is negative)
84  */
85 Block*
padblock(Block * bp,int size)86 padblock(Block *bp, int size)
87 {
88 	int n;
89 	Block *nbp;
90 
91 	QDEBUG checkb(bp, "padblock 1");
92 	if(size >= 0){
93 		if(bp->rp - bp->base >= size){
94 			bp->rp -= size;
95 			return bp;
96 		}
97 
98 		if(bp->next)
99 			panic("padblock 0x%p", getcallerpc(&bp));
100 		n = BLEN(bp);
101 		padblockcnt++;
102 		nbp = allocb(size+n);
103 		nbp->rp += size;
104 		nbp->wp = nbp->rp;
105 		memmove(nbp->wp, bp->rp, n);
106 		nbp->wp += n;
107 		freeb(bp);
108 		nbp->rp -= size;
109 	} else {
110 		size = -size;
111 
112 		if(bp->next)
113 			panic("padblock 0x%p", getcallerpc(&bp));
114 
115 		if(bp->lim - bp->wp >= size)
116 			return bp;
117 
118 		n = BLEN(bp);
119 		padblockcnt++;
120 		nbp = allocb(size+n);
121 		memmove(nbp->wp, bp->rp, n);
122 		nbp->wp += n;
123 		freeb(bp);
124 	}
125 	QDEBUG checkb(nbp, "padblock 1");
126 	return nbp;
127 }
128 
129 /*
130  *  return count of bytes in a string of blocks
131  */
132 int
blocklen(Block * bp)133 blocklen(Block *bp)
134 {
135 	int len;
136 
137 	len = 0;
138 	while(bp) {
139 		len += BLEN(bp);
140 		bp = bp->next;
141 	}
142 	return len;
143 }
144 
145 /*
146  * return count of space in blocks
147  */
148 int
blockalloclen(Block * bp)149 blockalloclen(Block *bp)
150 {
151 	int len;
152 
153 	len = 0;
154 	while(bp) {
155 		len += BALLOC(bp);
156 		bp = bp->next;
157 	}
158 	return len;
159 }
160 
161 /*
162  *  copy the  string of blocks into
163  *  a single block and free the string
164  */
165 Block*
concatblock(Block * bp)166 concatblock(Block *bp)
167 {
168 	int len;
169 	Block *nb, *f;
170 
171 	if(bp->next == 0)
172 		return bp;
173 
174 	nb = allocb(blocklen(bp));
175 	for(f = bp; f; f = f->next) {
176 		len = BLEN(f);
177 		memmove(nb->wp, f->rp, len);
178 		nb->wp += len;
179 	}
180 	concatblockcnt += BLEN(nb);
181 	freeblist(bp);
182 	QDEBUG checkb(nb, "concatblock 1");
183 	return nb;
184 }
185 
186 /*
187  *  make sure the first block has at least n bytes
188  */
189 Block*
pullupblock(Block * bp,int n)190 pullupblock(Block *bp, int n)
191 {
192 	int i;
193 	Block *nbp;
194 
195 	/*
196 	 *  this should almost always be true, it's
197 	 *  just to avoid every caller checking.
198 	 */
199 	if(BLEN(bp) >= n)
200 		return bp;
201 
202 	/*
203 	 *  if not enough room in the first block,
204 	 *  add another to the front of the list.
205 	 */
206 	if(bp->lim - bp->rp < n){
207 		nbp = allocb(n);
208 		nbp->next = bp;
209 		bp = nbp;
210 	}
211 
212 	/*
213 	 *  copy bytes from the trailing blocks into the first
214 	 */
215 	n -= BLEN(bp);
216 	while((nbp = bp->next)){
217 		i = BLEN(nbp);
218 		if(i > n) {
219 			memmove(bp->wp, nbp->rp, n);
220 			pullupblockcnt++;
221 			bp->wp += n;
222 			nbp->rp += n;
223 			QDEBUG checkb(bp, "pullupblock 1");
224 			return bp;
225 		} else {
226 			/* shouldn't happen but why crash if it does */
227 			if(i < 0){
228 				print("pullup negative length packet\n");
229 				i = 0;
230 			}
231 			memmove(bp->wp, nbp->rp, i);
232 			pullupblockcnt++;
233 			bp->wp += i;
234 			bp->next = nbp->next;
235 			nbp->next = 0;
236 			freeb(nbp);
237 			n -= i;
238 			if(n == 0){
239 				QDEBUG checkb(bp, "pullupblock 2");
240 				return bp;
241 			}
242 		}
243 	}
244 	freeb(bp);
245 	return 0;
246 }
247 
248 /*
249  *  make sure the first block has at least n bytes
250  */
251 Block*
pullupqueue(Queue * q,int n)252 pullupqueue(Queue *q, int n)
253 {
254 	Block *b;
255 
256 	if(BLEN(q->bfirst) >= n)
257 		return q->bfirst;
258 	q->bfirst = pullupblock(q->bfirst, n);
259 	for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
260 		;
261 	q->blast = b;
262 	return q->bfirst;
263 }
264 
265 /*
266  *  trim to len bytes starting at offset
267  */
268 Block *
trimblock(Block * bp,int offset,int len)269 trimblock(Block *bp, int offset, int len)
270 {
271 	ulong l;
272 	Block *nb, *startb;
273 
274 	QDEBUG checkb(bp, "trimblock 1");
275 	if(blocklen(bp) < offset+len) {
276 		freeblist(bp);
277 		return nil;
278 	}
279 
280 	while((l = BLEN(bp)) < offset) {
281 		offset -= l;
282 		nb = bp->next;
283 		bp->next = nil;
284 		freeb(bp);
285 		bp = nb;
286 	}
287 
288 	startb = bp;
289 	bp->rp += offset;
290 
291 	while((l = BLEN(bp)) < len) {
292 		len -= l;
293 		bp = bp->next;
294 	}
295 
296 	bp->wp -= (BLEN(bp) - len);
297 
298 	if(bp->next) {
299 		freeblist(bp->next);
300 		bp->next = nil;
301 	}
302 
303 	return startb;
304 }
305 
306 /*
307  *  copy 'count' bytes into a new block
308  */
309 Block*
copyblock(Block * bp,int count)310 copyblock(Block *bp, int count)
311 {
312 	int l;
313 	Block *nbp;
314 
315 	QDEBUG checkb(bp, "copyblock 0");
316 	nbp = allocb(count);
317 	for(; count > 0 && bp != 0; bp = bp->next){
318 		l = BLEN(bp);
319 		if(l > count)
320 			l = count;
321 		memmove(nbp->wp, bp->rp, l);
322 		nbp->wp += l;
323 		count -= l;
324 	}
325 	if(count > 0){
326 		memset(nbp->wp, 0, count);
327 		nbp->wp += count;
328 	}
329 	copyblockcnt++;
330 	QDEBUG checkb(nbp, "copyblock 1");
331 
332 	return nbp;
333 }
334 
335 Block*
adjustblock(Block * bp,int len)336 adjustblock(Block* bp, int len)
337 {
338 	int n;
339 	Block *nbp;
340 
341 	if(len < 0){
342 		freeb(bp);
343 		return nil;
344 	}
345 
346 	if(bp->rp+len > bp->lim){
347 		nbp = copyblock(bp, len);
348 		freeblist(bp);
349 		QDEBUG checkb(nbp, "adjustblock 1");
350 
351 		return nbp;
352 	}
353 
354 	n = BLEN(bp);
355 	if(len > n)
356 		memset(bp->wp, 0, len-n);
357 	bp->wp = bp->rp+len;
358 	QDEBUG checkb(bp, "adjustblock 2");
359 
360 	return bp;
361 }
362 
363 
364 /*
365  *  throw away up to count bytes from a
366  *  list of blocks.  Return count of bytes
367  *  thrown away.
368  */
369 int
pullblock(Block ** bph,int count)370 pullblock(Block **bph, int count)
371 {
372 	Block *bp;
373 	int n, bytes;
374 
375 	bytes = 0;
376 	if(bph == nil)
377 		return 0;
378 
379 	while(*bph != nil && count != 0) {
380 		bp = *bph;
381 		n = BLEN(bp);
382 		if(count < n)
383 			n = count;
384 		bytes += n;
385 		count -= n;
386 		bp->rp += n;
387 		QDEBUG checkb(bp, "pullblock ");
388 		if(BLEN(bp) == 0) {
389 			*bph = bp->next;
390 			bp->next = nil;
391 			freeb(bp);
392 		}
393 	}
394 	return bytes;
395 }
396 
397 /*
398  *  get next block from a queue, return null if nothing there
399  */
400 Block*
qget(Queue * q)401 qget(Queue *q)
402 {
403 	int dowakeup;
404 	Block *b;
405 
406 	/* sync with qwrite */
407 	ilock(&q->lk);
408 
409 	b = q->bfirst;
410 	if(b == nil){
411 		q->state |= Qstarve;
412 		iunlock(&q->lk);
413 		return nil;
414 	}
415 	q->bfirst = b->next;
416 	b->next = 0;
417 	q->len -= BALLOC(b);
418 	q->dlen -= BLEN(b);
419 	QDEBUG checkb(b, "qget");
420 
421 	/* if writer flow controlled, restart */
422 	if((q->state & Qflow) && q->len < q->limit/2){
423 		q->state &= ~Qflow;
424 		dowakeup = 1;
425 	} else
426 		dowakeup = 0;
427 
428 	iunlock(&q->lk);
429 
430 	if(dowakeup)
431 		wakeup(&q->wr);
432 
433 	return b;
434 }
435 
436 /*
437  *  throw away the next 'len' bytes in the queue
438  */
439 int
qdiscard(Queue * q,int len)440 qdiscard(Queue *q, int len)
441 {
442 	Block *b;
443 	int dowakeup, n, sofar;
444 
445 	ilock(&q->lk);
446 	for(sofar = 0; sofar < len; sofar += n){
447 		b = q->bfirst;
448 		if(b == nil)
449 			break;
450 		QDEBUG checkb(b, "qdiscard");
451 		n = BLEN(b);
452 		if(n <= len - sofar){
453 			q->bfirst = b->next;
454 			b->next = 0;
455 			q->len -= BALLOC(b);
456 			q->dlen -= BLEN(b);
457 			freeb(b);
458 		} else {
459 			n = len - sofar;
460 			b->rp += n;
461 			q->dlen -= n;
462 		}
463 	}
464 
465 	/*
466 	 *  if writer flow controlled, restart
467 	 *
468 	 *  This used to be
469 	 *	q->len < q->limit/2
470 	 *  but it slows down tcp too much for certain write sizes.
471 	 *  I really don't understand it completely.  It may be
472 	 *  due to the queue draining so fast that the transmission
473 	 *  stalls waiting for the app to produce more data.  - presotto
474 	 */
475 	if((q->state & Qflow) && q->len < q->limit){
476 		q->state &= ~Qflow;
477 		dowakeup = 1;
478 	} else
479 		dowakeup = 0;
480 
481 	iunlock(&q->lk);
482 
483 	if(dowakeup)
484 		wakeup(&q->wr);
485 
486 	return sofar;
487 }
488 
489 /*
490  *  Interrupt level copy out of a queue, return # bytes copied.
491  */
492 int
qconsume(Queue * q,void * vp,int len)493 qconsume(Queue *q, void *vp, int len)
494 {
495 	Block *b;
496 	int n, dowakeup;
497 	uchar *p = vp;
498 	Block *tofree = nil;
499 
500 	/* sync with qwrite */
501 	ilock(&q->lk);
502 
503 	for(;;) {
504 		b = q->bfirst;
505 		if(b == 0){
506 			q->state |= Qstarve;
507 			iunlock(&q->lk);
508 			return -1;
509 		}
510 		QDEBUG checkb(b, "qconsume 1");
511 
512 		n = BLEN(b);
513 		if(n > 0)
514 			break;
515 		q->bfirst = b->next;
516 		q->len -= BALLOC(b);
517 
518 		/* remember to free this */
519 		b->next = tofree;
520 		tofree = b;
521 	};
522 
523 	if(n < len)
524 		len = n;
525 	memmove(p, b->rp, len);
526 	consumecnt += n;
527 	b->rp += len;
528 	q->dlen -= len;
529 
530 	/* discard the block if we're done with it */
531 	if((q->state & Qmsg) || len == n){
532 		q->bfirst = b->next;
533 		b->next = 0;
534 		q->len -= BALLOC(b);
535 		q->dlen -= BLEN(b);
536 
537 		/* remember to free this */
538 		b->next = tofree;
539 		tofree = b;
540 	}
541 
542 	/* if writer flow controlled, restart */
543 	if((q->state & Qflow) && q->len < q->limit/2){
544 		q->state &= ~Qflow;
545 		dowakeup = 1;
546 	} else
547 		dowakeup = 0;
548 
549 	iunlock(&q->lk);
550 
551 	if(dowakeup)
552 		wakeup(&q->wr);
553 
554 	if(tofree != nil)
555 		freeblist(tofree);
556 
557 	return len;
558 }
559 
560 int
qpass(Queue * q,Block * b)561 qpass(Queue *q, Block *b)
562 {
563 	int dlen, len, dowakeup;
564 
565 	/* sync with qread */
566 	dowakeup = 0;
567 	ilock(&q->lk);
568 	if(q->len >= q->limit){
569 		freeblist(b);
570 		iunlock(&q->lk);
571 		return -1;
572 	}
573 	if(q->state & Qclosed){
574 		freeblist(b);
575 		iunlock(&q->lk);
576 		return BALLOC(b);
577 	}
578 
579 	/* add buffer to queue */
580 	if(q->bfirst)
581 		q->blast->next = b;
582 	else
583 		q->bfirst = b;
584 	len = BALLOC(b);
585 	dlen = BLEN(b);
586 	QDEBUG checkb(b, "qpass");
587 	while(b->next){
588 		b = b->next;
589 		QDEBUG checkb(b, "qpass");
590 		len += BALLOC(b);
591 		dlen += BLEN(b);
592 	}
593 	q->blast = b;
594 	q->len += len;
595 	q->dlen += dlen;
596 
597 	if(q->len >= q->limit/2)
598 		q->state |= Qflow;
599 
600 	if(q->state & Qstarve){
601 		q->state &= ~Qstarve;
602 		dowakeup = 1;
603 	}
604 	iunlock(&q->lk);
605 
606 	if(dowakeup)
607 		wakeup(&q->rr);
608 
609 	return len;
610 }
611 
612 int
qpassnolim(Queue * q,Block * b)613 qpassnolim(Queue *q, Block *b)
614 {
615 	int dlen, len, dowakeup;
616 
617 	/* sync with qread */
618 	dowakeup = 0;
619 	ilock(&q->lk);
620 
621 	if(q->state & Qclosed){
622 		freeblist(b);
623 		iunlock(&q->lk);
624 		return BALLOC(b);
625 	}
626 
627 	/* add buffer to queue */
628 	if(q->bfirst)
629 		q->blast->next = b;
630 	else
631 		q->bfirst = b;
632 	len = BALLOC(b);
633 	dlen = BLEN(b);
634 	QDEBUG checkb(b, "qpass");
635 	while(b->next){
636 		b = b->next;
637 		QDEBUG checkb(b, "qpass");
638 		len += BALLOC(b);
639 		dlen += BLEN(b);
640 	}
641 	q->blast = b;
642 	q->len += len;
643 	q->dlen += dlen;
644 
645 	if(q->len >= q->limit/2)
646 		q->state |= Qflow;
647 
648 	if(q->state & Qstarve){
649 		q->state &= ~Qstarve;
650 		dowakeup = 1;
651 	}
652 	iunlock(&q->lk);
653 
654 	if(dowakeup)
655 		wakeup(&q->rr);
656 
657 	return len;
658 }
659 
660 /*
661  *  if the allocated space is way out of line with the used
662  *  space, reallocate to a smaller block
663  */
664 Block*
packblock(Block * bp)665 packblock(Block *bp)
666 {
667 	Block **l, *nbp;
668 	int n;
669 
670 	for(l = &bp; *l; l = &(*l)->next){
671 		nbp = *l;
672 		n = BLEN(nbp);
673 		if((n<<2) < BALLOC(nbp)){
674 			*l = allocb(n);
675 			memmove((*l)->wp, nbp->rp, n);
676 			(*l)->wp += n;
677 			(*l)->next = nbp->next;
678 			freeb(nbp);
679 		}
680 	}
681 
682 	return bp;
683 }
684 
685 int
qproduce(Queue * q,void * vp,int len)686 qproduce(Queue *q, void *vp, int len)
687 {
688 	Block *b;
689 	int dowakeup;
690 	uchar *p = vp;
691 
692 	/* sync with qread */
693 	dowakeup = 0;
694 	ilock(&q->lk);
695 
696 	/* no waiting receivers, room in buffer? */
697 	if(q->len >= q->limit){
698 		q->state |= Qflow;
699 		iunlock(&q->lk);
700 		return -1;
701 	}
702 
703 	/* save in buffer */
704 	b = iallocb(len);
705 	if(b == 0){
706 		iunlock(&q->lk);
707 		return 0;
708 	}
709 	memmove(b->wp, p, len);
710 	producecnt += len;
711 	b->wp += len;
712 	if(q->bfirst)
713 		q->blast->next = b;
714 	else
715 		q->bfirst = b;
716 	q->blast = b;
717 	/* b->next = 0; done by iallocb() */
718 	q->len += BALLOC(b);
719 	q->dlen += BLEN(b);
720 	QDEBUG checkb(b, "qproduce");
721 
722 	if(q->state & Qstarve){
723 		q->state &= ~Qstarve;
724 		dowakeup = 1;
725 	}
726 
727 	if(q->len >= q->limit)
728 		q->state |= Qflow;
729 	iunlock(&q->lk);
730 
731 	if(dowakeup)
732 		wakeup(&q->rr);
733 
734 	return len;
735 }
736 
737 /*
738  *  copy from offset in the queue
739  */
740 Block*
qcopy(Queue * q,int len,ulong offset)741 qcopy(Queue *q, int len, ulong offset)
742 {
743 	int sofar;
744 	int n;
745 	Block *b, *nb;
746 	uchar *p;
747 
748 	nb = allocb(len);
749 
750 	ilock(&q->lk);
751 
752 	/* go to offset */
753 	b = q->bfirst;
754 	for(sofar = 0; ; sofar += n){
755 		if(b == nil){
756 			iunlock(&q->lk);
757 			return nb;
758 		}
759 		n = BLEN(b);
760 		if(sofar + n > offset){
761 			p = b->rp + offset - sofar;
762 			n -= offset - sofar;
763 			break;
764 		}
765 		QDEBUG checkb(b, "qcopy");
766 		b = b->next;
767 	}
768 
769 	/* copy bytes from there */
770 	for(sofar = 0; sofar < len;){
771 		if(n > len - sofar)
772 			n = len - sofar;
773 		memmove(nb->wp, p, n);
774 		qcopycnt += n;
775 		sofar += n;
776 		nb->wp += n;
777 		b = b->next;
778 		if(b == nil)
779 			break;
780 		n = BLEN(b);
781 		p = b->rp;
782 	}
783 	iunlock(&q->lk);
784 
785 	return nb;
786 }
787 
788 /*
789  *  called by non-interrupt code
790  */
791 Queue*
qopen(int limit,int msg,void (* kick)(void *),void * arg)792 qopen(int limit, int msg, void (*kick)(void*), void *arg)
793 {
794 	Queue *q;
795 
796 	q = malloc(sizeof(Queue));
797 	if(q == 0)
798 		return 0;
799 
800 	q->limit = q->inilim = limit;
801 	q->kick = kick;
802 	q->arg = arg;
803 	q->state = msg;
804 
805 	q->state |= Qstarve;
806 	q->eof = 0;
807 	q->noblock = 0;
808 
809 	return q;
810 }
811 
812 /* open a queue to be bypassed */
813 Queue*
qbypass(void (* bypass)(void *,Block *),void * arg)814 qbypass(void (*bypass)(void*, Block*), void *arg)
815 {
816 	Queue *q;
817 
818 	q = malloc(sizeof(Queue));
819 	if(q == 0)
820 		return 0;
821 
822 	q->limit = 0;
823 	q->arg = arg;
824 	q->bypass = bypass;
825 	q->state = 0;
826 
827 	return q;
828 }
829 
830 static int
notempty(void * a)831 notempty(void *a)
832 {
833 	Queue *q = a;
834 
835 	return (q->state & Qclosed) || q->bfirst != 0;
836 }
837 
838 /*
839  *  wait for the queue to be non-empty or closed.
840  *  called with q ilocked.
841  */
842 static int
qwait(Queue * q)843 qwait(Queue *q)
844 {
845 	/* wait for data */
846 	for(;;){
847 		if(q->bfirst != nil)
848 			break;
849 
850 		if(q->state & Qclosed){
851 			if(++q->eof > 3)
852 				return -1;
853 			if(*q->err && strcmp(q->err, Ehungup) != 0)
854 				return -1;
855 			return 0;
856 		}
857 
858 		q->state |= Qstarve;	/* flag requesting producer to wake me */
859 		iunlock(&q->lk);
860 		sleep(&q->rr, notempty, q);
861 		ilock(&q->lk);
862 	}
863 	return 1;
864 }
865 
866 /*
867  * add a block list to a queue
868  */
869 void
qaddlist(Queue * q,Block * b)870 qaddlist(Queue *q, Block *b)
871 {
872 	/* queue the block */
873 	if(q->bfirst)
874 		q->blast->next = b;
875 	else
876 		q->bfirst = b;
877 	q->len += blockalloclen(b);
878 	q->dlen += blocklen(b);
879 	while(b->next)
880 		b = b->next;
881 	q->blast = b;
882 }
883 
884 /*
885  *  called with q ilocked
886  */
887 Block*
qremove(Queue * q)888 qremove(Queue *q)
889 {
890 	Block *b;
891 
892 	b = q->bfirst;
893 	if(b == nil)
894 		return nil;
895 	q->bfirst = b->next;
896 	b->next = nil;
897 	q->dlen -= BLEN(b);
898 	q->len -= BALLOC(b);
899 	QDEBUG checkb(b, "qremove");
900 	return b;
901 }
902 
903 /*
904  *  copy the contents of a string of blocks into
905  *  memory.  emptied blocks are freed.  return
906  *  pointer to first unconsumed block.
907  */
908 Block*
bl2mem(uchar * p,Block * b,int n)909 bl2mem(uchar *p, Block *b, int n)
910 {
911 	int i;
912 	Block *next;
913 
914 	for(; b != nil; b = next){
915 		i = BLEN(b);
916 		if(i > n){
917 			memmove(p, b->rp, n);
918 			b->rp += n;
919 			return b;
920 		}
921 		memmove(p, b->rp, i);
922 		n -= i;
923 		p += i;
924 		b->rp += i;
925 		next = b->next;
926 		freeb(b);
927 	}
928 	return nil;
929 }
930 
931 /*
932  *  copy the contents of memory into a string of blocks.
933  *  return nil on error.
934  */
935 Block*
mem2bl(uchar * p,int len)936 mem2bl(uchar *p, int len)
937 {
938 	int n;
939 	Block *b, *first, **l;
940 
941 	first = nil;
942 	l = &first;
943 	if(waserror()){
944 		freeblist(first);
945 		nexterror();
946 	}
947 	do {
948 		n = len;
949 		if(n > Maxatomic)
950 			n = Maxatomic;
951 
952 		*l = b = allocb(n);
953 	/*	setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
954 		memmove(b->wp, p, n);
955 		b->wp += n;
956 		p += n;
957 		len -= n;
958 		l = &b->next;
959 	} while(len > 0);
960 	poperror();
961 
962 	return first;
963 }
964 
965 /*
966  *  put a block back to the front of the queue
967  *  called with q ilocked
968  */
969 void
qputback(Queue * q,Block * b)970 qputback(Queue *q, Block *b)
971 {
972 	b->next = q->bfirst;
973 	if(q->bfirst == nil)
974 		q->blast = b;
975 	q->bfirst = b;
976 	q->len += BALLOC(b);
977 	q->dlen += BLEN(b);
978 }
979 
980 /*
981  *  flow control, get producer going again
982  *  called with q ilocked
983  */
984 static void
qwakeup_iunlock(Queue * q)985 qwakeup_iunlock(Queue *q)
986 {
987 	int dowakeup = 0;
988 
989 	/* if writer flow controlled, restart */
990 	if((q->state & Qflow) && q->len < q->limit/2){
991 		q->state &= ~Qflow;
992 		dowakeup = 1;
993 	}
994 
995 	iunlock(&q->lk);
996 
997 	/* wakeup flow controlled writers */
998 	if(dowakeup){
999 		if(q->kick)
1000 			q->kick(q->arg);
1001 		wakeup(&q->wr);
1002 	}
1003 }
1004 
1005 /*
1006  *  get next block from a queue (up to a limit)
1007  */
1008 Block*
qbread(Queue * q,int len)1009 qbread(Queue *q, int len)
1010 {
1011 	Block *b, *nb;
1012 	int n;
1013 
1014 	qlock(&q->rlock);
1015 	if(waserror()){
1016 		qunlock(&q->rlock);
1017 		nexterror();
1018 	}
1019 
1020 	ilock(&q->lk);
1021 	switch(qwait(q)){
1022 	case 0:
1023 		/* queue closed */
1024 		iunlock(&q->lk);
1025 		qunlock(&q->rlock);
1026 		poperror();
1027 		return nil;
1028 	case -1:
1029 		/* multiple reads on a closed queue */
1030 		iunlock(&q->lk);
1031 		error(q->err);
1032 	}
1033 
1034 	/* if we get here, there's at least one block in the queue */
1035 	b = qremove(q);
1036 	n = BLEN(b);
1037 
1038 	/* split block if it's too big and this is not a message queue */
1039 	nb = b;
1040 	if(n > len){
1041 		if((q->state&Qmsg) == 0){
1042 			n -= len;
1043 			b = allocb(n);
1044 			memmove(b->wp, nb->rp+len, n);
1045 			b->wp += n;
1046 			qputback(q, b);
1047 		}
1048 		nb->wp = nb->rp + len;
1049 	}
1050 
1051 	/* restart producer */
1052 	qwakeup_iunlock(q);
1053 
1054 	poperror();
1055 	qunlock(&q->rlock);
1056 	return nb;
1057 }
1058 
1059 /*
1060  *  read a queue.  if no data is queued, post a Block
1061  *  and wait on its Rendez.
1062  */
1063 long
qread(Queue * q,void * vp,int len)1064 qread(Queue *q, void *vp, int len)
1065 {
1066 	Block *b, *first, **l;
1067 	int m, n;
1068 
1069 	qlock(&q->rlock);
1070 	if(waserror()){
1071 		qunlock(&q->rlock);
1072 		nexterror();
1073 	}
1074 
1075 	ilock(&q->lk);
1076 again:
1077 	switch(qwait(q)){
1078 	case 0:
1079 		/* queue closed */
1080 		iunlock(&q->lk);
1081 		qunlock(&q->rlock);
1082 		poperror();
1083 		return 0;
1084 	case -1:
1085 		/* multiple reads on a closed queue */
1086 		iunlock(&q->lk);
1087 		error(q->err);
1088 	}
1089 
1090 	/* if we get here, there's at least one block in the queue */
1091 	if(q->state & Qcoalesce){
1092 		/* when coalescing, 0 length blocks just go away */
1093 		b = q->bfirst;
1094 		if(BLEN(b) <= 0){
1095 			freeb(qremove(q));
1096 			goto again;
1097 		}
1098 
1099 		/*  grab the first block plus as many
1100 		 *  following blocks as will completely
1101 		 *  fit in the read.
1102 		 */
1103 		n = 0;
1104 		l = &first;
1105 		m = BLEN(b);
1106 		for(;;) {
1107 			*l = qremove(q);
1108 			l = &b->next;
1109 			n += m;
1110 
1111 			b = q->bfirst;
1112 			if(b == nil)
1113 				break;
1114 			m = BLEN(b);
1115 			if(n+m > len)
1116 				break;
1117 		}
1118 	} else {
1119 		first = qremove(q);
1120 		n = BLEN(first);
1121 	}
1122 
1123 	/* copy to user space outside of the ilock */
1124 	iunlock(&q->lk);
1125 	b = bl2mem(vp, first, len);
1126 	ilock(&q->lk);
1127 
1128 	/* take care of any left over partial block */
1129 	if(b != nil){
1130 		n -= BLEN(b);
1131 		if(q->state & Qmsg)
1132 			freeb(b);
1133 		else
1134 			qputback(q, b);
1135 	}
1136 
1137 	/* restart producer */
1138 	qwakeup_iunlock(q);
1139 
1140 	poperror();
1141 	qunlock(&q->rlock);
1142 	return n;
1143 }
1144 
1145 static int
qnotfull(void * a)1146 qnotfull(void *a)
1147 {
1148 	Queue *q = a;
1149 
1150 	return q->len < q->limit || (q->state & Qclosed);
1151 }
1152 
1153 ulong noblockcnt;
1154 
1155 /*
1156  *  add a block to a queue obeying flow control
1157  */
1158 long
qbwrite(Queue * q,Block * b)1159 qbwrite(Queue *q, Block *b)
1160 {
1161 	int n, dowakeup;
1162 
1163 	n = BLEN(b);
1164 
1165 	if(q->bypass){
1166 		(*q->bypass)(q->arg, b);
1167 		return n;
1168 	}
1169 
1170 	dowakeup = 0;
1171 	qlock(&q->wlock);
1172 	if(waserror()){
1173 		if(b != nil)
1174 			freeb(b);
1175 		qunlock(&q->wlock);
1176 		nexterror();
1177 	}
1178 
1179 	ilock(&q->lk);
1180 
1181 	/* give up if the queue is closed */
1182 	if(q->state & Qclosed){
1183 		iunlock(&q->lk);
1184 		error(q->err);
1185 	}
1186 
1187 	/* if nonblocking, don't queue over the limit */
1188 	if(q->len >= q->limit){
1189 		if(q->noblock){
1190 			iunlock(&q->lk);
1191 			freeb(b);
1192 			noblockcnt += n;
1193 			qunlock(&q->wlock);
1194 			poperror();
1195 			return n;
1196 		}
1197 	}
1198 
1199 	/* queue the block */
1200 	if(q->bfirst)
1201 		q->blast->next = b;
1202 	else
1203 		q->bfirst = b;
1204 	q->blast = b;
1205 	b->next = 0;
1206 	q->len += BALLOC(b);
1207 	q->dlen += n;
1208 	QDEBUG checkb(b, "qbwrite");
1209 	b = nil;
1210 
1211 	/* make sure other end gets awakened */
1212 	if(q->state & Qstarve){
1213 		q->state &= ~Qstarve;
1214 		dowakeup = 1;
1215 	}
1216 	iunlock(&q->lk);
1217 
1218 	/*  get output going again */
1219 	if(q->kick && (dowakeup || (q->state&Qkick)))
1220 		q->kick(q->arg);
1221 
1222 	/* wakeup anyone consuming at the other end */
1223 	if(dowakeup){
1224 		wakeup(&q->rr);
1225 
1226 		/* if we just wokeup a higher priority process, let it run */
1227 	/*
1228 		p = wakeup(&q->rr);
1229 		if(p != nil && p->priority > up->priority)
1230 			sched();
1231 	 */
1232 	}
1233 
1234 	/*
1235 	 *  flow control, wait for queue to get below the limit
1236 	 *  before allowing the process to continue and queue
1237 	 *  more.  We do this here so that postnote can only
1238 	 *  interrupt us after the data has been queued.  This
1239 	 *  means that things like 9p flushes and ssl messages
1240 	 *  will not be disrupted by software interrupts.
1241 	 *
1242 	 *  Note - this is moderately dangerous since a process
1243 	 *  that keeps getting interrupted and rewriting will
1244 	 *  queue infinite crud.
1245 	 */
1246 	for(;;){
1247 		if(q->noblock || qnotfull(q))
1248 			break;
1249 
1250 		ilock(&q->lk);
1251 		q->state |= Qflow;
1252 		iunlock(&q->lk);
1253 		sleep(&q->wr, qnotfull, q);
1254 	}
1255 	USED(b);
1256 
1257 	qunlock(&q->wlock);
1258 	poperror();
1259 	return n;
1260 }
1261 
1262 /*
1263  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1264  */
1265 int
qwrite(Queue * q,void * vp,int len)1266 qwrite(Queue *q, void *vp, int len)
1267 {
1268 	int n, sofar;
1269 	Block *b;
1270 	uchar *p = vp;
1271 
1272 	QDEBUG if(!islo())
1273 		print("qwrite hi %p\n", getcallerpc(&q));
1274 
1275 	sofar = 0;
1276 	do {
1277 		n = len-sofar;
1278 		if(n > Maxatomic)
1279 			n = Maxatomic;
1280 
1281 		b = allocb(n);
1282 	/*	setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
1283 		if(waserror()){
1284 			freeb(b);
1285 			nexterror();
1286 		}
1287 		memmove(b->wp, p+sofar, n);
1288 		poperror();
1289 		b->wp += n;
1290 
1291 		qbwrite(q, b);
1292 
1293 		sofar += n;
1294 	} while(sofar < len && (q->state & Qmsg) == 0);
1295 
1296 	return len;
1297 }
1298 
1299 /*
1300  *  used by print() to write to a queue.  Since we may be splhi or not in
1301  *  a process, don't qlock.
1302  */
1303 int
qiwrite(Queue * q,void * vp,int len)1304 qiwrite(Queue *q, void *vp, int len)
1305 {
1306 	int n, sofar, dowakeup;
1307 	Block *b;
1308 	uchar *p = vp;
1309 
1310 	dowakeup = 0;
1311 
1312 	sofar = 0;
1313 	do {
1314 		n = len-sofar;
1315 		if(n > Maxatomic)
1316 			n = Maxatomic;
1317 
1318 		b = iallocb(n);
1319 		if(b == nil)
1320 			break;
1321 		memmove(b->wp, p+sofar, n);
1322 		b->wp += n;
1323 
1324 		ilock(&q->lk);
1325 
1326 		QDEBUG checkb(b, "qiwrite");
1327 		if(q->bfirst)
1328 			q->blast->next = b;
1329 		else
1330 			q->bfirst = b;
1331 		q->blast = b;
1332 		q->len += BALLOC(b);
1333 		q->dlen += n;
1334 
1335 		if(q->state & Qstarve){
1336 			q->state &= ~Qstarve;
1337 			dowakeup = 1;
1338 		}
1339 
1340 		iunlock(&q->lk);
1341 
1342 		if(dowakeup){
1343 			if(q->kick)
1344 				q->kick(q->arg);
1345 			wakeup(&q->rr);
1346 		}
1347 
1348 		sofar += n;
1349 	} while(sofar < len && (q->state & Qmsg) == 0);
1350 
1351 	return sofar;
1352 }
1353 
1354 /*
1355  *  be extremely careful when calling this,
1356  *  as there is no reference accounting
1357  */
1358 void
qfree(Queue * q)1359 qfree(Queue *q)
1360 {
1361 	qclose(q);
1362 	free(q);
1363 }
1364 
1365 /*
1366  *  Mark a queue as closed.  No further IO is permitted.
1367  *  All blocks are released.
1368  */
1369 void
qclose(Queue * q)1370 qclose(Queue *q)
1371 {
1372 	Block *bfirst;
1373 
1374 	if(q == nil)
1375 		return;
1376 
1377 	/* mark it */
1378 	ilock(&q->lk);
1379 	q->state |= Qclosed;
1380 	q->state &= ~(Qflow|Qstarve);
1381 	strcpy(q->err, Ehungup);
1382 	bfirst = q->bfirst;
1383 	q->bfirst = 0;
1384 	q->len = 0;
1385 	q->dlen = 0;
1386 	q->noblock = 0;
1387 	iunlock(&q->lk);
1388 
1389 	/* free queued blocks */
1390 	freeblist(bfirst);
1391 
1392 	/* wake up readers/writers */
1393 	wakeup(&q->rr);
1394 	wakeup(&q->wr);
1395 }
1396 
1397 /*
1398  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1399  *  blocks.
1400  */
1401 void
qhangup(Queue * q,char * msg)1402 qhangup(Queue *q, char *msg)
1403 {
1404 	/* mark it */
1405 	ilock(&q->lk);
1406 	q->state |= Qclosed;
1407 	if(msg == 0 || *msg == 0)
1408 		strcpy(q->err, Ehungup);
1409 	else
1410 		strncpy(q->err, msg, ERRMAX-1);
1411 	iunlock(&q->lk);
1412 
1413 	/* wake up readers/writers */
1414 	wakeup(&q->rr);
1415 	wakeup(&q->wr);
1416 }
1417 
1418 /*
1419  *  return non-zero if the q is hungup
1420  */
1421 int
qisclosed(Queue * q)1422 qisclosed(Queue *q)
1423 {
1424 	return q->state & Qclosed;
1425 }
1426 
1427 /*
1428  *  mark a queue as no longer hung up
1429  */
1430 void
qreopen(Queue * q)1431 qreopen(Queue *q)
1432 {
1433 	ilock(&q->lk);
1434 	q->state &= ~Qclosed;
1435 	q->state |= Qstarve;
1436 	q->eof = 0;
1437 	q->limit = q->inilim;
1438 	iunlock(&q->lk);
1439 }
1440 
1441 /*
1442  *  return bytes queued
1443  */
1444 int
qlen(Queue * q)1445 qlen(Queue *q)
1446 {
1447 	return q->dlen;
1448 }
1449 
1450 /*
1451  * return space remaining before flow control
1452  */
1453 int
qwindow(Queue * q)1454 qwindow(Queue *q)
1455 {
1456 	int l;
1457 
1458 	l = q->limit - q->len;
1459 	if(l < 0)
1460 		l = 0;
1461 	return l;
1462 }
1463 
1464 /*
1465  *  return true if we can read without blocking
1466  */
1467 int
qcanread(Queue * q)1468 qcanread(Queue *q)
1469 {
1470 	return q->bfirst!=0;
1471 }
1472 
1473 /*
1474  *  change queue limit
1475  */
1476 void
qsetlimit(Queue * q,int limit)1477 qsetlimit(Queue *q, int limit)
1478 {
1479 	q->limit = limit;
1480 }
1481 
1482 /*
1483  *  set blocking/nonblocking
1484  */
1485 void
qnoblock(Queue * q,int onoff)1486 qnoblock(Queue *q, int onoff)
1487 {
1488 	q->noblock = onoff;
1489 }
1490 
1491 /*
1492  *  flush the output queue
1493  */
1494 void
qflush(Queue * q)1495 qflush(Queue *q)
1496 {
1497 	Block *bfirst;
1498 
1499 	/* mark it */
1500 	ilock(&q->lk);
1501 	bfirst = q->bfirst;
1502 	q->bfirst = 0;
1503 	q->len = 0;
1504 	q->dlen = 0;
1505 	iunlock(&q->lk);
1506 
1507 	/* free queued blocks */
1508 	freeblist(bfirst);
1509 
1510 	/* wake up readers/writers */
1511 	wakeup(&q->wr);
1512 }
1513 
1514 int
qfull(Queue * q)1515 qfull(Queue *q)
1516 {
1517 	return q->state & Qflow;
1518 }
1519 
1520 int
qstate(Queue * q)1521 qstate(Queue *q)
1522 {
1523 	return q->state;
1524 }
1525