1 #include "u.h"
2 #include "../port/lib.h"
3 #include "mem.h"
4 #include "dat.h"
5 #include "fns.h"
6 #include "../port/error.h"
7
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14 static ulong qcopycnt;
15
16 static int debugging;
17
18 #define QDEBUG if(0)
19
20 /*
21 * IO queues
22 */
23 typedef struct Queue Queue;
24
25 struct Queue
26 {
27 Lock;
28
29 Block* bfirst; /* buffer */
30 Block* blast;
31
32 int len; /* bytes allocated to queue */
33 int dlen; /* data bytes in queue */
34 int limit; /* max bytes in queue */
35 int inilim; /* initial limit */
36 int state;
37 int noblock; /* true if writes return immediately when q full */
38 int eof; /* number of eofs read by user */
39
40 void (*kick)(void*); /* restart output */
41 void (*bypass)(void*, Block*); /* bypass queue altogether */
42 void* arg; /* argument to kick */
43
44 QLock rlock; /* mutex for reading processes */
45 Rendez rr; /* process waiting to read */
46 QLock wlock; /* mutex for writing processes */
47 Rendez wr; /* process waiting to write */
48
49 char err[ERRMAX];
50 };
51
52 enum
53 {
54 Maxatomic = 64*1024,
55 };
56
57 uint qiomaxatomic = Maxatomic;
58
59 void
ixsummary(void)60 ixsummary(void)
61 {
62 debugging ^= 1;
63 iallocsummary();
64 print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
65 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
66 print("consume %lud, produce %lud, qcopy %lud\n",
67 consumecnt, producecnt, qcopycnt);
68 }
69
70 /*
71 * free a list of blocks
72 */
73 void
freeblist(Block * b)74 freeblist(Block *b)
75 {
76 Block *next;
77
78 for(; b != 0; b = next){
79 next = b->next;
80 b->next = 0;
81 freeb(b);
82 }
83 }
84
85 /*
86 * pad a block to the front (or the back if size is negative)
87 */
88 Block*
padblock(Block * bp,int size)89 padblock(Block *bp, int size)
90 {
91 int n;
92 Block *nbp;
93
94 QDEBUG checkb(bp, "padblock 1");
95 if(size >= 0){
96 if(bp->rp - bp->base >= size){
97 bp->rp -= size;
98 return bp;
99 }
100
101 if(bp->next)
102 panic("padblock %#p", getcallerpc(&bp));
103 n = BLEN(bp);
104 padblockcnt++;
105 nbp = allocb(size+n);
106 nbp->rp += size;
107 nbp->wp = nbp->rp;
108 memmove(nbp->wp, bp->rp, n);
109 nbp->wp += n;
110 freeb(bp);
111 nbp->rp -= size;
112 } else {
113 size = -size;
114
115 if(bp->next)
116 panic("padblock %#p", getcallerpc(&bp));
117
118 if(bp->lim - bp->wp >= size)
119 return bp;
120
121 n = BLEN(bp);
122 padblockcnt++;
123 nbp = allocb(size+n);
124 memmove(nbp->wp, bp->rp, n);
125 nbp->wp += n;
126 freeb(bp);
127 }
128 QDEBUG checkb(nbp, "padblock 1");
129 return nbp;
130 }
131
132 /*
133 * return count of bytes in a string of blocks
134 */
135 int
blocklen(Block * bp)136 blocklen(Block *bp)
137 {
138 int len;
139
140 len = 0;
141 while(bp) {
142 len += BLEN(bp);
143 bp = bp->next;
144 }
145 return len;
146 }
147
148 /*
149 * return count of space in blocks
150 */
151 int
blockalloclen(Block * bp)152 blockalloclen(Block *bp)
153 {
154 int len;
155
156 len = 0;
157 while(bp) {
158 len += BALLOC(bp);
159 bp = bp->next;
160 }
161 return len;
162 }
163
164 /*
165 * copy the string of blocks into
166 * a single block and free the string
167 */
168 Block*
concatblock(Block * bp)169 concatblock(Block *bp)
170 {
171 int len;
172 Block *nb, *f;
173
174 if(bp->next == 0)
175 return bp;
176
177 nb = allocb(blocklen(bp));
178 for(f = bp; f; f = f->next) {
179 len = BLEN(f);
180 memmove(nb->wp, f->rp, len);
181 nb->wp += len;
182 }
183 concatblockcnt += BLEN(nb);
184 freeblist(bp);
185 QDEBUG checkb(nb, "concatblock 1");
186 return nb;
187 }
188
189 /*
190 * make sure the first block has at least n bytes
191 */
192 Block*
pullupblock(Block * bp,int n)193 pullupblock(Block *bp, int n)
194 {
195 int i;
196 Block *nbp;
197
198 /*
199 * this should almost always be true, it's
200 * just to avoid every caller checking.
201 */
202 if(BLEN(bp) >= n)
203 return bp;
204
205 /*
206 * if not enough room in the first block,
207 * add another to the front of the list.
208 */
209 if(bp->lim - bp->rp < n){
210 nbp = allocb(n);
211 nbp->next = bp;
212 bp = nbp;
213 }
214
215 /*
216 * copy bytes from the trailing blocks into the first
217 */
218 n -= BLEN(bp);
219 while(nbp = bp->next){
220 i = BLEN(nbp);
221 if(i > n) {
222 memmove(bp->wp, nbp->rp, n);
223 pullupblockcnt++;
224 bp->wp += n;
225 nbp->rp += n;
226 QDEBUG checkb(bp, "pullupblock 1");
227 return bp;
228 } else {
229 /* shouldn't happen but why crash if it does */
230 if(i < 0){
231 print("pullupblock -ve length, from %#p\n",
232 getcallerpc(&bp));
233 i = 0;
234 }
235 memmove(bp->wp, nbp->rp, i);
236 pullupblockcnt++;
237 bp->wp += i;
238 bp->next = nbp->next;
239 nbp->next = 0;
240 freeb(nbp);
241 n -= i;
242 if(n == 0){
243 QDEBUG checkb(bp, "pullupblock 2");
244 return bp;
245 }
246 }
247 }
248 freeb(bp);
249 return 0;
250 }
251
252 /*
253 * make sure the first block has at least n bytes
254 */
255 Block*
pullupqueue(Queue * q,int n)256 pullupqueue(Queue *q, int n)
257 {
258 Block *b;
259
260 if(BLEN(q->bfirst) >= n)
261 return q->bfirst;
262 q->bfirst = pullupblock(q->bfirst, n);
263 for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
264 ;
265 q->blast = b;
266 return q->bfirst;
267 }
268
269 /*
270 * trim to len bytes starting at offset
271 */
272 Block *
trimblock(Block * bp,int offset,int len)273 trimblock(Block *bp, int offset, int len)
274 {
275 long l;
276 Block *nb, *startb;
277
278 QDEBUG checkb(bp, "trimblock 1");
279 if(blocklen(bp) < offset+len) {
280 freeblist(bp);
281 return nil;
282 }
283
284 while((l = BLEN(bp)) < offset) {
285 offset -= l;
286 nb = bp->next;
287 bp->next = nil;
288 freeb(bp);
289 bp = nb;
290 }
291
292 startb = bp;
293 bp->rp += offset;
294
295 while((l = BLEN(bp)) < len) {
296 len -= l;
297 bp = bp->next;
298 }
299
300 bp->wp -= (BLEN(bp) - len);
301
302 if(bp->next) {
303 freeblist(bp->next);
304 bp->next = nil;
305 }
306
307 return startb;
308 }
309
310 /*
311 * copy 'count' bytes into a new block
312 */
313 Block*
copyblock(Block * bp,int count)314 copyblock(Block *bp, int count)
315 {
316 int l;
317 Block *nbp;
318
319 QDEBUG checkb(bp, "copyblock 0");
320 if(bp->flag & BINTR){
321 nbp = iallocb(count);
322 if(nbp == nil)
323 return nil;
324 }else
325 nbp = allocb(count);
326 for(; count > 0 && bp != 0; bp = bp->next){
327 l = BLEN(bp);
328 if(l > count)
329 l = count;
330 memmove(nbp->wp, bp->rp, l);
331 nbp->wp += l;
332 count -= l;
333 }
334 if(count > 0){
335 memset(nbp->wp, 0, count);
336 nbp->wp += count;
337 }
338 copyblockcnt++;
339 QDEBUG checkb(nbp, "copyblock 1");
340
341 return nbp;
342 }
343
344 Block*
adjustblock(Block * bp,int len)345 adjustblock(Block* bp, int len)
346 {
347 int n;
348 Block *nbp;
349
350 if(len < 0){
351 freeb(bp);
352 return nil;
353 }
354
355 if(bp->rp+len > bp->lim){
356 nbp = copyblock(bp, len);
357 freeblist(bp);
358 QDEBUG checkb(nbp, "adjustblock 1");
359
360 return nbp;
361 }
362
363 n = BLEN(bp);
364 if(len > n)
365 memset(bp->wp, 0, len-n);
366 bp->wp = bp->rp+len;
367 QDEBUG checkb(bp, "adjustblock 2");
368
369 return bp;
370 }
371
372
373 /*
374 * throw away up to count bytes from a
375 * list of blocks. Return count of bytes
376 * thrown away.
377 */
378 int
pullblock(Block ** bph,int count)379 pullblock(Block **bph, int count)
380 {
381 Block *bp;
382 int n, bytes;
383
384 bytes = 0;
385 if(bph == nil)
386 return 0;
387
388 while(*bph != nil && count != 0) {
389 bp = *bph;
390 n = BLEN(bp);
391 if(count < n)
392 n = count;
393 bytes += n;
394 count -= n;
395 bp->rp += n;
396 QDEBUG checkb(bp, "pullblock ");
397 if(BLEN(bp) == 0) {
398 *bph = bp->next;
399 bp->next = nil;
400 freeb(bp);
401 }
402 }
403 return bytes;
404 }
405
406 /*
407 * get next block from a queue, return null if nothing there
408 */
409 Block*
qget(Queue * q)410 qget(Queue *q)
411 {
412 int dowakeup;
413 Block *b;
414
415 /* sync with qwrite */
416 ilock(q);
417
418 b = q->bfirst;
419 if(b == nil){
420 q->state |= Qstarve;
421 iunlock(q);
422 return nil;
423 }
424 q->bfirst = b->next;
425 b->next = 0;
426 q->len -= BALLOC(b);
427 q->dlen -= BLEN(b);
428 QDEBUG checkb(b, "qget");
429
430 /* if writer flow controlled, restart */
431 if((q->state & Qflow) && q->len < q->limit/2){
432 q->state &= ~Qflow;
433 dowakeup = 1;
434 } else
435 dowakeup = 0;
436
437 iunlock(q);
438
439 if(dowakeup)
440 wakeup(&q->wr);
441
442 return b;
443 }
444
445 /*
446 * throw away the next 'len' bytes in the queue
447 */
448 int
qdiscard(Queue * q,int len)449 qdiscard(Queue *q, int len)
450 {
451 Block *b;
452 int dowakeup, n, sofar;
453
454 ilock(q);
455 for(sofar = 0; sofar < len; sofar += n){
456 b = q->bfirst;
457 if(b == nil)
458 break;
459 QDEBUG checkb(b, "qdiscard");
460 n = BLEN(b);
461 if(n <= len - sofar){
462 q->bfirst = b->next;
463 b->next = 0;
464 q->len -= BALLOC(b);
465 q->dlen -= BLEN(b);
466 freeb(b);
467 } else {
468 n = len - sofar;
469 b->rp += n;
470 q->dlen -= n;
471 }
472 }
473
474 /*
475 * if writer flow controlled, restart
476 *
477 * This used to be
478 * q->len < q->limit/2
479 * but it slows down tcp too much for certain write sizes.
480 * I really don't understand it completely. It may be
481 * due to the queue draining so fast that the transmission
482 * stalls waiting for the app to produce more data. - presotto
483 *
484 * changed back from q->len < q->limit for reno tcp. - jmk
485 */
486 if((q->state & Qflow) && q->len < q->limit/2){
487 q->state &= ~Qflow;
488 dowakeup = 1;
489 } else
490 dowakeup = 0;
491
492 iunlock(q);
493
494 if(dowakeup)
495 wakeup(&q->wr);
496
497 return sofar;
498 }
499
500 /*
501 * Interrupt level copy out of a queue, return # bytes copied.
502 */
503 int
qconsume(Queue * q,void * vp,int len)504 qconsume(Queue *q, void *vp, int len)
505 {
506 Block *b;
507 int n, dowakeup;
508 uchar *p = vp;
509 Block *tofree = nil;
510
511 /* sync with qwrite */
512 ilock(q);
513
514 for(;;) {
515 b = q->bfirst;
516 if(b == 0){
517 q->state |= Qstarve;
518 iunlock(q);
519 return -1;
520 }
521 QDEBUG checkb(b, "qconsume 1");
522
523 n = BLEN(b);
524 if(n > 0)
525 break;
526 q->bfirst = b->next;
527 q->len -= BALLOC(b);
528
529 /* remember to free this */
530 b->next = tofree;
531 tofree = b;
532 };
533
534 if(n < len)
535 len = n;
536 memmove(p, b->rp, len);
537 consumecnt += n;
538 b->rp += len;
539 q->dlen -= len;
540
541 /* discard the block if we're done with it */
542 if((q->state & Qmsg) || len == n){
543 q->bfirst = b->next;
544 b->next = 0;
545 q->len -= BALLOC(b);
546 q->dlen -= BLEN(b);
547
548 /* remember to free this */
549 b->next = tofree;
550 tofree = b;
551 }
552
553 /* if writer flow controlled, restart */
554 if((q->state & Qflow) && q->len < q->limit/2){
555 q->state &= ~Qflow;
556 dowakeup = 1;
557 } else
558 dowakeup = 0;
559
560 iunlock(q);
561
562 if(dowakeup)
563 wakeup(&q->wr);
564
565 if(tofree != nil)
566 freeblist(tofree);
567
568 return len;
569 }
570
571 int
qpass(Queue * q,Block * b)572 qpass(Queue *q, Block *b)
573 {
574 int dlen, len, dowakeup;
575
576 /* sync with qread */
577 dowakeup = 0;
578 ilock(q);
579 if(q->len >= q->limit){
580 iunlock(q);
581 freeblist(b);
582 return -1;
583 }
584 if(q->state & Qclosed){
585 len = BALLOC(b);
586 iunlock(q);
587 freeblist(b);
588 return len;
589 }
590
591 /* add buffer to queue */
592 if(q->bfirst)
593 q->blast->next = b;
594 else
595 q->bfirst = b;
596 len = BALLOC(b);
597 dlen = BLEN(b);
598 QDEBUG checkb(b, "qpass");
599 while(b->next){
600 b = b->next;
601 QDEBUG checkb(b, "qpass");
602 len += BALLOC(b);
603 dlen += BLEN(b);
604 }
605 q->blast = b;
606 q->len += len;
607 q->dlen += dlen;
608
609 if(q->len >= q->limit/2)
610 q->state |= Qflow;
611
612 if(q->state & Qstarve){
613 q->state &= ~Qstarve;
614 dowakeup = 1;
615 }
616 iunlock(q);
617
618 if(dowakeup)
619 wakeup(&q->rr);
620
621 return len;
622 }
623
624 int
qpassnolim(Queue * q,Block * b)625 qpassnolim(Queue *q, Block *b)
626 {
627 int dlen, len, dowakeup;
628
629 /* sync with qread */
630 dowakeup = 0;
631 ilock(q);
632
633 if(q->state & Qclosed){
634 len = BALLOC(b);
635 iunlock(q);
636 freeblist(b);
637 return len;
638 }
639
640 /* add buffer to queue */
641 if(q->bfirst)
642 q->blast->next = b;
643 else
644 q->bfirst = b;
645 len = BALLOC(b);
646 dlen = BLEN(b);
647 QDEBUG checkb(b, "qpass");
648 while(b->next){
649 b = b->next;
650 QDEBUG checkb(b, "qpass");
651 len += BALLOC(b);
652 dlen += BLEN(b);
653 }
654 q->blast = b;
655 q->len += len;
656 q->dlen += dlen;
657
658 if(q->len >= q->limit/2)
659 q->state |= Qflow;
660
661 if(q->state & Qstarve){
662 q->state &= ~Qstarve;
663 dowakeup = 1;
664 }
665 iunlock(q);
666
667 if(dowakeup)
668 wakeup(&q->rr);
669
670 return len;
671 }
672
673 /*
674 * if the allocated space is way out of line with the used
675 * space, reallocate to a smaller block
676 */
677 Block*
packblock(Block * bp)678 packblock(Block *bp)
679 {
680 Block **l, *nbp;
681 int n;
682
683 for(l = &bp; *l; l = &(*l)->next){
684 nbp = *l;
685 n = BLEN(nbp);
686 if((n<<2) < BALLOC(nbp)){
687 *l = allocb(n);
688 memmove((*l)->wp, nbp->rp, n);
689 (*l)->wp += n;
690 (*l)->next = nbp->next;
691 freeb(nbp);
692 }
693 }
694
695 return bp;
696 }
697
698 int
qproduce(Queue * q,void * vp,int len)699 qproduce(Queue *q, void *vp, int len)
700 {
701 Block *b;
702 int dowakeup;
703 uchar *p = vp;
704
705 /* sync with qread */
706 dowakeup = 0;
707 ilock(q);
708
709 /* no waiting receivers, room in buffer? */
710 if(q->len >= q->limit){
711 q->state |= Qflow;
712 iunlock(q);
713 return -1;
714 }
715
716 /* save in buffer */
717 b = iallocb(len);
718 if(b == 0){
719 iunlock(q);
720 return 0;
721 }
722 memmove(b->wp, p, len);
723 producecnt += len;
724 b->wp += len;
725 if(q->bfirst)
726 q->blast->next = b;
727 else
728 q->bfirst = b;
729 q->blast = b;
730 /* b->next = 0; done by iallocb() */
731 q->len += BALLOC(b);
732 q->dlen += BLEN(b);
733 QDEBUG checkb(b, "qproduce");
734
735 if(q->state & Qstarve){
736 q->state &= ~Qstarve;
737 dowakeup = 1;
738 }
739
740 if(q->len >= q->limit)
741 q->state |= Qflow;
742 iunlock(q);
743
744 if(dowakeup)
745 wakeup(&q->rr);
746
747 return len;
748 }
749
750 /*
751 * copy from offset in the queue
752 */
753 Block*
qcopy(Queue * q,int len,ulong offset)754 qcopy(Queue *q, int len, ulong offset)
755 {
756 int sofar;
757 int n;
758 Block *b, *nb;
759 uchar *p;
760
761 nb = allocb(len);
762
763 ilock(q);
764
765 /* go to offset */
766 b = q->bfirst;
767 for(sofar = 0; ; sofar += n){
768 if(b == nil){
769 iunlock(q);
770 return nb;
771 }
772 n = BLEN(b);
773 if(sofar + n > offset){
774 p = b->rp + offset - sofar;
775 n -= offset - sofar;
776 break;
777 }
778 QDEBUG checkb(b, "qcopy");
779 b = b->next;
780 }
781
782 /* copy bytes from there */
783 for(sofar = 0; sofar < len;){
784 if(n > len - sofar)
785 n = len - sofar;
786 memmove(nb->wp, p, n);
787 qcopycnt += n;
788 sofar += n;
789 nb->wp += n;
790 b = b->next;
791 if(b == nil)
792 break;
793 n = BLEN(b);
794 p = b->rp;
795 }
796 iunlock(q);
797
798 return nb;
799 }
800
801 /*
802 * called by non-interrupt code
803 */
804 Queue*
qopen(int limit,int msg,void (* kick)(void *),void * arg)805 qopen(int limit, int msg, void (*kick)(void*), void *arg)
806 {
807 Queue *q;
808
809 q = malloc(sizeof(Queue));
810 if(q == 0)
811 return 0;
812
813 q->limit = q->inilim = limit;
814 q->kick = kick;
815 q->arg = arg;
816 q->state = msg;
817
818 q->state |= Qstarve;
819 q->eof = 0;
820 q->noblock = 0;
821
822 return q;
823 }
824
825 /* open a queue to be bypassed */
826 Queue*
qbypass(void (* bypass)(void *,Block *),void * arg)827 qbypass(void (*bypass)(void*, Block*), void *arg)
828 {
829 Queue *q;
830
831 q = malloc(sizeof(Queue));
832 if(q == 0)
833 return 0;
834
835 q->limit = 0;
836 q->arg = arg;
837 q->bypass = bypass;
838 q->state = 0;
839
840 return q;
841 }
842
843 static int
notempty(void * a)844 notempty(void *a)
845 {
846 Queue *q = a;
847
848 return (q->state & Qclosed) || q->bfirst != 0;
849 }
850
851 /*
852 * wait for the queue to be non-empty or closed.
853 * called with q ilocked.
854 */
855 static int
qwait(Queue * q)856 qwait(Queue *q)
857 {
858 /* wait for data */
859 for(;;){
860 if(q->bfirst != nil)
861 break;
862
863 if(q->state & Qclosed){
864 if(++q->eof > 3)
865 return -1;
866 if(*q->err && strcmp(q->err, Ehungup) != 0)
867 return -1;
868 return 0;
869 }
870
871 q->state |= Qstarve; /* flag requesting producer to wake me */
872 iunlock(q);
873 sleep(&q->rr, notempty, q);
874 ilock(q);
875 }
876 return 1;
877 }
878
879 /*
880 * add a block list to a queue
881 */
882 void
qaddlist(Queue * q,Block * b)883 qaddlist(Queue *q, Block *b)
884 {
885 /* queue the block */
886 if(q->bfirst)
887 q->blast->next = b;
888 else
889 q->bfirst = b;
890 q->len += blockalloclen(b);
891 q->dlen += blocklen(b);
892 while(b->next)
893 b = b->next;
894 q->blast = b;
895 }
896
897 /*
898 * called with q ilocked
899 */
900 Block*
qremove(Queue * q)901 qremove(Queue *q)
902 {
903 Block *b;
904
905 b = q->bfirst;
906 if(b == nil)
907 return nil;
908 q->bfirst = b->next;
909 b->next = nil;
910 q->dlen -= BLEN(b);
911 q->len -= BALLOC(b);
912 QDEBUG checkb(b, "qremove");
913 return b;
914 }
915
916 /*
917 * copy the contents of a string of blocks into
918 * memory. emptied blocks are freed. return
919 * pointer to first unconsumed block.
920 */
921 Block*
bl2mem(uchar * p,Block * b,int n)922 bl2mem(uchar *p, Block *b, int n)
923 {
924 int i;
925 Block *next;
926
927 for(; b != nil; b = next){
928 i = BLEN(b);
929 if(i > n){
930 memmove(p, b->rp, n);
931 b->rp += n;
932 return b;
933 }
934 memmove(p, b->rp, i);
935 n -= i;
936 p += i;
937 b->rp += i;
938 next = b->next;
939 freeb(b);
940 }
941 return nil;
942 }
943
944 /*
945 * copy the contents of memory into a string of blocks.
946 * return nil on error.
947 */
948 Block*
mem2bl(uchar * p,int len)949 mem2bl(uchar *p, int len)
950 {
951 int n;
952 Block *b, *first, **l;
953
954 first = nil;
955 l = &first;
956 if(waserror()){
957 freeblist(first);
958 nexterror();
959 }
960 do {
961 n = len;
962 if(n > Maxatomic)
963 n = Maxatomic;
964
965 *l = b = allocb(n);
966 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
967 memmove(b->wp, p, n);
968 b->wp += n;
969 p += n;
970 len -= n;
971 l = &b->next;
972 } while(len > 0);
973 poperror();
974
975 return first;
976 }
977
978 /*
979 * put a block back to the front of the queue
980 * called with q ilocked
981 */
982 void
qputback(Queue * q,Block * b)983 qputback(Queue *q, Block *b)
984 {
985 b->next = q->bfirst;
986 if(q->bfirst == nil)
987 q->blast = b;
988 q->bfirst = b;
989 q->len += BALLOC(b);
990 q->dlen += BLEN(b);
991 }
992
993 /*
994 * flow control, get producer going again
995 * called with q ilocked
996 */
997 static void
qwakeup_iunlock(Queue * q)998 qwakeup_iunlock(Queue *q)
999 {
1000 int dowakeup;
1001
1002 /* if writer flow controlled, restart */
1003 if((q->state & Qflow) && q->len < q->limit/2){
1004 q->state &= ~Qflow;
1005 dowakeup = 1;
1006 }
1007 else
1008 dowakeup = 0;
1009
1010 iunlock(q);
1011
1012 /* wakeup flow controlled writers */
1013 if(dowakeup){
1014 if(q->kick)
1015 q->kick(q->arg);
1016 wakeup(&q->wr);
1017 }
1018 }
1019
1020 /*
1021 * get next block from a queue (up to a limit)
1022 */
1023 Block*
qbread(Queue * q,int len)1024 qbread(Queue *q, int len)
1025 {
1026 Block *b, *nb;
1027 int n;
1028
1029 qlock(&q->rlock);
1030 if(waserror()){
1031 qunlock(&q->rlock);
1032 nexterror();
1033 }
1034
1035 ilock(q);
1036 switch(qwait(q)){
1037 case 0:
1038 /* queue closed */
1039 iunlock(q);
1040 qunlock(&q->rlock);
1041 poperror();
1042 return nil;
1043 case -1:
1044 /* multiple reads on a closed queue */
1045 iunlock(q);
1046 error(q->err);
1047 }
1048
1049 /* if we get here, there's at least one block in the queue */
1050 b = qremove(q);
1051 n = BLEN(b);
1052
1053 /* split block if it's too big and this is not a message queue */
1054 nb = b;
1055 if(n > len){
1056 if((q->state&Qmsg) == 0){
1057 n -= len;
1058 b = allocb(n);
1059 memmove(b->wp, nb->rp+len, n);
1060 b->wp += n;
1061 qputback(q, b);
1062 }
1063 nb->wp = nb->rp + len;
1064 }
1065
1066 /* restart producer */
1067 qwakeup_iunlock(q);
1068
1069 poperror();
1070 qunlock(&q->rlock);
1071 return nb;
1072 }
1073
1074 /*
1075 * read a queue. if no data is queued, post a Block
1076 * and wait on its Rendez.
1077 */
1078 long
qread(Queue * q,void * vp,int len)1079 qread(Queue *q, void *vp, int len)
1080 {
1081 Block *b, *first, **l;
1082 int blen, n;
1083
1084 qlock(&q->rlock);
1085 if(waserror()){
1086 qunlock(&q->rlock);
1087 nexterror();
1088 }
1089
1090 ilock(q);
1091 again:
1092 switch(qwait(q)){
1093 case 0:
1094 /* queue closed */
1095 iunlock(q);
1096 qunlock(&q->rlock);
1097 poperror();
1098 return 0;
1099 case -1:
1100 /* multiple reads on a closed queue */
1101 iunlock(q);
1102 error(q->err);
1103 }
1104
1105 /* if we get here, there's at least one block in the queue */
1106 if(q->state & Qcoalesce){
1107 /* when coalescing, 0 length blocks just go away */
1108 b = q->bfirst;
1109 if(BLEN(b) <= 0){
1110 freeb(qremove(q));
1111 goto again;
1112 }
1113
1114 /* grab the first block plus as many
1115 * following blocks as will completely
1116 * fit in the read.
1117 */
1118 n = 0;
1119 l = &first;
1120 blen = BLEN(b);
1121 for(;;) {
1122 *l = qremove(q);
1123 l = &b->next;
1124 n += blen;
1125
1126 b = q->bfirst;
1127 if(b == nil)
1128 break;
1129 blen = BLEN(b);
1130 if(n+blen > len)
1131 break;
1132 }
1133 } else {
1134 first = qremove(q);
1135 n = BLEN(first);
1136 }
1137
1138 /* copy to user space outside of the ilock */
1139 iunlock(q);
1140 b = bl2mem(vp, first, len);
1141 ilock(q);
1142
1143 /* take care of any left over partial block */
1144 if(b != nil){
1145 n -= BLEN(b);
1146 if(q->state & Qmsg)
1147 freeb(b);
1148 else
1149 qputback(q, b);
1150 }
1151
1152 /* restart producer */
1153 qwakeup_iunlock(q);
1154
1155 poperror();
1156 qunlock(&q->rlock);
1157 return n;
1158 }
1159
1160 static int
qnotfull(void * a)1161 qnotfull(void *a)
1162 {
1163 Queue *q = a;
1164
1165 return q->len < q->limit || (q->state & Qclosed);
1166 }
1167
1168 ulong noblockcnt;
1169
1170 /*
1171 * add a block to a queue obeying flow control
1172 */
1173 long
qbwrite(Queue * q,Block * b)1174 qbwrite(Queue *q, Block *b)
1175 {
1176 int n, dowakeup;
1177 Proc *p;
1178
1179 n = BLEN(b);
1180
1181 if(q->bypass){
1182 (*q->bypass)(q->arg, b);
1183 return n;
1184 }
1185
1186 dowakeup = 0;
1187 qlock(&q->wlock);
1188 if(waserror()){
1189 if(b != nil)
1190 freeb(b);
1191 qunlock(&q->wlock);
1192 nexterror();
1193 }
1194
1195 ilock(q);
1196
1197 /* give up if the queue is closed */
1198 if(q->state & Qclosed){
1199 iunlock(q);
1200 error(q->err);
1201 }
1202
1203 /* if nonblocking, don't queue over the limit */
1204 if(q->len >= q->limit){
1205 if(q->noblock){
1206 iunlock(q);
1207 freeb(b);
1208 noblockcnt += n;
1209 qunlock(&q->wlock);
1210 poperror();
1211 return n;
1212 }
1213 }
1214
1215 /* queue the block */
1216 if(q->bfirst)
1217 q->blast->next = b;
1218 else
1219 q->bfirst = b;
1220 q->blast = b;
1221 b->next = 0;
1222 q->len += BALLOC(b);
1223 q->dlen += n;
1224 QDEBUG checkb(b, "qbwrite");
1225 b = nil;
1226
1227 /* make sure other end gets awakened */
1228 if(q->state & Qstarve){
1229 q->state &= ~Qstarve;
1230 dowakeup = 1;
1231 }
1232 iunlock(q);
1233
1234 /* get output going again */
1235 if(q->kick && (dowakeup || (q->state&Qkick)))
1236 q->kick(q->arg);
1237
1238 /* wakeup anyone consuming at the other end */
1239 if(dowakeup){
1240 p = wakeup(&q->rr);
1241
1242 /* if we just wokeup a higher priority process, let it run */
1243 if(p != nil && p->priority > up->priority)
1244 sched();
1245 }
1246
1247 /*
1248 * flow control, wait for queue to get below the limit
1249 * before allowing the process to continue and queue
1250 * more. We do this here so that postnote can only
1251 * interrupt us after the data has been queued. This
1252 * means that things like 9p flushes and ssl messages
1253 * will not be disrupted by software interrupts.
1254 *
1255 * Note - this is moderately dangerous since a process
1256 * that keeps getting interrupted and rewriting will
1257 * queue infinite crud.
1258 */
1259 for(;;){
1260 if(q->noblock || qnotfull(q))
1261 break;
1262
1263 ilock(q);
1264 q->state |= Qflow;
1265 iunlock(q);
1266 sleep(&q->wr, qnotfull, q);
1267 }
1268 USED(b);
1269
1270 qunlock(&q->wlock);
1271 poperror();
1272 return n;
1273 }
1274
1275 /*
1276 * write to a queue. only Maxatomic bytes at a time is atomic.
1277 */
1278 int
qwrite(Queue * q,void * vp,int len)1279 qwrite(Queue *q, void *vp, int len)
1280 {
1281 int n, sofar;
1282 Block *b;
1283 uchar *p = vp;
1284
1285 QDEBUG if(!islo())
1286 print("qwrite hi %#p\n", getcallerpc(&q));
1287
1288 sofar = 0;
1289 do {
1290 n = len-sofar;
1291 if(n > Maxatomic)
1292 n = Maxatomic;
1293
1294 b = allocb(n);
1295 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
1296 if(waserror()){
1297 freeb(b);
1298 nexterror();
1299 }
1300 memmove(b->wp, p+sofar, n);
1301 poperror();
1302 b->wp += n;
1303
1304 qbwrite(q, b);
1305
1306 sofar += n;
1307 } while(sofar < len && (q->state & Qmsg) == 0);
1308
1309 return len;
1310 }
1311
1312 /*
1313 * used by print() to write to a queue. Since we may be splhi or not in
1314 * a process, don't qlock.
1315 *
1316 * this routine merges adjacent blocks if block n+1 will fit into
1317 * the free space of block n.
1318 */
1319 int
qiwrite(Queue * q,void * vp,int len)1320 qiwrite(Queue *q, void *vp, int len)
1321 {
1322 int n, sofar, dowakeup;
1323 Block *b;
1324 uchar *p = vp;
1325
1326 dowakeup = 0;
1327
1328 sofar = 0;
1329 do {
1330 n = len-sofar;
1331 if(n > Maxatomic)
1332 n = Maxatomic;
1333
1334 b = iallocb(n);
1335 if(b == nil)
1336 break;
1337 memmove(b->wp, p+sofar, n);
1338 b->wp += n;
1339
1340 ilock(q);
1341
1342 /* we use an artificially high limit for kernel prints since anything
1343 * over the limit gets dropped
1344 */
1345 if(q->dlen >= 16*1024){
1346 iunlock(q);
1347 freeb(b);
1348 break;
1349 }
1350
1351 QDEBUG checkb(b, "qiwrite");
1352 if(q->bfirst)
1353 q->blast->next = b;
1354 else
1355 q->bfirst = b;
1356 q->blast = b;
1357 q->len += BALLOC(b);
1358 q->dlen += n;
1359
1360 if(q->state & Qstarve){
1361 q->state &= ~Qstarve;
1362 dowakeup = 1;
1363 }
1364
1365 iunlock(q);
1366
1367 if(dowakeup){
1368 if(q->kick)
1369 q->kick(q->arg);
1370 wakeup(&q->rr);
1371 }
1372
1373 sofar += n;
1374 } while(sofar < len && (q->state & Qmsg) == 0);
1375
1376 return sofar;
1377 }
1378
1379 /*
1380 * be extremely careful when calling this,
1381 * as there is no reference accounting
1382 */
1383 void
qfree(Queue * q)1384 qfree(Queue *q)
1385 {
1386 qclose(q);
1387 free(q);
1388 }
1389
1390 /*
1391 * Mark a queue as closed. No further IO is permitted.
1392 * All blocks are released.
1393 */
1394 void
qclose(Queue * q)1395 qclose(Queue *q)
1396 {
1397 Block *bfirst;
1398
1399 if(q == nil)
1400 return;
1401
1402 /* mark it */
1403 ilock(q);
1404 q->state |= Qclosed;
1405 q->state &= ~(Qflow|Qstarve);
1406 strcpy(q->err, Ehungup);
1407 bfirst = q->bfirst;
1408 q->bfirst = 0;
1409 q->len = 0;
1410 q->dlen = 0;
1411 q->noblock = 0;
1412 iunlock(q);
1413
1414 /* free queued blocks */
1415 freeblist(bfirst);
1416
1417 /* wake up readers/writers */
1418 wakeup(&q->rr);
1419 wakeup(&q->wr);
1420 }
1421
1422 /*
1423 * Mark a queue as closed. Wakeup any readers. Don't remove queued
1424 * blocks.
1425 */
1426 void
qhangup(Queue * q,char * msg)1427 qhangup(Queue *q, char *msg)
1428 {
1429 /* mark it */
1430 ilock(q);
1431 q->state |= Qclosed;
1432 if(msg == 0 || *msg == 0)
1433 strcpy(q->err, Ehungup);
1434 else
1435 strncpy(q->err, msg, ERRMAX-1);
1436 iunlock(q);
1437
1438 /* wake up readers/writers */
1439 wakeup(&q->rr);
1440 wakeup(&q->wr);
1441 }
1442
1443 /*
1444 * return non-zero if the q is hungup
1445 */
1446 int
qisclosed(Queue * q)1447 qisclosed(Queue *q)
1448 {
1449 return q->state & Qclosed;
1450 }
1451
1452 /*
1453 * mark a queue as no longer hung up
1454 */
1455 void
qreopen(Queue * q)1456 qreopen(Queue *q)
1457 {
1458 ilock(q);
1459 q->state &= ~Qclosed;
1460 q->state |= Qstarve;
1461 q->eof = 0;
1462 q->limit = q->inilim;
1463 iunlock(q);
1464 }
1465
1466 /*
1467 * return bytes queued
1468 */
1469 int
qlen(Queue * q)1470 qlen(Queue *q)
1471 {
1472 return q->dlen;
1473 }
1474
1475 /*
1476 * return space remaining before flow control
1477 */
1478 int
qwindow(Queue * q)1479 qwindow(Queue *q)
1480 {
1481 int l;
1482
1483 l = q->limit - q->len;
1484 if(l < 0)
1485 l = 0;
1486 return l;
1487 }
1488
1489 /*
1490 * return true if we can read without blocking
1491 */
1492 int
qcanread(Queue * q)1493 qcanread(Queue *q)
1494 {
1495 return q->bfirst!=0;
1496 }
1497
1498 /*
1499 * change queue limit
1500 */
1501 void
qsetlimit(Queue * q,int limit)1502 qsetlimit(Queue *q, int limit)
1503 {
1504 q->limit = limit;
1505 }
1506
1507 /*
1508 * set blocking/nonblocking
1509 */
1510 void
qnoblock(Queue * q,int onoff)1511 qnoblock(Queue *q, int onoff)
1512 {
1513 q->noblock = onoff;
1514 }
1515
1516 /*
1517 * flush the output queue
1518 */
1519 void
qflush(Queue * q)1520 qflush(Queue *q)
1521 {
1522 Block *bfirst;
1523
1524 /* mark it */
1525 ilock(q);
1526 bfirst = q->bfirst;
1527 q->bfirst = 0;
1528 q->len = 0;
1529 q->dlen = 0;
1530 iunlock(q);
1531
1532 /* free queued blocks */
1533 freeblist(bfirst);
1534
1535 /* wake up readers/writers */
1536 wakeup(&q->wr);
1537 }
1538
1539 int
qfull(Queue * q)1540 qfull(Queue *q)
1541 {
1542 return q->state & Qflow;
1543 }
1544
1545 int
qstate(Queue * q)1546 qstate(Queue *q)
1547 {
1548 return q->state;
1549 }
1550