1 #include "u.h"
2 #include "lib.h"
3 #include "dat.h"
4 #include "fns.h"
5 #include "error.h"
6
7 static ulong padblockcnt;
8 static ulong concatblockcnt;
9 static ulong pullupblockcnt;
10 static ulong copyblockcnt;
11 static ulong consumecnt;
12 static ulong producecnt;
13 static ulong qcopycnt;
14
15 static int debugging;
16
17 #define QDEBUG if(0)
18
19 /*
20 * IO queues
21 */
22 struct Queue
23 {
24 Lock lk;
25
26 Block* bfirst; /* buffer */
27 Block* blast;
28
29 int len; /* bytes allocated to queue */
30 int dlen; /* data bytes in queue */
31 int limit; /* max bytes in queue */
32 int inilim; /* initial limit */
33 int state;
34 int noblock; /* true if writes return immediately when q full */
35 int eof; /* number of eofs read by user */
36
37 void (*kick)(void*); /* restart output */
38 void (*bypass)(void*, Block*); /* bypass queue altogether */
39 void* arg; /* argument to kick */
40
41 QLock rlock; /* mutex for reading processes */
42 Rendez rr; /* process waiting to read */
43 QLock wlock; /* mutex for writing processes */
44 Rendez wr; /* process waiting to write */
45
46 char err[ERRMAX];
47 };
48
49 enum
50 {
51 Maxatomic = 64*1024,
52 };
53
54 uint qiomaxatomic = Maxatomic;
55
56 void
ixsummary(void)57 ixsummary(void)
58 {
59 debugging ^= 1;
60 iallocsummary();
61 print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
62 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
63 print("consume %lud, produce %lud, qcopy %lud\n",
64 consumecnt, producecnt, qcopycnt);
65 }
66
67 /*
68 * free a list of blocks
69 */
70 void
freeblist(Block * b)71 freeblist(Block *b)
72 {
73 Block *next;
74
75 for(; b != 0; b = next){
76 next = b->next;
77 b->next = 0;
78 freeb(b);
79 }
80 }
81
82 /*
83 * pad a block to the front (or the back if size is negative)
84 */
85 Block*
padblock(Block * bp,int size)86 padblock(Block *bp, int size)
87 {
88 int n;
89 Block *nbp;
90
91 QDEBUG checkb(bp, "padblock 1");
92 if(size >= 0){
93 if(bp->rp - bp->base >= size){
94 bp->rp -= size;
95 return bp;
96 }
97
98 if(bp->next)
99 panic("padblock 0x%p", getcallerpc(&bp));
100 n = BLEN(bp);
101 padblockcnt++;
102 nbp = allocb(size+n);
103 nbp->rp += size;
104 nbp->wp = nbp->rp;
105 memmove(nbp->wp, bp->rp, n);
106 nbp->wp += n;
107 freeb(bp);
108 nbp->rp -= size;
109 } else {
110 size = -size;
111
112 if(bp->next)
113 panic("padblock 0x%p", getcallerpc(&bp));
114
115 if(bp->lim - bp->wp >= size)
116 return bp;
117
118 n = BLEN(bp);
119 padblockcnt++;
120 nbp = allocb(size+n);
121 memmove(nbp->wp, bp->rp, n);
122 nbp->wp += n;
123 freeb(bp);
124 }
125 QDEBUG checkb(nbp, "padblock 1");
126 return nbp;
127 }
128
129 /*
130 * return count of bytes in a string of blocks
131 */
132 int
blocklen(Block * bp)133 blocklen(Block *bp)
134 {
135 int len;
136
137 len = 0;
138 while(bp) {
139 len += BLEN(bp);
140 bp = bp->next;
141 }
142 return len;
143 }
144
145 /*
146 * return count of space in blocks
147 */
148 int
blockalloclen(Block * bp)149 blockalloclen(Block *bp)
150 {
151 int len;
152
153 len = 0;
154 while(bp) {
155 len += BALLOC(bp);
156 bp = bp->next;
157 }
158 return len;
159 }
160
161 /*
162 * copy the string of blocks into
163 * a single block and free the string
164 */
165 Block*
concatblock(Block * bp)166 concatblock(Block *bp)
167 {
168 int len;
169 Block *nb, *f;
170
171 if(bp->next == 0)
172 return bp;
173
174 nb = allocb(blocklen(bp));
175 for(f = bp; f; f = f->next) {
176 len = BLEN(f);
177 memmove(nb->wp, f->rp, len);
178 nb->wp += len;
179 }
180 concatblockcnt += BLEN(nb);
181 freeblist(bp);
182 QDEBUG checkb(nb, "concatblock 1");
183 return nb;
184 }
185
186 /*
187 * make sure the first block has at least n bytes
188 */
189 Block*
pullupblock(Block * bp,int n)190 pullupblock(Block *bp, int n)
191 {
192 int i;
193 Block *nbp;
194
195 /*
196 * this should almost always be true, it's
197 * just to avoid every caller checking.
198 */
199 if(BLEN(bp) >= n)
200 return bp;
201
202 /*
203 * if not enough room in the first block,
204 * add another to the front of the list.
205 */
206 if(bp->lim - bp->rp < n){
207 nbp = allocb(n);
208 nbp->next = bp;
209 bp = nbp;
210 }
211
212 /*
213 * copy bytes from the trailing blocks into the first
214 */
215 n -= BLEN(bp);
216 while((nbp = bp->next)){
217 i = BLEN(nbp);
218 if(i > n) {
219 memmove(bp->wp, nbp->rp, n);
220 pullupblockcnt++;
221 bp->wp += n;
222 nbp->rp += n;
223 QDEBUG checkb(bp, "pullupblock 1");
224 return bp;
225 } else {
226 /* shouldn't happen but why crash if it does */
227 if(i < 0){
228 print("pullup negative length packet\n");
229 i = 0;
230 }
231 memmove(bp->wp, nbp->rp, i);
232 pullupblockcnt++;
233 bp->wp += i;
234 bp->next = nbp->next;
235 nbp->next = 0;
236 freeb(nbp);
237 n -= i;
238 if(n == 0){
239 QDEBUG checkb(bp, "pullupblock 2");
240 return bp;
241 }
242 }
243 }
244 freeb(bp);
245 return 0;
246 }
247
248 /*
249 * make sure the first block has at least n bytes
250 */
251 Block*
pullupqueue(Queue * q,int n)252 pullupqueue(Queue *q, int n)
253 {
254 Block *b;
255
256 if(BLEN(q->bfirst) >= n)
257 return q->bfirst;
258 q->bfirst = pullupblock(q->bfirst, n);
259 for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
260 ;
261 q->blast = b;
262 return q->bfirst;
263 }
264
265 /*
266 * trim to len bytes starting at offset
267 */
268 Block *
trimblock(Block * bp,int offset,int len)269 trimblock(Block *bp, int offset, int len)
270 {
271 ulong l;
272 Block *nb, *startb;
273
274 QDEBUG checkb(bp, "trimblock 1");
275 if(blocklen(bp) < offset+len) {
276 freeblist(bp);
277 return nil;
278 }
279
280 while((l = BLEN(bp)) < offset) {
281 offset -= l;
282 nb = bp->next;
283 bp->next = nil;
284 freeb(bp);
285 bp = nb;
286 }
287
288 startb = bp;
289 bp->rp += offset;
290
291 while((l = BLEN(bp)) < len) {
292 len -= l;
293 bp = bp->next;
294 }
295
296 bp->wp -= (BLEN(bp) - len);
297
298 if(bp->next) {
299 freeblist(bp->next);
300 bp->next = nil;
301 }
302
303 return startb;
304 }
305
306 /*
307 * copy 'count' bytes into a new block
308 */
309 Block*
copyblock(Block * bp,int count)310 copyblock(Block *bp, int count)
311 {
312 int l;
313 Block *nbp;
314
315 QDEBUG checkb(bp, "copyblock 0");
316 nbp = allocb(count);
317 for(; count > 0 && bp != 0; bp = bp->next){
318 l = BLEN(bp);
319 if(l > count)
320 l = count;
321 memmove(nbp->wp, bp->rp, l);
322 nbp->wp += l;
323 count -= l;
324 }
325 if(count > 0){
326 memset(nbp->wp, 0, count);
327 nbp->wp += count;
328 }
329 copyblockcnt++;
330 QDEBUG checkb(nbp, "copyblock 1");
331
332 return nbp;
333 }
334
335 Block*
adjustblock(Block * bp,int len)336 adjustblock(Block* bp, int len)
337 {
338 int n;
339 Block *nbp;
340
341 if(len < 0){
342 freeb(bp);
343 return nil;
344 }
345
346 if(bp->rp+len > bp->lim){
347 nbp = copyblock(bp, len);
348 freeblist(bp);
349 QDEBUG checkb(nbp, "adjustblock 1");
350
351 return nbp;
352 }
353
354 n = BLEN(bp);
355 if(len > n)
356 memset(bp->wp, 0, len-n);
357 bp->wp = bp->rp+len;
358 QDEBUG checkb(bp, "adjustblock 2");
359
360 return bp;
361 }
362
363
364 /*
365 * throw away up to count bytes from a
366 * list of blocks. Return count of bytes
367 * thrown away.
368 */
369 int
pullblock(Block ** bph,int count)370 pullblock(Block **bph, int count)
371 {
372 Block *bp;
373 int n, bytes;
374
375 bytes = 0;
376 if(bph == nil)
377 return 0;
378
379 while(*bph != nil && count != 0) {
380 bp = *bph;
381 n = BLEN(bp);
382 if(count < n)
383 n = count;
384 bytes += n;
385 count -= n;
386 bp->rp += n;
387 QDEBUG checkb(bp, "pullblock ");
388 if(BLEN(bp) == 0) {
389 *bph = bp->next;
390 bp->next = nil;
391 freeb(bp);
392 }
393 }
394 return bytes;
395 }
396
397 /*
398 * get next block from a queue, return null if nothing there
399 */
400 Block*
qget(Queue * q)401 qget(Queue *q)
402 {
403 int dowakeup;
404 Block *b;
405
406 /* sync with qwrite */
407 ilock(&q->lk);
408
409 b = q->bfirst;
410 if(b == nil){
411 q->state |= Qstarve;
412 iunlock(&q->lk);
413 return nil;
414 }
415 q->bfirst = b->next;
416 b->next = 0;
417 q->len -= BALLOC(b);
418 q->dlen -= BLEN(b);
419 QDEBUG checkb(b, "qget");
420
421 /* if writer flow controlled, restart */
422 if((q->state & Qflow) && q->len < q->limit/2){
423 q->state &= ~Qflow;
424 dowakeup = 1;
425 } else
426 dowakeup = 0;
427
428 iunlock(&q->lk);
429
430 if(dowakeup)
431 wakeup(&q->wr);
432
433 return b;
434 }
435
436 /*
437 * throw away the next 'len' bytes in the queue
438 */
439 int
qdiscard(Queue * q,int len)440 qdiscard(Queue *q, int len)
441 {
442 Block *b;
443 int dowakeup, n, sofar;
444
445 ilock(&q->lk);
446 for(sofar = 0; sofar < len; sofar += n){
447 b = q->bfirst;
448 if(b == nil)
449 break;
450 QDEBUG checkb(b, "qdiscard");
451 n = BLEN(b);
452 if(n <= len - sofar){
453 q->bfirst = b->next;
454 b->next = 0;
455 q->len -= BALLOC(b);
456 q->dlen -= BLEN(b);
457 freeb(b);
458 } else {
459 n = len - sofar;
460 b->rp += n;
461 q->dlen -= n;
462 }
463 }
464
465 /*
466 * if writer flow controlled, restart
467 *
468 * This used to be
469 * q->len < q->limit/2
470 * but it slows down tcp too much for certain write sizes.
471 * I really don't understand it completely. It may be
472 * due to the queue draining so fast that the transmission
473 * stalls waiting for the app to produce more data. - presotto
474 */
475 if((q->state & Qflow) && q->len < q->limit){
476 q->state &= ~Qflow;
477 dowakeup = 1;
478 } else
479 dowakeup = 0;
480
481 iunlock(&q->lk);
482
483 if(dowakeup)
484 wakeup(&q->wr);
485
486 return sofar;
487 }
488
489 /*
490 * Interrupt level copy out of a queue, return # bytes copied.
491 */
492 int
qconsume(Queue * q,void * vp,int len)493 qconsume(Queue *q, void *vp, int len)
494 {
495 Block *b;
496 int n, dowakeup;
497 uchar *p = vp;
498 Block *tofree = nil;
499
500 /* sync with qwrite */
501 ilock(&q->lk);
502
503 for(;;) {
504 b = q->bfirst;
505 if(b == 0){
506 q->state |= Qstarve;
507 iunlock(&q->lk);
508 return -1;
509 }
510 QDEBUG checkb(b, "qconsume 1");
511
512 n = BLEN(b);
513 if(n > 0)
514 break;
515 q->bfirst = b->next;
516 q->len -= BALLOC(b);
517
518 /* remember to free this */
519 b->next = tofree;
520 tofree = b;
521 };
522
523 if(n < len)
524 len = n;
525 memmove(p, b->rp, len);
526 consumecnt += n;
527 b->rp += len;
528 q->dlen -= len;
529
530 /* discard the block if we're done with it */
531 if((q->state & Qmsg) || len == n){
532 q->bfirst = b->next;
533 b->next = 0;
534 q->len -= BALLOC(b);
535 q->dlen -= BLEN(b);
536
537 /* remember to free this */
538 b->next = tofree;
539 tofree = b;
540 }
541
542 /* if writer flow controlled, restart */
543 if((q->state & Qflow) && q->len < q->limit/2){
544 q->state &= ~Qflow;
545 dowakeup = 1;
546 } else
547 dowakeup = 0;
548
549 iunlock(&q->lk);
550
551 if(dowakeup)
552 wakeup(&q->wr);
553
554 if(tofree != nil)
555 freeblist(tofree);
556
557 return len;
558 }
559
560 int
qpass(Queue * q,Block * b)561 qpass(Queue *q, Block *b)
562 {
563 int dlen, len, dowakeup;
564
565 /* sync with qread */
566 dowakeup = 0;
567 ilock(&q->lk);
568 if(q->len >= q->limit){
569 freeblist(b);
570 iunlock(&q->lk);
571 return -1;
572 }
573 if(q->state & Qclosed){
574 freeblist(b);
575 iunlock(&q->lk);
576 return BALLOC(b);
577 }
578
579 /* add buffer to queue */
580 if(q->bfirst)
581 q->blast->next = b;
582 else
583 q->bfirst = b;
584 len = BALLOC(b);
585 dlen = BLEN(b);
586 QDEBUG checkb(b, "qpass");
587 while(b->next){
588 b = b->next;
589 QDEBUG checkb(b, "qpass");
590 len += BALLOC(b);
591 dlen += BLEN(b);
592 }
593 q->blast = b;
594 q->len += len;
595 q->dlen += dlen;
596
597 if(q->len >= q->limit/2)
598 q->state |= Qflow;
599
600 if(q->state & Qstarve){
601 q->state &= ~Qstarve;
602 dowakeup = 1;
603 }
604 iunlock(&q->lk);
605
606 if(dowakeup)
607 wakeup(&q->rr);
608
609 return len;
610 }
611
612 int
qpassnolim(Queue * q,Block * b)613 qpassnolim(Queue *q, Block *b)
614 {
615 int dlen, len, dowakeup;
616
617 /* sync with qread */
618 dowakeup = 0;
619 ilock(&q->lk);
620
621 if(q->state & Qclosed){
622 freeblist(b);
623 iunlock(&q->lk);
624 return BALLOC(b);
625 }
626
627 /* add buffer to queue */
628 if(q->bfirst)
629 q->blast->next = b;
630 else
631 q->bfirst = b;
632 len = BALLOC(b);
633 dlen = BLEN(b);
634 QDEBUG checkb(b, "qpass");
635 while(b->next){
636 b = b->next;
637 QDEBUG checkb(b, "qpass");
638 len += BALLOC(b);
639 dlen += BLEN(b);
640 }
641 q->blast = b;
642 q->len += len;
643 q->dlen += dlen;
644
645 if(q->len >= q->limit/2)
646 q->state |= Qflow;
647
648 if(q->state & Qstarve){
649 q->state &= ~Qstarve;
650 dowakeup = 1;
651 }
652 iunlock(&q->lk);
653
654 if(dowakeup)
655 wakeup(&q->rr);
656
657 return len;
658 }
659
660 /*
661 * if the allocated space is way out of line with the used
662 * space, reallocate to a smaller block
663 */
664 Block*
packblock(Block * bp)665 packblock(Block *bp)
666 {
667 Block **l, *nbp;
668 int n;
669
670 for(l = &bp; *l; l = &(*l)->next){
671 nbp = *l;
672 n = BLEN(nbp);
673 if((n<<2) < BALLOC(nbp)){
674 *l = allocb(n);
675 memmove((*l)->wp, nbp->rp, n);
676 (*l)->wp += n;
677 (*l)->next = nbp->next;
678 freeb(nbp);
679 }
680 }
681
682 return bp;
683 }
684
685 int
qproduce(Queue * q,void * vp,int len)686 qproduce(Queue *q, void *vp, int len)
687 {
688 Block *b;
689 int dowakeup;
690 uchar *p = vp;
691
692 /* sync with qread */
693 dowakeup = 0;
694 ilock(&q->lk);
695
696 /* no waiting receivers, room in buffer? */
697 if(q->len >= q->limit){
698 q->state |= Qflow;
699 iunlock(&q->lk);
700 return -1;
701 }
702
703 /* save in buffer */
704 b = iallocb(len);
705 if(b == 0){
706 iunlock(&q->lk);
707 return 0;
708 }
709 memmove(b->wp, p, len);
710 producecnt += len;
711 b->wp += len;
712 if(q->bfirst)
713 q->blast->next = b;
714 else
715 q->bfirst = b;
716 q->blast = b;
717 /* b->next = 0; done by iallocb() */
718 q->len += BALLOC(b);
719 q->dlen += BLEN(b);
720 QDEBUG checkb(b, "qproduce");
721
722 if(q->state & Qstarve){
723 q->state &= ~Qstarve;
724 dowakeup = 1;
725 }
726
727 if(q->len >= q->limit)
728 q->state |= Qflow;
729 iunlock(&q->lk);
730
731 if(dowakeup)
732 wakeup(&q->rr);
733
734 return len;
735 }
736
737 /*
738 * copy from offset in the queue
739 */
740 Block*
qcopy(Queue * q,int len,ulong offset)741 qcopy(Queue *q, int len, ulong offset)
742 {
743 int sofar;
744 int n;
745 Block *b, *nb;
746 uchar *p;
747
748 nb = allocb(len);
749
750 ilock(&q->lk);
751
752 /* go to offset */
753 b = q->bfirst;
754 for(sofar = 0; ; sofar += n){
755 if(b == nil){
756 iunlock(&q->lk);
757 return nb;
758 }
759 n = BLEN(b);
760 if(sofar + n > offset){
761 p = b->rp + offset - sofar;
762 n -= offset - sofar;
763 break;
764 }
765 QDEBUG checkb(b, "qcopy");
766 b = b->next;
767 }
768
769 /* copy bytes from there */
770 for(sofar = 0; sofar < len;){
771 if(n > len - sofar)
772 n = len - sofar;
773 memmove(nb->wp, p, n);
774 qcopycnt += n;
775 sofar += n;
776 nb->wp += n;
777 b = b->next;
778 if(b == nil)
779 break;
780 n = BLEN(b);
781 p = b->rp;
782 }
783 iunlock(&q->lk);
784
785 return nb;
786 }
787
788 /*
789 * called by non-interrupt code
790 */
791 Queue*
qopen(int limit,int msg,void (* kick)(void *),void * arg)792 qopen(int limit, int msg, void (*kick)(void*), void *arg)
793 {
794 Queue *q;
795
796 q = malloc(sizeof(Queue));
797 if(q == 0)
798 return 0;
799
800 q->limit = q->inilim = limit;
801 q->kick = kick;
802 q->arg = arg;
803 q->state = msg;
804
805 q->state |= Qstarve;
806 q->eof = 0;
807 q->noblock = 0;
808
809 return q;
810 }
811
812 /* open a queue to be bypassed */
813 Queue*
qbypass(void (* bypass)(void *,Block *),void * arg)814 qbypass(void (*bypass)(void*, Block*), void *arg)
815 {
816 Queue *q;
817
818 q = malloc(sizeof(Queue));
819 if(q == 0)
820 return 0;
821
822 q->limit = 0;
823 q->arg = arg;
824 q->bypass = bypass;
825 q->state = 0;
826
827 return q;
828 }
829
830 static int
notempty(void * a)831 notempty(void *a)
832 {
833 Queue *q = a;
834
835 return (q->state & Qclosed) || q->bfirst != 0;
836 }
837
838 /*
839 * wait for the queue to be non-empty or closed.
840 * called with q ilocked.
841 */
842 static int
qwait(Queue * q)843 qwait(Queue *q)
844 {
845 /* wait for data */
846 for(;;){
847 if(q->bfirst != nil)
848 break;
849
850 if(q->state & Qclosed){
851 if(++q->eof > 3)
852 return -1;
853 if(*q->err && strcmp(q->err, Ehungup) != 0)
854 return -1;
855 return 0;
856 }
857
858 q->state |= Qstarve; /* flag requesting producer to wake me */
859 iunlock(&q->lk);
860 sleep(&q->rr, notempty, q);
861 ilock(&q->lk);
862 }
863 return 1;
864 }
865
866 /*
867 * add a block list to a queue
868 */
869 void
qaddlist(Queue * q,Block * b)870 qaddlist(Queue *q, Block *b)
871 {
872 /* queue the block */
873 if(q->bfirst)
874 q->blast->next = b;
875 else
876 q->bfirst = b;
877 q->len += blockalloclen(b);
878 q->dlen += blocklen(b);
879 while(b->next)
880 b = b->next;
881 q->blast = b;
882 }
883
884 /*
885 * called with q ilocked
886 */
887 Block*
qremove(Queue * q)888 qremove(Queue *q)
889 {
890 Block *b;
891
892 b = q->bfirst;
893 if(b == nil)
894 return nil;
895 q->bfirst = b->next;
896 b->next = nil;
897 q->dlen -= BLEN(b);
898 q->len -= BALLOC(b);
899 QDEBUG checkb(b, "qremove");
900 return b;
901 }
902
903 /*
904 * copy the contents of a string of blocks into
905 * memory. emptied blocks are freed. return
906 * pointer to first unconsumed block.
907 */
908 Block*
bl2mem(uchar * p,Block * b,int n)909 bl2mem(uchar *p, Block *b, int n)
910 {
911 int i;
912 Block *next;
913
914 for(; b != nil; b = next){
915 i = BLEN(b);
916 if(i > n){
917 memmove(p, b->rp, n);
918 b->rp += n;
919 return b;
920 }
921 memmove(p, b->rp, i);
922 n -= i;
923 p += i;
924 b->rp += i;
925 next = b->next;
926 freeb(b);
927 }
928 return nil;
929 }
930
931 /*
932 * copy the contents of memory into a string of blocks.
933 * return nil on error.
934 */
935 Block*
mem2bl(uchar * p,int len)936 mem2bl(uchar *p, int len)
937 {
938 int n;
939 Block *b, *first, **l;
940
941 first = nil;
942 l = &first;
943 if(waserror()){
944 freeblist(first);
945 nexterror();
946 }
947 do {
948 n = len;
949 if(n > Maxatomic)
950 n = Maxatomic;
951
952 *l = b = allocb(n);
953 /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
954 memmove(b->wp, p, n);
955 b->wp += n;
956 p += n;
957 len -= n;
958 l = &b->next;
959 } while(len > 0);
960 poperror();
961
962 return first;
963 }
964
965 /*
966 * put a block back to the front of the queue
967 * called with q ilocked
968 */
969 void
qputback(Queue * q,Block * b)970 qputback(Queue *q, Block *b)
971 {
972 b->next = q->bfirst;
973 if(q->bfirst == nil)
974 q->blast = b;
975 q->bfirst = b;
976 q->len += BALLOC(b);
977 q->dlen += BLEN(b);
978 }
979
980 /*
981 * flow control, get producer going again
982 * called with q ilocked
983 */
984 static void
qwakeup_iunlock(Queue * q)985 qwakeup_iunlock(Queue *q)
986 {
987 int dowakeup = 0;
988
989 /* if writer flow controlled, restart */
990 if((q->state & Qflow) && q->len < q->limit/2){
991 q->state &= ~Qflow;
992 dowakeup = 1;
993 }
994
995 iunlock(&q->lk);
996
997 /* wakeup flow controlled writers */
998 if(dowakeup){
999 if(q->kick)
1000 q->kick(q->arg);
1001 wakeup(&q->wr);
1002 }
1003 }
1004
1005 /*
1006 * get next block from a queue (up to a limit)
1007 */
1008 Block*
qbread(Queue * q,int len)1009 qbread(Queue *q, int len)
1010 {
1011 Block *b, *nb;
1012 int n;
1013
1014 qlock(&q->rlock);
1015 if(waserror()){
1016 qunlock(&q->rlock);
1017 nexterror();
1018 }
1019
1020 ilock(&q->lk);
1021 switch(qwait(q)){
1022 case 0:
1023 /* queue closed */
1024 iunlock(&q->lk);
1025 qunlock(&q->rlock);
1026 poperror();
1027 return nil;
1028 case -1:
1029 /* multiple reads on a closed queue */
1030 iunlock(&q->lk);
1031 error(q->err);
1032 }
1033
1034 /* if we get here, there's at least one block in the queue */
1035 b = qremove(q);
1036 n = BLEN(b);
1037
1038 /* split block if it's too big and this is not a message queue */
1039 nb = b;
1040 if(n > len){
1041 if((q->state&Qmsg) == 0){
1042 n -= len;
1043 b = allocb(n);
1044 memmove(b->wp, nb->rp+len, n);
1045 b->wp += n;
1046 qputback(q, b);
1047 }
1048 nb->wp = nb->rp + len;
1049 }
1050
1051 /* restart producer */
1052 qwakeup_iunlock(q);
1053
1054 poperror();
1055 qunlock(&q->rlock);
1056 return nb;
1057 }
1058
1059 /*
1060 * read a queue. if no data is queued, post a Block
1061 * and wait on its Rendez.
1062 */
1063 long
qread(Queue * q,void * vp,int len)1064 qread(Queue *q, void *vp, int len)
1065 {
1066 Block *b, *first, **l;
1067 int m, n;
1068
1069 qlock(&q->rlock);
1070 if(waserror()){
1071 qunlock(&q->rlock);
1072 nexterror();
1073 }
1074
1075 ilock(&q->lk);
1076 again:
1077 switch(qwait(q)){
1078 case 0:
1079 /* queue closed */
1080 iunlock(&q->lk);
1081 qunlock(&q->rlock);
1082 poperror();
1083 return 0;
1084 case -1:
1085 /* multiple reads on a closed queue */
1086 iunlock(&q->lk);
1087 error(q->err);
1088 }
1089
1090 /* if we get here, there's at least one block in the queue */
1091 if(q->state & Qcoalesce){
1092 /* when coalescing, 0 length blocks just go away */
1093 b = q->bfirst;
1094 if(BLEN(b) <= 0){
1095 freeb(qremove(q));
1096 goto again;
1097 }
1098
1099 /* grab the first block plus as many
1100 * following blocks as will completely
1101 * fit in the read.
1102 */
1103 n = 0;
1104 l = &first;
1105 m = BLEN(b);
1106 for(;;) {
1107 *l = qremove(q);
1108 l = &b->next;
1109 n += m;
1110
1111 b = q->bfirst;
1112 if(b == nil)
1113 break;
1114 m = BLEN(b);
1115 if(n+m > len)
1116 break;
1117 }
1118 } else {
1119 first = qremove(q);
1120 n = BLEN(first);
1121 }
1122
1123 /* copy to user space outside of the ilock */
1124 iunlock(&q->lk);
1125 b = bl2mem(vp, first, len);
1126 ilock(&q->lk);
1127
1128 /* take care of any left over partial block */
1129 if(b != nil){
1130 n -= BLEN(b);
1131 if(q->state & Qmsg)
1132 freeb(b);
1133 else
1134 qputback(q, b);
1135 }
1136
1137 /* restart producer */
1138 qwakeup_iunlock(q);
1139
1140 poperror();
1141 qunlock(&q->rlock);
1142 return n;
1143 }
1144
1145 static int
qnotfull(void * a)1146 qnotfull(void *a)
1147 {
1148 Queue *q = a;
1149
1150 return q->len < q->limit || (q->state & Qclosed);
1151 }
1152
1153 ulong noblockcnt;
1154
1155 /*
1156 * add a block to a queue obeying flow control
1157 */
1158 long
qbwrite(Queue * q,Block * b)1159 qbwrite(Queue *q, Block *b)
1160 {
1161 int n, dowakeup;
1162
1163 n = BLEN(b);
1164
1165 if(q->bypass){
1166 (*q->bypass)(q->arg, b);
1167 return n;
1168 }
1169
1170 dowakeup = 0;
1171 qlock(&q->wlock);
1172 if(waserror()){
1173 if(b != nil)
1174 freeb(b);
1175 qunlock(&q->wlock);
1176 nexterror();
1177 }
1178
1179 ilock(&q->lk);
1180
1181 /* give up if the queue is closed */
1182 if(q->state & Qclosed){
1183 iunlock(&q->lk);
1184 error(q->err);
1185 }
1186
1187 /* if nonblocking, don't queue over the limit */
1188 if(q->len >= q->limit){
1189 if(q->noblock){
1190 iunlock(&q->lk);
1191 freeb(b);
1192 noblockcnt += n;
1193 qunlock(&q->wlock);
1194 poperror();
1195 return n;
1196 }
1197 }
1198
1199 /* queue the block */
1200 if(q->bfirst)
1201 q->blast->next = b;
1202 else
1203 q->bfirst = b;
1204 q->blast = b;
1205 b->next = 0;
1206 q->len += BALLOC(b);
1207 q->dlen += n;
1208 QDEBUG checkb(b, "qbwrite");
1209 b = nil;
1210
1211 /* make sure other end gets awakened */
1212 if(q->state & Qstarve){
1213 q->state &= ~Qstarve;
1214 dowakeup = 1;
1215 }
1216 iunlock(&q->lk);
1217
1218 /* get output going again */
1219 if(q->kick && (dowakeup || (q->state&Qkick)))
1220 q->kick(q->arg);
1221
1222 /* wakeup anyone consuming at the other end */
1223 if(dowakeup){
1224 wakeup(&q->rr);
1225
1226 /* if we just wokeup a higher priority process, let it run */
1227 /*
1228 p = wakeup(&q->rr);
1229 if(p != nil && p->priority > up->priority)
1230 sched();
1231 */
1232 }
1233
1234 /*
1235 * flow control, wait for queue to get below the limit
1236 * before allowing the process to continue and queue
1237 * more. We do this here so that postnote can only
1238 * interrupt us after the data has been queued. This
1239 * means that things like 9p flushes and ssl messages
1240 * will not be disrupted by software interrupts.
1241 *
1242 * Note - this is moderately dangerous since a process
1243 * that keeps getting interrupted and rewriting will
1244 * queue infinite crud.
1245 */
1246 for(;;){
1247 if(q->noblock || qnotfull(q))
1248 break;
1249
1250 ilock(&q->lk);
1251 q->state |= Qflow;
1252 iunlock(&q->lk);
1253 sleep(&q->wr, qnotfull, q);
1254 }
1255 USED(b);
1256
1257 qunlock(&q->wlock);
1258 poperror();
1259 return n;
1260 }
1261
1262 /*
1263 * write to a queue. only Maxatomic bytes at a time is atomic.
1264 */
1265 int
qwrite(Queue * q,void * vp,int len)1266 qwrite(Queue *q, void *vp, int len)
1267 {
1268 int n, sofar;
1269 Block *b;
1270 uchar *p = vp;
1271
1272 QDEBUG if(!islo())
1273 print("qwrite hi %p\n", getcallerpc(&q));
1274
1275 sofar = 0;
1276 do {
1277 n = len-sofar;
1278 if(n > Maxatomic)
1279 n = Maxatomic;
1280
1281 b = allocb(n);
1282 /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
1283 if(waserror()){
1284 freeb(b);
1285 nexterror();
1286 }
1287 memmove(b->wp, p+sofar, n);
1288 poperror();
1289 b->wp += n;
1290
1291 qbwrite(q, b);
1292
1293 sofar += n;
1294 } while(sofar < len && (q->state & Qmsg) == 0);
1295
1296 return len;
1297 }
1298
1299 /*
1300 * used by print() to write to a queue. Since we may be splhi or not in
1301 * a process, don't qlock.
1302 */
1303 int
qiwrite(Queue * q,void * vp,int len)1304 qiwrite(Queue *q, void *vp, int len)
1305 {
1306 int n, sofar, dowakeup;
1307 Block *b;
1308 uchar *p = vp;
1309
1310 dowakeup = 0;
1311
1312 sofar = 0;
1313 do {
1314 n = len-sofar;
1315 if(n > Maxatomic)
1316 n = Maxatomic;
1317
1318 b = iallocb(n);
1319 if(b == nil)
1320 break;
1321 memmove(b->wp, p+sofar, n);
1322 b->wp += n;
1323
1324 ilock(&q->lk);
1325
1326 QDEBUG checkb(b, "qiwrite");
1327 if(q->bfirst)
1328 q->blast->next = b;
1329 else
1330 q->bfirst = b;
1331 q->blast = b;
1332 q->len += BALLOC(b);
1333 q->dlen += n;
1334
1335 if(q->state & Qstarve){
1336 q->state &= ~Qstarve;
1337 dowakeup = 1;
1338 }
1339
1340 iunlock(&q->lk);
1341
1342 if(dowakeup){
1343 if(q->kick)
1344 q->kick(q->arg);
1345 wakeup(&q->rr);
1346 }
1347
1348 sofar += n;
1349 } while(sofar < len && (q->state & Qmsg) == 0);
1350
1351 return sofar;
1352 }
1353
1354 /*
1355 * be extremely careful when calling this,
1356 * as there is no reference accounting
1357 */
1358 void
qfree(Queue * q)1359 qfree(Queue *q)
1360 {
1361 qclose(q);
1362 free(q);
1363 }
1364
1365 /*
1366 * Mark a queue as closed. No further IO is permitted.
1367 * All blocks are released.
1368 */
1369 void
qclose(Queue * q)1370 qclose(Queue *q)
1371 {
1372 Block *bfirst;
1373
1374 if(q == nil)
1375 return;
1376
1377 /* mark it */
1378 ilock(&q->lk);
1379 q->state |= Qclosed;
1380 q->state &= ~(Qflow|Qstarve);
1381 strcpy(q->err, Ehungup);
1382 bfirst = q->bfirst;
1383 q->bfirst = 0;
1384 q->len = 0;
1385 q->dlen = 0;
1386 q->noblock = 0;
1387 iunlock(&q->lk);
1388
1389 /* free queued blocks */
1390 freeblist(bfirst);
1391
1392 /* wake up readers/writers */
1393 wakeup(&q->rr);
1394 wakeup(&q->wr);
1395 }
1396
1397 /*
1398 * Mark a queue as closed. Wakeup any readers. Don't remove queued
1399 * blocks.
1400 */
1401 void
qhangup(Queue * q,char * msg)1402 qhangup(Queue *q, char *msg)
1403 {
1404 /* mark it */
1405 ilock(&q->lk);
1406 q->state |= Qclosed;
1407 if(msg == 0 || *msg == 0)
1408 strcpy(q->err, Ehungup);
1409 else
1410 strncpy(q->err, msg, ERRMAX-1);
1411 iunlock(&q->lk);
1412
1413 /* wake up readers/writers */
1414 wakeup(&q->rr);
1415 wakeup(&q->wr);
1416 }
1417
1418 /*
1419 * return non-zero if the q is hungup
1420 */
1421 int
qisclosed(Queue * q)1422 qisclosed(Queue *q)
1423 {
1424 return q->state & Qclosed;
1425 }
1426
1427 /*
1428 * mark a queue as no longer hung up
1429 */
1430 void
qreopen(Queue * q)1431 qreopen(Queue *q)
1432 {
1433 ilock(&q->lk);
1434 q->state &= ~Qclosed;
1435 q->state |= Qstarve;
1436 q->eof = 0;
1437 q->limit = q->inilim;
1438 iunlock(&q->lk);
1439 }
1440
1441 /*
1442 * return bytes queued
1443 */
1444 int
qlen(Queue * q)1445 qlen(Queue *q)
1446 {
1447 return q->dlen;
1448 }
1449
1450 /*
1451 * return space remaining before flow control
1452 */
1453 int
qwindow(Queue * q)1454 qwindow(Queue *q)
1455 {
1456 int l;
1457
1458 l = q->limit - q->len;
1459 if(l < 0)
1460 l = 0;
1461 return l;
1462 }
1463
1464 /*
1465 * return true if we can read without blocking
1466 */
1467 int
qcanread(Queue * q)1468 qcanread(Queue *q)
1469 {
1470 return q->bfirst!=0;
1471 }
1472
1473 /*
1474 * change queue limit
1475 */
1476 void
qsetlimit(Queue * q,int limit)1477 qsetlimit(Queue *q, int limit)
1478 {
1479 q->limit = limit;
1480 }
1481
1482 /*
1483 * set blocking/nonblocking
1484 */
1485 void
qnoblock(Queue * q,int onoff)1486 qnoblock(Queue *q, int onoff)
1487 {
1488 q->noblock = onoff;
1489 }
1490
1491 /*
1492 * flush the output queue
1493 */
1494 void
qflush(Queue * q)1495 qflush(Queue *q)
1496 {
1497 Block *bfirst;
1498
1499 /* mark it */
1500 ilock(&q->lk);
1501 bfirst = q->bfirst;
1502 q->bfirst = 0;
1503 q->len = 0;
1504 q->dlen = 0;
1505 iunlock(&q->lk);
1506
1507 /* free queued blocks */
1508 freeblist(bfirst);
1509
1510 /* wake up readers/writers */
1511 wakeup(&q->wr);
1512 }
1513
1514 int
qfull(Queue * q)1515 qfull(Queue *q)
1516 {
1517 return q->state & Qflow;
1518 }
1519
1520 int
qstate(Queue * q)1521 qstate(Queue *q)
1522 {
1523 return q->state;
1524 }
1525