1 #include "u.h" 2 #include "lib.h" 3 #include "dat.h" 4 #include "fns.h" 5 #include "error.h" 6 7 static ulong padblockcnt; 8 static ulong concatblockcnt; 9 static ulong pullupblockcnt; 10 static ulong copyblockcnt; 11 static ulong consumecnt; 12 static ulong producecnt; 13 static ulong qcopycnt; 14 15 static int debugging; 16 17 #define QDEBUG if(0) 18 19 /* 20 * IO queues 21 */ 22 struct Queue 23 { 24 Lock lk; 25 26 Block* bfirst; /* buffer */ 27 Block* blast; 28 29 int len; /* bytes allocated to queue */ 30 int dlen; /* data bytes in queue */ 31 int limit; /* max bytes in queue */ 32 int inilim; /* initial limit */ 33 int state; 34 int noblock; /* true if writes return immediately when q full */ 35 int eof; /* number of eofs read by user */ 36 37 void (*kick)(void*); /* restart output */ 38 void (*bypass)(void*, Block*); /* bypass queue altogether */ 39 void* arg; /* argument to kick */ 40 41 QLock rlock; /* mutex for reading processes */ 42 Rendez rr; /* process waiting to read */ 43 QLock wlock; /* mutex for writing processes */ 44 Rendez wr; /* process waiting to write */ 45 46 char err[ERRMAX]; 47 }; 48 49 enum 50 { 51 Maxatomic = 64*1024, 52 }; 53 54 uint qiomaxatomic = Maxatomic; 55 56 void 57 ixsummary(void) 58 { 59 debugging ^= 1; 60 iallocsummary(); 61 print("pad %lud, concat %lud, pullup %lud, copy %lud\n", 62 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt); 63 print("consume %lud, produce %lud, qcopy %lud\n", 64 consumecnt, producecnt, qcopycnt); 65 } 66 67 /* 68 * free a list of blocks 69 */ 70 void 71 freeblist(Block *b) 72 { 73 Block *next; 74 75 for(; b != 0; b = next){ 76 next = b->next; 77 b->next = 0; 78 freeb(b); 79 } 80 } 81 82 /* 83 * pad a block to the front (or the back if size is negative) 84 */ 85 Block* 86 padblock(Block *bp, int size) 87 { 88 int n; 89 Block *nbp; 90 91 QDEBUG checkb(bp, "padblock 1"); 92 if(size >= 0){ 93 if(bp->rp - bp->base >= size){ 94 bp->rp -= size; 95 return bp; 96 } 97 98 if(bp->next) 99 panic("padblock 0x%p", getcallerpc(&bp)); 100 n = BLEN(bp); 101 padblockcnt++; 102 nbp = allocb(size+n); 103 nbp->rp += size; 104 nbp->wp = nbp->rp; 105 memmove(nbp->wp, bp->rp, n); 106 nbp->wp += n; 107 freeb(bp); 108 nbp->rp -= size; 109 } else { 110 size = -size; 111 112 if(bp->next) 113 panic("padblock 0x%p", getcallerpc(&bp)); 114 115 if(bp->lim - bp->wp >= size) 116 return bp; 117 118 n = BLEN(bp); 119 padblockcnt++; 120 nbp = allocb(size+n); 121 memmove(nbp->wp, bp->rp, n); 122 nbp->wp += n; 123 freeb(bp); 124 } 125 QDEBUG checkb(nbp, "padblock 1"); 126 return nbp; 127 } 128 129 /* 130 * return count of bytes in a string of blocks 131 */ 132 int 133 blocklen(Block *bp) 134 { 135 int len; 136 137 len = 0; 138 while(bp) { 139 len += BLEN(bp); 140 bp = bp->next; 141 } 142 return len; 143 } 144 145 /* 146 * return count of space in blocks 147 */ 148 int 149 blockalloclen(Block *bp) 150 { 151 int len; 152 153 len = 0; 154 while(bp) { 155 len += BALLOC(bp); 156 bp = bp->next; 157 } 158 return len; 159 } 160 161 /* 162 * copy the string of blocks into 163 * a single block and free the string 164 */ 165 Block* 166 concatblock(Block *bp) 167 { 168 int len; 169 Block *nb, *f; 170 171 if(bp->next == 0) 172 return bp; 173 174 nb = allocb(blocklen(bp)); 175 for(f = bp; f; f = f->next) { 176 len = BLEN(f); 177 memmove(nb->wp, f->rp, len); 178 nb->wp += len; 179 } 180 concatblockcnt += BLEN(nb); 181 freeblist(bp); 182 QDEBUG checkb(nb, "concatblock 1"); 183 return nb; 184 } 185 186 /* 187 * make sure the first block has at least n bytes 188 */ 189 Block* 190 pullupblock(Block *bp, int n) 191 { 192 int i; 193 Block *nbp; 194 195 /* 196 * this should almost always be true, it's 197 * just to avoid every caller checking. 198 */ 199 if(BLEN(bp) >= n) 200 return bp; 201 202 /* 203 * if not enough room in the first block, 204 * add another to the front of the list. 205 */ 206 if(bp->lim - bp->rp < n){ 207 nbp = allocb(n); 208 nbp->next = bp; 209 bp = nbp; 210 } 211 212 /* 213 * copy bytes from the trailing blocks into the first 214 */ 215 n -= BLEN(bp); 216 while((nbp = bp->next)){ 217 i = BLEN(nbp); 218 if(i > n) { 219 memmove(bp->wp, nbp->rp, n); 220 pullupblockcnt++; 221 bp->wp += n; 222 nbp->rp += n; 223 QDEBUG checkb(bp, "pullupblock 1"); 224 return bp; 225 } else { 226 /* shouldn't happen but why crash if it does */ 227 if(i < 0){ 228 print("pullup negative length packet\n"); 229 i = 0; 230 } 231 memmove(bp->wp, nbp->rp, i); 232 pullupblockcnt++; 233 bp->wp += i; 234 bp->next = nbp->next; 235 nbp->next = 0; 236 freeb(nbp); 237 n -= i; 238 if(n == 0){ 239 QDEBUG checkb(bp, "pullupblock 2"); 240 return bp; 241 } 242 } 243 } 244 freeb(bp); 245 return 0; 246 } 247 248 /* 249 * make sure the first block has at least n bytes 250 */ 251 Block* 252 pullupqueue(Queue *q, int n) 253 { 254 Block *b; 255 256 if(BLEN(q->bfirst) >= n) 257 return q->bfirst; 258 q->bfirst = pullupblock(q->bfirst, n); 259 for(b = q->bfirst; b != nil && b->next != nil; b = b->next) 260 ; 261 q->blast = b; 262 return q->bfirst; 263 } 264 265 /* 266 * trim to len bytes starting at offset 267 */ 268 Block * 269 trimblock(Block *bp, int offset, int len) 270 { 271 ulong l; 272 Block *nb, *startb; 273 274 QDEBUG checkb(bp, "trimblock 1"); 275 if(blocklen(bp) < offset+len) { 276 freeblist(bp); 277 return nil; 278 } 279 280 while((l = BLEN(bp)) < offset) { 281 offset -= l; 282 nb = bp->next; 283 bp->next = nil; 284 freeb(bp); 285 bp = nb; 286 } 287 288 startb = bp; 289 bp->rp += offset; 290 291 while((l = BLEN(bp)) < len) { 292 len -= l; 293 bp = bp->next; 294 } 295 296 bp->wp -= (BLEN(bp) - len); 297 298 if(bp->next) { 299 freeblist(bp->next); 300 bp->next = nil; 301 } 302 303 return startb; 304 } 305 306 /* 307 * copy 'count' bytes into a new block 308 */ 309 Block* 310 copyblock(Block *bp, int count) 311 { 312 int l; 313 Block *nbp; 314 315 QDEBUG checkb(bp, "copyblock 0"); 316 nbp = allocb(count); 317 for(; count > 0 && bp != 0; bp = bp->next){ 318 l = BLEN(bp); 319 if(l > count) 320 l = count; 321 memmove(nbp->wp, bp->rp, l); 322 nbp->wp += l; 323 count -= l; 324 } 325 if(count > 0){ 326 memset(nbp->wp, 0, count); 327 nbp->wp += count; 328 } 329 copyblockcnt++; 330 QDEBUG checkb(nbp, "copyblock 1"); 331 332 return nbp; 333 } 334 335 Block* 336 adjustblock(Block* bp, int len) 337 { 338 int n; 339 Block *nbp; 340 341 if(len < 0){ 342 freeb(bp); 343 return nil; 344 } 345 346 if(bp->rp+len > bp->lim){ 347 nbp = copyblock(bp, len); 348 freeblist(bp); 349 QDEBUG checkb(nbp, "adjustblock 1"); 350 351 return nbp; 352 } 353 354 n = BLEN(bp); 355 if(len > n) 356 memset(bp->wp, 0, len-n); 357 bp->wp = bp->rp+len; 358 QDEBUG checkb(bp, "adjustblock 2"); 359 360 return bp; 361 } 362 363 364 /* 365 * throw away up to count bytes from a 366 * list of blocks. Return count of bytes 367 * thrown away. 368 */ 369 int 370 pullblock(Block **bph, int count) 371 { 372 Block *bp; 373 int n, bytes; 374 375 bytes = 0; 376 if(bph == nil) 377 return 0; 378 379 while(*bph != nil && count != 0) { 380 bp = *bph; 381 n = BLEN(bp); 382 if(count < n) 383 n = count; 384 bytes += n; 385 count -= n; 386 bp->rp += n; 387 QDEBUG checkb(bp, "pullblock "); 388 if(BLEN(bp) == 0) { 389 *bph = bp->next; 390 bp->next = nil; 391 freeb(bp); 392 } 393 } 394 return bytes; 395 } 396 397 /* 398 * get next block from a queue, return null if nothing there 399 */ 400 Block* 401 qget(Queue *q) 402 { 403 int dowakeup; 404 Block *b; 405 406 /* sync with qwrite */ 407 ilock(&q->lk); 408 409 b = q->bfirst; 410 if(b == nil){ 411 q->state |= Qstarve; 412 iunlock(&q->lk); 413 return nil; 414 } 415 q->bfirst = b->next; 416 b->next = 0; 417 q->len -= BALLOC(b); 418 q->dlen -= BLEN(b); 419 QDEBUG checkb(b, "qget"); 420 421 /* if writer flow controlled, restart */ 422 if((q->state & Qflow) && q->len < q->limit/2){ 423 q->state &= ~Qflow; 424 dowakeup = 1; 425 } else 426 dowakeup = 0; 427 428 iunlock(&q->lk); 429 430 if(dowakeup) 431 wakeup(&q->wr); 432 433 return b; 434 } 435 436 /* 437 * throw away the next 'len' bytes in the queue 438 */ 439 int 440 qdiscard(Queue *q, int len) 441 { 442 Block *b; 443 int dowakeup, n, sofar; 444 445 ilock(&q->lk); 446 for(sofar = 0; sofar < len; sofar += n){ 447 b = q->bfirst; 448 if(b == nil) 449 break; 450 QDEBUG checkb(b, "qdiscard"); 451 n = BLEN(b); 452 if(n <= len - sofar){ 453 q->bfirst = b->next; 454 b->next = 0; 455 q->len -= BALLOC(b); 456 q->dlen -= BLEN(b); 457 freeb(b); 458 } else { 459 n = len - sofar; 460 b->rp += n; 461 q->dlen -= n; 462 } 463 } 464 465 /* 466 * if writer flow controlled, restart 467 * 468 * This used to be 469 * q->len < q->limit/2 470 * but it slows down tcp too much for certain write sizes. 471 * I really don't understand it completely. It may be 472 * due to the queue draining so fast that the transmission 473 * stalls waiting for the app to produce more data. - presotto 474 */ 475 if((q->state & Qflow) && q->len < q->limit){ 476 q->state &= ~Qflow; 477 dowakeup = 1; 478 } else 479 dowakeup = 0; 480 481 iunlock(&q->lk); 482 483 if(dowakeup) 484 wakeup(&q->wr); 485 486 return sofar; 487 } 488 489 /* 490 * Interrupt level copy out of a queue, return # bytes copied. 491 */ 492 int 493 qconsume(Queue *q, void *vp, int len) 494 { 495 Block *b; 496 int n, dowakeup; 497 uchar *p = vp; 498 Block *tofree = nil; 499 500 /* sync with qwrite */ 501 ilock(&q->lk); 502 503 for(;;) { 504 b = q->bfirst; 505 if(b == 0){ 506 q->state |= Qstarve; 507 iunlock(&q->lk); 508 return -1; 509 } 510 QDEBUG checkb(b, "qconsume 1"); 511 512 n = BLEN(b); 513 if(n > 0) 514 break; 515 q->bfirst = b->next; 516 q->len -= BALLOC(b); 517 518 /* remember to free this */ 519 b->next = tofree; 520 tofree = b; 521 }; 522 523 if(n < len) 524 len = n; 525 memmove(p, b->rp, len); 526 consumecnt += n; 527 b->rp += len; 528 q->dlen -= len; 529 530 /* discard the block if we're done with it */ 531 if((q->state & Qmsg) || len == n){ 532 q->bfirst = b->next; 533 b->next = 0; 534 q->len -= BALLOC(b); 535 q->dlen -= BLEN(b); 536 537 /* remember to free this */ 538 b->next = tofree; 539 tofree = b; 540 } 541 542 /* if writer flow controlled, restart */ 543 if((q->state & Qflow) && q->len < q->limit/2){ 544 q->state &= ~Qflow; 545 dowakeup = 1; 546 } else 547 dowakeup = 0; 548 549 iunlock(&q->lk); 550 551 if(dowakeup) 552 wakeup(&q->wr); 553 554 if(tofree != nil) 555 freeblist(tofree); 556 557 return len; 558 } 559 560 int 561 qpass(Queue *q, Block *b) 562 { 563 int dlen, len, dowakeup; 564 565 /* sync with qread */ 566 dowakeup = 0; 567 ilock(&q->lk); 568 if(q->len >= q->limit){ 569 freeblist(b); 570 iunlock(&q->lk); 571 return -1; 572 } 573 if(q->state & Qclosed){ 574 freeblist(b); 575 iunlock(&q->lk); 576 return BALLOC(b); 577 } 578 579 /* add buffer to queue */ 580 if(q->bfirst) 581 q->blast->next = b; 582 else 583 q->bfirst = b; 584 len = BALLOC(b); 585 dlen = BLEN(b); 586 QDEBUG checkb(b, "qpass"); 587 while(b->next){ 588 b = b->next; 589 QDEBUG checkb(b, "qpass"); 590 len += BALLOC(b); 591 dlen += BLEN(b); 592 } 593 q->blast = b; 594 q->len += len; 595 q->dlen += dlen; 596 597 if(q->len >= q->limit/2) 598 q->state |= Qflow; 599 600 if(q->state & Qstarve){ 601 q->state &= ~Qstarve; 602 dowakeup = 1; 603 } 604 iunlock(&q->lk); 605 606 if(dowakeup) 607 wakeup(&q->rr); 608 609 return len; 610 } 611 612 int 613 qpassnolim(Queue *q, Block *b) 614 { 615 int dlen, len, dowakeup; 616 617 /* sync with qread */ 618 dowakeup = 0; 619 ilock(&q->lk); 620 621 if(q->state & Qclosed){ 622 freeblist(b); 623 iunlock(&q->lk); 624 return BALLOC(b); 625 } 626 627 /* add buffer to queue */ 628 if(q->bfirst) 629 q->blast->next = b; 630 else 631 q->bfirst = b; 632 len = BALLOC(b); 633 dlen = BLEN(b); 634 QDEBUG checkb(b, "qpass"); 635 while(b->next){ 636 b = b->next; 637 QDEBUG checkb(b, "qpass"); 638 len += BALLOC(b); 639 dlen += BLEN(b); 640 } 641 q->blast = b; 642 q->len += len; 643 q->dlen += dlen; 644 645 if(q->len >= q->limit/2) 646 q->state |= Qflow; 647 648 if(q->state & Qstarve){ 649 q->state &= ~Qstarve; 650 dowakeup = 1; 651 } 652 iunlock(&q->lk); 653 654 if(dowakeup) 655 wakeup(&q->rr); 656 657 return len; 658 } 659 660 /* 661 * if the allocated space is way out of line with the used 662 * space, reallocate to a smaller block 663 */ 664 Block* 665 packblock(Block *bp) 666 { 667 Block **l, *nbp; 668 int n; 669 670 for(l = &bp; *l; l = &(*l)->next){ 671 nbp = *l; 672 n = BLEN(nbp); 673 if((n<<2) < BALLOC(nbp)){ 674 *l = allocb(n); 675 memmove((*l)->wp, nbp->rp, n); 676 (*l)->wp += n; 677 (*l)->next = nbp->next; 678 freeb(nbp); 679 } 680 } 681 682 return bp; 683 } 684 685 int 686 qproduce(Queue *q, void *vp, int len) 687 { 688 Block *b; 689 int dowakeup; 690 uchar *p = vp; 691 692 /* sync with qread */ 693 dowakeup = 0; 694 ilock(&q->lk); 695 696 /* no waiting receivers, room in buffer? */ 697 if(q->len >= q->limit){ 698 q->state |= Qflow; 699 iunlock(&q->lk); 700 return -1; 701 } 702 703 /* save in buffer */ 704 b = iallocb(len); 705 if(b == 0){ 706 iunlock(&q->lk); 707 return 0; 708 } 709 memmove(b->wp, p, len); 710 producecnt += len; 711 b->wp += len; 712 if(q->bfirst) 713 q->blast->next = b; 714 else 715 q->bfirst = b; 716 q->blast = b; 717 /* b->next = 0; done by iallocb() */ 718 q->len += BALLOC(b); 719 q->dlen += BLEN(b); 720 QDEBUG checkb(b, "qproduce"); 721 722 if(q->state & Qstarve){ 723 q->state &= ~Qstarve; 724 dowakeup = 1; 725 } 726 727 if(q->len >= q->limit) 728 q->state |= Qflow; 729 iunlock(&q->lk); 730 731 if(dowakeup) 732 wakeup(&q->rr); 733 734 return len; 735 } 736 737 /* 738 * copy from offset in the queue 739 */ 740 Block* 741 qcopy(Queue *q, int len, ulong offset) 742 { 743 int sofar; 744 int n; 745 Block *b, *nb; 746 uchar *p; 747 748 nb = allocb(len); 749 750 ilock(&q->lk); 751 752 /* go to offset */ 753 b = q->bfirst; 754 for(sofar = 0; ; sofar += n){ 755 if(b == nil){ 756 iunlock(&q->lk); 757 return nb; 758 } 759 n = BLEN(b); 760 if(sofar + n > offset){ 761 p = b->rp + offset - sofar; 762 n -= offset - sofar; 763 break; 764 } 765 QDEBUG checkb(b, "qcopy"); 766 b = b->next; 767 } 768 769 /* copy bytes from there */ 770 for(sofar = 0; sofar < len;){ 771 if(n > len - sofar) 772 n = len - sofar; 773 memmove(nb->wp, p, n); 774 qcopycnt += n; 775 sofar += n; 776 nb->wp += n; 777 b = b->next; 778 if(b == nil) 779 break; 780 n = BLEN(b); 781 p = b->rp; 782 } 783 iunlock(&q->lk); 784 785 return nb; 786 } 787 788 /* 789 * called by non-interrupt code 790 */ 791 Queue* 792 qopen(int limit, int msg, void (*kick)(void*), void *arg) 793 { 794 Queue *q; 795 796 q = malloc(sizeof(Queue)); 797 if(q == 0) 798 return 0; 799 800 q->limit = q->inilim = limit; 801 q->kick = kick; 802 q->arg = arg; 803 q->state = msg; 804 805 q->state |= Qstarve; 806 q->eof = 0; 807 q->noblock = 0; 808 809 return q; 810 } 811 812 /* open a queue to be bypassed */ 813 Queue* 814 qbypass(void (*bypass)(void*, Block*), void *arg) 815 { 816 Queue *q; 817 818 q = malloc(sizeof(Queue)); 819 if(q == 0) 820 return 0; 821 822 q->limit = 0; 823 q->arg = arg; 824 q->bypass = bypass; 825 q->state = 0; 826 827 return q; 828 } 829 830 static int 831 notempty(void *a) 832 { 833 Queue *q = a; 834 835 return (q->state & Qclosed) || q->bfirst != 0; 836 } 837 838 /* 839 * wait for the queue to be non-empty or closed. 840 * called with q ilocked. 841 */ 842 static int 843 qwait(Queue *q) 844 { 845 /* wait for data */ 846 for(;;){ 847 if(q->bfirst != nil) 848 break; 849 850 if(q->state & Qclosed){ 851 if(++q->eof > 3) 852 return -1; 853 if(*q->err && strcmp(q->err, Ehungup) != 0) 854 return -1; 855 return 0; 856 } 857 858 q->state |= Qstarve; /* flag requesting producer to wake me */ 859 iunlock(&q->lk); 860 sleep(&q->rr, notempty, q); 861 ilock(&q->lk); 862 } 863 return 1; 864 } 865 866 /* 867 * add a block list to a queue 868 */ 869 void 870 qaddlist(Queue *q, Block *b) 871 { 872 /* queue the block */ 873 if(q->bfirst) 874 q->blast->next = b; 875 else 876 q->bfirst = b; 877 q->len += blockalloclen(b); 878 q->dlen += blocklen(b); 879 while(b->next) 880 b = b->next; 881 q->blast = b; 882 } 883 884 /* 885 * called with q ilocked 886 */ 887 Block* 888 qremove(Queue *q) 889 { 890 Block *b; 891 892 b = q->bfirst; 893 if(b == nil) 894 return nil; 895 q->bfirst = b->next; 896 b->next = nil; 897 q->dlen -= BLEN(b); 898 q->len -= BALLOC(b); 899 QDEBUG checkb(b, "qremove"); 900 return b; 901 } 902 903 /* 904 * copy the contents of a string of blocks into 905 * memory. emptied blocks are freed. return 906 * pointer to first unconsumed block. 907 */ 908 Block* 909 bl2mem(uchar *p, Block *b, int n) 910 { 911 int i; 912 Block *next; 913 914 for(; b != nil; b = next){ 915 i = BLEN(b); 916 if(i > n){ 917 memmove(p, b->rp, n); 918 b->rp += n; 919 return b; 920 } 921 memmove(p, b->rp, i); 922 n -= i; 923 p += i; 924 b->rp += i; 925 next = b->next; 926 freeb(b); 927 } 928 return nil; 929 } 930 931 /* 932 * copy the contents of memory into a string of blocks. 933 * return nil on error. 934 */ 935 Block* 936 mem2bl(uchar *p, int len) 937 { 938 int n; 939 Block *b, *first, **l; 940 941 first = nil; 942 l = &first; 943 if(waserror()){ 944 freeblist(first); 945 nexterror(); 946 } 947 do { 948 n = len; 949 if(n > Maxatomic) 950 n = Maxatomic; 951 952 *l = b = allocb(n); 953 /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */ 954 memmove(b->wp, p, n); 955 b->wp += n; 956 p += n; 957 len -= n; 958 l = &b->next; 959 } while(len > 0); 960 poperror(); 961 962 return first; 963 } 964 965 /* 966 * put a block back to the front of the queue 967 * called with q ilocked 968 */ 969 void 970 qputback(Queue *q, Block *b) 971 { 972 b->next = q->bfirst; 973 if(q->bfirst == nil) 974 q->blast = b; 975 q->bfirst = b; 976 q->len += BALLOC(b); 977 q->dlen += BLEN(b); 978 } 979 980 /* 981 * flow control, get producer going again 982 * called with q ilocked 983 */ 984 static void 985 qwakeup_iunlock(Queue *q) 986 { 987 int dowakeup = 0; 988 989 /* if writer flow controlled, restart */ 990 if((q->state & Qflow) && q->len < q->limit/2){ 991 q->state &= ~Qflow; 992 dowakeup = 1; 993 } 994 995 iunlock(&q->lk); 996 997 /* wakeup flow controlled writers */ 998 if(dowakeup){ 999 if(q->kick) 1000 q->kick(q->arg); 1001 wakeup(&q->wr); 1002 } 1003 } 1004 1005 /* 1006 * get next block from a queue (up to a limit) 1007 */ 1008 Block* 1009 qbread(Queue *q, int len) 1010 { 1011 Block *b, *nb; 1012 int n; 1013 1014 qlock(&q->rlock); 1015 if(waserror()){ 1016 qunlock(&q->rlock); 1017 nexterror(); 1018 } 1019 1020 ilock(&q->lk); 1021 switch(qwait(q)){ 1022 case 0: 1023 /* queue closed */ 1024 iunlock(&q->lk); 1025 qunlock(&q->rlock); 1026 poperror(); 1027 return nil; 1028 case -1: 1029 /* multiple reads on a closed queue */ 1030 iunlock(&q->lk); 1031 error(q->err); 1032 } 1033 1034 /* if we get here, there's at least one block in the queue */ 1035 b = qremove(q); 1036 n = BLEN(b); 1037 1038 /* split block if it's too big and this is not a message queue */ 1039 nb = b; 1040 if(n > len){ 1041 if((q->state&Qmsg) == 0){ 1042 n -= len; 1043 b = allocb(n); 1044 memmove(b->wp, nb->rp+len, n); 1045 b->wp += n; 1046 qputback(q, b); 1047 } 1048 nb->wp = nb->rp + len; 1049 } 1050 1051 /* restart producer */ 1052 qwakeup_iunlock(q); 1053 1054 poperror(); 1055 qunlock(&q->rlock); 1056 return nb; 1057 } 1058 1059 /* 1060 * read a queue. if no data is queued, post a Block 1061 * and wait on its Rendez. 1062 */ 1063 long 1064 qread(Queue *q, void *vp, int len) 1065 { 1066 Block *b, *first, **l; 1067 int m, n; 1068 1069 qlock(&q->rlock); 1070 if(waserror()){ 1071 qunlock(&q->rlock); 1072 nexterror(); 1073 } 1074 1075 ilock(&q->lk); 1076 again: 1077 switch(qwait(q)){ 1078 case 0: 1079 /* queue closed */ 1080 iunlock(&q->lk); 1081 qunlock(&q->rlock); 1082 poperror(); 1083 return 0; 1084 case -1: 1085 /* multiple reads on a closed queue */ 1086 iunlock(&q->lk); 1087 error(q->err); 1088 } 1089 1090 /* if we get here, there's at least one block in the queue */ 1091 if(q->state & Qcoalesce){ 1092 /* when coalescing, 0 length blocks just go away */ 1093 b = q->bfirst; 1094 if(BLEN(b) <= 0){ 1095 freeb(qremove(q)); 1096 goto again; 1097 } 1098 1099 /* grab the first block plus as many 1100 * following blocks as will completely 1101 * fit in the read. 1102 */ 1103 n = 0; 1104 l = &first; 1105 m = BLEN(b); 1106 for(;;) { 1107 *l = qremove(q); 1108 l = &b->next; 1109 n += m; 1110 1111 b = q->bfirst; 1112 if(b == nil) 1113 break; 1114 m = BLEN(b); 1115 if(n+m > len) 1116 break; 1117 } 1118 } else { 1119 first = qremove(q); 1120 n = BLEN(first); 1121 } 1122 1123 /* copy to user space outside of the ilock */ 1124 iunlock(&q->lk); 1125 b = bl2mem(vp, first, len); 1126 ilock(&q->lk); 1127 1128 /* take care of any left over partial block */ 1129 if(b != nil){ 1130 n -= BLEN(b); 1131 if(q->state & Qmsg) 1132 freeb(b); 1133 else 1134 qputback(q, b); 1135 } 1136 1137 /* restart producer */ 1138 qwakeup_iunlock(q); 1139 1140 poperror(); 1141 qunlock(&q->rlock); 1142 return n; 1143 } 1144 1145 static int 1146 qnotfull(void *a) 1147 { 1148 Queue *q = a; 1149 1150 return q->len < q->limit || (q->state & Qclosed); 1151 } 1152 1153 ulong noblockcnt; 1154 1155 /* 1156 * add a block to a queue obeying flow control 1157 */ 1158 long 1159 qbwrite(Queue *q, Block *b) 1160 { 1161 int n, dowakeup; 1162 1163 n = BLEN(b); 1164 1165 if(q->bypass){ 1166 (*q->bypass)(q->arg, b); 1167 return n; 1168 } 1169 1170 dowakeup = 0; 1171 qlock(&q->wlock); 1172 if(waserror()){ 1173 if(b != nil) 1174 freeb(b); 1175 qunlock(&q->wlock); 1176 nexterror(); 1177 } 1178 1179 ilock(&q->lk); 1180 1181 /* give up if the queue is closed */ 1182 if(q->state & Qclosed){ 1183 iunlock(&q->lk); 1184 error(q->err); 1185 } 1186 1187 /* if nonblocking, don't queue over the limit */ 1188 if(q->len >= q->limit){ 1189 if(q->noblock){ 1190 iunlock(&q->lk); 1191 freeb(b); 1192 noblockcnt += n; 1193 qunlock(&q->wlock); 1194 poperror(); 1195 return n; 1196 } 1197 } 1198 1199 /* queue the block */ 1200 if(q->bfirst) 1201 q->blast->next = b; 1202 else 1203 q->bfirst = b; 1204 q->blast = b; 1205 b->next = 0; 1206 q->len += BALLOC(b); 1207 q->dlen += n; 1208 QDEBUG checkb(b, "qbwrite"); 1209 b = nil; 1210 1211 /* make sure other end gets awakened */ 1212 if(q->state & Qstarve){ 1213 q->state &= ~Qstarve; 1214 dowakeup = 1; 1215 } 1216 iunlock(&q->lk); 1217 1218 /* get output going again */ 1219 if(q->kick && (dowakeup || (q->state&Qkick))) 1220 q->kick(q->arg); 1221 1222 /* wakeup anyone consuming at the other end */ 1223 if(dowakeup){ 1224 wakeup(&q->rr); 1225 1226 /* if we just wokeup a higher priority process, let it run */ 1227 /* 1228 p = wakeup(&q->rr); 1229 if(p != nil && p->priority > up->priority) 1230 sched(); 1231 */ 1232 } 1233 1234 /* 1235 * flow control, wait for queue to get below the limit 1236 * before allowing the process to continue and queue 1237 * more. We do this here so that postnote can only 1238 * interrupt us after the data has been queued. This 1239 * means that things like 9p flushes and ssl messages 1240 * will not be disrupted by software interrupts. 1241 * 1242 * Note - this is moderately dangerous since a process 1243 * that keeps getting interrupted and rewriting will 1244 * queue infinite crud. 1245 */ 1246 for(;;){ 1247 if(q->noblock || qnotfull(q)) 1248 break; 1249 1250 ilock(&q->lk); 1251 q->state |= Qflow; 1252 iunlock(&q->lk); 1253 sleep(&q->wr, qnotfull, q); 1254 } 1255 USED(b); 1256 1257 qunlock(&q->wlock); 1258 poperror(); 1259 return n; 1260 } 1261 1262 /* 1263 * write to a queue. only Maxatomic bytes at a time is atomic. 1264 */ 1265 int 1266 qwrite(Queue *q, void *vp, int len) 1267 { 1268 int n, sofar; 1269 Block *b; 1270 uchar *p = vp; 1271 1272 QDEBUG if(!islo()) 1273 print("qwrite hi %p\n", getcallerpc(&q)); 1274 1275 sofar = 0; 1276 do { 1277 n = len-sofar; 1278 if(n > Maxatomic) 1279 n = Maxatomic; 1280 1281 b = allocb(n); 1282 /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */ 1283 if(waserror()){ 1284 freeb(b); 1285 nexterror(); 1286 } 1287 memmove(b->wp, p+sofar, n); 1288 poperror(); 1289 b->wp += n; 1290 1291 qbwrite(q, b); 1292 1293 sofar += n; 1294 } while(sofar < len && (q->state & Qmsg) == 0); 1295 1296 return len; 1297 } 1298 1299 /* 1300 * used by print() to write to a queue. Since we may be splhi or not in 1301 * a process, don't qlock. 1302 */ 1303 int 1304 qiwrite(Queue *q, void *vp, int len) 1305 { 1306 int n, sofar, dowakeup; 1307 Block *b; 1308 uchar *p = vp; 1309 1310 dowakeup = 0; 1311 1312 sofar = 0; 1313 do { 1314 n = len-sofar; 1315 if(n > Maxatomic) 1316 n = Maxatomic; 1317 1318 b = iallocb(n); 1319 if(b == nil) 1320 break; 1321 memmove(b->wp, p+sofar, n); 1322 b->wp += n; 1323 1324 ilock(&q->lk); 1325 1326 QDEBUG checkb(b, "qiwrite"); 1327 if(q->bfirst) 1328 q->blast->next = b; 1329 else 1330 q->bfirst = b; 1331 q->blast = b; 1332 q->len += BALLOC(b); 1333 q->dlen += n; 1334 1335 if(q->state & Qstarve){ 1336 q->state &= ~Qstarve; 1337 dowakeup = 1; 1338 } 1339 1340 iunlock(&q->lk); 1341 1342 if(dowakeup){ 1343 if(q->kick) 1344 q->kick(q->arg); 1345 wakeup(&q->rr); 1346 } 1347 1348 sofar += n; 1349 } while(sofar < len && (q->state & Qmsg) == 0); 1350 1351 return sofar; 1352 } 1353 1354 /* 1355 * be extremely careful when calling this, 1356 * as there is no reference accounting 1357 */ 1358 void 1359 qfree(Queue *q) 1360 { 1361 qclose(q); 1362 free(q); 1363 } 1364 1365 /* 1366 * Mark a queue as closed. No further IO is permitted. 1367 * All blocks are released. 1368 */ 1369 void 1370 qclose(Queue *q) 1371 { 1372 Block *bfirst; 1373 1374 if(q == nil) 1375 return; 1376 1377 /* mark it */ 1378 ilock(&q->lk); 1379 q->state |= Qclosed; 1380 q->state &= ~(Qflow|Qstarve); 1381 strcpy(q->err, Ehungup); 1382 bfirst = q->bfirst; 1383 q->bfirst = 0; 1384 q->len = 0; 1385 q->dlen = 0; 1386 q->noblock = 0; 1387 iunlock(&q->lk); 1388 1389 /* free queued blocks */ 1390 freeblist(bfirst); 1391 1392 /* wake up readers/writers */ 1393 wakeup(&q->rr); 1394 wakeup(&q->wr); 1395 } 1396 1397 /* 1398 * Mark a queue as closed. Wakeup any readers. Don't remove queued 1399 * blocks. 1400 */ 1401 void 1402 qhangup(Queue *q, char *msg) 1403 { 1404 /* mark it */ 1405 ilock(&q->lk); 1406 q->state |= Qclosed; 1407 if(msg == 0 || *msg == 0) 1408 strcpy(q->err, Ehungup); 1409 else 1410 strncpy(q->err, msg, ERRMAX-1); 1411 iunlock(&q->lk); 1412 1413 /* wake up readers/writers */ 1414 wakeup(&q->rr); 1415 wakeup(&q->wr); 1416 } 1417 1418 /* 1419 * return non-zero if the q is hungup 1420 */ 1421 int 1422 qisclosed(Queue *q) 1423 { 1424 return q->state & Qclosed; 1425 } 1426 1427 /* 1428 * mark a queue as no longer hung up 1429 */ 1430 void 1431 qreopen(Queue *q) 1432 { 1433 ilock(&q->lk); 1434 q->state &= ~Qclosed; 1435 q->state |= Qstarve; 1436 q->eof = 0; 1437 q->limit = q->inilim; 1438 iunlock(&q->lk); 1439 } 1440 1441 /* 1442 * return bytes queued 1443 */ 1444 int 1445 qlen(Queue *q) 1446 { 1447 return q->dlen; 1448 } 1449 1450 /* 1451 * return space remaining before flow control 1452 */ 1453 int 1454 qwindow(Queue *q) 1455 { 1456 int l; 1457 1458 l = q->limit - q->len; 1459 if(l < 0) 1460 l = 0; 1461 return l; 1462 } 1463 1464 /* 1465 * return true if we can read without blocking 1466 */ 1467 int 1468 qcanread(Queue *q) 1469 { 1470 return q->bfirst!=0; 1471 } 1472 1473 /* 1474 * change queue limit 1475 */ 1476 void 1477 qsetlimit(Queue *q, int limit) 1478 { 1479 q->limit = limit; 1480 } 1481 1482 /* 1483 * set blocking/nonblocking 1484 */ 1485 void 1486 qnoblock(Queue *q, int onoff) 1487 { 1488 q->noblock = onoff; 1489 } 1490 1491 /* 1492 * flush the output queue 1493 */ 1494 void 1495 qflush(Queue *q) 1496 { 1497 Block *bfirst; 1498 1499 /* mark it */ 1500 ilock(&q->lk); 1501 bfirst = q->bfirst; 1502 q->bfirst = 0; 1503 q->len = 0; 1504 q->dlen = 0; 1505 iunlock(&q->lk); 1506 1507 /* free queued blocks */ 1508 freeblist(bfirst); 1509 1510 /* wake up readers/writers */ 1511 wakeup(&q->wr); 1512 } 1513 1514 int 1515 qfull(Queue *q) 1516 { 1517 return q->state & Qflow; 1518 } 1519 1520 int 1521 qstate(Queue *q) 1522 { 1523 return q->state; 1524 } 1525